Skip to content

Commit

Permalink
Documentation updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ebartkus committed Jul 2, 2019
1 parent 97726b7 commit 9c4daca
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 39 deletions.
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# kafka-connect-dynamodb

A [Kafka Connector](http://kafka.apache.org/documentation.html#connect) which implements a "source connector" for AWS DynamoDB table Streams. This source connector allows replicating DynamoDB tables into Kafka topics. Once data is in Kafka you can use various Kafka sink connectors to push this data to different destinations systems, e.g. - BigQuery for easy analytics.
A [Kafka Connector](http://kafka.apache.org/documentation.html#connect) which implements a "source connector" for AWS DynamoDB table Streams. This source connector allows replicating DynamoDB tables into Kafka topics. Once data is in Kafka you can use various Kafka sink connectors to push this data into different destinations systems, e.g. - BigQuery for easy analytics.

## Notable features
* `autodiscovery` - monitors and automatically discovers DynamoDB tables to start/stop syncing from (based on AWS TAG's)
Expand All @@ -12,11 +12,9 @@ A [Kafka Connector](http://kafka.apache.org/documentation.html#connect) which im

Prior our development we found only one existing implementation by [shikhar](https://github.com/shikhar/kafka-connect-dynamodb), but it seems to be missing major features (initial sync, handling shard changes) and is no longer supported.

Also it tries to manage DynamoDB Stream shards manually by using one Kafka Connect task to read from each DynamoDB Streams shard, but since DynamoDB Stream shards are dynamic compared to static ones in Kinesis streams this approach would require rebalancing all Kafka Connect cluster tasks far to often.

Contrary in our implementation we opted to use Amazon Kinesis Client with DynamoDB Streams Kinesis Adapter which takes care of all shard reading and tracking tasks.

Also, it tries to manage DynamoDB Stream shards manually by using one Kafka Connect task to read from each DynamoDB Streams shard. But, since DynamoDB Stream shards are dynamic contrary to static ones in "normal" Kinesis streams this approach would require rebalancing all Kafka Connect cluster tasks far to often.

In our implementation we opted to use Amazon Kinesis Client with DynamoDB Streams Kinesis Adapter which takes care of all shard reading and tracking tasks.

## Built with

Expand All @@ -28,8 +26,8 @@ Contrary in our implementation we opted to use Amazon Kinesis Client with Dynamo

## Documentation
* [Getting started](docs/getting-started.md)
* [In depth explanation](docs/details.md)
* [Connector option](docs/options.md)
* [In depth](docs/details.md)
* [Connector options](docs/options.md)
* [Produced Kafka messages](docs/data.md)

## Usage considerations, requirements and limitations
Expand All @@ -38,14 +36,14 @@ Contrary in our implementation we opted to use Amazon Kinesis Client with Dynamo

* Current implementation supports only one Kafka Connect task(= KCL worker) reading from one table at any given time.
* Due to this limitation we tested maximum throughput from one table to be **~2000 records(change events) per second**.
* This limitation is imposed by current connector logic and not by the KCL library or Kafka Connect framework. Running multiple tasks would require additional synchronization mechanisms for `INIT SYNC` state tracking and might be implemented incremental feature.
* This limitation is imposed by our connectors logic and not by the KCL library or Kafka Connect framework. We opted to skip this feature since running multiple tasks per table would require additional synchronization mechanisms for `INIT SYNC` state tracking. And this means higher complexity and longer development time. However this might be implemented in later versions.

* Running multiple tasks for different tables on the same JVM has negative impact on overall performance of both tasks.
* Running multiple KCL workers on the same JVM has negative impact on overall performance of all workers. (NOTE: one KCL worker is executed by each individual Connector task. And each task is responsible for one DynamoDB table.)
* This is because Amazon Kinesis Client library has some global locking happening.
* This issue has been solved in newer KCL versions, but reading from DynamoDB Streams requires usage of DynamoDB Streams Kinesis Adapter library. And this library still depends on older Amazon Kinesis Client 1.9.1.
* That being said you will only encounter this issue by running lots of tasks on one machine with really high load.
* However you will only encounter this issue by running lots of tasks on one machine with really high load.

* DynamoDB table unit capacity must be large enough to ensure `INIT_SYNC` to be finished in around 16 hours. Otherwise there is a risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours.
* Synced(Source) DynamoDB table unit capacity must be large enough to ensure `INIT_SYNC` to be finished in around 16 hours. Otherwise there is a risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours.

* Required AWS roles:
```json
Expand Down
6 changes: 3 additions & 3 deletions docs/data.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

# Topic messages structure

* `Topic key` - contains all defined DynamoDB table keys.
* `Topic value` - contains full DynamoDB document serialised as DynamoDB Json string together with additional metadata.
* `key` - contains all defined DynamoDB table keys.
* `value` - contains full DynamoDB document serialised as DynamoDB Json string together with additional metadata.

```json
[
Expand Down Expand Up @@ -49,7 +49,7 @@
Note that when connector detects delete event, it creates two event messages:
* a delete event message with `op` type `d` and empty `document` field.
* a tombstone message contains same key as the delete message, but the entire message value is null.
* Kafka’s log compaction utilizes this to know that it can remove any earlier messages with the same key.
* Kafka’s log compaction utilizes this to know that it can delete all messages for this key.

### Tombstone message sample
```json
Expand Down
31 changes: 16 additions & 15 deletions docs/details.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@

# Main states

### 1. Source tables discovery & Source tasks configuration
### 1. "DISCOVERY"

This connector can sync multiple DynamoDB tables at the same time and it does so without requiring explicit configuration for each one. On start and at regular time intervals (by default 60s) it queries AWS api for DynamoDB tables which match following criteria and starts Kafka Connect task for each of them:
* table must have configured ingestion TAG key set
* table mush have configured stack(environment) TAG key and value set
* table must have DynamoDB streams enabled (in `new_image` or `new_and_old_image` mode)
This connector can sync multiple DynamoDB tables at the same time and it does so without requiring explicit configuration for each one. On start and at regular time intervals (by default 60s) after, it queries AWS api for DynamoDB tables which match following criteria and starts Kafka Connect task for each of them:
* ingestion TAG key set
* stack(environment) TAG key and value set
* DynamoDB streams enabled (in `new_image` or `new_and_old_image` mode)


### 2. "INIT_SYNC"

`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (as data is stored only for 24 hours).
`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (DynamoDB streams store data for 24 hours only).

### 3. "SYNC"

Expand All @@ -34,26 +34,27 @@ Connector tracks it's state at all stages and is able to continue where it stopp

Since we are using two different frameworks/libraries together there are two different ways how each of them stores state:
* Kafka connect leverages dedicated `state` topics where connector tasks can push offsets(state) for each partition they are consuming. This connector has no support for source table "partitions" and only one task is allowed to consume one table at a time, therefore it uses table name as partition key and leverage `offsets` dictionary to store tasks state and progress of that state.
* KCL library uses separate dedicated DynamoDB table for each DynamoDB Stream it tracks to remember it's own progress. It is used only to track which messages have been consumed already. Since we can only say that message has been consumed once it's delivered to Kafka special synchronization logic is implemented in this connector.
* KCL library uses separate dedicated DynamoDB table for each DynamoDB Stream it tracks to remember it's own progress. Since we can only say that message has been consumed once it's delivered to Kafka special synchronization logic is implemented in this connector.

> NOTE: KCL library uses `state` table in DynamoDB for each stream it tracks! This table is created automatically if it doesn't exist.
> NOTE: KCL library uses `state` table in DynamoDB for each stream it tracks and this table is created **automatically** if it doesn't exist.
### `DISCOVERY` and task configuration
### `DISCOVERY` state and task configuration

Connector uses resource group api to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if stack TAG is set and streams are actually enabled. For each table which meats all requirements separate dedicated Kafka Connect task is started.
Connector uses AWS resource group API to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if stack TAG is matched and streams are actually enabled. Connect task is started for each table which meats all requirements.

Same `discovery` phase is executed on start and after every 60 seconds(default config value). Each started task can be in one of the following states.
`discovery` phase is executed on start and every 60 seconds(default config value) after initial start.

Each started task can be in one of the following states:

`INIT_SYNC` state
#### `INIT_SYNC` state

During `INIT_SYNC` phase all records from source table are scanned in batches. After each batch `EXCLUSIVE_START_KEY` is set as offset data with each record. In case of restart `INIT_SYNC` will continues from this start key. Once all records have been read `INIT_SYNC` is marked as finished in offsets and `SYNC` mode starts.
During `INIT_SYNC` phase all records from source table are scanned in batches. After that each batches `EXCLUSIVE_START_KEY` is set as offset data with each record. In case of restart `INIT_SYNC` will continues from this start key. Once all records have been read `INIT_SYNC` is marked as finished in offsets and `SYNC` mode starts.

> NOTE: On start `INIT_SYNC` is delayed by configurable amount of time (by default 60s). This is done to give connect cluster time to settle down after restart and helps to lower amount of duplicates because of connect task rebalances.
### `SYNC` state
#### `SYNC` state

After `INIT_SYNC` connector starts reading messages from DynamoDB Stream. First it makes sure to drop all events which happened before `INIT_SYNC` was started (except for those created during last hour before `INIT_SYNC`). This is done to prevent unnecessary duplicate events(since we already have latest state) and to advance KCL reader into `save zone`.
After `INIT_SYNC` connector starts reading messages from DynamoDB Stream. As first step it makes sure to drop all events which happened before `INIT_SYNC` was started (except for those created during last hour before `INIT_SYNC`). This is done to prevent unnecessary duplicate events(since we already have latest state) and to advance KCL reader into `save zone`.

Events are considered to be in `save zone` if they there create no earlier then -20 hours before `now`. Otherwise connector has no way to validate that it hasn't skipped some of the events and it has to initiate `INIT_SYNC`!

Expand Down
14 changes: 8 additions & 6 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Getting started

These instructions will get you running this connector on your own machine. Check official [Confluent documentation](https://docs.confluent.io/current/connect/userguide.html#installing-plugins) on how to install this plugin in a production cluster.
These instructions will get you up and running this connector on your own machine.
> Check official [Confluent documentation](https://docs.confluent.io/current/connect/userguide.html#installing-plugins) on how to deploy to production cluster.
### Prerequisites

Expand All @@ -13,8 +14,8 @@ These instructions will get you running this connector on your own machine. Chec

### Setting up DynamoDB table

* Connect into AWS console UI
* Create `test-dynamodb-connector` DynamoDB table (or use any other name you choose)
* Login to AWS console UI
* Create `test-dynamodb-connector` DynamoDB table (or use any other name you choose)
* Enable DynamoDB streams with mode `new image` or `new and old image`
* Set TAG's:
* `stack=dev`
Expand All @@ -25,20 +26,21 @@ These instructions will get you running this connector on your own machine. Chec

* First we need to perform some configuration changes to make connector package available to Kafka Connect:

* Store downloaded connector jar file to a location in your filesystem. For instance: `/opt/connectors/my-dynamodb-connector`
* Store downloaded connector jar file to a location in your filesystem. For instance: `/opt/connectors/kafka-dynamodb-connector`
* Edit file `${CONFLUENT_PLATFORM_HOME}/etc/schema-registry/connect-avro-distributed.properties` and set `plugin.path=/opt/connectors`

* Next start confluent and configure actual connector, by executing:
*
```bash
cd ${CONFLUENT_PLATFORM_HOME}/bin

# Starts all the required services including Kafka and Confluent Connect
./confluent start

# check if "com.trustpilot.connector.dynamodb.DynamoDBSourceConnector" has been loaded
# Check if "com.trustpilot.connector.dynamodb.DynamoDBSourceConnector" has been loaded
curl localhost:8083/connector-plugins | jq

# Once everything is started, configure connector
# Configure connector
curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"DynamoDBSourceConnector","tasks.max":"100","name":"myDynamodbConnector"}' localhost:8083/connectors/myDynamodbConnector/config

# Check connector status
Expand Down
8 changes: 4 additions & 4 deletions docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
```json

{
"connector.class": "com.trustpilot.connector.dynamodb.DynamoDBSourceConnector",
"connector.class": "DynamoDBSourceConnector",
"tasks.max": "100"
}
```
Expand All @@ -15,7 +15,7 @@
## All config options (with default values)
```json
{
"connector.class": "com.trustpilot.connector.dynamodb.DynamoDBSourceConnector",
"connector.class": "DynamoDBSourceConnector",

"aws.region": "eu-west-1",
"aws.access.key.id": "",
Expand All @@ -38,11 +38,11 @@

`dynamodb.table.ingestion.tag.key` - only tables marked with this tag key will be ingested.

`kafka.topic.prefix` - all topics create by this connector will have this prefix in their name. Following this pattern `{prefix}-{dynamodb-table-name}`
`kafka.topic.prefix` - all topics created by this connector will have this prefix in their name. Following this pattern `{prefix}-{dynamodb-table-name}`

`tasks.max` - **MUST** always exceed number of tables found for tracking. If max tasks count is lower then found tables count, no tasks will be started!

`init.sync.delay.period` - time value in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).
`init.sync.delay.period` - time interval in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).

`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured.

Expand Down

0 comments on commit 9c4daca

Please sign in to comment.