Skip to content

Commit

Permalink
Merge pull request #110 from Altinity/refactor_insert_batch
Browse files Browse the repository at this point in the history
Refactor insert batch
  • Loading branch information
subkanthi authored Oct 8, 2022
2 parents ae1d6dc + 19d621f commit e7a9a99
Show file tree
Hide file tree
Showing 23 changed files with 455 additions and 150 deletions.
10 changes: 10 additions & 0 deletions deploy/configure_datatypes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DATABASE=datatypes
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"

docker cp ../tests/data_types.sql mysql-master:/tmp
docker exec -it mysql-master mysql -uroot -proot -e "DROP DATABASE IF EXISTS $DATABASE;CREATE DATABASE $DATABASE;"

docker exec -it mysql-master mysql -uroot -proot -e "use $DATABASE;source /tmp/data_types.sql;"

./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
2 changes: 1 addition & 1 deletion deploy/debezium-connector-setup-database.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ else
"tasks.max": "1",
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.delay.ms": 10000,
"snapshot.delay.ms": 1,
"include.schema.changes":"true",
"include.schema.comments": "true",
"database.hostname": "${MYSQL_HOST}",
Expand Down
4 changes: 3 additions & 1 deletion deploy/sink-connector-setup-database.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ else
"auto.create.tables": true,
"schema.evolution": false,
"deduplication.policy": "off"
"deduplication.policy": "off",
"metadata.max.age.ms" : 10000
}
}
EOF
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
<url>https://github.com/altinity/clickhouse-kafka-sink-connector</url>
</scm>

<!-- Set our Language Level to Java 8 -->
<!-- Set our Language Level to Java 9 -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.9</maven.compiler.source>
<maven.compiler.target>1.9</maven.compiler.target>
</properties>


Expand Down
4 changes: 2 additions & 2 deletions python/db_compare/mysql_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ def get_table_checksum_query(table, conn):

if 'datetime' in data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('2299-12-31 23:59:59.99999999', 1, length({column_name})) then CAST(substr('2299-12-31 23:59:59.99999999', 1, length({column_name})) AS {data_type}) else case when {column_name} <= '1900-01-01 00:00:00' then CAST('1900-01-01 00:00:00' AS {data_type}) else {column_name} end end"
select += f"case when {column_name} > substr('2283-11-11 23:59:59.99999999', 1, length({column_name})) then CAST(substr('2283-11-11 23:59:59.99999999', 1, length({column_name})) AS {data_type}) else case when {column_name} <= '1925-01-01 00:00:00' then CAST('1925-01-01 00:00:00' AS {data_type}) else {column_name} end end"
#elif 'datetime' in data_type:
# # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime/
# select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1970-01-01' then CAST('1925-01-01 00:00:00' AS {data_type}) else {column_name} end end"*/
else:
if 'date' == data_type:
# CH date range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/date
select += f"case when {column_name} >='2299-12-31' then CAST('2299-12-31' AS {data_type}) else case when {column_name} <= '1900-01-01' then CAST('1900-01-01' AS {data_type}) else {column_name} end end"
select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1925-01-01' then CAST('1925-01-01' AS {data_type}) else {column_name} end end"
else:
if is_binary_datatype(data_type):
select += "lower(hex(cast("+column_name+"as binary)))"
Expand Down
3 changes: 0 additions & 3 deletions python/requirements.txt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public class ClickHouseDataTypeMapper {
dataTypesMap.put(new MutablePair(Schema.INT64_SCHEMA.type(), MicroTime.SCHEMA_NAME), ClickHouseDataType.String);

// Timestamp -> DateTime
dataTypesMap.put(new MutablePair(Schema.INT64_SCHEMA.type(), Timestamp.SCHEMA_NAME), ClickHouseDataType.DateTime);

dataTypesMap.put(new MutablePair(Schema.INT64_SCHEMA.type(), MicroTimestamp.SCHEMA_NAME), ClickHouseDataType.String);
dataTypesMap.put(new MutablePair(Schema.INT64_SCHEMA.type(), Timestamp.SCHEMA_NAME), ClickHouseDataType.DateTime64);
// Datetime with microseconds precision
dataTypesMap.put(new MutablePair(Schema.INT64_SCHEMA.type(), MicroTimestamp.SCHEMA_NAME), ClickHouseDataType.DateTime64);

// BLOB -> String
dataTypesMap.put(new MutablePair(Schema.Type.BYTES, null), ClickHouseDataType.String);
Expand All @@ -61,7 +61,7 @@ public class ClickHouseDataTypeMapper {
dataTypesMap.put(new MutablePair<>(Schema.Type.BOOLEAN, null), ClickHouseDataType.Bool);

// Timestamp -> ZonedTimeStamp -> DateTime
dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, ZonedTimestamp.SCHEMA_NAME), ClickHouseDataType.DateTime);
dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, ZonedTimestamp.SCHEMA_NAME), ClickHouseDataType.DateTime64);

dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, Enum.LOGICAL_NAME), ClickHouseDataType.String);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package com.altinity.clickhouse.sink.connector.converters;

import com.altinity.clickhouse.sink.connector.metadata.DataTypeRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Date;
import java.text.SimpleDateFormat;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.TimeZone;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

public class DebeziumConverter {

private static final int MICROS_IN_SEC = 1000000;
private static final int MICROS_IN_MILLI = 1000;

private static final Logger log = LoggerFactory.getLogger(DebeziumConverter.class);


public static class MicroTimeConverter {
/**
* Function to convert Long(Epoch)
Expand All @@ -27,70 +33,66 @@ public static String convert(Object value) {
Long milliTimestamp = (Long) value / 1000;
java.util.Date date = new java.util.Date(milliTimestamp);

SimpleDateFormat bqTimeSecondsFormat = new SimpleDateFormat("HH:mm:ss");
String formattedSecondsTimestamp = bqTimeSecondsFormat.format(date);
Instant i = Instant.EPOCH.plus((Long) value, ChronoUnit.MICROS);


LocalTime time = i.atZone(ZoneOffset.UTC).toLocalTime();
String formattedSecondsTimestamp= time.format(DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"));


return formattedSecondsTimestamp;

//return Timestamp.from(Instant.ofEpochMilli((Long) value));
}
}

public static class MicroTimestampConverter {

//ToDO: IF values exceed the ones supported by clickhouse
public static String convert(Object value) {
public static Timestamp convert(Object value) {
Long microTimestamp = (Long) value;

Long milliTimestamp = microTimestamp / MICROS_IN_MILLI;
java.util.Date date = new java.util.Date(milliTimestamp);

SimpleDateFormat bqDatetimeSecondsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
bqDatetimeSecondsFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
String formattedSecondsTimestamp = bqDatetimeSecondsFormat.format(date);

Long microRemainder = microTimestamp % MICROS_IN_SEC;
//Long milliTimestamp = microTimestamp / MICROS_IN_MILLI;
//Instant receivedDT = Instant.ofEpochMilli(microTimestamp/MICROS_IN_MILLI).plusNanos(microTimestamp%1_000);
//Instant receivedDT = Instant.ofEpochMilli(microTimestamp/MICROS_IN_MILLI).pl
Instant receivedDT = Instant.EPOCH.plus(microTimestamp, ChronoUnit.MICROS);
Instant modifiedDT = checkIfDateTimeExceedsSupportedRange(receivedDT, true);

return formattedSecondsTimestamp;
return Timestamp.from(modifiedDT);
}
}

public static class TimestampConverter {

/**
* Function to convert Debezium Timestamp fields to
* Function to convert Debezium Timestamp fields to DATETIME(0), DATETIME(1), DATETIME(2)
* Timestamp does not have microseconds
* ISO formatted String.
* @param value
* @return
*/
public static String convert(Object value, boolean isDateTime64) {
LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli((long) value), ZoneId.systemDefault());
public static Long convert(Object value, boolean isDateTime64) {
Instant providedDT = Instant.ofEpochMilli((long) value);

LocalDateTime modifiedDate = checkIfDateTimeExceedsSupportedRange(date, isDateTime64);
//DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME;
DateTimeFormatter destFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
Instant modifiedDT = checkIfDateTimeExceedsSupportedRange(providedDT, isDateTime64);

return modifiedDate.format(destFormatter);
return modifiedDT.toEpochMilli();
}

public static LocalDateTime checkIfDateTimeExceedsSupportedRange(LocalDateTime providedDateTime, boolean isDateTime64) {

LocalDateTime minSupportedDateTime = LocalDateTime.parse(DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATETIME);
LocalDateTime maxSupportedDateTime = LocalDateTime.parse(DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATETIME);
}

if(isDateTime64 == true) {
minSupportedDateTime = LocalDateTime.parse(DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATETIME);
maxSupportedDateTime = LocalDateTime.parse(DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATETIME);
}
public static Instant checkIfDateTimeExceedsSupportedRange(Instant providedDateTime, boolean isDateTime64) {

if(providedDateTime.isBefore(minSupportedDateTime)) {
return minSupportedDateTime;
} else if (providedDateTime.isAfter(maxSupportedDateTime)){
return maxSupportedDateTime;
}
if(providedDateTime.isBefore(DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATETIME)) {
return DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATETIME;
} else if (providedDateTime.isAfter(DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATETIME)){
return DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATETIME;
}

return providedDateTime;
return providedDateTime;

}
}

public static class DateConverter {


Expand All @@ -104,25 +106,25 @@ public static class DateConverter {
* @return
*/
public static Date convert(Object value) {
long msSinceEpoch = TimeUnit.DAYS.toMillis((Integer) value);
Integer epochInDays = checkIfDateExceedsSupportedRange((Integer) value);

// The value is epoch in Days
long msSinceEpoch = TimeUnit.DAYS.toMillis(epochInDays);
java.util.Date date = new java.util.Date(msSinceEpoch);

java.util.Date modifiedDate = checkIfDateExceedsSupportedRange(date);

return new java.sql.Date(modifiedDate.getTime());
return new java.sql.Date(date.getTime());
}

public static java.util.Date checkIfDateExceedsSupportedRange(java.util.Date providedDate) {
java.util.Date minSupportedDate = Date.valueOf(DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATE);
java.util.Date maxSupportedDate = Date.valueOf(DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATE);
public static Integer checkIfDateExceedsSupportedRange(Integer epochInDays) {

if(providedDate.before(minSupportedDate)) {
return minSupportedDate;
} else if (providedDate.after(maxSupportedDate)){
return maxSupportedDate;
if(epochInDays < DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATE32) {
return DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATE32;
} else if (epochInDays > DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATE32){
return DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATE32;
}

return providedDate;
return epochInDays;

}
}
Expand All @@ -136,10 +138,50 @@ public static class ZonedTimestampConverter {
* @return
*/
public static String convert(Object value) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
LocalDateTime zd = LocalDateTime.parse((String) value, formatter);
DateTimeFormatter destFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return zd.format(destFormatter);

// TemporalAccessor parsedTime = ZonedTimestamp.FORMATTER.parse((String) value);
// DateTimeFormatter bqZonedTimestampFormat =
// new DateTimeFormatterBuilder()
// .append(DateTimeFormatter.ISO_LOCAL_DATE)
// .appendLiteral(' ')
// .append(DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"))
// .toFormatter();
// return bqZonedTimestampFormat.format(parsedTime);

String result = "";
DateTimeFormatter destFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");

// The order of this array matters,
// for example you might truncate microseconds
// to milliseconds(3) if .SSS is above .SSSSSS
String[] date_formats = {
"yyyy-MM-dd'T'HH:mm:ss'Z'",
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'",
"yyyy-MM-dd'T'HH:mm:ss.SSSSS'Z'",
"yyyy-MM-dd'T'HH:mm:ss.SSSS'Z'",
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"yyyy-MM-dd'T'HH:mm:ss.SS'Z'",
"yyyy-MM-dd'T'HH:mm:ss.S'Z'"
};

boolean parsingSuccesful = false;
for (String formatString : date_formats) {
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(formatString);
LocalDateTime zd = LocalDateTime.parse((String) value, formatter);
result = zd.format(destFormatter);
//result = StringUtils.stripEnd(result, "0");
parsingSuccesful = true;
break;
} catch(Exception e) {
// Continue
}
}
if(parsingSuccesful == false) {
log.error("Error parsing zonedtimestamp " + (String) value);
}

return result;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.altinity.clickhouse.sink.connector.db;

public class ClickHouseDbConstants {

public static final String ALTER_TABLE = "ALTER TABLE";
public static final String ALTER_TABLE_ADD_COLUMN = "add column";
public static final String ALTER_TABLE_DELETE_COLUMN = "delete column";


public static final String VERSION_COLUMN = "_version";
public static final String VERSION_COLUMN_DATA_TYPE = "UInt64";
public static final String SIGN_COLUMN = "_sign";
public static final String SIGN_COLUMN_DATA_TYPE = "Int8";

public static final String CREATE_TABLE = "CREATE TABLE";

public static final String NULL = "NULL";
public static final String NOT_NULL = "NOT NULL";

public static final String PRIMARY_KEY = "PRIMARY KEY";

public static final String ORDER_BY = "ORDER BY";

public static final String ORDER_BY_TUPLE = "ORDER BY tuple()";
}
Original file line number Diff line number Diff line change
Expand Up @@ -499,19 +499,22 @@ private Field getFieldByColumnName(List<Field> fields, String colName) {
public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, PreparedStatement ps, List<Field> fields,
ClickHouseStruct record, Struct struct, boolean beforeSection) throws Exception {


// int index = 1;
// Use this map's key natural ordering as the source of truth.
//for (Map.Entry<String, String> entry : this.columnNameToDataTypeMap.entrySet()) {
for (Field f : fields) {
String colName = f.name();
for (Map.Entry<String, String> entry : this.columnNameToDataTypeMap.entrySet()) {
//for (Field f : fields) {
String colName = entry.getKey();
//String colName = f.name();

if(colName == null) {
continue;
}
if(columnNameToIndexMap == null) {
log.error("Column Name to Index map error");
}

int index = -1;
int index = -1;
//int index = 1;
if(true == columnNameToIndexMap.containsKey(colName)) {
index = columnNameToIndexMap.get(colName);
} else {
Expand Down Expand Up @@ -547,7 +550,7 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
//ToDo: Map the Clickhouse types as a Enum.


// Field f = getFieldByColumnName(fields, colName);
Field f = getFieldByColumnName(fields, colName);
Schema.Type type = f.schema().type();
String schemaName = f.schema().name();
Object value = struct.get(f);
Expand Down Expand Up @@ -625,14 +628,14 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
if (isFieldDateTime) {
if (schemaName != null && schemaName.equalsIgnoreCase(MicroTimestamp.SCHEMA_NAME)) {
// Handle microtimestamp first
ps.setString(index, DebeziumConverter.MicroTimestampConverter.convert(value));
ps.setTimestamp(index, DebeziumConverter.MicroTimestampConverter.convert(value));
}
else if (value instanceof Long) {
boolean isColumnDateTime64 = false;
if(schemaName.equalsIgnoreCase(Timestamp.SCHEMA_NAME) && type == Schema.INT64_SCHEMA.type()){
isColumnDateTime64 = true;
}
ps.setString(index, DebeziumConverter.TimestampConverter.convert(value, isColumnDateTime64));
ps.setLong(index, DebeziumConverter.TimestampConverter.convert(value, isColumnDateTime64));
}
} else if (isFieldTime) {
ps.setString(index, DebeziumConverter.MicroTimeConverter.convert(value));
Expand Down
Loading

0 comments on commit e7a9a99

Please sign in to comment.