diff --git a/sink-connector/python/db/mysql.py b/sink-connector/python/db/mysql.py index 4a9ee6932..4c1462439 100644 --- a/sink-connector/python/db/mysql.py +++ b/sink-connector/python/db/mysql.py @@ -24,6 +24,46 @@ def get_mysql_connection(mysql_host, mysql_user, mysql_passwd, mysql_port, mysql return conn +def get_tables_from_regex_sql(conn, no_wc, mysql_database, include_tables_regex, exclude_tables_regex=None, non_partitioned_tables_only=False): + schema = mysql_database + exclude_regex_clause = "" + if exclude_tables_regex is not None: + exclude_regex_clause = f"and table_name not rlike '{exclude_tables_regex}'" + non_partitioned_tables_clause = "" + if non_partitioned_tables_only: + non_partitioned_tables_clause = f" and (table_schema, table_name) in (select table_schema, table_name from information_schema.partitions where table_schema = '{schema}' group by table_schema, table_name having count(*) = 1 )" + + strCommand = f"select TABLE_SCHEMA as table_schema, TABLE_NAME as table_name from information_schema.tables where table_type='BASE TABLE' and table_schema = '{schema}' and table_name rlike '{include_tables_regex}' {exclude_regex_clause} {non_partitioned_tables_clause} order by 1" + return strCommand + + +def get_tables_from_regex(conn, no_wc, mysql_database, include_tables_regex, exclude_tables_regex=None, non_partitioned_tables_only=False): + if no_wc: + return [[tables_regex]] + + strCommand = get_tables_from_regex_sql(conn, no_wc, mysql_database, include_tables_regex, exclude_tables_regex=exclude_tables_regex, non_partitioned_tables_only=non_partitioned_tables_only) + + (rowset, rowcount) = execute_mysql(conn, strCommand) + x = rowset + + return x + + +def get_partitions_from_regex(conn, mysql_database, include_tables_regex, exclude_tables_regex=None, include_partitions_regex=None, non_partitioned_tables_only=False): + + table_sql = get_tables_from_regex_sql(conn, False, mysql_database, include_tables_regex, exclude_tables_regex=exclude_tables_regex, non_partitioned_tables_only=non_partitioned_tables_only) + + include_regex_clause = "" + if include_partitions_regex is not None: + include_regex_clause = f"and partition_name rlike '{include_partitions_regex}'" + + strCommand = f"select TABLE_SCHEMA as table_schema, TABLE_NAME as table_name, PARTITION_NAME as partition_name from information_schema.partitions where table_schema = '{mysql_database}' {include_regex_clause} and (table_schema, table_name) IN ({table_sql}) order by 1,2,3" + (rowset, rowcount) = execute_mysql(conn, strCommand) + x = rowset + + return x + + def execute_mysql(conn, strSql): """ # -- ======================================================================= @@ -54,5 +94,5 @@ def resolve_credentials_from_config(config_file): assert 'client' in config, f"Expected a [client] section in f{config_file}" mysql_user = config['client']['user'] mysql_password = config['client']['password'] - logging.debug(f"mysql_user {mysql_user} mysql_password {mysql_password}") + logging.debug(f"mysql_user {mysql_user} mysql_password ****") return (mysql_user, mysql_password) \ No newline at end of file diff --git a/sink-connector/python/db_compare/clickhouse_table_checksum.py b/sink-connector/python/db_compare/clickhouse_table_checksum.py index b5a6b0b82..621cb729a 100644 --- a/sink-connector/python/db_compare/clickhouse_table_checksum.py +++ b/sink-connector/python/db_compare/clickhouse_table_checksum.py @@ -231,7 +231,7 @@ def select_table_statements(table, query, select_query, order_by, external_colum return statements -def get_tables_from_regex(conn, strDSN): +def get_tables_from_regex(conn): if args.no_wc: return [[args.tables_regex]] @@ -333,7 +333,7 @@ def main(): action='store_true', default=False) # TODO change this to standard MaterializedMySQL columns https://github.com/Altinity/clickhouse-sink-connector/issues/78 parser.add_argument('--exclude_columns', help='columns exclude', - nargs='*', default=['_sign,_version,is_deleted']) + nargs='*', default=['_sign,_version,is_deleted,_is_deleted']) parser.add_argument('--threads', type=int, help='number of parallel threads', default=1) @@ -367,7 +367,7 @@ def main(): (clickhouse_user, clickhouse_password) = resolve_credentials_from_config(config_file) try: conn = get_connection(clickhouse_user, clickhouse_password) - tables = get_tables_from_regex(conn, args.tables_regex) + tables = get_tables_from_regex(conn) # CH does not print decimal with trailing zero, we need a custom function execute_sql(conn, create_function_format_decimal) diff --git a/sink-connector/python/db_compare/mysql_table_checksum.py b/sink-connector/python/db_compare/mysql_table_checksum.py index 2bee3e50a..71ad9f5fa 100644 --- a/sink-connector/python/db_compare/mysql_table_checksum.py +++ b/sink-connector/python/db_compare/mysql_table_checksum.py @@ -178,17 +178,8 @@ def select_table_statements(table, query, select_query, order_by, external_colum return statements -def get_tables_from_regex(conn, strDSN): - if args.no_wc: - return [[args.tables_regex]] - schema = args.mysql_database - strCommand = "select TABLE_NAME as table_name from information_schema.tables where table_type='BASE TABLE' and table_schema = '{d}' and table_name rlike '{t}' order by 1".format( - d=schema, t=args.tables_regex) - (rowset, rowcount) = execute_mysql(conn, strCommand) - x = rowset - conn.close() - - return x +def get_tables_from_regexp(conn, tables_regexp): + return get_tables_from_regex(conn, args.no_wc, args.mysql_database, tables_regexp) def calculate_sql_checksum(conn, table): @@ -309,7 +300,7 @@ def main(): try: conn = get_mysql_connection(args.mysql_host, mysql_user, mysql_password, args.mysql_port, args.mysql_database) - tables = get_tables_from_regex(conn, args.tables_regex) + tables = get_tables_from_regexp(conn, args.tables_regex) with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: futures = [] for table in tables.fetchall(): diff --git a/sink-connector/python/db_dump/__init__.py b/sink-connector/python/db_dump/__init__.py new file mode 100644 index 000000000..9c3fbc75b --- /dev/null +++ b/sink-connector/python/db_dump/__init__.py @@ -0,0 +1,3 @@ +"""db_dump""" + +__version__ = "0.1" diff --git a/sink-connector/python/db_dump/mysql_dumper.py b/sink-connector/python/db_dump/mysql_dumper.py new file mode 100644 index 000000000..012f1816a --- /dev/null +++ b/sink-connector/python/db_dump/mysql_dumper.py @@ -0,0 +1,264 @@ +# -- ============================================================================ +""" +# -- ============================================================================ +# -- FileName : mysql_dumper.py +# -- Date : +# -- Summary : dumps a MySQL database using mysqlsh +# -- Credits : https://dev.mysql.com/doc/mysql-shell/8.0/en/mysql-shell-utilities-dump-instance-schema.html +# -- +""" +import logging +import argparse +import traceback +import sys +import datetime +import os +from db.mysql import * +from subprocess import Popen, PIPE +import subprocess +import time + +runTime = datetime.datetime.now().strftime("%Y.%m.%d-%H.%M.%S") + + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.dirname(SCRIPT_DIR)) + + +def check_program_exists(name): + p = Popen(['/usr/bin/which', name], stdout=PIPE, stderr=PIPE) + p.communicate() + return p.returncode == 0 + +# hack to add the user to the logger, which needs it apparently +old_factory = logging.getLogRecordFactory() + +def record_factory(*args, **kwargs): + record = old_factory(*args, **kwargs) + record.user = "me" + return record + + +logging.setLogRecordFactory(record_factory) + +def run_command(cmd): + """ + # -- ====================================================================== + # -- run the command that is passed as cmd and return True or False + # -- ====================================================================== + """ + logging.debug("cmd " + cmd) + process = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True) + for line in process.stdout: + logging.info(line.decode().strip()) + time.sleep(0.02) + rc = str(process.poll()) + logging.debug("return code = " + str(rc)) + return rc + + +def run_quick_command(cmd): + logging.debug("cmd " + cmd) + process = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True) + stdout, stderr = process.communicate() + rc = str(process.poll()) + if stdout: + logging.info(str(stdout).strip()) + logging.debug("return code = " + rc) + if rc != "0": + logging.error("command failed : terminating") + return rc, stdout + + +def generate_mysqlsh_dump_tables_clause(dump_dir, + dry_run, + database, + tables_to_dump, + data_only, + schema_only, + where, + partition_map, + threads): + table_array_clause = tables_to_dump + dump_options = {"dryRun":int(dry_run), "ddlOnly":int(schema_only), "dataOnly":int(data_only), "threads":threads} + if partition_map: + dump_options['partitions'] = partition_map + dump_clause=f""" util.dumpTables('{database}',{table_array_clause}, '{dump_dir}', {dump_options} ); """ + return dump_clause + + +def generate_mysqlsh_command(dump_dir, + dry_run, + mysql_host, + mysql_user, + mysql_password, + mysql_port, + defaults_file, + database, + tables_to_dump, + data_only, + schema_only, + where, + partition_map, + threads): + mysql_user_clause = "" + if mysql_user is not None: + mysql_user_clause = f" --user {mysql_user}" + mysql_password_clause = "" + if mysql_password is not None: + mysql_password_clause = f""" --password "{mysql_password}" """ + mysql_port_clause = "" + if mysql_port is not None: + mysql_port_clause = f" --port {mysql_port}" + defaults_file_clause = "" + if defaults_file is not None: + defaults_file_clause = f" --defaults-file={defaults_file}" + + dump_clause = generate_mysqlsh_dump_tables_clause(dump_dir, + dry_run, + database, + tables_to_dump, + data_only, + schema_only, + where, + partition_map, + threads) + cmd = f"""mysqlsh {defaults_file_clause} -h {mysql_host} {mysql_user_clause} {mysql_password_clause} {mysql_port_clause} -e "{dump_clause}" """ + return cmd + + +def main(): + + parser = argparse.ArgumentParser(description='''Wrapper for mysqlsh dump''') + # Required + parser.add_argument('--mysql_host', help='MySQL host', required=True) + parser.add_argument('--mysql_user', help='MySQL user', required=False) + parser.add_argument('--mysql_password', + help='MySQL password, discouraged, please use a config file', required=False) + parser.add_argument('--defaults_file', + help='MySQL config file default is ~/.my.cnf', required=False, default='~/.my.cnf') + parser.add_argument('--mysql_database', + help='MySQL database', required=True) + parser.add_argument('--mysql_port', help='MySQL port', + default=3306, required=False) + parser.add_argument('--dump_dir', help='Location of dump files', required=True) + parser.add_argument('--include_tables_regex', help='table regexp', required=False, default=None) + parser.add_argument('--where', help='where clause', required=False) + parser.add_argument('--exclude_tables_regex', + help='exclude table regexp', required=False) + parser.add_argument('--include_partitions_regex', help='partitions regex', required=False, default=None) + parser.add_argument('--threads', type=int, + help='number of parallel threads', default=1) + parser.add_argument('--debug', dest='debug', + action='store_true', default=False) + parser.add_argument('--schema_only', dest='schema_only', + action='store_true', default=False) + parser.add_argument('--data_only', dest='data_only', + action='store_true', default=False) + parser.add_argument('--non_partitioned_tables_only', dest='non_partitioned_tables_only', + action='store_true', default=False) + parser.add_argument('--dry_run', dest='dry_run', + action='store_true', default=False) + + global args + args = parser.parse_args() + + root = logging.getLogger() + root.setLevel(logging.INFO) + + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.INFO) + + formatter = logging.Formatter( + '%(asctime)s - %(levelname)s - %(threadName)s - %(message)s') + handler.setFormatter(formatter) + root.addHandler(handler) + + if args.debug: + root.setLevel(logging.DEBUG) + handler.setLevel(logging.DEBUG) + + mysql_user = args.mysql_user + mysql_password = args.mysql_password + + assert check_program_exists("mysqlsh"), "mysqlsh should in the PATH" + + # check parameters + if args.mysql_password: + logging.warning("Using password on the command line is not secure, please specify a config file ") + assert args.mysql_user is not None, "--mysql_user must be specified" + else: + config_file = args.defaults_file + (mysql_user, mysql_password) = resolve_credentials_from_config(config_file) + + try: + conn = get_mysql_connection(args.mysql_host, mysql_user, + mysql_password, args.mysql_port, args.mysql_database) + tables = get_tables_from_regex(conn, False, + args.mysql_database, + args.include_tables_regex, + exclude_tables_regex=args.exclude_tables_regex, + non_partitioned_tables_only=args.non_partitioned_tables_only) + partitions = get_partitions_from_regex(conn, + args.mysql_database, + args.include_tables_regex, + exclude_tables_regex=args.exclude_tables_regex, + include_partitions_regex=args.include_partitions_regex, + non_partitioned_tables_only=args.non_partitioned_tables_only) + + + tables_to_dump = [] + for table in tables.fetchall(): + logging.debug(table['table_name']) + tables_to_dump.append(table['table_name']) + + partition_map = {} + for partition in partitions.fetchall(): + schema = partition['table_schema'] + table = partition['table_name'] + partition_name = partition['partition_name'] + key = schema+"."+table + if key not in partition_map: + partition_map[key]=[partition_name] + else: + partition_map[key].append(partition_name) + logging.debug(partition_map) + cmd = generate_mysqlsh_command(args.dump_dir, + args.dry_run, + args.mysql_host, + args.mysql_user, + args.mysql_password, + args.mysql_port, + args.defaults_file, + args.mysql_database, + tables_to_dump, + args.data_only, + args.schema_only, + args.where, + partition_map, + args.threads + ) + rc = run_command(cmd) + assert rc == "0", "mysqldumper failed, check the log." + + except (KeyboardInterrupt, SystemExit): + logging.info("Received interrupt") + os._exit(1) + except Exception as e: + logging.error("Exception in main thread : " + str(e)) + logging.error(traceback.format_exc()) + sys.exit(1) + logging.debug("Exiting Main Thread") + sys.exit(0) + + +if __name__ == '__main__': + main() + + diff --git a/sink-connector/python/db_load/clickhouse_loader.py b/sink-connector/python/db_load/clickhouse_loader.py index c12aa9806..87aaea541 100644 --- a/sink-connector/python/db_load/clickhouse_loader.py +++ b/sink-connector/python/db_load/clickhouse_loader.py @@ -383,7 +383,7 @@ def get_column_list(schema_map, schema, table, virtual_columns, transform=False, column_list = "" first = True for column in columns: - if column['column_name'] not in virtual_columns: + if (column['column_name'] not in virtual_columns or (column['column_name'] == '`is_deleted`' and column['has_is_deleted_column'] )) and not column['generated']: datatype = column['datatype'] column_name = column['column_name'].replace('`', '\\`') @@ -478,7 +478,7 @@ def load_data_mysqlshell(args, timezone, schema_map, clickhouse_user=None, click structure = "" for column in column_metadata_list: logging.info(str(column)) - if column['column_name'] in args.virtual_columns: + if (column['column_name'] in args.virtual_columns and not (column['column_name'] == '`is_deleted`' and column['has_is_deleted_column'] )) or column['generated']: continue column_name = column['column_name'].replace('`', '\\`') if structure != "": @@ -497,7 +497,7 @@ def load_data_mysqlshell(args, timezone, schema_map, clickhouse_user=None, click else: structure += " String" - cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client {config_file_option} --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} {password_option} -mn """ + cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client {config_file_option} --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{clickhouse_user} {password_option} -mn """ futures.append(executor.submit(execute_load, cmd)) for future in concurrent.futures.as_completed(futures): @@ -557,7 +557,7 @@ def main(): parser.add_argument('--dry_run', dest='dry_run', action='store_true', default=False) parser.add_argument('--virtual_columns', help='virtual_columns', - nargs='+', default=['`_sign`', '`_version`', '`is_deleted`']) + nargs='+', default=['`_sign`', '`_version`', '`is_deleted`','`_is_deleted`']) parser.add_argument('--mysqlshell', help='using a util.dumpSchemas', dest='mysqlshell', action='store_true', default=False) parser.add_argument('--rmt_delete_support', help='Use RMT deletes', dest='rmt_delete_support', diff --git a/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py b/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py index 3dee7093a..b8efdb4ac 100644 --- a/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py +++ b/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py @@ -17,6 +17,7 @@ def __init__(self, rmt_delete_support, partition_options, datetime_timezone=None self.rmt_delete_support = rmt_delete_support self.partition_options = partition_options self.datatime_timezone = datetime_timezone + self.has_is_deleted_column = False def extract_original_text(self, ctx): token_source = ctx.start.getTokenSource() @@ -63,6 +64,8 @@ def translateColumnDefinition(self, column_name, columnDefinition): notNull = False notSymbol = True nullable = True + generated = False + generatedExpression = None for child in columnDefinition.getChildren(): if child.getRuleIndex() == MySqlParser.RULE_columnConstraint: @@ -87,12 +90,18 @@ def translateColumnDefinition(self, column_name, columnDefinition): if isinstance(child, MySqlParser.PrimaryKeyColumnConstraintContext) and child.PRIMARY(): self.primary_key = column_name + if isinstance(child, MySqlParser.GeneratedColumnConstraintContext): + expression = child.expression() + text = self.extract_original_text(expression) + generatedExpression = " ALIAS " + text + generated = True # column without nullable info are default nullable in MySQL, while they are not null in ClickHouse if not notNull: column_buffer += " NULL" nullable = True - - return (column_buffer, dataType, nullable) + if generatedExpression: + column_buffer += generatedExpression + return (column_buffer, dataType, nullable, generated) def exitColumnDeclaration(self, ctx): column_text = self.extract_original_text(ctx) @@ -107,15 +116,18 @@ def exitColumnDeclaration(self, ctx): dataType = columnDefinition.dataType() originalDataTypeText = self.extract_original_text(dataType) - (columnDefinition_buffer, dataType, nullable) = self.translateColumnDefinition( + (columnDefinition_buffer, dataType, nullable, generated) = self.translateColumnDefinition( column_name, columnDefinition) column_buffer += columnDefinition_buffer self.columns.append(column_buffer) dataTypeText = self.convertDataType(dataType) + if column_name in ['is_deleted','`is_deleted`']: + self.has_is_deleted_column = True columnMap = {'column_name': column_name, 'datatype': dataTypeText, - 'nullable': nullable, 'mysql_datatype': originalDataTypeText} + 'nullable': nullable, 'mysql_datatype': originalDataTypeText, 'generated': generated, + 'has_is_deleted_column':self.has_is_deleted_column} logging.info(str(columnMap)) self.columns_map.append(columnMap) @@ -143,9 +155,12 @@ def exitColumnCreateTable(self, ctx): tableName = self.extract_original_text(ctx.tableName()) self.buffer = f"CREATE TABLE {tableName} (" self.columns.append("`_version` UInt64 DEFAULT 0") + is_deleted_column = 'is_deleted' + if self.has_is_deleted_column: + is_deleted_column = '_is_deleted' # is_deleted and _sign are redundant, so exclusive in the schema - if self.rmt_delete_support: - self.columns.append("`is_deleted` UInt8 DEFAULT 0") + if self.rmt_delete_support: + self.columns.append(f"`{is_deleted_column}` UInt8 DEFAULT 0") else: self.columns.append("`_sign` Int8 DEFAULT 1") @@ -160,7 +175,7 @@ def exitColumnCreateTable(self, ctx): partition_by = f"partition by {self.partition_keys}" rmt_params = "_version" if self.rmt_delete_support: - rmt_params += ',is_deleted' + rmt_params += f",{is_deleted_column}" self.buffer += f") engine=ReplacingMergeTree({rmt_params}) {partition_by} order by " + \ self.primary_key