From 39d1a7748e4edd781cc7b600f662cd9d5c3f1d92 Mon Sep 17 00:00:00 2001 From: Arnaud Adant <aadant@fpif-vcsl1.w2k.jumptrading.com> Date: Mon, 11 Dec 2023 01:30:53 -0600 Subject: [PATCH 1/2] Allow for password less command lines --- sink-connector/python/db/clickhouse.py | 65 ++++ .../python/{db_compare => db}/mysql.py | 32 +- .../db_compare/clickhouse_table_checksum.py | 109 ++---- .../python/db_compare/mysql_table_checksum.py | 75 +++-- .../python/db_load/clickhouse_loader.py | 177 +++++----- .../CreateTableMySQLParserListener.py | 315 +++++++++--------- sink-connector/python/requirements.txt | 1 + 7 files changed, 426 insertions(+), 348 deletions(-) create mode 100644 sink-connector/python/db/clickhouse.py rename sink-connector/python/{db_compare => db}/mysql.py (50%) diff --git a/sink-connector/python/db/clickhouse.py b/sink-connector/python/db/clickhouse.py new file mode 100644 index 000000000..14bd95522 --- /dev/null +++ b/sink-connector/python/db/clickhouse.py @@ -0,0 +1,65 @@ +import logging +import warnings +from clickhouse_driver import connect +import xml.etree.ElementTree as ET +import yaml +import os + + +def clickhouse_connection(host, database='default', user='default', password='', port=9000, + secure=False): + conn = connect(host=host, + user=user, + password=password, + port=port, + database=database, + connect_timeout=20, + secure=secure + ) + return conn + + +def clickhouse_execute_conn(conn, sql): + logging.debug(sql) + cursor = conn.cursor() + cursor.execute(sql) + result = cursor.fetchall() + return result + + +def execute_sql(conn, strSql): + """ + # -- ======================================================================= + # -- Connect to the SQL server and execute the command + # -- ======================================================================= + """ + logging.debug("SQL="+strSql) + rowset = None + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + rowset = clickhouse_execute_conn(conn, strSql) + rowcount = len(rowset) + if len(w) > 0: + logging.warning("SQL warnings : "+str(len(w))) + logging.warning("first warning : "+str(w[0].message)) + + return (rowset, rowcount) + + +def resolve_credentials_from_config(config_file): + assert config_file is not None, "A config file --clickhouse_config_file must be passed if --password is not specified" + assert os.path.isfile(config_file), f"Path {config_file} must exist" + assert config_file.endswith(".xml") or config_file.endswith(".yml") or config_file.endswith(".yaml"), f"Supported configuration extensions .xml or .yaml or .yml" + + if config_file.endswith(".xml"): + tree = ET.parse(config_file) + root = tree.getroot() + clickhouse_user = root.findtext('user') + clickhouse_password = root.findtext('password') + elif config_file.endswith(".yml") or config_file.endswith(".yaml"): + with open(config_file, 'r') as f: + valuesYaml = yaml.load(f, Loader=yaml.FullLoader) + clickhouse_user = valuesYaml['config']['user'] + clickhouse_password = valuesYaml['config']['password'] + logging.debug(f"clickhouse_user {clickhouse_user} clickhouse_password {clickhouse_password}") + return (clickhouse_user, clickhouse_password) \ No newline at end of file diff --git a/sink-connector/python/db_compare/mysql.py b/sink-connector/python/db/mysql.py similarity index 50% rename from sink-connector/python/db_compare/mysql.py rename to sink-connector/python/db/mysql.py index 0347abc20..4a9ee6932 100644 --- a/sink-connector/python/db_compare/mysql.py +++ b/sink-connector/python/db/mysql.py @@ -1,14 +1,20 @@ from sqlalchemy import create_engine import logging import warnings +import os +import configparser +config = configparser.ConfigParser() + +binary_datatypes = ('blob', 'varbinary', 'point', 'geometry', 'bit', 'binary', 'linestring', + 'geomcollection', 'multilinestring', 'multipolygon', 'multipoint', 'polygon') -binary_datatypes = ('blob', 'varbinary', 'point', 'geometry', 'bit', 'binary', 'linestring', 'geomcollection', 'multilinestring', 'multipolygon', 'multipoint', 'polygon') def is_binary_datatype(datatype): - if "blob" in datatype or "binary" in datatype or "varbinary" in datatype or "bit" in datatype: - return True - else: - return datatype.lower() in binary_datatypes + if "blob" in datatype or "binary" in datatype or "varbinary" in datatype or "bit" in datatype: + return True + else: + return datatype.lower() in binary_datatypes + def get_mysql_connection(mysql_host, mysql_user, mysql_passwd, mysql_port, mysql_database): url = 'mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}?charset=utf8mb4'.format( @@ -17,6 +23,7 @@ def get_mysql_connection(mysql_host, mysql_user, mysql_passwd, mysql_port, mysql conn = engine.connect() return conn + def execute_mysql(conn, strSql): """ # -- ======================================================================= @@ -34,3 +41,18 @@ def execute_mysql(conn, strSql): logging.warning("first warning : "+str(w[0].message)) return (rowset, rowcount) + + +def resolve_credentials_from_config(config_file): + assert config_file is not None, "A config file --default_file must be passed if --password is not specified" + config_file = os.path.expanduser(config_file) + assert os.path.isfile(config_file), f"Path {config_file} must exist" + assert config_file.endswith(".cnf"), f"Supported configuration extensions .cnf" + # ini file read + config = configparser.ConfigParser() + config.read(config_file) + assert 'client' in config, f"Expected a [client] section in f{config_file}" + mysql_user = config['client']['user'] + mysql_password = config['client']['password'] + logging.debug(f"mysql_user {mysql_user} mysql_password {mysql_password}") + return (mysql_user, mysql_password) \ No newline at end of file diff --git a/sink-connector/python/db_compare/clickhouse_table_checksum.py b/sink-connector/python/db_compare/clickhouse_table_checksum.py index 86af84e39..f640a9b18 100644 --- a/sink-connector/python/db_compare/clickhouse_table_checksum.py +++ b/sink-connector/python/db_compare/clickhouse_table_checksum.py @@ -16,77 +16,24 @@ import re import os import hashlib -from clickhouse_driver import connect import concurrent.futures - +from db.clickhouse import * runTime = datetime.datetime.now().strftime("%Y.%m.%d-%H.%M.%S") -def clickhouse_connection(host, database='default', user='default', port=9000, password='', - secure=False): - conn = connect(host=host, - user=user, - password=password, - port=port, - database=database, - connect_timeout=20, - secure=secure - ) - return conn - - -def clickhouse_execute_conn(conn, sql): - logging.debug(sql) - cursor = conn.cursor() - cursor.execute(sql) - result = cursor.fetchall() - return result - - -def get_connection(): +def get_connection(clickhouse_user, clickhouse_password): conn = clickhouse_connection(args.clickhouse_host, database=args.clickhouse_database, - user=args.clickhouse_user, password=args.clickhouse_password, + user=clickhouse_user, password=clickhouse_password, port=args.clickhouse_port, secure=args.secure) return conn -def execute_sql(conn, strSql): - """ - # -- ======================================================================= - # -- Connect to the SQL server and execute the command - # -- ======================================================================= - """ - logging.debug("SQL="+strSql) - rowset = None - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always') - rowset = clickhouse_execute_conn(conn, strSql) - rowcount = len(rowset) - if len(w) > 0: - logging.warning("SQL warnings : "+str(len(w))) - logging.warning("first warning : "+str(w[0].message)) +def compute_checksum(table, clickhouse_user, clickhouse_password, statements): - return (rowset, rowcount) - - -def execute_statement(strSql): - """ - # -- ======================================================================= - # -- Connect to the SQL server and execute the command - # -- ======================================================================= - """ - conn = get_connection() - (rowset, rowcount) = execute_sql(conn, strSql) - conn.close() - return (rowset, rowcount) - - -def compute_checksum(table, statements): - - conn = get_connection() + conn = get_connection(clickhouse_user, clickhouse_password) debug_out = None if args.debug_output: @@ -132,7 +79,7 @@ def compute_checksum(table, statements): conn.close() -def get_primary_key_columns(table_schema, table_name): +def get_primary_key_columns(conn, table_schema, table_name): sql = """ SELECT name @@ -140,7 +87,7 @@ def get_primary_key_columns(table_schema, table_name): WHERE (database = '{table_schema}') AND (table = '{table_name}') AND (is_in_primary_key = 1) ORDER BY position ASC """.format(table_schema=table_schema, table_name=table_name) - (rowset, count) = execute_statement(sql) + (rowset, count) = execute_sql(conn, sql) res = [] for row in rowset: if row[0] is not None: @@ -148,7 +95,7 @@ def get_primary_key_columns(table_schema, table_name): return res -def get_table_checksum_query(table): +def get_table_checksum_query(conn, table): #logging.info(f"Excluded columns before join, {args.exclude_columns}") excluded_columns = "','".join(args.exclude_columns) excluded_columns = [f'{column}' for column in excluded_columns.split(',')] @@ -157,7 +104,7 @@ def get_table_checksum_query(table): excluded_columns_str = ','.join((f"'{col}'" for col in excluded_columns)) checksum_query="select name, type, if(match(type,'Nullable'),1,0) is_nullable, numeric_scale from system.columns where database='" + args.clickhouse_database+"' and table = '"+table+"' and name not in ("+ excluded_columns_str +") order by position" - (rowset, rowcount) = execute_statement(checksum_query) + (rowset, rowcount) = execute_sql(conn, checksum_query) #logging.info(f"CHECKSUM QUERY: {checksum_query}") select = "" @@ -223,7 +170,7 @@ def get_table_checksum_query(table): query = "select "+select+"||',' as query from " + \ args.clickhouse_database+"."+table - primary_key_columns = get_primary_key_columns( + primary_key_columns = get_primary_key_columns(conn, args.clickhouse_database, table) logging.debug(str(primary_key_columns)) order_by_columns = "" @@ -284,7 +231,7 @@ def select_table_statements(table, query, select_query, order_by, external_colum return statements -def get_tables_from_regex(strDSN): +def get_tables_from_regex(conn, strDSN): if args.no_wc: return [[args.tables_regex]] @@ -292,12 +239,12 @@ def get_tables_from_regex(strDSN): strCommand = "select name from system.tables where database = '{d}' and match(name,'{t}') order by 1".format( d=schema, t=args.tables_regex) logging.info(f"REGEX QUERY: {strCommand}") - (rowset, rowcount) = execute_statement(strCommand) + (rowset, rowcount) = execute_sql(conn, strCommand) x = rowset return x -def calculate_checksum(table): +def calculate_checksum(table, clickhouse_user, clickhouse_password): if args.ignore_tables_regex: rex_ignore_tables = re.compile(args.ignore_tables_regex, re.IGNORECASE) if rex_ignore_tables.match(table): @@ -316,7 +263,8 @@ def calculate_checksum(table): if args.where: sql = sql + " where " + args.where - (rowset, rowcount) = execute_statement(sql) + conn = get_connection(clickhouse_user, clickhouse_password) + (rowset, rowcount) = execute_sql(conn, sql) if rowcount == 0: logging.info("No rows in ClickHouse. Nothing to sync.") logging.info("Checksum for table {schema}.{table} = d41d8cd98f00b204e9800998ecf8427e count 0".format( @@ -325,10 +273,10 @@ def calculate_checksum(table): # generate the file from ClickHouse (query, select_query, distributed_by, - external_table_types) = get_table_checksum_query(table) + external_table_types) = get_table_checksum_query(conn, table) statements = select_table_statements( table, query, select_query, distributed_by, external_table_types) - compute_checksum(table, statements) + compute_checksum(table, clickhouse_user, clickhouse_password, statements) # hack to add the user to the logger, which needs it apparently @@ -356,9 +304,11 @@ def main(): parser.add_argument('--clickhouse_host', help='ClickHouse host', required=True) parser.add_argument('--clickhouse_user', - help='ClickHouse user', required=True) + help='ClickHouse user', required=False) parser.add_argument('--clickhouse_password', - help='ClickHouse password', required=True) + help='CH password (discouraged option use a configuration file)', required=False, default=None) + parser.add_argument('--clickhouse_config_file', + help='CH config file either xml or yaml', required=False, default='./clickhouse-client.xml') parser.add_argument('--clickhouse_database', help='ClickHouse database', required=True) parser.add_argument('--clickhouse_port', @@ -405,17 +355,26 @@ def main(): root.setLevel(logging.DEBUG) handler.setLevel(logging.DEBUG) - thisScript = argv[0] + clickhouse_user = args.clickhouse_user + clickhouse_password = args.clickhouse_password + # check parameters + if args.clickhouse_password: + logging.warning("Using password on the command line is not secure, please specify a config file ") + assert args.clickhouse_user is not None, "--clickhouse_user must be specified" + else: + config_file = args.clickhouse_config_file + (clickhouse_user, clickhouse_password) = resolve_credentials_from_config(config_file) try: - tables = get_tables_from_regex(args.tables_regex) + conn = get_connection(clickhouse_user, clickhouse_password) + tables = get_tables_from_regex(conn, args.tables_regex) # CH does not print decimal with trailing zero, we need a custom function - execute_statement(create_function_format_decimal) + execute_sql(conn, create_function_format_decimal) with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: futures = [] for table in tables: - futures.append(executor.submit(calculate_checksum, table[0])) + futures.append(executor.submit(calculate_checksum, table[0], clickhouse_user, clickhouse_password)) for future in concurrent.futures.as_completed(futures): if future.exception() is not None: raise future.exception() diff --git a/sink-connector/python/db_compare/mysql_table_checksum.py b/sink-connector/python/db_compare/mysql_table_checksum.py index 94990c22f..16a860fda 100644 --- a/sink-connector/python/db_compare/mysql_table_checksum.py +++ b/sink-connector/python/db_compare/mysql_table_checksum.py @@ -16,7 +16,7 @@ import os import hashlib import concurrent.futures -from mysql import * +from db.mysql import * runTime = datetime.datetime.now().strftime("%Y.%m.%d-%H.%M.%S") @@ -25,7 +25,7 @@ def compute_checksum(table, statements, conn): debug_out = None if args.debug_output: out_file = f"out.{table}.mysql.txt" - #logging.info(f"Debug output to {out_file}") + # logging.info(f"Debug output to {out_file}") debug_out = open(out_file, 'w') try: for statement in statements: @@ -58,11 +58,11 @@ def compute_checksum(table, statements, conn): finally: conn.close() - + def get_table_checksum_query(table, conn, binary_encoding): (rowset, rowcount) = execute_mysql(conn, "select COLUMN_NAME as column_name, column_type as data_type, IS_NULLABLE as is_nullable from information_schema.columns where table_schema='" + - args.mysql_database+"' and table_name = '"+table+"' order by ordinal_position") + args.mysql_database+"' and table_name = '"+table+"' order by ordinal_position") select = "" nullables = [] @@ -76,11 +76,11 @@ def get_table_checksum_query(table, conn, binary_encoding): is_nullable = row['is_nullable'] if not first_column: - select += "," + select += "," if is_nullable == 'YES': nullables.append(column_name) - if 'datetime' == data_type or 'datetime(1)'== data_type or 'datetime(2)' == data_type or 'datetime(3)' == data_type: + if 'datetime' == data_type or 'datetime(1)' == data_type or 'datetime(2)' == data_type or 'datetime(3)' == data_type: # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/ select += f"case when {column_name} > substr('{max_date_value} 23:59:59.999', 1, length({column_name})) then substr(TRIM(TRAILING '0' FROM CAST('{max_date_value} 23:59:59.999' AS datetime(3))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end" elif 'datetime(4)' == data_type or 'datetime(5)' == data_type or 'datetime(6)' == data_type: @@ -91,17 +91,19 @@ def get_table_checksum_query(table, conn, binary_encoding): elif 'timestamp' == data_type or 'timestamp(1)' == data_type or 'timestamp(2)' == data_type or 'timestamp(3)' == data_type or 'timestamp(4)' == data_type or 'timestamp(5)' == data_type or 'timestamp(6)' == data_type: select += f"substr(TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char)))),1,length({column_name}))" else: - if 'date' == data_type: # Date are converted to Date32 in CH + if 'date' == data_type: # Date are converted to Date32 in CH # CH date range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/date select += f"case when {column_name} >='{max_date_value}' then CAST('{max_date_value}' AS {data_type}) else case when {column_name} <= '{min_date_value}' then CAST('{min_date_value}' AS {data_type}) else {column_name} end end" else: if is_binary_datatype(data_type): - binary_encode = "lower(hex(cast("+column_name+"as binary)))" - if binary_encoding == 'base64': - binary_encode = "replace(to_base64(cast("+column_name+" as binary)),'\\n','')" - select += binary_encode + binary_encode = "lower(hex(cast(" + \ + column_name+"as binary)))" + if binary_encoding == 'base64': + binary_encode = "replace(to_base64(cast(" + \ + column_name+" as binary)),'\\n','')" + select += binary_encode else: - select += column_name + "" + select += column_name + "" first_column = False data_types[row['column_name']] = data_type @@ -176,34 +178,33 @@ def select_table_statements(table, query, select_query, order_by, external_colum return statements -def get_tables_from_regex(strDSN): +def get_tables_from_regex(conn, strDSN): if args.no_wc: return [[args.tables_regex]] - - conn = get_mysql_connection(args.mysql_host, args.mysql_user, args.mysql_password, args.mysql_port, args.mysql_database) schema = args.mysql_database strCommand = "select TABLE_NAME as table_name from information_schema.tables where table_type='BASE TABLE' and table_schema = '{d}' and table_name rlike '{t}' order by 1".format( d=schema, t=args.tables_regex) (rowset, rowcount) = execute_mysql(conn, strCommand) x = rowset conn.close() - + return x -def calculate_sql_checksum(table): - conn = get_mysql_connection(args.mysql_host, args.mysql_user, args.mysql_password, args.mysql_port, args.mysql_database) +def calculate_sql_checksum(conn, table): + try: if args.ignore_tables_regex: - rex_ignore_tables = re.compile(args.ignore_tables_regex, re.IGNORECASE) + rex_ignore_tables = re.compile( + args.ignore_tables_regex, re.IGNORECASE) if rex_ignore_tables.match(table): logging.info("Ignoring "+table + " due to ignore_regex_tables") return statements = [] - + (query, select_query, distributed_by, - external_table_types) = get_table_checksum_query(table, conn, args.binary_encoding) + external_table_types) = get_table_checksum_query(table, conn, args.binary_encoding) statements = select_table_statements( table, query, select_query, distributed_by, external_table_types) compute_checksum(table, statements, conn) @@ -211,7 +212,7 @@ def calculate_sql_checksum(table): conn.close() -def calculate_checksum(mysql_table): +def calculate_checksum(mysql_table, mysql_user, mysql_password): if args.ignore_tables_regex: rex_ignore_tables = re.compile(args.ignore_tables_regex, re.IGNORECASE) if rex_ignore_tables.match(mysql_table): @@ -220,7 +221,8 @@ def calculate_checksum(mysql_table): return statements = [] - calculate_sql_checksum(mysql_table) + conn = get_mysql_connection(args.mysql_host, mysql_user, mysql_password, args.mysql_port, args.mysql_database) + calculate_sql_checksum(conn, mysql_table) # hack to add the user to the logger, which needs it apparently @@ -242,9 +244,11 @@ def main(): ''') # Required parser.add_argument('--mysql_host', help='MySQL host', required=True) - parser.add_argument('--mysql_user', help='MySQL user', required=True) + parser.add_argument('--mysql_user', help='MySQL user', required=False) parser.add_argument('--mysql_password', - help='MySQL password', required=True) + help='MySQL password, discouraged, please use a config file', required=False) + parser.add_argument('--defaults_file', + help='MySQL config file either xml or yaml', required=False, default='~/.my.cnf') parser.add_argument('--mysql_database', help='MySQL database', required=True) parser.add_argument('--mysql_port', help='MySQL port', @@ -291,16 +295,29 @@ def main(): root.setLevel(logging.DEBUG) handler.setLevel(logging.DEBUG) + mysql_user = args.mysql_user + mysql_password = args.mysql_password + + # check parameters + if args.mysql_password: + logging.warning("Using password on the command line is not secure, please specify a config file ") + assert args.mysql_user is not None, "--mysql_user must be specified" + else: + config_file = args.defaults_file + (mysql_user, mysql_password) = resolve_credentials_from_config(config_file) + try: - tables = get_tables_from_regex(args.tables_regex) + conn = get_mysql_connection(args.mysql_host, mysql_user, + mysql_password, args.mysql_port, args.mysql_database) + tables = get_tables_from_regex(conn, args.tables_regex) with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: futures = [] for table in tables.fetchall(): futures.append(executor.submit( - calculate_sql_checksum, table['table_name'])) + calculate_checksum, table['table_name'], mysql_user, mysql_password)) for future in concurrent.futures.as_completed(futures): - if future.exception() is not None: - raise future.exception() + if future.exception() is not None: + raise future.exception() except (KeyboardInterrupt, SystemExit): logging.info("Received interrupt") diff --git a/sink-connector/python/db_load/clickhouse_loader.py b/sink-connector/python/db_load/clickhouse_loader.py index e3ff47dac..cf4236eb5 100644 --- a/sink-connector/python/db_load/clickhouse_loader.py +++ b/sink-connector/python/db_load/clickhouse_loader.py @@ -1,7 +1,6 @@ # python db_load/clickhouse_myloader.py --clickhouse_host localhost --clickhouse_schema world --dump_dir $HOME/dbdumps/world --db_user root --db_password root --threads 16 --ch_module clickhouse-client-22.5.1.2079 --mysql_source_schema world from subprocess import Popen, PIPE -from db_compare.mysql import is_binary_datatype -from clickhouse_driver import connect +from db.mysql import is_binary_datatype import argparse import sys import logging @@ -18,14 +17,12 @@ import time import datetime import zoneinfo +from db.clickhouse import * from db_load.mysql_parser.mysql_parser import convert_to_clickhouse_table_antlr SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.dirname(SCRIPT_DIR)) -from db_compare.mysql import is_binary_datatype -from subprocess import Popen, PIPE - def run_command(cmd): """ @@ -61,30 +58,10 @@ def run_quick_command(cmd): logging.error("command failed : terminating") return rc, stdout -def clickhouse_connection(host, database='default', user='default', port=9000, password=''): - conn = connect(host=host, - user=user, - password=password, - port=port, - database=database, - connect_timeout=20, - secure=False - ) - return conn - - -def clickhouse_execute_conn(conn, sql): - logging.debug(sql) - cursor = conn.cursor() - cursor.execute(sql) - result = cursor.fetchall() - return result - - -def get_connection(args, database='default'): +def get_connection(args, clickhouse_user, clickhouse_password, database='default'): conn = clickhouse_connection(args.clickhouse_host, database=database, - user=args.clickhouse_user, password=args.clickhouse_password, port=args.clickhouse_port) + user=clickhouse_user, password=clickhouse_password, port=args.clickhouse_port) return conn @@ -153,12 +130,12 @@ def find_partitioning_options(source): def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support, datetime_timezone): - + # do we have a table in the source - - if not find_create_table(source) : + + if not find_create_table(source): return ('', []) - + primary_key = find_primary_key(source) if primary_key is None: logging.warning("No PK found for "+table_name + @@ -186,15 +163,15 @@ def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete src = re.sub(r'\stimestamp\s', ' DateTime64(3) ', src) src = re.sub(r'\stimestamp(.*?)\s', ' DateTime64\\1 ', src) src = re.sub(r'\spoint\s', ' Point ', src) - #src = re.sub(r'\sdouble\s', ' Decimal(38,10) ', src) + # src = re.sub(r'\sdouble\s', ' Decimal(38,10) ', src) src = re.sub(r'\sgeometry\s', ' Geometry ', src) # dangerous src = re.sub(r'\bDEFAULT\b.*,', ',', src) src = re.sub(r'\sCOLLATE\s(.*?)([\s,])', ' \\2', src, ) src = re.sub(r'\sCHARACTER\sSET\s(.*?)([\s,])', ' \\2', src) # it is a challenge to convert MySQL expression in generated columns - src = re.sub(r'.*GENERATED ALWAYS AS.*',' ',src) - src = re.sub(r'\bVIRTUAL\b',' ', src) + src = re.sub(r'.*GENERATED ALWAYS AS.*', ' ', src) + src = re.sub(r'\bVIRTUAL\b', ' ', src) # ClickHouse does not support constraints, indices, primary and unique keys src = re.sub(r'.*\bCONSTRAINT\b.*', '', src) src = re.sub(r'.*\bPRIMARY KEY\b.*\(.*', '', src) @@ -212,9 +189,9 @@ def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete src = re.sub(r'\) ENGINE', ' '+virtual_columns+') ENGINE', src) - rmt_engine = "ENGINE = ReplacingMergeTree(_version) " + rmt_engine = "ENGINE = ReplacingMergeTree(_version) " if rmt_delete_support: - rmt_engine = "ENGINE = ReplacingMergeTree(_version, is_deleted) " + rmt_engine = "ENGINE = ReplacingMergeTree(_version, is_deleted) " src = re.sub(r'ENGINE=InnoDB[^;]*', rmt_engine + partitioning_options + ' ORDER BY ('+primary_key+') SETTINGS '+settings, src) @@ -235,12 +212,13 @@ def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete match = re.match(columns_pattern, altered_line) if match: column_name = match.group(1) - datatype = match.group(2) - nullable = False if "NOT NULL" in line else True + datatype = match.group(2) + nullable = False if "NOT NULL" in line else True logging.info(f"{column_name} {datatype}") - columns.append({'column_name':column_name,'datatype':datatype,'nullable':nullable}) - - # tables with no PK miss commas + columns.append({'column_name': column_name, + 'datatype': datatype, 'nullable': nullable}) + + # tables with no PK miss commas if altered_line.strip() != "" and not altered_line.endswith(',') and not altered_line.endswith(';'): altered_line += "," @@ -271,7 +249,7 @@ def convert_to_clickhouse_table(user_name, table_name, source, rmt_delete_suppor return ('', []) src = source - #if use_regexp_parser == True: + # if use_regexp_parser == True: # return convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support, datetime_timezone) # the progressive grammar trims the comment partition_options = find_partitioning_options(source) @@ -304,14 +282,14 @@ def get_unix_timezone_from_mysql_timezone(timezone): return tz -def load_schema(args, dry_run=False, datetime_timezone=None): +def load_schema(args, clickhouse_user=None, clickhouse_password=None, dry_run=False, datetime_timezone=None): if args.mysqlshell: - return load_schema_mysqlshell(args, dry_run=dry_run, datetime_timezone=datetime_timezone) + return load_schema_mysqlshell(args, clickhouse_user=clickhouse_user, clickhouse_password=clickhouse_password, dry_run=dry_run, datetime_timezone=datetime_timezone) schema_map = {} # create database - with get_connection(args) as conn: + with get_connection(args, clickhouse_user, clickhouse_password) as conn: database_file = args.dump_dir + \ f"/{args.mysql_source_database}-schema-create.sql.gz" @@ -324,7 +302,7 @@ def load_schema(args, dry_run=False, datetime_timezone=None): # create tables timezone = None - with get_connection(args, args.clickhouse_database) as conn: + with get_connection(args, clickhouse_user, clickhouse_password, args.clickhouse_database) as conn: schema_file = args.dump_dir + '/*-schema.sql.gz' @@ -334,7 +312,8 @@ def load_schema(args, dry_run=False, datetime_timezone=None): with gzip.open(file, "r") as schema_file: source = schema_file.read().decode('UTF-8') logging.info(source) - (table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser, datetime_timezone) + (table_source, columns) = convert_to_clickhouse_table( + db, table, source, args.rmt_delete_support, args.use_regexp_parser, datetime_timezone) logging.info(table_source) timezone = find_dump_timezone(source) logging.info(f"Timezone {timezone}") @@ -349,11 +328,11 @@ def load_schema(args, dry_run=False, datetime_timezone=None): return (tz, schema_map) -def load_schema_mysqlshell(args, dry_run=False, datetime_timezone=None): +def load_schema_mysqlshell(args, clickhouse_user, clickhouse_password, dry_run=False, datetime_timezone=None): schema_map = {} # create database - with get_connection(args) as conn: + with get_connection(args, clickhouse_user, clickhouse_password) as conn: source = f"create database if not exists {args.clickhouse_database}" if not dry_run: @@ -363,9 +342,10 @@ def load_schema_mysqlshell(args, dry_run=False, datetime_timezone=None): logging.error(f"Database create error: {e}") # create tables timezone = '+00:00' - with get_connection(args, args.clickhouse_database) as conn: + with get_connection(args, clickhouse_user, clickhouse_password, args.clickhouse_database) as conn: - schema_file_wildcard = args.dump_dir + f"/{args.mysql_source_database}@*.sql" + schema_file_wildcard = args.dump_dir + \ + f"/{args.mysql_source_database}@*.sql" schema_files = glob.glob(schema_file_wildcard) if len(schema_files) == 0: logging.error("Cannot find schema files") @@ -380,7 +360,8 @@ def load_schema_mysqlshell(args, dry_run=False, datetime_timezone=None): with open(file, "r") as schema_file: source = schema_file.read() logging.info(source) - (table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser, datetime_timezone) + (table_source, columns) = convert_to_clickhouse_table( + db, table, source, args.rmt_delete_support, args.use_regexp_parser, datetime_timezone) logging.info(table_source) # timezone = find_dump_timezone(source) logging.info(f"Timezone {timezone}") @@ -422,14 +403,20 @@ def get_column_list(schema_map, schema, table, virtual_columns, transform=False, return column_list -def load_data(args, timezone, schema_map, dry_run=False): +def load_data(args, timezone, schema_map, clickhouse_user=None, clickhouse_password=None, dry_run=False): if args.mysqlshell: - load_data_mysqlshell(args, timezone, schema_map, dry_run=False) + load_data_mysqlshell(args, timezone, schema_map, clickhouse_user=clickhouse_user, clickhouse_password=clickhouse_password, dry_run=False) clickhouse_host = args.clickhouse_host ch_schema = args.clickhouse_database - password= args.clickhouse_password + password = clickhouse_password + password_option = "" + if password is not None: + password_option= f"--password '{password}'" + config_file_option = "" + if args.clickhouse_config_file is not None: + config_file_option= f"--config-file '{args.clickhouse_config_file}'" schema_file = args.dump_dir + '/*-schema.sql.gz' for files in glob.glob(schema_file): (schema, table_name) = parse_schema_path(files) @@ -442,24 +429,34 @@ def load_data(args, timezone, schema_map, dry_run=False): schema_map, schema, table_name, args.virtual_columns, transform=True) for data_file in data_files: # double quote escape logic https://github.com/ClickHouse/ClickHouse/issues/10624 - structure = columns.replace(","," Nullable(String),")+" Nullable(String)" - cmd = f"""export TZ={timezone}; gunzip --stdout {data_file} | sed -e 's/\\\\"/""/g' | sed -e "s/\\\\\\'/'/g" | clickhouse-client --use_client_time_zone 1 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT CSV" -u{args.clickhouse_user} --password '{password}' -mn """ + structure = columns.replace( + ",", " Nullable(String),")+" Nullable(String)" + cmd = f"""export TZ={timezone}; gunzip --stdout {data_file} | sed -e 's/\\\\"/""/g' | sed -e "s/\\\\\\'/'/g" | clickhouse-client {config_file_option} --use_client_time_zone 1 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT CSV" -u{clickhouse_user} {password_option} -mn """ execute_load(cmd) + def execute_load(cmd): logging.info(cmd) (rc, result) = run_quick_command(cmd) logging.debug(result) if rc != '0': - raise AssertionError("command "+cmd+ " failed") + raise AssertionError("command "+cmd + " failed") + -def load_data_mysqlshell(args, timezone, schema_map, dry_run=False): +def load_data_mysqlshell(args, timezone, schema_map, clickhouse_user=None, clickhouse_password=None, dry_run=False): clickhouse_host = args.clickhouse_host ch_schema = args.clickhouse_database schema_files = args.dump_dir + f"/{args.mysql_source_database}@*.sql" - password = args.clickhouse_password + password = args.clickhouse_password + password_option = "" + if password is not None: + password_option= f"--password '{password}'" + config_file_option = "" + if args.clickhouse_config_file is not None: + config_file_option= f"--config-file '{args.clickhouse_config_file}'" + with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: futures = [] for file in glob.glob(schema_files): @@ -482,25 +479,25 @@ def load_data_mysqlshell(args, timezone, schema_map, dry_run=False): for column in column_metadata_list: logging.info(str(column)) if column['column_name'] in args.virtual_columns: - continue - column_name = column['column_name'].replace('`','\\`') + continue + column_name = column['column_name'].replace('`', '\\`') if structure != "": - structure += ", " - structure +=" "+column_name + " " + structure += ", " + structure += " "+column_name + " " datatype = column['datatype'] mysql_datetype = column['mysql_datatype'] if 'timestamp' in mysql_datetype.lower(): - if column['nullable'] == True: - structure +=f" Nullable({datatype})" - else: - structure +=f" {datatype}" + if column['nullable'] == True: + structure += f" Nullable({datatype})" + else: + structure += f" {datatype}" else: - if column['nullable'] == True: - structure +=" Nullable(String)" - else: - structure +=" String" + if column['nullable'] == True: + structure += " Nullable(String)" + else: + structure += " String" - cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} --password '{password}' -mn """ + cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client {config_file_option} --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} {password_option} -mn """ futures.append(executor.submit(execute_load, cmd)) for future in concurrent.futures.as_completed(futures): @@ -533,11 +530,13 @@ def main(): ''') parser.add_argument('--clickhouse_host', help='CH host', required=True) - parser.add_argument('--clickhouse_user', help='CH user', required=True) + parser.add_argument('--clickhouse_user', help='CH user', required=False) parser.add_argument('--clickhouse_password', - help='CH password', required=True) - parser.add_argument('--clickhouse_port', type=int, default=9000, - help='ClickHouse port', required=False) + help='CH password (discouraged option use a configuration file)', required=False, default=None) + parser.add_argument('--clickhouse_config_file', + help='CH config file either xml or yaml', required=False, default='./clickhouse-client.xml') + parser.add_argument('--clickhouse_port', type=int, + default=9000, help='ClickHouse port', required=False) parser.add_argument('--clickhouse_database', help='Clickhouse database name', required=True) parser.add_argument('--mysql_source_database', @@ -552,23 +551,34 @@ def main(): action='store_true', default=False) parser.add_argument('--data_only', dest='data_only', action='store_true', default=False) - parser.add_argument('--use_regexp_parser', + parser.add_argument('--use_regexp_parser', action='store_true', default=False) parser.add_argument('--dry_run', dest='dry_run', action='store_true', default=False) - parser.add_argument('--virtual_columns', help='virtual_columns', nargs='+', default=['`_sign`', '`_version`', '`is_deleted`']) + parser.add_argument('--virtual_columns', help='virtual_columns', + nargs='+', default=['`_sign`', '`_version`', '`is_deleted`']) parser.add_argument('--mysqlshell', help='using a util.dumpSchemas', dest='mysqlshell', action='store_true', default=False) parser.add_argument('--rmt_delete_support', help='Use RMT deletes', dest='rmt_delete_support', action='store_true', default=False) parser.add_argument('--clickhouse_datetime_timezone', - help='Timezone for CH date times', required=False, default=None) + help='Timezone for CH date times', required=False, default=None) args = parser.parse_args() schema = not args.data_only data = not args.schema_only timezone = None schema_map = {} + clickhouse_user = args.clickhouse_user + clickhouse_password = args.clickhouse_password + + # check parameters + if args.clickhouse_password: + logging.warning("Using password on the command line is not secure, please specify a config file ") + assert args.clickhouse_user is not None, "--clickhouse_user must be specified" + else: + config_file = args.clickhouse_config_file + (clickhouse_user, clickhouse_password) = resolve_credentials_from_config(config_file) # check dependencies assert check_program_exists( @@ -577,13 +587,14 @@ def main(): 'zstd'), "zstd should be in the PATH for util.dumpSchemas load" if schema: - (timezone, schema_map) = load_schema(args, dry_run=args.dry_run, datetime_timezone = args.clickhouse_datetime_timezone) + (timezone, schema_map) = load_schema(args, clickhouse_user=clickhouse_user, clickhouse_password=clickhouse_password, dry_run=args.dry_run, + datetime_timezone=args.clickhouse_datetime_timezone) if data: if timezone is None: - (timezone, schema_map) = load_schema(args, dry_run=True) + (timezone, schema_map) = load_schema(args, clickhouse_user=clickhouse_user, clickhouse_password=clickhouse_password, dry_run=True) logging.debug(str(schema_map)) - load_data(args, timezone, schema_map, dry_run=args.dry_run) + load_data(args, timezone, schema_map, clickhouse_user=clickhouse_user, clickhouse_password=clickhouse_password,dry_run=args.dry_run) if __name__ == '__main__': diff --git a/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py b/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py index ab2f9606c..3dee7093a 100644 --- a/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py +++ b/sink-connector/python/db_load/mysql_parser/CreateTableMySQLParserListener.py @@ -1,22 +1,22 @@ from antlr4 import * from db_load.mysql_parser.MySqlParserListener import MySqlParserListener from db_load.mysql_parser.MySqlParser import MySqlParser -from db_compare.mysql import is_binary_datatype +from db.mysql import is_binary_datatype import re import logging class CreateTableMySQLParserListener(MySqlParserListener): def __init__(self, rmt_delete_support, partition_options, datetime_timezone=None): - self.buffer = "" - self.columns = "" - self.primary_key = "" - self.columns_map = {} - self.alter_list = [] - self.rename_list = [] - self.rmt_delete_support = rmt_delete_support - self.partition_options = partition_options - self.datatime_timezone = datetime_timezone + self.buffer = "" + self.columns = "" + self.primary_key = "" + self.columns_map = {} + self.alter_list = [] + self.rename_list = [] + self.rmt_delete_support = rmt_delete_support + self.partition_options = partition_options + self.datatime_timezone = datetime_timezone def extract_original_text(self, ctx): token_source = ctx.start.getTokenSource() @@ -32,29 +32,28 @@ def add_timezone(self, dataTypeText): def convertDataType(self, dataType): dataTypeText = self.extract_original_text(dataType) dataTypeText = re.sub("CHARACTER SET.*", '', - dataTypeText, flags=re.IGNORECASE) + dataTypeText, flags=re.IGNORECASE) dataTypeText = re.sub("CHARSET.*", '', dataTypeText, re.IGNORECASE) - + if isinstance(dataType, MySqlParser.SimpleDataTypeContext) and dataType.DATE(): - dataTypeText = 'Date32' + dataTypeText = 'Date32' if isinstance(dataType, MySqlParser.DimensionDataTypeContext): - if dataType.DATETIME() or dataType.TIMESTAMP(): - dataTypeText = 'DateTime64(0)' - dataTypeText = self.add_timezone(dataTypeText) - if dataType.lengthOneDimension(): - dataTypeText = 'DateTime64'+dataType.lengthOneDimension().getText() - dataTypeText = self.add_timezone(dataTypeText) - elif dataType.TIME(): - dataTypeText = "String" - + if dataType.DATETIME() or dataType.TIMESTAMP(): + dataTypeText = 'DateTime64(0)' + dataTypeText = self.add_timezone(dataTypeText) + if dataType.lengthOneDimension(): + dataTypeText = 'DateTime64'+dataType.lengthOneDimension().getText() + dataTypeText = self.add_timezone(dataTypeText) + elif dataType.TIME(): + dataTypeText = "String" + if (isinstance(dataType, MySqlParser.SpatialDataTypeContext) and dataType.JSON()) or is_binary_datatype(dataTypeText): - dataTypeText = 'String' + dataTypeText = 'String' return dataTypeText - def translateColumnDefinition(self, column_name, columnDefinition): - column_buffer ='' + column_buffer = '' dataType = columnDefinition.dataType() dataTypeText = self.convertDataType(dataType) # data type @@ -66,28 +65,28 @@ def translateColumnDefinition(self, column_name, columnDefinition): nullable = True for child in columnDefinition.getChildren(): if child.getRuleIndex() == MySqlParser.RULE_columnConstraint: - - if isinstance(child, MySqlParser.NullColumnConstraintContext): - nullNotNull = child.nullNotnull() - if nullNotNull: - text = self.extract_original_text(child) - column_buffer += " " + text - if "NULL" == text: - nullable = True - notNull = True - continue - - if nullNotNull.NOT(): - notSymbol = True - if (nullNotNull.NULL_LITERAL() or nullNotNull.NULL_SPEC_LITERAL()) and notSymbol: - notNull = True - nullable = False - else: - notNull = False - nullable = True - - if isinstance(child, MySqlParser.PrimaryKeyColumnConstraintContext) and child.PRIMARY(): - self.primary_key = column_name + + if isinstance(child, MySqlParser.NullColumnConstraintContext): + nullNotNull = child.nullNotnull() + if nullNotNull: + text = self.extract_original_text(child) + column_buffer += " " + text + if "NULL" == text: + nullable = True + notNull = True + continue + + if nullNotNull.NOT(): + notSymbol = True + if (nullNotNull.NULL_LITERAL() or nullNotNull.NULL_SPEC_LITERAL()) and notSymbol: + notNull = True + nullable = False + else: + notNull = False + nullable = True + + if isinstance(child, MySqlParser.PrimaryKeyColumnConstraintContext) and child.PRIMARY(): + self.primary_key = column_name # column without nullable info are default nullable in MySQL, while they are not null in ClickHouse if not notNull: column_buffer += " NULL" @@ -95,7 +94,6 @@ def translateColumnDefinition(self, column_name, columnDefinition): return (column_buffer, dataType, nullable) - def exitColumnDeclaration(self, ctx): column_text = self.extract_original_text(ctx) @@ -109,154 +107,159 @@ def exitColumnDeclaration(self, ctx): dataType = columnDefinition.dataType() originalDataTypeText = self.extract_original_text(dataType) - (columnDefinition_buffer, dataType, nullable) = self.translateColumnDefinition(column_name, columnDefinition) + (columnDefinition_buffer, dataType, nullable) = self.translateColumnDefinition( + column_name, columnDefinition) column_buffer += columnDefinition_buffer self.columns.append(column_buffer) - dataTypeText = self.convertDataType(dataType) - columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable, 'mysql_datatype':originalDataTypeText} + dataTypeText = self.convertDataType(dataType) + columnMap = {'column_name': column_name, 'datatype': dataTypeText, + 'nullable': nullable, 'mysql_datatype': originalDataTypeText} logging.info(str(columnMap)) self.columns_map.append(columnMap) - def exitPrimaryKeyTableConstraint(self, ctx): - - text = self.extract_original_text(ctx.indexColumnNames()) - self.primary_key = text + text = self.extract_original_text(ctx.indexColumnNames()) + self.primary_key = text def enterColumnCreateTable(self, ctx): - self.buffer = "" - self.columns = [] - self.columns_map = [] - self.primary_key = 'tuple()' - self.partition_keys = None - + self.buffer = "" + self.columns = [] + self.columns_map = [] + self.primary_key = 'tuple()' + self.partition_keys = None def exitPartitionClause(self, ctx): - if ctx.partitionTypeDef(): - partitionTypeDef = ctx.partitionTypeDef() - if partitionTypeDef.RANGE_SYMBOL() and partitionTypeDef.COLUMNS_SYMBOL(): - text = self.extract_original_text(partitionTypeDef.identifierList()) - self.partition_keys = text - + if ctx.partitionTypeDef(): + partitionTypeDef = ctx.partitionTypeDef() + if partitionTypeDef.RANGE_SYMBOL() and partitionTypeDef.COLUMNS_SYMBOL(): + text = self.extract_original_text( + partitionTypeDef.identifierList()) + self.partition_keys = text def exitColumnCreateTable(self, ctx): tableName = self.extract_original_text(ctx.tableName()) self.buffer = f"CREATE TABLE {tableName} (" self.columns.append("`_version` UInt64 DEFAULT 0") - # is_deleted and _sign are redundant, so exclusive in the schema + # is_deleted and _sign are redundant, so exclusive in the schema if self.rmt_delete_support: - self.columns.append("`is_deleted` UInt8 DEFAULT 0") + self.columns.append("`is_deleted` UInt8 DEFAULT 0") else: - self.columns.append("`_sign` Int8 DEFAULT 1") + self.columns.append("`_sign` Int8 DEFAULT 1") for column in self.columns: - self.buffer += column - if column != self.columns[-1]: - self.buffer += ',' - self.buffer += '\n' + self.buffer += column + if column != self.columns[-1]: + self.buffer += ',' + self.buffer += '\n' partition_by = self.partition_options if self.partition_keys: - partition_by = f"partition by {self.partition_keys}" + partition_by = f"partition by {self.partition_keys}" rmt_params = "_version" if self.rmt_delete_support: - rmt_params+=',is_deleted' + rmt_params += ',is_deleted' self.buffer += f") engine=ReplacingMergeTree({rmt_params}) {partition_by} order by " + \ self.primary_key logging.info(self.buffer) - def get_clickhouse_sql(self): - return (self.buffer, self.columns_map) - + return (self.buffer, self.columns_map) def exitAlterList(self, ctx): - for child in ctx.getChildren(): - if isinstance(child,MySqlParser.AlterListItemContext) : - alter = self.extract_original_text(child) - - if child.ADD_SYMBOL() and child.fieldDefinition(): - fieldDefinition = child.fieldDefinition() - if child.identifier(): - identifier = self.extract_original_text(child.identifier()) - place = self.extract_original_text(child.place()) if child.place() else '' - - (fieldDefinition_buffer, dataType) = self.translateFieldDefinition(identifier, fieldDefinition) - alter = f"add column {identifier} {fieldDefinition_buffer} {place}" - self.alter_list.append(alter) - - if child.MODIFY_SYMBOL() and child.fieldDefinition(): - fieldDefinition = child.fieldDefinition() - if child.columnInternalRef(): - identifier = self.extract_original_text(child.columnInternalRef().identifier()) - place = self.extract_original_text(child.place()) if child.place() else '' - (fieldDefinition_buffer, dataType) = self.translateFieldDefinition(identifier, fieldDefinition) - alter = f"modify column {identifier} {fieldDefinition_buffer} {place}" - self.alter_list.append(alter) - - if child.CHANGE_SYMBOL() and child.fieldDefinition(): - fieldDefinition = child.fieldDefinition() - if child.columnInternalRef(): - identifier = self.extract_original_text(child.columnInternalRef().identifier()) - place = self.extract_original_text(child.place()) if child.place() else '' - (fieldDefinition_buffer, dataType) = self.translateFieldDefinition(identifier, fieldDefinition) - alter = f"modify column {identifier} {fieldDefinition_buffer} {place}" - self.alter_list.append(alter) - new_identifier = self.extract_original_text(child.identifier()) - rename_column = f"rename column {identifier} to {new_identifier}" - self.alter_list.append(rename_column) - - if child.DROP_SYMBOL() and child.COLUMN_SYMBOL(): - self.alter_list.append(alter) - - if child.RENAME_SYMBOL() and child.COLUMN_SYMBOL(): - self.alter_list.append(alter) - - if child.RENAME_SYMBOL() and child.tableName(): - to_table = self.extract_original_text(child.tableName().qualifiedIdentifier()) - rename = f" to {to_table}" - self.rename_list.append(rename) - + for child in ctx.getChildren(): + if isinstance(child, MySqlParser.AlterListItemContext): + alter = self.extract_original_text(child) + + if child.ADD_SYMBOL() and child.fieldDefinition(): + fieldDefinition = child.fieldDefinition() + if child.identifier(): + identifier = self.extract_original_text( + child.identifier()) + place = self.extract_original_text( + child.place()) if child.place() else '' + + (fieldDefinition_buffer, dataType) = self.translateFieldDefinition( + identifier, fieldDefinition) + alter = f"add column {identifier} {fieldDefinition_buffer} {place}" + self.alter_list.append(alter) + + if child.MODIFY_SYMBOL() and child.fieldDefinition(): + fieldDefinition = child.fieldDefinition() + if child.columnInternalRef(): + identifier = self.extract_original_text( + child.columnInternalRef().identifier()) + place = self.extract_original_text( + child.place()) if child.place() else '' + (fieldDefinition_buffer, dataType) = self.translateFieldDefinition( + identifier, fieldDefinition) + alter = f"modify column {identifier} {fieldDefinition_buffer} {place}" + self.alter_list.append(alter) + + if child.CHANGE_SYMBOL() and child.fieldDefinition(): + fieldDefinition = child.fieldDefinition() + if child.columnInternalRef(): + identifier = self.extract_original_text( + child.columnInternalRef().identifier()) + place = self.extract_original_text( + child.place()) if child.place() else '' + (fieldDefinition_buffer, dataType) = self.translateFieldDefinition( + identifier, fieldDefinition) + alter = f"modify column {identifier} {fieldDefinition_buffer} {place}" + self.alter_list.append(alter) + new_identifier = self.extract_original_text( + child.identifier()) + rename_column = f"rename column {identifier} to {new_identifier}" + self.alter_list.append(rename_column) + + if child.DROP_SYMBOL() and child.COLUMN_SYMBOL(): + self.alter_list.append(alter) + + if child.RENAME_SYMBOL() and child.COLUMN_SYMBOL(): + self.alter_list.append(alter) + + if child.RENAME_SYMBOL() and child.tableName(): + to_table = self.extract_original_text( + child.tableName().qualifiedIdentifier()) + rename = f" to {to_table}" + self.rename_list.append(rename) def exitAlterTable(self, ctx): - tableName = self.extract_original_text(ctx.tableName()) - - for child in ctx.getChildren(): - if isinstance(child,MySqlParser.AlterByRenameContext) : - if child.RENAME(): - if child.uid(): - rename = self.extract_original_text(child.uid()) - if child.fullId(): - rename = self.extract_original_text(child.fullId()) - - self.buffer += f" rename table {tableName} to {rename}" - - #if len(self.alter_list): - # self.buffer = f"ALTER TABLE {tableName}" - # for alter in self.alter_list: - # self.buffer += ' ' + alter - # if self.alter_list[-1] != alter: - # self.buffer += ', ' - # self.buffer += ';' - - #for rename in self.rename_list: - # self.buffer += f" rename table {tableName} {rename}" - # self.buffer += ';' + tableName = self.extract_original_text(ctx.tableName()) - def exitRenameTable(self, ctx): - # same syntax as CH - self.buffer = self.extract_original_text(ctx) + for child in ctx.getChildren(): + if isinstance(child, MySqlParser.AlterByRenameContext): + if child.RENAME(): + if child.uid(): + rename = self.extract_original_text(child.uid()) + if child.fullId(): + rename = self.extract_original_text(child.fullId()) + self.buffer += f" rename table {tableName} to {rename}" - def exitTruncateTable(self, ctx): - # same syntax as CH - self.buffer = self.extract_original_text(ctx) + # if len(self.alter_list): + # self.buffer = f"ALTER TABLE {tableName}" + # for alter in self.alter_list: + # self.buffer += ' ' + alter + # if self.alter_list[-1] != alter: + # self.buffer += ', ' + # self.buffer += ';' + # for rename in self.rename_list: + # self.buffer += f" rename table {tableName} {rename}" + # self.buffer += ';' + + def exitRenameTable(self, ctx): + # same syntax as CH + self.buffer = self.extract_original_text(ctx) + + def exitTruncateTable(self, ctx): + # same syntax as CH + self.buffer = self.extract_original_text(ctx) def exitDropTable(self, ctx): - # same syntax as CH - self.buffer = self.extract_original_text(ctx) + # same syntax as CH + self.buffer = self.extract_original_text(ctx) diff --git a/sink-connector/python/requirements.txt b/sink-connector/python/requirements.txt index 8d2485d59..ad961be2e 100644 --- a/sink-connector/python/requirements.txt +++ b/sink-connector/python/requirements.txt @@ -1,4 +1,5 @@ clickhouse-driver==0.2.5 pymysql +pyyaml sqlalchemy == 1.4 antlr4-python3-runtime==4.11.1 From 65da4e8c7b622470f12df34e80d026e497b2aaae Mon Sep 17 00:00:00 2001 From: Arnaud Adant <aadant@fpif-vcsl1.w2k.jumptrading.com> Date: Mon, 11 Dec 2023 01:38:36 -0600 Subject: [PATCH 2/2] Fix help --- sink-connector/python/db_compare/clickhouse_table_checksum.py | 2 +- sink-connector/python/db_compare/mysql_table_checksum.py | 4 ++-- sink-connector/python/db_load/clickhouse_loader.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sink-connector/python/db_compare/clickhouse_table_checksum.py b/sink-connector/python/db_compare/clickhouse_table_checksum.py index f640a9b18..b5a6b0b82 100644 --- a/sink-connector/python/db_compare/clickhouse_table_checksum.py +++ b/sink-connector/python/db_compare/clickhouse_table_checksum.py @@ -308,7 +308,7 @@ def main(): parser.add_argument('--clickhouse_password', help='CH password (discouraged option use a configuration file)', required=False, default=None) parser.add_argument('--clickhouse_config_file', - help='CH config file either xml or yaml', required=False, default='./clickhouse-client.xml') + help='CH config file either xml or yaml, default is ./clickhouse-client.xml', required=False, default='./clickhouse-client.xml') parser.add_argument('--clickhouse_database', help='ClickHouse database', required=True) parser.add_argument('--clickhouse_port', diff --git a/sink-connector/python/db_compare/mysql_table_checksum.py b/sink-connector/python/db_compare/mysql_table_checksum.py index 16a860fda..2bee3e50a 100644 --- a/sink-connector/python/db_compare/mysql_table_checksum.py +++ b/sink-connector/python/db_compare/mysql_table_checksum.py @@ -248,7 +248,7 @@ def main(): parser.add_argument('--mysql_password', help='MySQL password, discouraged, please use a config file', required=False) parser.add_argument('--defaults_file', - help='MySQL config file either xml or yaml', required=False, default='~/.my.cnf') + help='MySQL config file default is ~/.my.cnf', required=False, default='~/.my.cnf') parser.add_argument('--mysql_database', help='MySQL database', required=True) parser.add_argument('--mysql_port', help='MySQL port', @@ -297,7 +297,7 @@ def main(): mysql_user = args.mysql_user mysql_password = args.mysql_password - + # check parameters if args.mysql_password: logging.warning("Using password on the command line is not secure, please specify a config file ") diff --git a/sink-connector/python/db_load/clickhouse_loader.py b/sink-connector/python/db_load/clickhouse_loader.py index cf4236eb5..c12aa9806 100644 --- a/sink-connector/python/db_load/clickhouse_loader.py +++ b/sink-connector/python/db_load/clickhouse_loader.py @@ -534,7 +534,7 @@ def main(): parser.add_argument('--clickhouse_password', help='CH password (discouraged option use a configuration file)', required=False, default=None) parser.add_argument('--clickhouse_config_file', - help='CH config file either xml or yaml', required=False, default='./clickhouse-client.xml') + help='CH config file either xml or yaml, default is ./clickhouse-client.xml', required=False, default='./clickhouse-client.xml') parser.add_argument('--clickhouse_port', type=int, default=9000, help='ClickHouse port', required=False) parser.add_argument('--clickhouse_database',