diff --git a/sink-connector/python/db_compare/clickhouse_table_checksum.py b/sink-connector/python/db_compare/clickhouse_table_checksum.py index 621cb729a..aea6d92c1 100644 --- a/sink-connector/python/db_compare/clickhouse_table_checksum.py +++ b/sink-connector/python/db_compare/clickhouse_table_checksum.py @@ -1,5 +1,5 @@ """ -# -- ============================================================================ +#r/ -- ============================================================================ # -- FileName : clickhouse_table_checksum # -- Date : # -- Summary : calculate a checksum for a clickhouse table @@ -144,8 +144,10 @@ def get_table_checksum_query(conn, table): # requires this function : CREATE OR REPLACE FUNCTION format_decimal AS (x, scale) -> if(locate(toString(x),'.')>0,concat(toString(x),repeat('0',toUInt8(scale-(length(toString(x))-locate(toString(x),'.'))))),concat(toString(x),'.',repeat('0',toUInt8(scale)))) select += "format_decimal("+column_name + \ ","+str(numeric_scale)+")" - elif "DateTime64(0)" == data_type: + elif "DateTime64(0" in data_type: select += f"toString({column_name})" + elif "DateTime64(6" in data_type: + select += f"if(toString({column_name}) > '{args.max_datetime_value}', '{args.max_datetime_value}', toString({column_name}))" elif "DateTime" in data_type: select += f"trim(TRAILING '.' from (trim(TRAILING '0' FROM toString({column_name}))))" else: @@ -336,6 +338,8 @@ def main(): nargs='*', default=['_sign,_version,is_deleted,_is_deleted']) parser.add_argument('--threads', type=int, help='number of parallel threads', default=1) + parser.add_argument( + '--max_datetime_value', help='Maximum Datetime64 datetime', default='2299-12-31 23:59:59.000000', required=False) global args args = parser.parse_args() diff --git a/sink-connector/python/db_dump/mysql_dumper.py b/sink-connector/python/db_dump/mysql_dumper.py index 012f1816a..39c3aab17 100644 --- a/sink-connector/python/db_dump/mysql_dumper.py +++ b/sink-connector/python/db_dump/mysql_dumper.py @@ -17,6 +17,7 @@ from subprocess import Popen, PIPE import subprocess import time +import tempfile runTime = datetime.datetime.now().strftime("%Y.%m.%d-%H.%M.%S") @@ -87,9 +88,11 @@ def generate_mysqlsh_dump_tables_clause(dump_dir, threads): table_array_clause = tables_to_dump dump_options = {"dryRun":int(dry_run), "ddlOnly":int(schema_only), "dataOnly":int(data_only), "threads":threads} - if partition_map: + if partition_map and not schema_only: dump_options['partitions'] = partition_map + logging.info(f"{dump_options}") dump_clause=f""" util.dumpTables('{database}',{table_array_clause}, '{dump_dir}', {dump_options} ); """ + logging.info(dump_clause) return dump_clause @@ -106,7 +109,7 @@ def generate_mysqlsh_command(dump_dir, schema_only, where, partition_map, - threads): + threads, temp_file): mysql_user_clause = "" if mysql_user is not None: mysql_user_clause = f" --user {mysql_user}" @@ -129,7 +132,9 @@ def generate_mysqlsh_command(dump_dir, where, partition_map, threads) - cmd = f"""mysqlsh {defaults_file_clause} -h {mysql_host} {mysql_user_clause} {mysql_password_clause} {mysql_port_clause} -e "{dump_clause}" """ + temp_file.write(dump_clause) + temp_file.flush() + cmd = f"""mysqlsh {defaults_file_clause} -h {mysql_host} {mysql_user_clause} {mysql_password_clause} {mysql_port_clause} -f {temp_file.name} """ return cmd @@ -148,7 +153,7 @@ def main(): parser.add_argument('--mysql_port', help='MySQL port', default=3306, required=False) parser.add_argument('--dump_dir', help='Location of dump files', required=True) - parser.add_argument('--include_tables_regex', help='table regexp', required=False, default=None) + parser.add_argument('--include_tables_regex', help='table regexp', required=False, default='.') parser.add_argument('--where', help='where clause', required=False) parser.add_argument('--exclude_tables_regex', help='exclude table regexp', required=False) @@ -163,6 +168,8 @@ def main(): action='store_true', default=False) parser.add_argument('--non_partitioned_tables_only', dest='non_partitioned_tables_only', action='store_true', default=False) + parser.add_argument('--partitioned_tables_only', dest='partitioned_tables_only', + action='store_true', default=False) parser.add_argument('--dry_run', dest='dry_run', action='store_true', default=False) @@ -214,9 +221,10 @@ def main(): tables_to_dump = [] - for table in tables.fetchall(): - logging.debug(table['table_name']) - tables_to_dump.append(table['table_name']) + if not args.partitioned_tables_only: + for table in tables.fetchall(): + logging.debug(table['table_name']) + tables_to_dump.append(table['table_name']) partition_map = {} for partition in partitions.fetchall(): @@ -225,11 +233,18 @@ def main(): partition_name = partition['partition_name'] key = schema+"."+table if key not in partition_map: - partition_map[key]=[partition_name] + partition_map[key]=[partition_name] if partition_name is not None else [] else: partition_map[key].append(partition_name) + if args.partitioned_tables_only: + if table not in tables_to_dump: + tables_to_dump.append(table) logging.debug(partition_map) - cmd = generate_mysqlsh_command(args.dump_dir, + # the generated json can be bigger than the shell allows, so using the -f option with + # a temporary file + tmp = tempfile.NamedTemporaryFile() + with open(tmp.name, 'w') as temp_file: + cmd = generate_mysqlsh_command(args.dump_dir, args.dry_run, args.mysql_host, args.mysql_user, @@ -242,10 +257,11 @@ def main(): args.schema_only, args.where, partition_map, - args.threads + args.threads, + temp_file ) - rc = run_command(cmd) - assert rc == "0", "mysqldumper failed, check the log." + rc = run_command(cmd) + assert rc == "0", "mysqldumper failed, check the log." except (KeyboardInterrupt, SystemExit): logging.info("Received interrupt") diff --git a/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py b/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py index b8efdb4ac..8c19bc84f 100644 --- a/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py +++ b/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py @@ -93,7 +93,9 @@ def translateColumnDefinition(self, column_name, columnDefinition): if isinstance(child, MySqlParser.GeneratedColumnConstraintContext): expression = child.expression() text = self.extract_original_text(expression) - generatedExpression = " ALIAS " + text + # generated columns are mapped to MATERIALIZED see https://github.com/Altinity/clickhouse-sink-connector/issues/459 + # collations may be present before strings like _latin1 or _utf8mb4 + generatedExpression = " MATERIALIZED " + re.sub(r"\b_.*?'","'", text) generated = True # column without nullable info are default nullable in MySQL, while they are not null in ClickHouse if not notNull: