diff --git a/sink-connector/python/db_compare/clickhouse_table_checksum.py b/sink-connector/python/db_compare/clickhouse_table_checksum.py index 12ff168b5..86af84e39 100644 --- a/sink-connector/python/db_compare/clickhouse_table_checksum.py +++ b/sink-connector/python/db_compare/clickhouse_table_checksum.py @@ -153,7 +153,7 @@ def get_table_checksum_query(table): excluded_columns = "','".join(args.exclude_columns) excluded_columns = [f'{column}' for column in excluded_columns.split(',')] #excluded_columns = "'"+excluded_columns+"'" - #logging.info(f"Excluded columns, {excluded_columns}") + logging.info(f"Excluded columns, {excluded_columns}") excluded_columns_str = ','.join((f"'{col}'" for col in excluded_columns)) checksum_query="select name, type, if(match(type,'Nullable'),1,0) is_nullable, numeric_scale from system.columns where database='" + args.clickhouse_database+"' and table = '"+table+"' and name not in ("+ excluded_columns_str +") order by position" @@ -197,6 +197,8 @@ def get_table_checksum_query(table): # requires this function : CREATE OR REPLACE FUNCTION format_decimal AS (x, scale) -> if(locate(toString(x),'.')>0,concat(toString(x),repeat('0',toUInt8(scale-(length(toString(x))-locate(toString(x),'.'))))),concat(toString(x),'.',repeat('0',toUInt8(scale)))) select += "format_decimal("+column_name + \ ","+str(numeric_scale)+")" + elif "DateTime64(0)" == data_type: + select += f"toString({column_name})" elif "DateTime" in data_type: select += f"trim(TRAILING '.' from (trim(TRAILING '0' FROM toString({column_name}))))" else: @@ -255,7 +257,8 @@ def select_table_statements(table, query, select_query, order_by, external_colum where = args.where # skip deleted rows - where+= f" and {args.sign_column} > 0 " + if args.sign_column != '': + where+= f" and {args.sign_column} > 0 " sql = """ select count(*) as "cnt", @@ -380,7 +383,7 @@ def main(): action='store_true', default=False) # TODO change this to standard MaterializedMySQL columns https://github.com/Altinity/clickhouse-sink-connector/issues/78 parser.add_argument('--exclude_columns', help='columns exclude', - nargs='*', default=['_sign,_version']) + nargs='*', default=['_sign,_version,is_deleted']) parser.add_argument('--threads', type=int, help='number of parallel threads', default=1) diff --git a/sink-connector/python/db_compare/mysql_table_checksum.py b/sink-connector/python/db_compare/mysql_table_checksum.py index ec28f4648..94990c22f 100644 --- a/sink-connector/python/db_compare/mysql_table_checksum.py +++ b/sink-connector/python/db_compare/mysql_table_checksum.py @@ -59,7 +59,7 @@ def compute_checksum(table, statements, conn): conn.close() -def get_table_checksum_query(table, conn): +def get_table_checksum_query(table, conn, binary_encoding): (rowset, rowcount) = execute_mysql(conn, "select COLUMN_NAME as column_name, column_type as data_type, IS_NULLABLE as is_nullable from information_schema.columns where table_schema='" + args.mysql_database+"' and table_name = '"+table+"' order by ordinal_position") @@ -68,6 +68,8 @@ def get_table_checksum_query(table, conn): nullables = [] data_types = {} first_column = True + min_date_value = args.min_date_value + max_date_value = args.max_date_value for row in rowset: column_name = '`'+row['column_name']+'`' data_type = row['data_type'] @@ -78,29 +80,26 @@ def get_table_checksum_query(table, conn): if is_nullable == 'YES': nullables.append(column_name) - if 'datetime' == data_type or 'datetime(1)'== data_type or 'datetime(2)' == data_type or 'datetime(3)' == data_type: # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/ - select += f"case when {column_name} > substr('2283-11-11 23:59:59.999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999' AS datetime(3))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end" + select += f"case when {column_name} > substr('{max_date_value} 23:59:59.999', 1, length({column_name})) then substr(TRIM(TRAILING '0' FROM CAST('{max_date_value} 23:59:59.999' AS datetime(3))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end" elif 'datetime(4)' == data_type or 'datetime(5)' == data_type or 'datetime(6)' == data_type: - # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/ - select += f"case when {column_name} > substr('2283-11-11 23:59:59.999999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999999' AS datetime(6))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end" + # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/ii + select += f"case when {column_name} > substr('{max_date_value} 23:59:59.999999', 1, length({column_name})) then substr(TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{max_date_value} 23:59:59.999999' AS datetime(6)))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end" elif 'time' == data_type or 'time(1)' == data_type or 'time(2)' == data_type or 'time(3)' == data_type or 'time(4)' == data_type or 'time(5)' == data_type or 'time(6)' == data_type: - select += f"cast({column_name} as time(6))" + select += f"substr(cast({column_name} as time(6)),1,length({column_name}))" elif 'timestamp' == data_type or 'timestamp(1)' == data_type or 'timestamp(2)' == data_type or 'timestamp(3)' == data_type or 'timestamp(4)' == data_type or 'timestamp(5)' == data_type or 'timestamp(6)' == data_type: - select += f"TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char))))" - #elif 'datetime' in data_type: - # # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime/ - # select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1970-01-01' then CAST('1925-01-01 00:00:00' AS {data_type}) else {column_name} end end"*/ - # elif "float" in data_type: - # select += f"CAST({column_name} as DECIMAL(64,8))" + select += f"substr(TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char)))),1,length({column_name}))" else: - if 'date' == data_type: + if 'date' == data_type: # Date are converted to Date32 in CH # CH date range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/date - select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1925-01-01' then CAST('1925-01-01' AS {data_type}) else {column_name} end end" + select += f"case when {column_name} >='{max_date_value}' then CAST('{max_date_value}' AS {data_type}) else case when {column_name} <= '{min_date_value}' then CAST('{min_date_value}' AS {data_type}) else {column_name} end end" else: if is_binary_datatype(data_type): - select += "lower(hex(cast("+column_name+"as binary)))" + binary_encode = "lower(hex(cast("+column_name+"as binary)))" + if binary_encoding == 'base64': + binary_encode = "replace(to_base64(cast("+column_name+" as binary)),'\\n','')" + select += binary_encode else: select += column_name + "" first_column = False @@ -204,7 +203,7 @@ def calculate_sql_checksum(table): statements = [] (query, select_query, distributed_by, - external_table_types) = get_table_checksum_query(table, conn) + external_table_types) = get_table_checksum_query(table, conn, args.binary_encoding) statements = select_table_statements( table, query, select_query, distributed_by, external_table_types) compute_checksum(table, statements, conn) @@ -239,7 +238,7 @@ def record_factory(*args, **kwargs): def main(): - parser = argparse.ArgumentParser(description=''' + parser = argparse.ArgumentParser(description='''Compute a ClickHouse compatible checksum. ''') # Required parser.add_argument('--mysql_host', help='MySQL host', required=True) @@ -261,6 +260,12 @@ def main(): help='Output the raw format to a file called out.txt', required=False) parser.add_argument( '--debug_limit', help='Limit the debug output in lines', required=False) + parser.add_argument( + '--binary_encoding', help='either hex or base64 to encode MySQL binary content', default='hex', required=False) + parser.add_argument( + '--min_date_value', help='Minimum Date32/DateTime64 date', default='1900-01-01', required=False) + parser.add_argument( + '--max_date_value', help='Maximum Date32/Datetime64 date', default='2299-12-31', required=False) parser.add_argument('--debug', dest='debug', action='store_true', default=False) parser.add_argument('--exclude_columns', help='columns exclude', diff --git a/sink-connector/python/db_load/clickhouse_loader.py b/sink-connector/python/db_load/clickhouse_loader.py index 76c963a61..159828d15 100644 --- a/sink-connector/python/db_load/clickhouse_loader.py +++ b/sink-connector/python/db_load/clickhouse_loader.py @@ -487,10 +487,18 @@ def load_data_mysqlshell(args, timezone, schema_map, dry_run=False): if structure != "": structure += ", " structure +=" "+column_name + " " - if column['nullable'] == True: - structure +=" Nullable(String)" + datatype = column['datatype'] + mysql_datetype = column['mysql_datatype'] + if 'timestamp' in mysql_datetype.lower(): + if column['nullable'] == True: + structure +=f" Nullable({datatype})" + else: + structure +=f" {datatype}" else: - structure +=" String" + if column['nullable'] == True: + structure +=" Nullable(String)" + else: + structure +=" String" cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} --password '{password}' -mn """ futures.append(executor.submit(execute_load, cmd)) diff --git a/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py b/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py index 1609d6b29..bc26bfa67 100644 --- a/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py +++ b/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py @@ -57,6 +57,7 @@ def translateColumnDefinition(self, column_name, columnDefinition): # data type modifier (NULL / NOT NULL / PRIMARY KEY) notNull = False notSymbol = True + nullable = True for child in columnDefinition.getChildren(): if child.getRuleIndex() == MySqlParser.RULE_columnConstraint: @@ -65,19 +66,28 @@ def translateColumnDefinition(self, column_name, columnDefinition): if nullNotNull: text = self.extract_original_text(child) column_buffer += " " + text + if "NULL" == text: + nullable = True + notNull = True + continue + if nullNotNull.NOT(): notSymbol = True - if nullNotNull.NULL_LITERAL() and notSymbol: + if (nullNotNull.NULL_LITERAL() or nullNotNull.NULL_SPEC_LITERAL()) and notSymbol: notNull = True - + nullable = False + else: + notNull = False + nullable = True if isinstance(child, MySqlParser.PrimaryKeyColumnConstraintContext) and child.PRIMARY(): self.primary_key = column_name # column without nullable info are default nullable in MySQL, while they are not null in ClickHouse if not notNull: column_buffer += " NULL" + nullable = True - return (column_buffer, dataType, not notNull) + return (column_buffer, dataType, nullable) def exitColumnDeclaration(self, ctx): @@ -90,6 +100,8 @@ def exitColumnDeclaration(self, ctx): # columns have an identifier and a column definition columnDefinition = ctx.columnDefinition() + dataType = columnDefinition.dataType() + originalDataTypeText = self.extract_original_text(dataType) (columnDefinition_buffer, dataType, nullable) = self.translateColumnDefinition(column_name, columnDefinition) @@ -97,7 +109,7 @@ def exitColumnDeclaration(self, ctx): self.columns.append(column_buffer) dataTypeText = self.convertDataType(dataType) - columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable} + columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable, 'mysql_datatype':originalDataTypeText} logging.info(str(columnMap)) self.columns_map.append(columnMap)