Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] [Connector-V2] Change Connector Custom Config Prefix To Map #3719

Merged
merged 8 commits into from
Jan 3, 2023
15 changes: 8 additions & 7 deletions docs/en/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Write data to Clickhouse can also be done using JDBC
| username | string | yes | - |
| password | string | yes | - |
| fields | string | yes | - |
| clickhouse.* | string | no | |
| clickhouse.config | map | no | |
| bulk_size | string | no | 20000 |
| split_mode | string | no | false |
| sharding_key | string | no | - |
Expand Down Expand Up @@ -64,12 +64,10 @@ The table name

The data field that needs to be output to `ClickHouse` , if not configured, it will be automatically adapted according to the sink table `schema` .

### clickhouse [string]
### clickhouse.config [map]

In addition to the above mandatory parameters that must be specified by `clickhouse-jdbc` , users can also specify multiple optional parameters, which cover all the [parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration) provided by `clickhouse-jdbc` .

The way to specify the parameter is to add the prefix `clickhouse.` to the original parameter name. For example, the way to specify `socket_timeout` is: `clickhouse.socket_timeout = 50000` . If these non-essential parameters are not specified, they will use the default values given by `clickhouse-jdbc`.

### bulk_size [number]

The number of rows written through [Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the `default is 20000` .
Expand Down Expand Up @@ -114,6 +112,10 @@ sink {
table = "fake_all"
username = "default"
password = ""
clickhouse.confg = {
max_rows_to_read = "100"
read_overflow_mode = "throw"
}
}
}
```
Expand Down Expand Up @@ -185,7 +187,6 @@ sink {
### next version

- [Improve] Clickhouse Sink support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))

- [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))

- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
44 changes: 25 additions & 19 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on

## Options

| name | type | required | default value |
|----------------------|-----------------------| -------- | ------------- |
| topic | string | yes | - |
| bootstrap.servers | string | yes | - |
| kafka.* | kafka producer config | no | - |
| semantic | string | no | NON |
| partition_key_fields | array | no | - |
| partition | int | no | - |
| assign_partitions | array | no | - |
| transaction_prefix | string | no | - |
| format | String | no | json |
| field_delimiter | String | no | , |
| common-options | config | no | - |
| name | type | required | default value |
|----------------------|--------|----------|---------------|
| topic | string | yes | - |
| bootstrap.servers | string | yes | - |
| kafka.config | map | no | - |
| semantic | string | no | NON |
| partition_key_fields | array | no | - |
| partition | int | no | - |
| assign_partitions | array | no | - |
| transaction_prefix | string | no | - |
| format | String | no | json |
| field_delimiter | String | no | , |
| common-options | config | no | - |

### topic [string]

Expand Down Expand Up @@ -61,10 +61,10 @@ For example, if you want to use value of fields from upstream data as key, you c

Upstream data is the following:

| name | age | data |
| ---- | ---- | ------------- |
| Jack | 16 | data-example1 |
| Mary | 23 | data-example2 |
| name | age | data |
|------|-----|---------------|
| Jack | 16 | data-example1 |
| Mary | 23 | data-example2 |

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

Expand All @@ -81,7 +81,7 @@ We can decide which partition to send based on the content of the message. The f
For example, there are five partitions in total, and the assign_partitions field in config is as follows:
assign_partitions = ["shoe", "clothing"]

Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscripted as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions.
Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscribed as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions.

This function by `MessageContentPartitioner` class implements `org.apache.kafka.clients.producer.Partitioner` interface.If we need custom partitions, we need to implement this interface as well.

Expand Down Expand Up @@ -115,6 +115,11 @@ sink {
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
}

}
Expand Down Expand Up @@ -186,4 +191,5 @@ sink {
### next version

- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/incubator-seatunnel/pull/3711)
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/incubator-seatunnel/pull/3711)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
38 changes: 24 additions & 14 deletions docs/en/connector-v2/sink/Rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@ Used to write data to Rabbitmq.

## Options

| name | type | required | default value |
|-----------------------------|---------|-----------|---------------|
| host | string | yes | - |
| port | int | yes | - |
| virtual_host | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| queue_name | string | yes | - |
| url | string | no | - |
| network_recovery_interval | int | no | - |
| topology_recovery_enabled | boolean | no | - |
| automatic_recovery_enabled | boolean | no | - |
| connection_timeout | int | no | - |
| common-options | | no | - |
| name | type | required | default value |
|----------------------------|---------|-----------|---------------|
| host | string | yes | - |
| port | int | yes | - |
| virtual_host | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| queue_name | string | yes | - |
| url | string | no | - |
| network_recovery_interval | int | no | - |
| topology_recovery_enabled | boolean | no | - |
| automatic_recovery_enabled | boolean | no | - |
| connection_timeout | int | no | - |
| rabbitmq.config | map | no | - |
| common-options | | no | - |

### host [string]

Expand Down Expand Up @@ -78,6 +79,10 @@ if true, enables connection recovery

connection TCP establishment timeout in milliseconds; zero for infinite

### rabbitmq.config [map]

In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, covering [all the parameters specified in the official RabbitMQ document](https://www.rabbitmq.com/configure.html).

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
Expand All @@ -95,6 +100,10 @@ sink {
username = "guest"
password = "guest"
queue_name = "test1"
rabbitmq.config = {
requested-heartbeat = 10
connection-timeout = 10
}
}
}
```
Expand All @@ -104,3 +113,4 @@ sink {
### next version

- Add Rabbitmq Sink Connector
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
83 changes: 43 additions & 40 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@ The internal implementation of StarRocks sink connector is cached and imported b

## Options

| name | type | required | default value |
|-----------------------------|------------------------------|----------|-----------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| sink.properties.* | starrocks stream load config | no | - |
| name | type | required | default value |
|-----------------------------|--------|----------|-----------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| starrocks.config | map | no | - |

### node_urls [list]

Expand Down Expand Up @@ -76,11 +76,9 @@ Using as a multiplier for generating the next delay for backoff

The amount of time to wait before attempting to retry a request to `StarRocks`

### sink.properties.* [starrocks stream load config]
### starrocks.config [map]

The parameter of the stream load `data_desc`
The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name
For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array`

#### Supported import data formats

Expand All @@ -90,42 +88,47 @@ The supported formats include CSV and JSON. Default value: CSV

Use JSON format to import data

```
```hocon
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
sink.properties.format = "JSON"
sink.properties.strip_outer_array = true
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
}
}

```

Use CSV format to import data

```
```hocon
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
sink.properties.format = "CSV"
sink.properties.column_separator = "\x01"
sink.properties.row_delimiter = "\x02"
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}
```

## Changelog

### next version

- Add StarRocks Sink Connector
- Add StarRocks Sink Connector
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
21 changes: 12 additions & 9 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ Source connector for Apache Kafka.
## Options

| name | type | required | default value |
|-------------------------------------|---------| -------- |--------------------------|
|-------------------------------------|---------|----------|--------------------------|
| topic | String | yes | - |
| bootstrap.servers | String | yes | - |
| pattern | Boolean | no | false |
| consumer.group | String | no | SeaTunnel-Consumer-Group |
| commit_on_checkpoint | Boolean | no | true |
| kafka.* | String | no | - |
| kafka.config | Map | no | - |
| common-options | config | no | - |
| schema | | no | - |
| format | String | no | json |
Expand Down Expand Up @@ -58,12 +58,10 @@ If true the consumer's offset will be periodically committed in the background.

The interval for dynamically discovering topics and partitions.

### kafka.* [string]
### kafka.config [map]

In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs).

The way to specify parameters is to add the prefix `kafka.` to the original parameter name. For example, the way to specify `auto.offset.reset` is: `kafka.auto.offset.reset = latest` . If these non-essential parameters are not specified, they will use the default values given in the official Kafka documentation.

### common-options [config]

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
Expand All @@ -88,7 +86,7 @@ The initial consumption pattern of consumers,there are several types:

## start_mode.timestamp

The time required for consumption mode to be timestamp.
The time required for consumption mode to be "timestamp".

## start_mode.offsets

Expand Down Expand Up @@ -120,11 +118,15 @@ source {
}
}
format = text
field_delimiter = "#
field_delimiter = "#"
topic = "topic_1,topic_2,topic_3"
bootstrap.servers = "localhost:9092"
kafka.max.poll.records = 500
kafka.client.id = client_1
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
}

}
Expand Down Expand Up @@ -210,3 +212,4 @@ source {

- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
Loading