From baf64133235f5c5dd90826230fe0d61449fa50f4 Mon Sep 17 00:00:00 2001 From: Artem Alekseev Date: Thu, 22 Aug 2024 13:56:25 +0300 Subject: [PATCH] Add scan/insert while alter test (#8094) --- ydb/tools/olap_workload/__main__.py | 177 +++++++++++++++++++++++----- 1 file changed, 146 insertions(+), 31 deletions(-) diff --git a/ydb/tools/olap_workload/__main__.py b/ydb/tools/olap_workload/__main__.py index f49232da9f43..02ee03f4f231 100644 --- a/ydb/tools/olap_workload/__main__.py +++ b/ydb/tools/olap_workload/__main__.py @@ -3,6 +3,8 @@ import ydb import time import os +import random +import string ydb.interceptor.monkey_patch_event_handler() @@ -15,12 +17,31 @@ def table_name_with_timestamp(): return os.path.join("column_table_" + str(timestamp())) +def random_string(length): + letters = string.ascii_lowercase + return bytes(''.join(random.choice(letters) for i in range(length)), encoding='utf8') + + +def random_type(): + return random.choice([ydb.PrimitiveType.Int64, ydb.PrimitiveType.String]) + + +def random_value(type): + if isinstance(type, ydb.OptionalType): + return random_value(type.item) + if type == ydb.PrimitiveType.Int64: + return random.randint(0, 1 << 31) + if type == ydb.PrimitiveType.String: + return random_string(random.randint(1, 32)) + + class Workload(object): - def __init__(self, endpoint, database, duration): + def __init__(self, endpoint, database, duration, batch_size): self.database = database self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database)) self.pool = ydb.SessionPool(self.driver, size=200) self.duration = duration + self.batch_size = batch_size def __enter__(self): return self @@ -29,47 +50,140 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.pool.stop() self.driver.stop() + def run_query_ignore_errors(self, callee): + try: + self.pool.retry_operation_sync(callee) + except Exception as e: + print(type(e), e) + def create_table(self, table_name): - with self.pool.checkout() as s: - try: - s.execute_scheme( - """ - CREATE TABLE %s ( - id Int64 NOT NULL, - i64Val Int64, - PRIMARY KEY(id) - ) - PARTITION BY HASH(id) - WITH ( - STORE = COLUMN - ) - """ - % table_name + print(f"Create table {table_name}") + + def callee(session): + session.execute_scheme( + f""" + CREATE TABLE {table_name} ( + id Int64 NOT NULL, + i64Val Int64, + PRIMARY KEY(id) + ) + PARTITION BY HASH(id) + WITH ( + STORE = COLUMN ) + """ + ) - print("Table %s created" % table_name) - except ydb.SchemeError as e: - print(e) + self.run_query_ignore_errors(callee) def drop_table(self, table_name): - with self.pool.checkout() as s: - try: - s.drop_table(self.database + "/" + table_name) + print(f"Drop table {table_name}") + + def callee(session): + session.drop_table(self.database + "/" + table_name) + + self.run_query_ignore_errors(callee) + + def add_column(self, table_name, col_name, col_type): + print(f"Add column {table_name}.{col_name} {str(col_type)}") + + def callee(session): + session.execute_scheme(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {str(col_type)}") + + self.run_query_ignore_errors(callee) + + def drop_column(self, table_name, col_name): + print(f"Drop column {table_name}.{col_name}") + + def callee(session): + session.execute_scheme(f"ALTER TABLE {table_name} DROP COLUMN {col_name}") + + self.run_query_ignore_errors(callee) + + def generate_batch(self, schema): + data = [] + + for i in range(self.batch_size): + data.append({c.name: random_value(c.type) for c in schema}) + + return data + + def add_batch(self, table_name, schema): + print(f"Add batch {table_name}") - print("Table %s dropped" % table_name) - except ydb.SchemeError as e: - print(e) + column_types = ydb.BulkUpsertColumns() + + for c in schema: + column_types.add_column(c.name, c.type) + + batch = self.generate_batch(schema) + + self.driver.table_client.bulk_upsert(self.database + "/" + table_name, batch, column_types) + + def list_tables(self): + db = self.driver.scheme_client.list_directory(self.database) + return [t.name for t in db.children if t.type == ydb.SchemeEntryType.COLUMN_TABLE] + + def list_columns(self, table_name): + path = self.database + "/" + table_name + + def callee(session): + return session.describe_table(path).columns + + return self.pool.retry_operation_sync(callee) + + def rows_count(self, table_name): + return self.driver.table_client.scan_query(f"SELECT count(*) FROM {table_name}").next().result_set.rows[0][0] + + def select_n(self, table_name, limit): + print(f"Select {limit} from {table_name}") + self.driver.table_client.scan_query(f"SELECT * FROM {table_name} limit {limit}").next() + + def drop_all_tables(self): + for t in self.list_tables(): + if t.startswith("column_table_"): + self.drop_table(t) + + def drop_all_columns(self, table_name): + for c in self.list_columns(table_name): + if c.name != "id": + self.drop_column(table_name, c.name) + + def queries_while_alter(self): + table_name = "queries_while_alter" + + schema = self.list_columns(table_name) + + self.select_n(table_name, 1000) + self.add_batch(table_name, schema) + self.select_n(table_name, 100) + self.add_batch(table_name, schema) + self.select_n(table_name, 300) + + if len(schema) > 50: + self.drop_all_columns(table_name) + + if self.rows_count(table_name) > 100000: + self.drop_table(table_name) + + col = "col_" + str(timestamp()) + self.add_column(table_name, col, random_type()) def run(self): started_at = time.time() while time.time() - started_at < self.duration: - table_name = table_name_with_timestamp() - self.create_table(table_name) + try: + self.create_table("queries_while_alter") - time.sleep(5) + self.drop_all_tables() - self.drop_table(table_name) + self.queries_while_alter() + + table_name = table_name_with_timestamp() + self.create_table(table_name) + except Exception as e: + print(type(e), e) if __name__ == '__main__': @@ -78,7 +192,8 @@ def run(self): ) parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used") parser.add_argument('--database', default=None, required=True, help='A database to connect') - parser.add_argument('--duration', default=10**9, type=lambda x: int(x), help='A duration of workload in seconds.') + parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds.') + parser.add_argument('--batch_size', default=1000, help='Batch size for bulk insert') args = parser.parse_args() - with Workload(args.endpoint, args.database, args.duration) as workload: + with Workload(args.endpoint, args.database, args.duration, args.batch_size) as workload: workload.run()