Skip to content

Commit

Permalink
Fixing temporal data types
Browse files Browse the repository at this point in the history
Arnaud Adant committed Oct 24, 2023
1 parent f45406f commit 418d4ec
Showing 4 changed files with 39 additions and 17 deletions.
9 changes: 6 additions & 3 deletions sink-connector/python/db_compare/clickhouse_table_checksum.py
Original file line number Diff line number Diff line change
@@ -153,7 +153,7 @@ def get_table_checksum_query(table):
excluded_columns = "','".join(args.exclude_columns)
excluded_columns = [f'{column}' for column in excluded_columns.split(',')]
#excluded_columns = "'"+excluded_columns+"'"
#logging.info(f"Excluded columns, {excluded_columns}")
logging.info(f"Excluded columns, {excluded_columns}")
excluded_columns_str = ','.join((f"'{col}'" for col in excluded_columns))
checksum_query="select name, type, if(match(type,'Nullable'),1,0) is_nullable, numeric_scale from system.columns where database='" + args.clickhouse_database+"' and table = '"+table+"' and name not in ("+ excluded_columns_str +") order by position"

@@ -197,6 +197,8 @@ def get_table_checksum_query(table):
# requires this function : CREATE OR REPLACE FUNCTION format_decimal AS (x, scale) -> if(locate(toString(x),'.')>0,concat(toString(x),repeat('0',toUInt8(scale-(length(toString(x))-locate(toString(x),'.'))))),concat(toString(x),'.',repeat('0',toUInt8(scale))))
select += "format_decimal("+column_name + \
","+str(numeric_scale)+")"
elif "DateTime64(0)" == data_type:
select += f"toString({column_name})"
elif "DateTime" in data_type:
select += f"trim(TRAILING '.' from (trim(TRAILING '0' FROM toString({column_name}))))"
else:
@@ -255,7 +257,8 @@ def select_table_statements(table, query, select_query, order_by, external_colum
where = args.where

# skip deleted rows
where+= f" and {args.sign_column} > 0 "
if args.sign_column != '':
where+= f" and {args.sign_column} > 0 "

sql = """ select
count(*) as "cnt",
@@ -380,7 +383,7 @@ def main():
action='store_true', default=False)
# TODO change this to standard MaterializedMySQL columns https://github.com/Altinity/clickhouse-sink-connector/issues/78
parser.add_argument('--exclude_columns', help='columns exclude',
nargs='*', default=['_sign,_version'])
nargs='*', default=['_sign,_version,is_deleted'])
parser.add_argument('--threads', type=int,
help='number of parallel threads', default=1)

13 changes: 6 additions & 7 deletions sink-connector/python/db_compare/mysql_table_checksum.py
Original file line number Diff line number Diff line change
@@ -78,26 +78,25 @@ def get_table_checksum_query(table, conn):

if is_nullable == 'YES':
nullables.append(column_name)

if 'datetime' == data_type or 'datetime(1)'== data_type or 'datetime(2)' == data_type or 'datetime(3)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('2283-11-11 23:59:59.999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999' AS datetime(3))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
select += f"case when {column_name} > substr('2299-12-31 23:59:59.999', 1, length({column_name})) then substr(TRIM(TRAILING '0' FROM CAST('2299-12-31 23:59:59.999' AS datetime(3))),1,length({column_name})) else case when {column_name} <= '1900-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1900-01-01 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
elif 'datetime(4)' == data_type or 'datetime(5)' == data_type or 'datetime(6)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('2283-11-11 23:59:59.999999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999999' AS datetime(6))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
select += f"case when {column_name} > substr('2299-12-31 23:59:59.999999', 1, length({column_name})) then substr(TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('2299-12-31 23:59:59.999999' AS datetime(6)))),1,length({column_name})) else case when {column_name} <= '1900-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1900-01-01 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
elif 'time' == data_type or 'time(1)' == data_type or 'time(2)' == data_type or 'time(3)' == data_type or 'time(4)' == data_type or 'time(5)' == data_type or 'time(6)' == data_type:
select += f"cast({column_name} as time(6))"
select += f"substr(cast({column_name} as time(6)),1,length({column_name}))"
elif 'timestamp' == data_type or 'timestamp(1)' == data_type or 'timestamp(2)' == data_type or 'timestamp(3)' == data_type or 'timestamp(4)' == data_type or 'timestamp(5)' == data_type or 'timestamp(6)' == data_type:
select += f"TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char))))"
select += f"substr(TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char)))),1,length({column_name}))"
#elif 'datetime' in data_type:
# # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime/
# select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1970-01-01' then CAST('1925-01-01 00:00:00' AS {data_type}) else {column_name} end end"*/
# select += f"case when {column_name} >='2299-12-31' then CAST('2299-12-31' AS {data_type}) else case when {column_name} <= '1900-01-01' then CAST('1900-01-01 00:00:00' AS {data_type}) else {column_name} end end"*/
# elif "float" in data_type:
# select += f"CAST({column_name} as DECIMAL(64,8))"
else:
if 'date' == data_type:
# CH date range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/date
select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1925-01-01' then CAST('1925-01-01' AS {data_type}) else {column_name} end end"
select += f"case when {column_name} >='2299-12-31' then CAST('2299-12-31' AS {data_type}) else case when {column_name} <= '1900-01-01' then CAST('1900-01-01' AS {data_type}) else {column_name} end end"
else:
if is_binary_datatype(data_type):
select += "lower(hex(cast("+column_name+"as binary)))"
14 changes: 11 additions & 3 deletions sink-connector/python/db_load/clickhouse_loader.py
Original file line number Diff line number Diff line change
@@ -487,10 +487,18 @@ def load_data_mysqlshell(args, timezone, schema_map, dry_run=False):
if structure != "":
structure += ", "
structure +=" "+column_name + " "
if column['nullable'] == True:
structure +=" Nullable(String)"
datatype = column['datatype']
mysql_datetype = column['mysql_datatype']
if 'timestamp' in mysql_datetype.lower():
if column['nullable'] == True:
structure +=f" Nullable({datatype})"
else:
structure +=f" {datatype}"
else:
structure +=" String"
if column['nullable'] == True:
structure +=" Nullable(String)"
else:
structure +=" String"

cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} --password '{password}' -mn """
futures.append(executor.submit(execute_load, cmd))
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ def translateColumnDefinition(self, column_name, columnDefinition):
# data type modifier (NULL / NOT NULL / PRIMARY KEY)
notNull = False
notSymbol = True
nullable = True
for child in columnDefinition.getChildren():
if child.getRuleIndex() == MySqlParser.RULE_columnConstraint:

@@ -65,19 +66,28 @@ def translateColumnDefinition(self, column_name, columnDefinition):
if nullNotNull:
text = self.extract_original_text(child)
column_buffer += " " + text
if "NULL" == text:
nullable = True
notNull = True
continue

if nullNotNull.NOT():
notSymbol = True
if nullNotNull.NULL_LITERAL() and notSymbol:
if (nullNotNull.NULL_LITERAL() or nullNotNull.NULL_SPEC_LITERAL()) and notSymbol:
notNull = True

nullable = False
else:
notNull = False
nullable = True

if isinstance(child, MySqlParser.PrimaryKeyColumnConstraintContext) and child.PRIMARY():
self.primary_key = column_name
# column without nullable info are default nullable in MySQL, while they are not null in ClickHouse
if not notNull:
column_buffer += " NULL"
nullable = True

return (column_buffer, dataType, not notNull)
return (column_buffer, dataType, nullable)


def exitColumnDeclaration(self, ctx):
@@ -90,14 +100,16 @@ def exitColumnDeclaration(self, ctx):

# columns have an identifier and a column definition
columnDefinition = ctx.columnDefinition()
dataType = columnDefinition.dataType()
originalDataTypeText = self.extract_original_text(dataType)

(columnDefinition_buffer, dataType, nullable) = self.translateColumnDefinition(column_name, columnDefinition)

column_buffer += columnDefinition_buffer

self.columns.append(column_buffer)
dataTypeText = self.convertDataType(dataType)
columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable}
columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable, 'mysql_datatype':originalDataTypeText}
logging.info(str(columnMap))
self.columns_map.append(columnMap)

0 comments on commit 418d4ec

Please sign in to comment.