Skip to content

Commit

Permalink
Resolve conflict problems
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzy15 committed Jul 25, 2023
1 parent 9ce682b commit 639be27
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 105 deletions.
152 changes: 64 additions & 88 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
@@ -1,36 +1,52 @@
# Kafka

> Kafka sink connector
>
## Description
Write Rows to a Kafka topic.
## Support Those Engines

> Spark<br/>
> Flink<br/>
> Seatunnel Zeta<br/>
## Key features
## Key Features

- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)

> By default, we will use 2pc to guarantee the message is sent to kafka exactly once.
## Description

Write Rows to a Kafka topic.

By default, we will use 2pc to guarantee the message is sent to kafka exactly once.
## Supported DataSource Info

## Options
In order to use the Kafka connector, the following dependencies are required.
They can be downloaded via install-plugin.sh or from the Maven central repository.

| name | type | required | default value |
|----------------------|--------|----------|---------------|
| topic | string | yes | - |
| bootstrap.servers | string | yes | - |
| kafka.config | map | no | - |
| semantics | string | no | NON |
| partition_key_fields | array | no | - |
| partition | int | no | - |
| assign_partitions | array | no | - |
| transaction_prefix | string | no | - |
| format | String | no | json |
| field_delimiter | String | no | , |
| common-options | config | no | - |
| Datasource | Supported Versions | Maven |
|------------|--------------------|-------------------------------------------------------------------------------------------------------------|
| Kafka | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka) |

### topic [string]
## Sink Options

Kafka Topic.
| Name | Type | Required | Default | Description |
|----------------------|--------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. |
| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). |
| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. |
| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. |
| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. |
| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. |
| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. |
| format | String | No | json | Data format. The default format is json. Optional text format. The default field separator is ",",If you customize the delimiter, add the "field_delimiter" option. |
| field_delimiter | String | No | , | Customize the field delimiter for data format. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |

## Parameter Interpretation

### Topic Formats

Currently two formats are supported:

Expand All @@ -47,27 +63,13 @@ Currently two formats are supported:

If `${name}` is set as the topic. So the first row is sent to Jack topic, and the second row is sent to Mary topic.

### bootstrap.servers [string]

Kafka Brokers List.

### kafka.config [kafka producer config]

In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs).

### semantics [string]

Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
### Semantics

In EXACTLY_ONCE, producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint.

In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint.

NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.

### partition_key_fields [array]

Configure which fields are used as the key of the kafka message.
### Partition Key Fields

For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.

Expand All @@ -79,55 +81,48 @@ Upstream data is the following:
| Mary | 23 | data-example2 |

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

If not set partition key fields, the null message key will be sent to.

The format of the message key is json, If name is set as the key, for example '{"name":"Jack"}'.

The selected field must be an existing field in the upstream.

### partition [int]

We can specify the partition, all messages will be sent to this partition.

### assign_partitions [array]

We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information.
### Assign Partitions

For example, there are five partitions in total, and the assign_partitions field in config is as follows:
assign_partitions = ["shoe", "clothing"]

Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscribed as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions.

This function by `MessageContentPartitioner` class implements `org.apache.kafka.clients.producer.Partitioner` interface.If we need custom partitions, we need to implement this interface as well.

### transaction_prefix [string]

If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction.
Kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix.

### format

Data format. The default format is json. Optional text format, canal-json and debezium-json.
If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.
If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.
If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details.
## Task Example

### field_delimiter
### Simple:

Customize the field delimiter for data format.
> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Kafka Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target topic is test_topic will also be 16 rows of data in the topic. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job.
### common options [config]

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
```hocon
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
}
## Examples
source {
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
```hocon
sink {
kafka {
topic = "seatunnel"
topic = "test_topic"
bootstrap.servers = "localhost:9092"
partition = 3
format = json
Expand All @@ -139,7 +134,6 @@ sink {
buffer.memory = 33554432
}
}
}
```

Expand All @@ -162,7 +156,6 @@ sink {
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};"
}
}
}
```

Expand Down Expand Up @@ -199,22 +192,5 @@ sink {
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
}
}
```

## Changelog

### 2.3.0-beta 2022-10-20

- Add Kafka Sink Connector

### next version

- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/incubator-seatunnel/pull/3711)
- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/incubator-seatunnel/pull/3742)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950)
- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981)

19 changes: 2 additions & 17 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor

## Source Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. |
| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
Expand Down Expand Up @@ -158,19 +158,4 @@ source {
}
}
}
```

## Changelog

### 2.3.0-beta 2022-10-20

- Add Kafka Source Connector

### Next Version

- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/seatunnel/pull/3157))
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/seatunnel/pull/3125))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719)
- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810))
- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/seatunnel/pull/4364))

```

0 comments on commit 639be27

Please sign in to comment.