Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document the use of SMTs to customize _id field #83

Merged
merged 11 commits into from
Jul 22, 2022
6 changes: 3 additions & 3 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"files": "performance/resources/.+\\.ipynb|src/test/resources/.+\\.json|^.secrets.baseline$",
"lines": null
},
"generated_at": "2022-06-15T15:41:43Z",
"generated_at": "2022-07-20T15:13:05Z",
"plugins_used": [
{
"name": "AWSKeyDetector"
Expand Down Expand Up @@ -97,7 +97,7 @@
{
"hashed_secret": "333f0f8814d63e7268f80e1e65e7549137d2350c",
"is_verified": false,
"line_number": 168,
"line_number": 225,
"type": "Secret Keyword",
"verified_result": null
}
Expand All @@ -106,7 +106,7 @@
{
"hashed_secret": "b48edfdb054b3af80a24bbb8e0666a0dada90c3e",
"is_verified": false,
"line_number": 217,
"line_number": 218,
"type": "Secret Keyword",
"verified_result": null
}
Expand Down
59 changes: 58 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ For the sink connector:
1. Kafka keys are currently ignored; therefore the key converter settings are not relevant.
1. We assume that the values in kafka are serialized JSON objects, and therefore `JsonConverter` is supported. If your values contain a schema (`{"schema": {...}, "payload": {...}}`), then set `value.converter.schemas.enable=true`, otherwise set `value.converter.schemas.enable=false`. Any other converter that converts the message values into `org.apache.kafka.connect.data.Struct` or `java.util.Map` types should also work. However, it must be noted that the subsequent serialization of `Map` or `Struct` values to JSON documents in the sink may not match expectations if a schema has not been provided.
1. Inserting only a single revision of any `_id` is currently supported. This means it cannot update or delete documents.
1. The `_rev` field in message values are preserved. To remove `rev` during data flow, use the `ReplaceField` Single Message Transforms (SMT).
1. The `_rev` field in event values are preserved. To remove `rev` during data flow, use the `ReplaceField` Single Message Transforms (SMT).
Example configuration:
```
transforms=ReplaceField
Expand All @@ -54,6 +54,63 @@ Example configuration:
```
See the [Kafka Connect transforms](https://kafka.apache.org/31/documentation.html#connect_transforms) documentation for more details.

**Note:** The ID of each document written to Cloudant by the sink connector can be configured as follows:

1. From the value of the `cloudant_doc_id` header on the event. The value passed to this header must be a string and the `header.converter=org.apache.kafka.connect.storage.StringConverter` config is required. This will overwrite the `_id` field if it already exists.
1. The value of the `_id` field in the JSON
1. If no other non-null or non-empty value is available the document will be created with a new UUID.

#### Single Message Transforms

Single Message Transforms, or SMTs, can be used to customize fields or values of events during data flow. The examples below will explore modifying fields for events flowing from the Kafka topic to a Cloudant database using the sink connector.

1. If the event value contains an existing field, not called `_id`, that is suitable to use as the Cloudant document ID, then you can use the `RenameField` transform.
```
transforms=RenameField
transforms.RenameField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames=name:_id
```
1. If you have `_id` fields and would prefer to have Cloudant generate a UUID for the document ID, use the `ReplaceField` transform to exclude the existing `_id` field:
```
transforms=ReplaceField
transforms.ReplaceField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.ReplaceField.exclude=_id
```

1. If you have events where there is no value e.g. tombstones (and don't want the Cloudant sink connector to generate an empty doc with a generated ID) then
you'll need to use a `dropNullRecords` transform and predicate to filter out and remove these tombstone events:
```
transforms=dropNullRecords
transforms.dropNullRecords.type=org.apache.kafka.connect.transforms.Filter
transforms.dropNullRecords.predicate=isNullRecord

predicates=isNullRecord
predicates.isNullRecord.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
```

1. If you want to use the event key or another custom value as the document ID then use the `cloudant_doc_id` custom header.
The value set in this custom header will be added to the `_id` field. If the `_id` field already exists then it will be overwritten
with the value in this header.
You can use the `HeaderFrom` SMT to move or copy a key to the custom header. The example config below adds the transform to move
the `docid` event key to the `cloudant_doc_id` custom header and sets the header converter to string:
```
transforms=moveFieldsToHeaders
transforms.moveFieldsToHeaders.type=org.apache.kafka.connect.transforms.HeaderFrom$Key
transforms.moveFieldsToHeaders.fields=docid
transforms.moveFieldsToHeaders.headers=cloudant_doc_id
transforms.moveFieldsToHeaders.operation=move

header.converter=org.apache.kafka.connect.storage.StringConverter
```

**Note**: The `header.converter` is required to be set to `StringConverter` since the document ID field only supports strings.

1. If you have events where the `_id` field is absent or `null` then Cloudant will generate
a document ID. If you don't want this to happen then set an `_id` (see earlier examples).
If you need to filter out those documents or drop `_id` fields when the value is `null` then you'll need to create a custom SMT.

**Note**: For any of the SMTs above, if the field does not exist it will leave the event unmodified and continue processing the next event.

### Authentication

In order to read from or write to Cloudant, some authentication properties need to be configured. These properties are common to both the source and sink connector.
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
implementation group: 'org.json', name: 'json', version: '20210307'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.36'
implementation group: 'org.apache.kafka', name: 'connect-api', version: "${kafkaVersion}"
implementation group: 'org.apache.kafka', name: 'connect-transforms', version: "${kafkaVersion}"
testImplementation group: 'junit', name: 'junit', version: '4.12'
testImplementation group: 'org.powermock', name: 'powermock-api-easymock', version: '1.6.4'
testImplementation group: 'org.easymock', name: 'easymock', version: '3.4'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
/*
* Copyright © 2022 IBM Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package com.ibm.cloudant.kafka.schema;

import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,6 +27,8 @@
import java.util.function.Function;

public class ConnectRecordMapper<R extends ConnectRecord<R>> implements Function<ConnectRecord<R>, Map<String, Object>> {

static final String HEADER_DOC_ID_KEY = "cloudant_doc_id";

private static Logger LOG = LoggerFactory.getLogger(ConnectRecordMapper.class);

Expand All @@ -23,21 +39,29 @@ public Map<String, Object> apply(ConnectRecord<R> record) {
switch (schemaType) {
case MAP:
if (record.value() instanceof Map) {
return convertMap((Map) record.value(), toReturn);
convertMap((Map) record.value(), toReturn);
break;
} else {
throw new IllegalArgumentException(String.format("Type %s not supported with schema of type Map (or no schema)",
record.value().getClass()));
}
case STRUCT:
if (record.value() instanceof Struct) {
return convertStruct((Struct) record.value(), toReturn);
convertStruct((Struct) record.value(), toReturn);
break;
} else {
throw new IllegalArgumentException(String.format("Type %s not supported with schema of type Struct",
record.value().getClass()));
}
default:
throw new IllegalArgumentException(String.format("Schema type %s not supported", record.valueSchema().type()));
}
// Check if custom header exists on the record and use the value for the document's id
String headerValue = getHeaderForDocId(record);
if (!toReturn.isEmpty() && headerValue != null && !headerValue.isEmpty()) {
toReturn.put("_id", headerValue);
}
return toReturn;
}

// convert struct to map by adding key/values to passed in map, and returning it
Expand Down Expand Up @@ -100,4 +124,12 @@ private Object getField(Type type, Object value) {

}

private String getHeaderForDocId(ConnectRecord<R> record) {
Header value = record.headers().lastWithName(HEADER_DOC_ID_KEY);
if (value != null && value.value() instanceof String) {
return value.value().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also be a cast to String but it's a style issue as they both achieve the same thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to keep this as-is

}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/*
* Copyright © 2022 IBM Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package com.ibm.cloudant.kafka.schema;

import org.apache.kafka.connect.data.Schema;
Expand All @@ -11,8 +24,10 @@
import java.util.HashMap;
import java.util.Map;

import static com.ibm.cloudant.kafka.schema.ConnectRecordMapper.HEADER_DOC_ID_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNull;

public class ConnectRecordMapperTests {

Expand Down Expand Up @@ -45,6 +60,41 @@ public void testConvertToMapNoSchema() {
assertEquals("world", converted.get("hello"));
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about having a negative test, eg where the header value is not a string, to show that conversion doesn't blow up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added two tests in 097f306:

  • Convert to map with no schema, existing _id field, and invalid map header. Assert that the _id field never changed.
  • Convert to struct with invalid boolean header. Assert that the _id field is null.

public void testConvertToMapNoSchemaWithHeader() {
// given...
Schema s = null; // no schema
String headerValue = "example-doc-id";
Map<String, String> value = new HashMap<>();
value.put("hello", "world");
SinkRecord sr = new SinkRecord("test", 13, null, "0001", s, value, 0);
sr.headers().addString(HEADER_DOC_ID_KEY, headerValue);
// when...
Map<String, Object> converted = mapper.apply(sr);
// then...
assertEquals("world", converted.get("hello"));
assertEquals(headerValue, converted.get("_id"));
}

@Test
public void testConvertToMapNoSchemaWithInvalidHeader() {
// given...
Schema s = null; // no schema
Schema headerSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA);
Map headerValue = new HashMap();
headerValue.put("invalid", "value");
Map<String, String> value = new HashMap<>();
value.put("_id", "foo");
value.put("hello", "world");
SinkRecord sr = new SinkRecord("test", 13, null, "0001", s, value, 0);
sr.headers().addMap(HEADER_DOC_ID_KEY, headerValue, headerSchema);
// when...
Map<String, Object> converted = mapper.apply(sr);
// then...
assertEquals("world", converted.get("hello"));
assertEquals("foo", converted.get("_id"));
}

@Test
public void testConvertComplexStruct() {
// given...
Expand Down Expand Up @@ -111,6 +161,92 @@ public void testConvertComplexStruct() {

}

@Test
public void testConvertStructOverwriteIdWithHeader() {
// given...
String headerValue = "example-doc-id";
Schema s = SchemaBuilder.struct()
.field("_id", Schema.STRING_SCHEMA)
.field("_rev", Schema.STRING_SCHEMA)
.build();

Struct value = new Struct(s);
value.put("_id", "foo1");
value.put("_rev", "foo2");

// when...
try {
value.validate();
} catch (DataException de) {
fail("Data invalid according to schema");
}

// do conversion
SinkRecord sr = new SinkRecord("test", 13, null, "0001", s, value, 0);
sr.headers().addString(HEADER_DOC_ID_KEY, headerValue);
Map<String, Object> converted = mapper.apply(sr);

// then...
assertEquals(headerValue, converted.get("_id"));
assertEquals("foo2", converted.get("_rev"));
}

@Test
public void testConvertStructWithHeader() {
// given...
String headerValue = "example-doc-id";
Schema s = SchemaBuilder.struct()
.field("_rev", Schema.STRING_SCHEMA)
.build();

Struct value = new Struct(s);
value.put("_rev", "foo2");

// when...
try {
value.validate();
} catch (DataException de) {
fail("Data invalid according to schema");
}

// do conversion
SinkRecord sr = new SinkRecord("test", 13, null, "0001", s, value, 0);
sr.headers().addString(HEADER_DOC_ID_KEY, headerValue);
Map<String, Object> converted = mapper.apply(sr);

// then...
assertEquals(headerValue, converted.get("_id"));
assertEquals("foo2", converted.get("_rev"));
}

@Test
public void testConvertStructWithInvalidHeader() {
// given...
int headerValue = 100;
Schema s = SchemaBuilder.struct()
.field("_rev", Schema.STRING_SCHEMA)
.build();

Struct value = new Struct(s);
value.put("_rev", "foo2");

// when...
try {
value.validate();
} catch (DataException de) {
fail("Data invalid according to schema");
}

// do conversion
SinkRecord sr = new SinkRecord("test", 13, null, "0001", s, value, 0);
sr.headers().addInt(HEADER_DOC_ID_KEY, headerValue);
Map<String, Object> converted = mapper.apply(sr);

// then...
assertNull(converted.get("_id"));
assertEquals("foo2", converted.get("_rev"));
}

@Test
public void testConvertStringFails() {
Schema s = null; // no schema
Expand Down