Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Param source timezone (review) #374

Closed
wants to merge 10 commits into from
Prev Previous commit
Next Next commit
Fixed some remarks
  • Loading branch information
IlyaTsoi committed Nov 11, 2023
commit 7de7462513ebdf4148ca6c6e6dbde96daaff3fec
Original file line number Diff line number Diff line change
@@ -263,6 +263,17 @@ static ConfigDef newConfigDef() {
1,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.REPLACING_MERGE_TREE_DELETE_COLUMN.toString())
.define(
ClickHouseSinkConnectorConfigVariables.SOURCE_DATETIME_TIMEZONE.toString(),
Type.STRING,
"UTC",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure it is a good idea. leave it null by default and initialize with source timezone from Debezium.
Add to the default config.yaml (commented)

Importance.LOW,
"Config variable to define source timezone " +
"All incoming DateTime* values will be recalculated to UTC considering specified timezone",
CONFIG_GROUP_CONNECTOR_CONFIG,
1,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.SOURCE_DATETIME_TIMEZONE.toString())
.define(
ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET.toString(),
Type.BOOLEAN,
Original file line number Diff line number Diff line change
@@ -53,6 +53,8 @@ public enum ClickHouseSinkConnectorConfigVariables {
//Config variable for auto creating tables if they dont exist.
AUTO_CREATE_TABLES("auto.create.tables"),

//Config variable to define source timezone.
SOURCE_DATETIME_TIMEZONE("source.datetime.timezone"),
// Config variable when set to true, columns will be added.
ENABLE_SCHEMA_EVOLUTION("schema.evolution"),

Original file line number Diff line number Diff line change
@@ -108,6 +108,7 @@ public class ClickHouseDataTypeMapper {
*/
public static boolean convert(Schema.Type type, String schemaName,
Object value,
String sourceTimeZone,
int index,
PreparedStatement ps) throws SQLException {

@@ -154,7 +155,7 @@ public static boolean convert(Schema.Type type, String schemaName,
if (type == Schema.Type.STRING) {
if (schemaName != null && schemaName.equalsIgnoreCase(ZonedTimestamp.SCHEMA_NAME)) {
// MySQL(Timestamp) -> String, name(ZonedTimestamp) -> Clickhouse(DateTime)
ps.setString(index, DebeziumConverter.ZonedTimestampConverter.convert(value));
ps.setString(index, DebeziumConverter.ZonedTimestampConverter.convert(value, sourceTimeZone));

} else if(schemaName != null && schemaName.equalsIgnoreCase(Json.LOGICAL_NAME)) {
// if the column is JSON, it should be written, String otherwise
@@ -186,14 +187,14 @@ public static boolean convert(Schema.Type type, String schemaName,
if (isFieldDateTime) {
if (schemaName != null && schemaName.equalsIgnoreCase(MicroTimestamp.SCHEMA_NAME)) {
// Handle microtimestamp first
ps.setTimestamp(index, DebeziumConverter.MicroTimestampConverter.convert(value));
ps.setString(index, DebeziumConverter.MicroTimestampConverter.convert(value));
}
else if (value instanceof Long) {
boolean isColumnDateTime64 = false;
if(schemaName.equalsIgnoreCase(Timestamp.SCHEMA_NAME) && type == Schema.INT64_SCHEMA.type()){
isColumnDateTime64 = true;
}
ps.setLong(index, DebeziumConverter.TimestampConverter.convert(value, isColumnDateTime64));
ps.setString(index, DebeziumConverter.TimestampConverter.convert(value, isColumnDateTime64));
}
} else if (isFieldTime) {
ps.setString(index, DebeziumConverter.MicroTimeConverter.convert(value));
Original file line number Diff line number Diff line change
@@ -46,16 +46,15 @@ public static String convert(Object value) {
public static class MicroTimestampConverter {

//ToDO: IF values exceed the ones supported by clickhouse
public static Timestamp convert(Object value) {
public static String convert(Object value) {
Long microTimestamp = (Long) value;

//Long milliTimestamp = microTimestamp / MICROS_IN_MILLI;
//Instant receivedDT = Instant.ofEpochMilli(microTimestamp/MICROS_IN_MILLI).plusNanos(microTimestamp%1_000);
//Instant receivedDT = Instant.ofEpochMilli(microTimestamp/MICROS_IN_MILLI).pl
Instant receivedDT = Instant.EPOCH.plus(microTimestamp, ChronoUnit.MICROS);
Instant modifiedDT = checkIfDateTimeExceedsSupportedRange(receivedDT, true);

return Timestamp.from(modifiedDT);
return Timestamp.from(modifiedDT).toString();
}
}

@@ -68,12 +67,14 @@ public static class TimestampConverter {
* @param value
* @return
*/
public static Long convert(Object value, boolean isDateTime64) {
public static String convert(Object value, boolean isDateTime64) {
DateTimeFormatter destFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
Instant providedDT = Instant.ofEpochMilli((long) value);

Instant modifiedDT = checkIfDateTimeExceedsSupportedRange(providedDT, isDateTime64);

return modifiedDT.toEpochMilli();
//return modifiedDT.toEpochMilli();
return ZonedDateTime.ofInstant(modifiedDT, ZoneId.of("UTC")).format(destFormatter);
}


@@ -134,7 +135,7 @@ public static class ZonedTimestampConverter {
* @param value
* @return
*/
public static String convert(Object value) {
public static String convert(Object value, String timeZone) {

// TemporalAccessor parsedTime = ZonedTimestamp.FORMATTER.parse((String) value);
// DateTimeFormatter bqZonedTimestampFormat =
@@ -196,7 +197,7 @@ public static String convert(Object value) {
}
//check date range
i = checkIfDateTimeExceedsSupportedRange(i, true);
result = ZonedDateTime.ofInstant(i, ZoneId.of("UTC")).format(destFormatter);
result = ZonedDateTime.ofInstant(i, ZoneId.of(timeZone)).format(destFormatter);
} else {
log.error("Error parsing zonedtimestamp " + (String) value);
}
Original file line number Diff line number Diff line change
@@ -6,11 +6,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.StringUtils;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.regex.Matcher;

public class BaseDbWriter {

@@ -75,7 +78,7 @@ protected void createConnection(String url, String clientName, String userName,
* Function that uses the DatabaseMetaData JDBC functionality
* to get the column name and column data type as key/value pair.
*/
public Map<String, String> getColumnsDataTypesForTable(String tableName) {
public Map<String, String> getColumnsDataTypesForTable(String tableName, String sourceTimeZone) {

LinkedHashMap<String, String> result = new LinkedHashMap<>();
try {
@@ -89,6 +92,9 @@ public Map<String, String> getColumnsDataTypesForTable(String tableName) {
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
String typeName = columns.getString("TYPE_NAME");
if (typeName.contains("DateTime")) {
typeName = addTimeZoneToColumnDefinition(typeName, sourceTimeZone);
}

// Object dataType = columns.getString("DATA_TYPE");
// String columnSize = columns.getString("COLUMN_SIZE");
@@ -141,6 +147,29 @@ public ResultSet executeQueryWithResultSet(String sql) throws SQLException {

}

public String addTimeZoneToColumnDefinition(String typeName, String timeZone) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure it is a good idea to use a regexp here as the target DateTime64 may already have a TZ !

example DateTime64(6,'UTC') becomes
DateTime64(6,'America/Chicago')

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it works if you use a String instead, CH will do the conversion

Exception I got with the maximum boundaries

`db_to` DateTime64(6,\'America/Chicago\'),`_version` UInt64,`is_deleted` UInt8')
118741 2023-11-13 02:53:56.165 [pool-1-thread-9] ERROR com.altinity.clickhouse.sink.connector.db.DbWriter  - ******* ERROR inserting Batch *****************
java.lang.IllegalArgumentException: DateTime(9904571999) should be between -1420070400 and 9904550399 inclusive of both values
	at com.clickhouse.data.ClickHouseChecker.newException(ClickHouseChecker.java:17)
	at com.clickhouse.data.ClickHouseChecker.between(ClickHouseChecker.java:109)
	at com.clickhouse.data.format.BinaryDataProcessor$DateTime64SerDe.serialize(BinaryDataProcessor.java:292)
	at com.clickhouse.data.ClickHouseDataProcessor.write(ClickHouseDataProcessor.java:534)
	at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.addBatch(InputBasedPreparedStatement.java:345)
	at com.altinity.clickhouse.sink.connector.db.DbWriter.addToPreparedStatementBatch(DbWriter.java:479)
	at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.flushRecordsToClickHouse(ClickHouseBatchRunnable.java:199)
	at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.processRecordsByTopic(ClickHouseBatchRunnable.java:169)
	at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.run(ClickHouseBatchRunnable.java:101)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite logical. We can take into account source.timezone for checkIfDateExceedsSupportedRange() methods.

Copy link
Collaborator Author

@aadant aadant Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that works. @subkanthi FYI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure it is a good idea to use a regexp here as the target DateTime64 may already have a TZ !

example DateTime64(6,'UTC') becomes DateTime64(6,'America/Chicago')

Could you explain, please? I don't see any problem here (except possible slowness)). I specifically override it within input() function to make Clickhouse use defined source.timezone to parse incoming data instead of column metadata.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the wrong place to change the schema. The sink-connector should create the schema with the correct timezone (target timezone).

Copy link
Collaborator Author

@aadant aadant Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the sink-connector, there are 2 places we fix the schema

  • for the initial snapshot via custom tools like https://github.com/Altinity/clickhouse-sink-connector/pull/375/files
  • via debezium, the sink-connector needs to add this clickhouse.datetime.timezone parameter. In your case, it would be the same as the source. default should be not specified and defaults to database.connectionTimeZone (debezium)
    source.datetime.timezone should also default to database.connectionTimeZone

what do you think ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in terms of timing, let us wait for @subkanthi to merge all pending PRs to develop, we will then address this one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@subkanthi let us implement this too #349

Pattern pattern = Pattern.compile("DateTime(64)?(\\([^\\)]*\\))?");
Matcher matcher = pattern.matcher(typeName);
StringBuffer result = new StringBuffer("");
int cursor = 0;
while (matcher.find()) {
int start = matcher.start();
int end = matcher.end();
result.append(typeName.substring(cursor, start));
cursor = end;
String occurrence = typeName.substring(start, end);
if (occurrence.contains("DateTime64")) {
String[] params = StringUtils.substringBetween(occurrence, "(", ")").split(",");
String precision = params[0].trim();
result.append(String.format("DateTime64(%s,\\'%s\\')", precision, timeZone));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

escape clashing with another escape

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in #366

String dataType = ClickHouseUtils.escape(columnNameToDataTypeMap.get(entry.getKey()), '\'');

} else {
result.append(String.format("DateTime(\\'%s\\')", timeZone));
}
}
result.append(typeName.substring(cursor));
return result.toString();
}

/**
* Function to get the clickhouse version.
* @return version as string.
Original file line number Diff line number Diff line change
@@ -39,6 +39,10 @@ public DbKafkaOffsetWriter(

}

public Map<String, String> getColumnsDataTypesForTable(String tableName) {
return super.getColumnsDataTypesForTable(tableName, "UTC");
}

/**
* Function to create kafka offset table.
*/
Original file line number Diff line number Diff line change
@@ -60,6 +60,9 @@ public class DbWriter extends BaseDbWriter {
// Delete column for ReplacingMergeTree
private String replacingMergeTreeDeleteColumn = null;

// Source dateTime timezone
private String sourceTimeZone = null;

/**
* IMPORTANT: The logic to identify the new replacing mergetree
* table which lets you specify the is_deleted column in
@@ -84,11 +87,12 @@ public DbWriter(
this.tableName = tableName;

this.config = config;
this.sourceTimeZone = this.config.getString(ClickHouseSinkConnectorConfigVariables.SOURCE_DATETIME_TIMEZONE.toString());

try {
if (this.conn != null) {
// Order of the column names and the data type has to match.
this.columnNameToDataTypeMap = this.getColumnsDataTypesForTable(tableName);
this.columnNameToDataTypeMap = this.getColumnsDataTypesForTable(tableName, this.sourceTimeZone);
}

DBMetadata metadata = new DBMetadata();
@@ -118,7 +122,7 @@ public DbWriter(
}

act.createNewTable(record.getPrimaryKey(), tableName, fields, this.conn);
this.columnNameToDataTypeMap = this.getColumnsDataTypesForTable(tableName);
this.columnNameToDataTypeMap = this.getColumnsDataTypesForTable(tableName, this.sourceTimeZone);
response = metadata.getTableEngine(this.conn, database, tableName);
this.engine = response.getLeft();
} catch (Exception e) {
@@ -291,7 +295,7 @@ public Map<TopicPartition, Long> groupQueryWithRecords(ConcurrentLinkedQueue<Cli
if(enableSchemaEvolution) {
try {
alterTable(record.getAfterStruct().schema().fields());
this.columnNameToDataTypeMap = this.getColumnsDataTypesForTable(tableName);
this.columnNameToDataTypeMap = this.getColumnsDataTypesForTable(tableName, this.sourceTimeZone);

} catch(Exception e) {
log.error("**** ERROR ALTER TABLE: " + tableName, e);
@@ -600,7 +604,7 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
if(type == Schema.Type.ARRAY) {
schemaName = f.schema().valueSchema().type().name();
}
if(false == ClickHouseDataTypeMapper.convert(type, schemaName, value, index, ps)) {
if(false == ClickHouseDataTypeMapper.convert(type, schemaName, value, sourceTimeZone, index, ps)) {
log.error(String.format("**** DATA TYPE NOT HANDLED type(%s), name(%s), column name(%s)", type.toString(),
schemaName, colName));
}
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ public void convert() throws SQLException {
ps.setString(index++, "Test");
ps.setDouble(index++, 0d);
ps.setDouble(index++, 0d);
ClickHouseDataTypeMapper.convert(Schema.FLOAT32_SCHEMA.type(), null, maxDoubleTest, index++, ps);
ClickHouseDataTypeMapper.convert(Schema.FLOAT32_SCHEMA.type(), null, maxDoubleTest, "UTC", index++, ps);
ps.setDouble(index, 1d);
ps.setInt(index++,1);
ps.setInt(index++, 12);
Original file line number Diff line number Diff line change
@@ -42,38 +42,34 @@ public void testMicroTimestampConverter() {
//Assert.assertTrue(resultWMilliSeconds == 1665076675L);


Timestamp result = DebeziumConverter.MicroTimestampConverter.convert(1664416228000000L);
System.out.println("");

Timestamp result2 = DebeziumConverter.MicroTimestampConverter.convert(253402300799999990L);
System.out.println("");
String result = DebeziumConverter.MicroTimestampConverter.convert(1664416228001000L);
Assert.assertTrue(result.toString().equalsIgnoreCase("2022-09-29 04:50:28.001"));
String result2 = DebeziumConverter.MicroTimestampConverter.convert(253402300799999990L);
Assert.assertTrue(result2.toString().equalsIgnoreCase("2283-11-12 02:59:59.999999999"));
}

@Test
public void testTimestampConverter() {

Object timestampEpoch = 1640995260000L;
Object timestampEpoch = 1640995260123L;
String formattedTimestamp = String.valueOf(DebeziumConverter.TimestampConverter.convert(timestampEpoch, false));

Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("1640995260000"));
Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("2022-01-01 00:01:00.123"));
}

@Test
public void testTimestampConverterMinRange() {

Object timestampEpoch = -2166681362000L;
String formattedTimestamp = String.valueOf(DebeziumConverter.TimestampConverter.convert(timestampEpoch, false));

Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("-1420070400000"));
Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("1925-01-01 00:00:00.000"));
}

@Test
public void testTimestampConverterMaxRange() {

Object timestampEpoch = 4807440238000L;
String formattedTimestamp = String.valueOf(DebeziumConverter.TimestampConverter.convert(timestampEpoch, false));

Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("4807440238000"));
Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("2122-05-05 16:03:58.000"));
}

@Test
@@ -118,23 +114,23 @@ public void testDateConverterWithinRange() {
@Test
public void testZonedTimestampConverter() {

String formattedTimestamp = DebeziumConverter.ZonedTimestampConverter.convert("2021-12-31T19:01:00Z");
String formattedTimestamp = DebeziumConverter.ZonedTimestampConverter.convert("2021-12-31T19:01:00Z","UTC");
Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("2021-12-31 19:01:00.000000"));

String formattedTimestampWMicroSeconds = DebeziumConverter.ZonedTimestampConverter.convert("2038-01-19T03:14:07.999999Z");
Assert.assertTrue(formattedTimestampWMicroSeconds.equalsIgnoreCase("2038-01-19 03:14:07.999999"));
String formattedTimestampWMicroSeconds = DebeziumConverter.ZonedTimestampConverter.convert("2038-01-19T03:14:07.999999Z","Europe/Moscow");
Assert.assertTrue(formattedTimestampWMicroSeconds.equalsIgnoreCase("2038-01-19 06:14:07.999999"));

String formattedTimestamp3 = DebeziumConverter.ZonedTimestampConverter.convert("2038-01-19T03:14:07.99Z");
Assert.assertTrue(formattedTimestamp3.equalsIgnoreCase("2038-01-19 03:14:07.990000"));
String formattedTimestamp3 = DebeziumConverter.ZonedTimestampConverter.convert("2038-01-19T03:14:07.99Z","Canada/Yukon");
Assert.assertTrue(formattedTimestamp3.equalsIgnoreCase("2038-01-18 20:14:07.990000"));

String formattedTimestamp4 = DebeziumConverter.ZonedTimestampConverter.convert("0000-00-01T00:00:00");
Assert.assertTrue(formattedTimestamp4.equalsIgnoreCase("1925-01-01 00:00:00.000000"));
String formattedTimestamp4 = DebeziumConverter.ZonedTimestampConverter.convert("0000-00-01T00:00:00","Brazil/West");
Assert.assertTrue(formattedTimestamp4.equalsIgnoreCase("1924-12-31 20:00:00.000000"));

String formattedTimestamp5 = DebeziumConverter.ZonedTimestampConverter.convert("2023-10-20T22:44:15.123456+03:00");
String formattedTimestamp5 = DebeziumConverter.ZonedTimestampConverter.convert("2023-10-20T22:44:15.123456+03:00","UTC");
Assert.assertTrue(formattedTimestamp5.equalsIgnoreCase("2023-10-20 19:44:15.123456"));

String formattedTimestamp6 = DebeziumConverter.ZonedTimestampConverter.convert("2022-12-25T04:00:00.7654321-0300");
Assert.assertTrue(formattedTimestamp6.equalsIgnoreCase("2022-12-25 07:00:00.765432"));
String formattedTimestamp6 = DebeziumConverter.ZonedTimestampConverter.convert("2022-12-25T04:00:00.7654321-0300","Atlantic/Bermuda");
Assert.assertTrue(formattedTimestamp6.equalsIgnoreCase("2022-12-25 03:00:00.765432"));
}

@Test
@@ -157,7 +153,7 @@ public void testBatchArrays() {
Properties properties = new Properties();
properties.setProperty("client_name", "Test_1");

ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(new HashMap<>());
DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null);
String url = dbWriter.getConnectionString(hostName, port, database);

Loading