Skip to content

Commit

Permalink
feat: Emit tombstones for deleted documents
Browse files Browse the repository at this point in the history
Update CHANGES and README.
  • Loading branch information
tomblench committed Aug 17, 2022
1 parent de506e6 commit fca5481
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# UNRELEASED
- [BREAKING CHANGE] Source connector now emits tombstone events for deleted documents. See [single message transforms](README.md#single-message-transforms) section in README for details.
- [BREAKING CHANGE] Publish releases to https://github.com/IBM/cloudant-kafka-connector/releases.
- [BREAKING CHANGE] Rename package from `com.ibm.cloudant.kafka` to `com.ibm.cloud.cloudant.kafka`. Existing `connector.class` property values must be updated to use the new package.
- [BREAKING CHANGE] Rename module from `kafka-connect-cloudant` to `cloudant-kafka-connector`.
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ Single Message Transforms, or SMTs, can be used to customize fields or values of
transforms.ReplaceField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.ReplaceField.exclude=_id
```
1. If you have events where there is no value ([_tombstone_ events](https://kafka.apache.org/documentation.html#compaction)), you may wish to filter these out.
- In the Cloudant sink connector, these may be undesirable as they will generate an empty document.
- In the Cloudant source connector, tombstone events are generated by default for deleted documents (in addition to the deleted document itself).
- In either case, you will need to use a `dropNullRecords` transform and predicate to filter out and remove these tombstone events:
1. If you have events where there is no value e.g. tombstones (and don't want the Cloudant sink connector to generate an empty doc with a generated ID) then
you'll need to use a `dropNullRecords` transform and predicate to filter out and remove these tombstone events:
```
transforms=dropNullRecords
transforms.dropNullRecords.type=org.apache.kafka.connect.transforms.Filter
Expand Down Expand Up @@ -169,6 +171,8 @@ For use with `vpc` authentication.
For use with `iam`, `container`, or `vpc` authentication.
### Cloudant as source
In addition to those properties related to authentication, the Cloudant source connector supports the following properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,21 @@ public List<SourceRecord> poll() throws InterruptedException {
SourceRecord sourceRecord = new SourceRecord(offset(url, db),
offsetValue(latestSequenceNumber),
topic, // topics
//Integer.valueOf(row_.getId())%3, // partition
Schema.STRING_SCHEMA, // key schema
id, // key
docSchema, // value schema
docValue); // value
records.add(sourceRecord);
if (doc.isDeleted() != null && doc.isDeleted()) {
SourceRecord tombstone = new SourceRecord(offset(url, db),
offsetValue(latestSequenceNumber),
topic, // topics
Schema.STRING_SCHEMA, // key schema
id, // key
null, // value schema
null); // value
records.add(tombstone);
}
}
}
}
Expand Down

0 comments on commit fca5481

Please sign in to comment.