-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for JSONConverter in sink connector #71
Conversation
28305f7
to
f03cd16
Compare
@@ -116,6 +112,7 @@ public static JSONArray batchWrite(Map<String, String> props, JSONArray data) | |||
result.put(jsonResult); | |||
} | |||
} catch (Exception e) { | |||
LOG.error("Exception caught in batchWrite()", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may need to be revisited in another PR - the worrying thing is that we were just swallowing exceptions from the cloudant client which I had manage to trigger with a misconfigured test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, there needs to be a separate look at error handling to conform to the behaviours of the built-in errors.tolerance=all
and none
flags. (all
implies silently ignore bad messages so I guess that's all we have right now!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we open a ticket specifically for investigating and improving error handling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a note in my error handling epic, I think strictly speaking we should iterate the result and push to the DLQ or whatever error handling is configured specifically for each document/message, but I'm ok with us improving that later.
value.converter.schemas.enable=true | ||
``` | ||
|
||
#### Converter configuration: sink connector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source connector converter needs covering when we do the PR for that work - my intention is that we support JsonConverter on source and sink which simplifies things (as mentioned above it's the default anyway so no need to explicitly set it in config)
@@ -88,13 +92,17 @@ public void testReplicateAll() throws Exception { | |||
// - no offset | |||
for (SourceRecord record : records) { | |||
|
|||
// source task returns strings but sink task expects structs or maps | |||
// in a real kafka instance this would be fixed by using appropriate converters | |||
Map recordValue = gson.fromJson((String)record.value(), Map.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit awkward but is needed until we do the PR to support JsonConverter in the source connector - at which point this line can be removed since the source will return us a Map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ricellis your idea of enabling schemas here (to return a struct/map instead of a string) didn't work because it caused various limits to be exceeded (memory, http request size), because the test payloads are complex resulting in a huge inline schema per doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test payloads are complex resulting in a huge inline schema per doc.
I'm fine with doing something other than the enable.schemas
approach, but honestly this is a little concerning to me, yeah they do have a 100 or so properties, but they don't look that complex. Maybe this is something that we need to cover in QA though to get a better handle on what is stressing it.
@@ -60,7 +63,8 @@ protected void setUp() throws Exception { | |||
data = new JSONArray(tokener); | |||
|
|||
// Load data into the source database (create if it does not exist) | |||
JavaCloudantUtil.batchWrite(sourceProperties, data); | |||
JavaCloudantUtil.batchWrite(sourceProperties, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few instances of this awkward mapping from org.json
in test code - I've tried to make the changes as minimal as possible but would love to get rid of that library altogether at a later date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed I've already got this noted downf or later work. There are 3 different JSON libs hanging around in various places and we should narrow that down to use only the one brought by Kafka itself or the cloudant-java-sdk or maybe both as they'll both be there anyway, but we definitely shouldn't rely on an extra third one.
README.md
Outdated
|
||
Assume these settings in a file `connect-standalone.properties` or `connect-distributed.properties`. | ||
Usually the kafka distribution defaults (`connect-(standalone|distributed).properties`) are as follows: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to expand on this if we say that the values below are usually the defaults? Would it be any better if we said:
"The Kafka distribution defaults are typically as follows:"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ac8b55d
@@ -116,6 +112,7 @@ public static JSONArray batchWrite(Map<String, String> props, JSONArray data) | |||
result.put(jsonResult); | |||
} | |||
} catch (Exception e) { | |||
LOG.error("Exception caught in batchWrite()", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we open a ticket specifically for investigating and improving error handling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good on the whole, just a few minor suggestions.
@@ -116,6 +112,7 @@ public static JSONArray batchWrite(Map<String, String> props, JSONArray data) | |||
result.put(jsonResult); | |||
} | |||
} catch (Exception e) { | |||
LOG.error("Exception caught in batchWrite()", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a note in my error handling epic, I think strictly speaking we should iterate the result and push to the DLQ or whatever error handling is configured specifically for each document/message, but I'm ok with us improving that later.
src/main/java/com/ibm/cloudant/kafka/common/utils/JavaCloudantUtil.java
Outdated
Show resolved
Hide resolved
src/main/java/com/ibm/cloudant/kafka/connect/CloudantSinkTask.java
Outdated
Show resolved
Hide resolved
src/main/java/com/ibm/cloudant/kafka/connect/StructToMapConverter.java
Outdated
Show resolved
Hide resolved
src/main/java/com/ibm/cloudant/kafka/connect/StructToMapConverter.java
Outdated
Show resolved
Hide resolved
src/test/java/com/ibm/cloudant/kafka/connect/CloudantSinkTaskTest.java
Outdated
Show resolved
Hide resolved
@@ -88,13 +92,17 @@ public void testReplicateAll() throws Exception { | |||
// - no offset | |||
for (SourceRecord record : records) { | |||
|
|||
// source task returns strings but sink task expects structs or maps | |||
// in a real kafka instance this would be fixed by using appropriate converters | |||
Map recordValue = gson.fromJson((String)record.value(), Map.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test payloads are complex resulting in a huge inline schema per doc.
I'm fine with doing something other than the enable.schemas
approach, but honestly this is a little concerning to me, yeah they do have a 100 or so properties, but they don't look that complex. Maybe this is something that we need to cover in QA though to get a better handle on what is stressing it.
@@ -60,7 +63,8 @@ protected void setUp() throws Exception { | |||
data = new JSONArray(tokener); | |||
|
|||
// Load data into the source database (create if it does not exist) | |||
JavaCloudantUtil.batchWrite(sourceProperties, data); | |||
JavaCloudantUtil.batchWrite(sourceProperties, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed I've already got this noted downf or later work. There are 3 different JSON libs hanging around in various places and we should narrow that down to use only the one brought by Kafka itself or the cloudant-java-sdk or maybe both as they'll both be there anyway, but we definitely shouldn't rely on an extra third one.
These will probably need a follow-up commit to iron out any breakages... Co-authored-by: Rich Ellis <ricellis@users.noreply.github.com>
Test removed because we no longer support the KC_SCHEMA stuff, as discussed in PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am +1 now, but a couple of minor things
src/main/java/com/ibm/cloudant/kafka/connect/CloudantSinkTask.java
Outdated
Show resolved
Hide resolved
src/main/java/com/ibm/cloudant/kafka/connect/CloudantSinkTask.java
Outdated
Show resolved
Hide resolved
…java Co-authored-by: Rich Ellis <ricellis@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Checklist
CHANGES.md
|CHANGELOG.md
) or test/build only changesDescription
See #62
Approach
Support JSONConverter by expecting values from kafka sink tasks to either be a java
Map
or a kafkaStruct
.Map
, pass through tobatchWrite
directlyStruct
, convert toMap
using newStructToMapConverter
Add documentation in README
Schema & API Changes
Security and Privacy
Testing
See added
StructToMapConverterTests
Monitoring and Logging
Added
slf4j-simple
to view log output when running tests