Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sink connector ID configuration #65

Closed
ricellis opened this issue Mar 11, 2022 · 2 comments · Fixed by #83
Closed

Sink connector ID configuration #65

ricellis opened this issue Mar 11, 2022 · 2 comments · Fixed by #83
Assignees
Milestone

Comments

@ricellis
Copy link
Member

ricellis commented Mar 11, 2022

Currently the sink connector handles _id as follows:

replication option _id present in SinkRecord _id written
true true _id
true false none, new _id generated by Cloudant
false true <topic-name>_<partition>_<offset>_<_id>
false false none, new _id generated by Cloudant

I don't think the _id should be decided by a seemingly unrelated configuration option.

We should instead document the use of SMTs to faciliate customization of the _id; namely:

  • ReplaceField
    • Rename a current field to _id
    • Exclude the _id field to force Cloudant to generate

We should provide (and document) a new SMT class (KeyToDocId ?) that can insert the message key into the value _id field (noting that it should be a String schema or stringifiable).

In the case that the required _id field is not present or the record has a null key then we should not pass an ID to Cloudant and just let it generate a UUID. In the case that the user wanted all _id generated they could use ReplaceField with an exclude to remove any existing _id and the default id mode.

We should also document that it is possible to use further SMTs to customize e.g.

  • ValueToKey and ExtractField to convert some field to the key and then using the KeyToDocId transform to use the new key as an ID.
tomblench added a commit that referenced this issue May 10, 2022
* WIP - converter for map/struct w/ and w/out schema

* Clarify converter config

* Don't log record value as it could be large

* test cleanup

* Add changelog entry

* Apply suggestions from code review

These will probably need a follow-up commit to iron out any breakages...

Co-authored-by: Rich Ellis <ricellis@users.noreply.github.com>

* Post-suggestion fixups

* PR suggestion: more explicit schema/type checks

* PR feedback: cleanup imports

* PR feedback: drop docid checks, to be addressed in #65

* PR feedback: fix tests and remove testNonReplicateSinkRecordSchema

Test removed because we no longer support the KC_SCHEMA stuff, as
discussed in PR.

* PR feedback: clarify defaults

* PR feedback: used linked list

* Update src/main/java/com/ibm/cloudant/kafka/connect/CloudantSinkTask.java

Co-authored-by: Rich Ellis <ricellis@users.noreply.github.com>

* PR feedback: streaming implementation of put()

Co-authored-by: Rich Ellis <ricellis@users.noreply.github.com>
@ricellis
Copy link
Member Author

Rather than a sink connector configuration option I think providing (and documenting) a SMT that is able to insert the message key into the message value as an _id field is the way to go.

@emlaver emlaver self-assigned this Jun 6, 2022
@ricellis ricellis added this to the 0.200.0 milestone Jun 8, 2022
@ricellis
Copy link
Member Author

Looking at the complexity that is coming out in #82 from trying to handle all the variations of inserting an _id into different types of messages and associated schema updates I think we need to re-think how we handle this.
One option would be to:

  1. Modify the ConnectRecordMapper to check for the presence of a custom header on the record and use the value of that header as the ID.
  2. Use the org.apache.kafka.connect.transforms.HeaderFrom to convert a key to the header, avoiding the need for us to have any custom SMT.

For the header, we would ensure the StringConverter was used as we only want strings for IDs. We would need to document the name of the header and the expectation that the value is a string. (this would cause errors for things that cannot convert cleanly to strings, inline with general Kafka behaviour on converters).

We would overwrite existing _id in the JSON blob before writing to Cloudant (with a warning).

emlaver added a commit that referenced this issue Jul 22, 2022
* Add README section with SMT examples for customizing _id field and document the `HeaderFrom` transform to convert an event key to the header
* Modify the ConnectRecordMapper to check for the presence of a custom header on the event and use the value of that header as the document ID.
#65
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants