Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document the use of SMTs to customize _id field #83

Merged
merged 11 commits into from
Jul 22, 2022
Merged

Conversation

emlaver
Copy link
Contributor

@emlaver emlaver commented Jul 19, 2022

Checklist

  • Tick to sign-off your agreement to the Developer Certificate of Origin (DCO) 1.1
  • Added tests for code changes or test/build only changes
  • Updated the change log file (CHANGES.md|CHANGELOG.md) or test/build only changes
  • Completed the PR template below:

Description

fixes #65

Approach

  • Modify the ConnectRecordMapper to check for the presence of a custom header on the record and use the value of that header as the ID.
  • Use the HeaderFrom transform to convert a key to the header, avoiding the need for us to have any custom SMT.
  • Add README section for using SMTs to rename, replace, or filter out _id field. Also document how to remove tombstone records.

Schema & API Changes

"No change"

Security and Privacy

"No change"

Testing

Monitoring and Logging

"No change"

For reviewers:

  • This example requires a conditional SMT to drop the field if _id == null. I need to re-review the SMT docs to see if we can filter on the value of fields.
  • I've yet to successfully test the header to _id changes in a local Kafka environment. In Kafka 3.2.0, they enabled the option to set headers when using kafka-console-producer.sh. If you don't have 3.2.0 then you'll have to install and use kcat.
    I'm using the example:
echo 'header:value\t{"test":"1", "try": 0, "time": true}'  | ./bin/kafka-console-producer.sh --topic kafka_test2 --bootstrap-server localhost:9092 --property "parse.headers=true" -

And this throws the error:

Caused by: org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [header move], found: null
	at org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
	at org.apache.kafka.connect.transforms.HeaderFrom.applySchemaless(HeaderFrom.java:161)
	at org.apache.kafka.connect.transforms.HeaderFrom.apply(HeaderFrom.java:113)
	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.TransformationChain$$Lambda$613/0x0000000000000000.call(Unknown Source)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	... 14 more

emlaver added 4 commits June 17, 2022 11:45
…bstone records

WIP decide if we want custom conditional SMT that drops _id field if _id == null
…header on the record and use the value of that header as the ID.

- Document the existing HeaderFrom transform to convert a key to the header
@emlaver emlaver self-assigned this Jul 19, 2022
Copy link
Member

@ricellis ricellis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the error

Caused by: org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [header move], found: null
	at org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
	at org.apache.kafka.connect.transforms.HeaderFrom.applySchemaless(HeaderFrom.java:161)
	at org.apache.kafka.connect.transforms.HeaderFrom.apply(HeaderFrom.java:113)

It looks like only Map is supported for the message key when there is no schema, so you either need to make your test message have a map key or add a schema to it.

- Rename header to "cloudant_doc_id"
- Add README priority order section, rename header
@emlaver
Copy link
Contributor Author

emlaver commented Jul 20, 2022

For the error

Caused by: org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [header move], found: null
	at org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
	at org.apache.kafka.connect.transforms.HeaderFrom.applySchemaless(HeaderFrom.java:161)
	at org.apache.kafka.connect.transforms.HeaderFrom.apply(HeaderFrom.java:113)

It looks like only Map is supported for the message key when there is no schema, so you either need to make your test message have a map key or add a schema to it.

Right, I realized my mistake. I was producing a record that had headers and a value but no key.
Using the Java producer API and the Kafka "map" key "{\"docid\":\"value1\"}", I managed to successfully create the bulk doc JSON body {"docs":[{"hello":"Message_1","_id":"value1"}]}. This was using key.converter=org.apache.kafka.connect.json.JsonConverter config.

Copy link
Member

@ricellis ricellis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 though I'd advocate for adding the empty check you proposed, plus a couple of nits.

README.md Outdated
Comment on lines 79 to 87
3. If you have messages where the `_id` field is absent or `null` then Cloudant will generate
a document ID. If you don't want this to happen then set an `_id` (see earlier examples).
Alternatively filter out those documents. For example if you have messages where the `_id`
field is `null` then you'll need to use a transform and predicate to filter out and remove this
field:
```
TODO
```

Copy link
Contributor Author

@emlaver emlaver Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some more digging and the current built-in SMT predicates and filter won't handle either a) dropping the field or b) filtering the document if _id == null .
At this point, I wouldn't want to delay this PR by trying to create a custom SMT. What if we update this to:

Suggested change
3. If you have messages where the `_id` field is absent or `null` then Cloudant will generate
a document ID. If you don't want this to happen then set an `_id` (see earlier examples).
Alternatively filter out those documents. For example if you have messages where the `_id`
field is `null` then you'll need to use a transform and predicate to filter out and remove this
field:
```
TODO
```
3. If you have messages where the `_id` field is absent or `null` then Cloudant will generate
a document ID. If you don't want this to happen then set an `_id` (see earlier examples).
If you need to filter out those documents or drop `_id` fields when the value is `null` then you'll need to create a custom SMT.

I don't think this needs to be update a numbered point. This could be a note at the end of this section.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't have to delay this PR by trying to create a custom SMT

Agreed, I don't think it is a use case that warrants us delivering a custom SMT for it anyway.

I don't think this needs to be update a numbered point. This could be a note at the end of this section.

I think it can stay a numbered point, but I might move it to the bottom of the list.

@emlaver emlaver requested a review from tomblench July 20, 2022 14:45
README.md Outdated

**Note**: The `header.converter` is required to be set to `StringConverter` since the document ID field only supports strings.

**Note**: For any of the SMTs above, if the field does not exist it will skip over that message and continue processing the next message.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt:

Suggested change
**Note**: For any of the SMTs above, if the field does not exist it will skip over that message and continue processing the next message.
**Note**: For any of the SMTs above, if the field does not exist it will leave the message unmodified and continue processing the next message.

- Add numbered point at the end of section about handling _id fields that are null
- Update numbering
- Fix final note
Copy link
Contributor

@tomblench tomblench left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just a few minor issues especially around the README.

@@ -45,6 +59,22 @@ public void testConvertToMapNoSchema() {
assertEquals("world", converted.get("hello"));
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about having a negative test, eg where the header value is not a string, to show that conversion doesn't blow up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added two tests in 097f306:

  • Convert to map with no schema, existing _id field, and invalid map header. Assert that the _id field never changed.
  • Convert to struct with invalid boolean header. Assert that the _id field is null.

private String getHeaderForDocId(ConnectRecord<R> record) {
Header value = record.headers().lastWithName(HEADER_DOC_ID_KEY);
if (value != null && value.value() instanceof String) {
return value.value().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also be a cast to String but it's a style issue as they both achieve the same thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to keep this as-is

@emlaver emlaver merged commit 947c8d6 into master Jul 22, 2022
@emlaver emlaver deleted the 65-sink-id-transform branch July 22, 2022 14:14
@ricellis ricellis added this to the 0.200.0 milestone Sep 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sink connector ID configuration
3 participants