Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clickhouse_loader.py : fixing temporal and binary data types #338

Merged
merged 5 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions sink-connector/python/db_compare/clickhouse_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def get_table_checksum_query(table):
excluded_columns = "','".join(args.exclude_columns)
excluded_columns = [f'{column}' for column in excluded_columns.split(',')]
#excluded_columns = "'"+excluded_columns+"'"
#logging.info(f"Excluded columns, {excluded_columns}")
logging.info(f"Excluded columns, {excluded_columns}")
excluded_columns_str = ','.join((f"'{col}'" for col in excluded_columns))
checksum_query="select name, type, if(match(type,'Nullable'),1,0) is_nullable, numeric_scale from system.columns where database='" + args.clickhouse_database+"' and table = '"+table+"' and name not in ("+ excluded_columns_str +") order by position"

Expand Down Expand Up @@ -197,6 +197,8 @@ def get_table_checksum_query(table):
# requires this function : CREATE OR REPLACE FUNCTION format_decimal AS (x, scale) -> if(locate(toString(x),'.')>0,concat(toString(x),repeat('0',toUInt8(scale-(length(toString(x))-locate(toString(x),'.'))))),concat(toString(x),'.',repeat('0',toUInt8(scale))))
select += "format_decimal("+column_name + \
","+str(numeric_scale)+")"
elif "DateTime64(0)" == data_type:
select += f"toString({column_name})"
elif "DateTime" in data_type:
select += f"trim(TRAILING '.' from (trim(TRAILING '0' FROM toString({column_name}))))"
else:
Expand Down Expand Up @@ -255,7 +257,8 @@ def select_table_statements(table, query, select_query, order_by, external_colum
where = args.where

# skip deleted rows
where+= f" and {args.sign_column} > 0 "
if args.sign_column != '':
where+= f" and {args.sign_column} > 0 "

sql = """ select
count(*) as "cnt",
Expand Down Expand Up @@ -380,7 +383,7 @@ def main():
action='store_true', default=False)
# TODO change this to standard MaterializedMySQL columns https://github.com/Altinity/clickhouse-sink-connector/issues/78
parser.add_argument('--exclude_columns', help='columns exclude',
nargs='*', default=['_sign,_version'])
nargs='*', default=['_sign,_version,is_deleted'])
parser.add_argument('--threads', type=int,
help='number of parallel threads', default=1)

Expand Down
39 changes: 22 additions & 17 deletions sink-connector/python/db_compare/mysql_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def compute_checksum(table, statements, conn):
conn.close()


def get_table_checksum_query(table, conn):
def get_table_checksum_query(table, conn, binary_encoding):

(rowset, rowcount) = execute_mysql(conn, "select COLUMN_NAME as column_name, column_type as data_type, IS_NULLABLE as is_nullable from information_schema.columns where table_schema='" +
args.mysql_database+"' and table_name = '"+table+"' order by ordinal_position")
Expand All @@ -68,6 +68,8 @@ def get_table_checksum_query(table, conn):
nullables = []
data_types = {}
first_column = True
min_date_value = args.min_date_value
max_date_value = args.max_date_value
for row in rowset:
column_name = '`'+row['column_name']+'`'
data_type = row['data_type']
Expand All @@ -78,29 +80,26 @@ def get_table_checksum_query(table, conn):

if is_nullable == 'YES':
nullables.append(column_name)

if 'datetime' == data_type or 'datetime(1)'== data_type or 'datetime(2)' == data_type or 'datetime(3)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('2283-11-11 23:59:59.999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999' AS datetime(3))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
select += f"case when {column_name} > substr('{max_date_value} 23:59:59.999', 1, length({column_name})) then substr(TRIM(TRAILING '0' FROM CAST('{max_date_value} 23:59:59.999' AS datetime(3))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
elif 'datetime(4)' == data_type or 'datetime(5)' == data_type or 'datetime(6)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('2283-11-11 23:59:59.999999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999999' AS datetime(6))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/ii
select += f"case when {column_name} > substr('{max_date_value} 23:59:59.999999', 1, length({column_name})) then substr(TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{max_date_value} 23:59:59.999999' AS datetime(6)))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
elif 'time' == data_type or 'time(1)' == data_type or 'time(2)' == data_type or 'time(3)' == data_type or 'time(4)' == data_type or 'time(5)' == data_type or 'time(6)' == data_type:
select += f"cast({column_name} as time(6))"
select += f"substr(cast({column_name} as time(6)),1,length({column_name}))"
elif 'timestamp' == data_type or 'timestamp(1)' == data_type or 'timestamp(2)' == data_type or 'timestamp(3)' == data_type or 'timestamp(4)' == data_type or 'timestamp(5)' == data_type or 'timestamp(6)' == data_type:
select += f"TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char))))"
#elif 'datetime' in data_type:
# # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime/
# select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1970-01-01' then CAST('1925-01-01 00:00:00' AS {data_type}) else {column_name} end end"*/
# elif "float" in data_type:
# select += f"CAST({column_name} as DECIMAL(64,8))"
select += f"substr(TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char)))),1,length({column_name}))"
else:
if 'date' == data_type:
if 'date' == data_type: # Date are converted to Date32 in CH
# CH date range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/date
select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1925-01-01' then CAST('1925-01-01' AS {data_type}) else {column_name} end end"
select += f"case when {column_name} >='{max_date_value}' then CAST('{max_date_value}' AS {data_type}) else case when {column_name} <= '{min_date_value}' then CAST('{min_date_value}' AS {data_type}) else {column_name} end end"
else:
if is_binary_datatype(data_type):
select += "lower(hex(cast("+column_name+"as binary)))"
binary_encode = "lower(hex(cast("+column_name+"as binary)))"
if binary_encoding == 'base64':
binary_encode = "replace(to_base64(cast("+column_name+" as binary)),'\\n','')"
select += binary_encode
else:
select += column_name + ""
first_column = False
Expand Down Expand Up @@ -204,7 +203,7 @@ def calculate_sql_checksum(table):
statements = []

(query, select_query, distributed_by,
external_table_types) = get_table_checksum_query(table, conn)
external_table_types) = get_table_checksum_query(table, conn, args.binary_encoding)
statements = select_table_statements(
table, query, select_query, distributed_by, external_table_types)
compute_checksum(table, statements, conn)
Expand Down Expand Up @@ -239,7 +238,7 @@ def record_factory(*args, **kwargs):

def main():

parser = argparse.ArgumentParser(description='''
parser = argparse.ArgumentParser(description='''Compute a ClickHouse compatible checksum.
''')
# Required
parser.add_argument('--mysql_host', help='MySQL host', required=True)
Expand All @@ -261,6 +260,12 @@ def main():
help='Output the raw format to a file called out.txt', required=False)
parser.add_argument(
'--debug_limit', help='Limit the debug output in lines', required=False)
parser.add_argument(
'--binary_encoding', help='either hex or base64 to encode MySQL binary content', default='hex', required=False)
parser.add_argument(
'--min_date_value', help='Minimum Date32/DateTime64 date', default='1900-01-01', required=False)
parser.add_argument(
'--max_date_value', help='Maximum Date32/Datetime64 date', default='2299-12-31', required=False)
parser.add_argument('--debug', dest='debug',
action='store_true', default=False)
parser.add_argument('--exclude_columns', help='columns exclude',
Expand Down
14 changes: 11 additions & 3 deletions sink-connector/python/db_load/clickhouse_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,18 @@ def load_data_mysqlshell(args, timezone, schema_map, dry_run=False):
if structure != "":
structure += ", "
structure +=" "+column_name + " "
if column['nullable'] == True:
structure +=" Nullable(String)"
datatype = column['datatype']
mysql_datetype = column['mysql_datatype']
if 'timestamp' in mysql_datetype.lower():
if column['nullable'] == True:
structure +=f" Nullable({datatype})"
else:
structure +=f" {datatype}"
else:
structure +=" String"
if column['nullable'] == True:
structure +=" Nullable(String)"
else:
structure +=" String"

cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} --password '{password}' -mn """
futures.append(executor.submit(execute_load, cmd))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def translateColumnDefinition(self, column_name, columnDefinition):
# data type modifier (NULL / NOT NULL / PRIMARY KEY)
notNull = False
notSymbol = True
nullable = True
for child in columnDefinition.getChildren():
if child.getRuleIndex() == MySqlParser.RULE_columnConstraint:

Expand All @@ -65,19 +66,28 @@ def translateColumnDefinition(self, column_name, columnDefinition):
if nullNotNull:
text = self.extract_original_text(child)
column_buffer += " " + text
if "NULL" == text:
nullable = True
notNull = True
continue

if nullNotNull.NOT():
notSymbol = True
if nullNotNull.NULL_LITERAL() and notSymbol:
if (nullNotNull.NULL_LITERAL() or nullNotNull.NULL_SPEC_LITERAL()) and notSymbol:
notNull = True

nullable = False
else:
notNull = False
nullable = True

if isinstance(child, MySqlParser.PrimaryKeyColumnConstraintContext) and child.PRIMARY():
self.primary_key = column_name
# column without nullable info are default nullable in MySQL, while they are not null in ClickHouse
if not notNull:
column_buffer += " NULL"
nullable = True

return (column_buffer, dataType, not notNull)
return (column_buffer, dataType, nullable)


def exitColumnDeclaration(self, ctx):
Expand All @@ -90,14 +100,16 @@ def exitColumnDeclaration(self, ctx):

# columns have an identifier and a column definition
columnDefinition = ctx.columnDefinition()
dataType = columnDefinition.dataType()
originalDataTypeText = self.extract_original_text(dataType)

(columnDefinition_buffer, dataType, nullable) = self.translateColumnDefinition(column_name, columnDefinition)

column_buffer += columnDefinition_buffer

self.columns.append(column_buffer)
dataTypeText = self.convertDataType(dataType)
columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable}
columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable, 'mysql_datatype':originalDataTypeText}
logging.info(str(columnMap))
self.columns_map.append(columnMap)

Expand Down