Skip to content

Commit

Permalink
[Feature][Connector-V2][Clickhouse] Add clickhouse connector time zon…
Browse files Browse the repository at this point in the history
…e key,default system time zone (#5078)

* Add clickhouse connector time zone key,default system time zone

* Modify the document and add clickhouse server_time_zone configuration
  • Loading branch information
gaopeng666 authored Jul 14, 2023
1 parent cf3d0bb commit 309b58d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 9 deletions.
22 changes: 14 additions & 8 deletions docs/en/connector-v2/source/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ supports query SQL and can achieve projection effect.

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| host | string | yes | - |
| database | string | yes | - |
| sql | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| common-options | | no | - |
| name | type | required | default value |
|------------------|--------|----------|------------------------|
| host | string | yes | - |
| database | string | yes | - |
| sql | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| server_time_zone | string | no | ZoneId.systemDefault() |
| common-options | | no | - |

### host [string]

Expand All @@ -49,6 +50,10 @@ The query sql used to search data though Clickhouse server

`ClickHouse` user password

### server_time_zone [string]

The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.

### common options

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details
Expand All @@ -64,6 +69,7 @@ source {
sql = "select * from test where age = 20 limit 100"
username = "default"
password = ""
server_time_zone = "UTC"
result_table_name = "test"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -75,6 +76,15 @@ public class ClickhouseConfig {
.noDefaultValue()
.withDescription("Clickhouse server password");

/** Clickhouse server timezone */
public static final Option<String> SERVER_TIME_ZONE =
Options.key("server_time_zone")
.stringType()
.defaultValue(ZoneId.systemDefault().getId())
.withDescription(
"The session time zone in database server."
+ "If not set, then ZoneId.systemDefault() is used to determine the server time zone");

/** Split mode when table is distributed engine */
public static final Option<Boolean> SPLIT_MODE =
Options.key("split_mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
Expand Down Expand Up @@ -101,6 +102,7 @@ public void prepare(Config config) throws PrepareFailException {
ImmutableMap.<String, Object>builder()
.put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
.put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
.put(SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue())
.build();

config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
Expand All @@ -111,13 +113,15 @@ public void prepare(Config config) throws PrepareFailException {
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
null,
null);
} else {
nodes =
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
Expand Down Expand Up @@ -114,6 +115,7 @@ public void prepare(Config config) throws PrepareFailException {
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
Expand All @@ -44,13 +45,16 @@
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseResponse;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;

Expand Down Expand Up @@ -86,10 +90,17 @@ public void prepare(Config config) throws PrepareFailException {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
Map<String, Object> defaultConfig =
ImmutableMap.<String, Object>builder()
.put(SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue())
.build();

config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
servers =
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
public class ClickhouseUtil {

public static List<ClickHouseNode> createNodes(
String nodeAddress, String database, String username, String password) {
String nodeAddress,
String database,
String serverTimeZone,
String username,
String password) {
return Arrays.stream(nodeAddress.split(","))
.map(
address -> {
Expand All @@ -42,12 +46,14 @@ public static List<ClickHouseNode> createNodes(
ClickHouseProtocol.HTTP,
Integer.parseInt(nodeAndPort[1]))
.database(database)
.timeZone(serverTimeZone)
.build();
}
return ClickHouseNode.builder()
.host(nodeAndPort[0])
.port(ClickHouseProtocol.HTTP, Integer.parseInt(nodeAndPort[1]))
.database(database)
.timeZone(serverTimeZone)
.credentials(
ClickHouseCredentials.fromUserAndPassword(
username, password))
Expand Down

0 comments on commit 309b58d

Please sign in to comment.