Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clickhouse_loader.py : fixing temporal and binary data types #338

Merged
merged 5 commits into from
Nov 3, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions sink-connector/python/db_compare/clickhouse_table_checksum.py
Original file line number Diff line number Diff line change
@@ -153,7 +153,7 @@ def get_table_checksum_query(table):
excluded_columns = "','".join(args.exclude_columns)
excluded_columns = [f'{column}' for column in excluded_columns.split(',')]
#excluded_columns = "'"+excluded_columns+"'"
#logging.info(f"Excluded columns, {excluded_columns}")
logging.info(f"Excluded columns, {excluded_columns}")
excluded_columns_str = ','.join((f"'{col}'" for col in excluded_columns))
checksum_query="select name, type, if(match(type,'Nullable'),1,0) is_nullable, numeric_scale from system.columns where database='" + args.clickhouse_database+"' and table = '"+table+"' and name not in ("+ excluded_columns_str +") order by position"

@@ -197,6 +197,8 @@ def get_table_checksum_query(table):
# requires this function : CREATE OR REPLACE FUNCTION format_decimal AS (x, scale) -> if(locate(toString(x),'.')>0,concat(toString(x),repeat('0',toUInt8(scale-(length(toString(x))-locate(toString(x),'.'))))),concat(toString(x),'.',repeat('0',toUInt8(scale))))
select += "format_decimal("+column_name + \
","+str(numeric_scale)+")"
elif "DateTime64(0)" == data_type:
select += f"toString({column_name})"
elif "DateTime" in data_type:
select += f"trim(TRAILING '.' from (trim(TRAILING '0' FROM toString({column_name}))))"
else:
@@ -255,7 +257,8 @@ def select_table_statements(table, query, select_query, order_by, external_colum
where = args.where

# skip deleted rows
where+= f" and {args.sign_column} > 0 "
if args.sign_column != '':
where+= f" and {args.sign_column} > 0 "

sql = """ select
count(*) as "cnt",
@@ -380,7 +383,7 @@ def main():
action='store_true', default=False)
# TODO change this to standard MaterializedMySQL columns https://github.com/Altinity/clickhouse-sink-connector/issues/78
parser.add_argument('--exclude_columns', help='columns exclude',
nargs='*', default=['_sign,_version'])
nargs='*', default=['_sign,_version,is_deleted'])
parser.add_argument('--threads', type=int,
help='number of parallel threads', default=1)

13 changes: 6 additions & 7 deletions sink-connector/python/db_compare/mysql_table_checksum.py
Original file line number Diff line number Diff line change
@@ -78,26 +78,25 @@ def get_table_checksum_query(table, conn):

if is_nullable == 'YES':
nullables.append(column_name)

if 'datetime' == data_type or 'datetime(1)'== data_type or 'datetime(2)' == data_type or 'datetime(3)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('2283-11-11 23:59:59.999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999' AS datetime(3))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
select += f"case when {column_name} > substr('2299-12-31 23:59:59.999', 1, length({column_name})) then substr(TRIM(TRAILING '0' FROM CAST('2299-12-31 23:59:59.999' AS datetime(3))),1,length({column_name})) else case when {column_name} <= '1900-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1900-01-01 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be nice if these values 2299-12-31 are in a separate file or a separate variable , so that it can be updated easily.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, this was tested with 23.3. @subkanthi do you know if there is a JDBC driver that supports those maximum values ?
will add it as a parameter as it may change again

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@subkanthi Added --min_date_value and --max_date_value, please review !

elif 'datetime(4)' == data_type or 'datetime(5)' == data_type or 'datetime(6)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('2283-11-11 23:59:59.999999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999999' AS datetime(6))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
select += f"case when {column_name} > substr('2299-12-31 23:59:59.999999', 1, length({column_name})) then substr(TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('2299-12-31 23:59:59.999999' AS datetime(6)))),1,length({column_name})) else case when {column_name} <= '1900-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1900-01-01 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
elif 'time' == data_type or 'time(1)' == data_type or 'time(2)' == data_type or 'time(3)' == data_type or 'time(4)' == data_type or 'time(5)' == data_type or 'time(6)' == data_type:
select += f"cast({column_name} as time(6))"
select += f"substr(cast({column_name} as time(6)),1,length({column_name}))"
elif 'timestamp' == data_type or 'timestamp(1)' == data_type or 'timestamp(2)' == data_type or 'timestamp(3)' == data_type or 'timestamp(4)' == data_type or 'timestamp(5)' == data_type or 'timestamp(6)' == data_type:
select += f"TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char))))"
select += f"substr(TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char)))),1,length({column_name}))"
#elif 'datetime' in data_type:
# # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime/
# select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1970-01-01' then CAST('1925-01-01 00:00:00' AS {data_type}) else {column_name} end end"*/
# select += f"case when {column_name} >='2299-12-31' then CAST('2299-12-31' AS {data_type}) else case when {column_name} <= '1900-01-01' then CAST('1900-01-01 00:00:00' AS {data_type}) else {column_name} end end"*/
# elif "float" in data_type:
# select += f"CAST({column_name} as DECIMAL(64,8))"
else:
if 'date' == data_type:
# CH date range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/date
select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1925-01-01' then CAST('1925-01-01' AS {data_type}) else {column_name} end end"
select += f"case when {column_name} >='2299-12-31' then CAST('2299-12-31' AS {data_type}) else case when {column_name} <= '1900-01-01' then CAST('1900-01-01' AS {data_type}) else {column_name} end end"
else:
if is_binary_datatype(data_type):
select += "lower(hex(cast("+column_name+"as binary)))"
14 changes: 11 additions & 3 deletions sink-connector/python/db_load/clickhouse_loader.py
Original file line number Diff line number Diff line change
@@ -487,10 +487,18 @@ def load_data_mysqlshell(args, timezone, schema_map, dry_run=False):
if structure != "":
structure += ", "
structure +=" "+column_name + " "
if column['nullable'] == True:
structure +=" Nullable(String)"
datatype = column['datatype']
mysql_datetype = column['mysql_datatype']
if 'timestamp' in mysql_datetype.lower():
if column['nullable'] == True:
structure +=f" Nullable({datatype})"
else:
structure +=f" {datatype}"
else:
structure +=" String"
if column['nullable'] == True:
structure +=" Nullable(String)"
else:
structure +=" String"

cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} --password '{password}' -mn """
futures.append(executor.submit(execute_load, cmd))
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ def translateColumnDefinition(self, column_name, columnDefinition):
# data type modifier (NULL / NOT NULL / PRIMARY KEY)
notNull = False
notSymbol = True
nullable = True
for child in columnDefinition.getChildren():
if child.getRuleIndex() == MySqlParser.RULE_columnConstraint:

@@ -65,19 +66,28 @@ def translateColumnDefinition(self, column_name, columnDefinition):
if nullNotNull:
text = self.extract_original_text(child)
column_buffer += " " + text
if "NULL" == text:
nullable = True
notNull = True
continue

if nullNotNull.NOT():
notSymbol = True
if nullNotNull.NULL_LITERAL() and notSymbol:
if (nullNotNull.NULL_LITERAL() or nullNotNull.NULL_SPEC_LITERAL()) and notSymbol:
notNull = True

nullable = False
else:
notNull = False
nullable = True

if isinstance(child, MySqlParser.PrimaryKeyColumnConstraintContext) and child.PRIMARY():
self.primary_key = column_name
# column without nullable info are default nullable in MySQL, while they are not null in ClickHouse
if not notNull:
column_buffer += " NULL"
nullable = True

return (column_buffer, dataType, not notNull)
return (column_buffer, dataType, nullable)


def exitColumnDeclaration(self, ctx):
@@ -90,14 +100,16 @@ def exitColumnDeclaration(self, ctx):

# columns have an identifier and a column definition
columnDefinition = ctx.columnDefinition()
dataType = columnDefinition.dataType()
originalDataTypeText = self.extract_original_text(dataType)

(columnDefinition_buffer, dataType, nullable) = self.translateColumnDefinition(column_name, columnDefinition)

column_buffer += columnDefinition_buffer

self.columns.append(column_buffer)
dataTypeText = self.convertDataType(dataType)
columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable}
columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable, 'mysql_datatype':originalDataTypeText}
logging.info(str(columnMap))
self.columns_map.append(columnMap)