From e9e36dc9d7efaabab419ee1be5e7eed9ce88281e Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Fri, 9 Aug 2024 15:44:10 +0000 Subject: [PATCH 1/5] test altering tiering in scenario tests --- .../olap/scenario/helpers/table_helper.py | 58 +++++ .../olap/scenario/helpers/tiering_helper.py | 233 ++++++++++++++++++ ydb/tests/olap/scenario/test_alter_tiering.py | 117 +++++++++ ydb/tests/olap/scenario/ya.make | 1 + 4 files changed, 409 insertions(+) create mode 100644 ydb/tests/olap/scenario/helpers/tiering_helper.py create mode 100644 ydb/tests/olap/scenario/test_alter_tiering.py diff --git a/ydb/tests/olap/scenario/helpers/table_helper.py b/ydb/tests/olap/scenario/helpers/table_helper.py index ef2088ecd692..8bae922e3726 100644 --- a/ydb/tests/olap/scenario/helpers/table_helper.py +++ b/ydb/tests/olap/scenario/helpers/table_helper.py @@ -218,6 +218,38 @@ def title(self) -> str: return f'drop column `{self._column}`' +class SetSetting(AlterTableAction): + """Set a setting value for a table-like object. + + Table-like objects are Tables and TableStore. + See {AlterTableLikeObject}. + + Example: + sth = ScenarioTestHelper(ctx) + sth.execute_scheme_query( + AlterTable('testTable').action(SetSetting('TIERING', 'tiering1)) + ) + """ + + def __init__(self, setting: str, value_literal: str) -> None: + """Constructor. + + Args: + column: Name of the column to be deleted.""" + + super().__init__() + self._setting = setting + self._value = value_literal + + @override + def to_yql(self) -> str: + return f'SET {self._setting} {self._value}' + + @override + def title(self) -> str: + return f'set {self._setting} = {self._value}' + + class AlterTableLikeObject(ScenarioTestHelper.IYqlble): """The base class for all requests to change table-like objects. @@ -277,6 +309,32 @@ def drop_column(self, column: str) -> AlterTableLikeObject: return self(DropColumn(column)) + def set_tiering(self, tiering_rule: str) -> AlterTableLikeObject: + """Set a tiering policy. + + The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. + + Args: + tiering_rule: Name of a TIERING_RULE object. + + Returns: + self.""" + + return self(SetSetting('TIERING', f'"{tiering_rule}"')) + + def set_ttl(self, interval: str, column: str) -> AlterTableLikeObject: + """Set row TTL. + + The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. + + Args: + tiering_rule: Name of a TIERING_RULE object. + + Returns: + self.""" + + return self(SetSetting('TTL', f'Interval("{interval}") ON `{column}`')) + @override def params(self) -> Dict[str, str]: return {self._type(): self._name, 'actions': ', '.join([a.title() for a in self._actions])} diff --git a/ydb/tests/olap/scenario/helpers/tiering_helper.py b/ydb/tests/olap/scenario/helpers/tiering_helper.py new file mode 100644 index 000000000000..da5c6743bf5c --- /dev/null +++ b/ydb/tests/olap/scenario/helpers/tiering_helper.py @@ -0,0 +1,233 @@ +# TODO: pre-review refactor +from __future__ import annotations +from ydb.tests.olap.scenario.helpers.scenario_tests_helper import ( + ScenarioTestHelper, + TestContext, +) +from abc import abstractmethod, ABC + +from typing import override, Dict, Optional +from dataclasses import dataclass +import json + + +@dataclass +class ObjectStorageParams: + endpoint: str + bucket: str + access_key: str + secret_key: str + scheme: str = 'HTTP' + verify_ssl: bool = False + + def to_proto_str(self) -> str: + return ( + f'Scheme: {self.scheme}\n' + f'VerifySSL: {str(self.verify_ssl).lower()}\n' + f'Endpoint: "{self.endpoint}"\n' + f'Bucket: "{self.bucket}"\n' + f'AccessKey: "{self.access_key}"\n' + f'SecretKey: "{self.secret_key}"\n' + ) + + +@dataclass +class TieringRule: + tier_name: str + duration_for_evict: str + + def to_dict(self): + return { + 'tierName': self.tier_name, + 'durationForEvict': self.duration_for_evict, + } + + +@dataclass +class TieringPolicy: + rules: list[TieringRule] + + def __init__(self): + self.rules = [] + + def with_rule(self, rule: TieringRule): + self.rules.append(rule) + return self + + def to_json(self) -> str: + return json.dumps({"rules": list(map(lambda x: x.to_dict(), self.rules))}) + + +@dataclass +class TierConfig: + name: str + s3_params: ObjectStorageParams + + def to_proto_str(self) -> str: + return ( + f'Name: "{self.name}"\n' + f'ObjectStorage: {{\n{self.s3_params.to_proto_str()}\n}}' + ) + + +class UpdateTieringRuleBase(ScenarioTestHelper.IYqlble): + """Create or alter a tiering rule. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str, default_column: str, config: TieringPolicy, verb: str, preposition: str) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object. + default_column: Default column. + config: Tiering rules to apply. + verb: CREATE or ALTER. + preposition: WITH or SET.""" + + super().__init__(name) + self._default_column: str = default_column + self._config: TieringPolicy = config + self._verb: str = verb + self._preposition = preposition + + @override + def params(self) -> Dict[str, str]: + return {'tiering_rule': self._name, 'config': self._config.to_json()} + + @override + def title(self): + return f'Alter tiering rule' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'{self._verb} OBJECT `{self._name}` (TYPE TIERING_RULE) {self._preposition} (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' + + @override + def _type(self) -> str: + return 'tiering_rule' + + +class CreateTieringRule(UpdateTieringRuleBase): + """Create a tiering rule. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str, default_column: str, config: TieringPolicy) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object. + default_column: Default column. + config: Tiering configuration.""" + + super().__init__(name, default_column, config, 'CREATE', 'WITH') + + +class CreateTieringRule(UpdateTieringRuleBase): + """Create a tiering rule. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str, default_column: str, config: TieringPolicy) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object. + default_column: Default column. + config: Tiering configuration.""" + + super().__init__(name, default_column, config, 'ALTER', 'SET') + + +class CreateTier(ScenarioTestHelper.IYqlble): + """Alter a tiering rule. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str, config: TierConfig) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object.""" + + super().__init__(name) + self._config: TierConfig = config + + @override + def params(self) -> Dict[str, str]: + return {self._type(): self._name, 'config': self._config.to_proto_str()} + + @override + def title(self): + return f'Create tier' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'CREATE OBJECT `{self._name}` (TYPE TIER) WITH (tierConfig = `{self._config.to_proto_str()}`)' + + @override + def _type(self) -> str: + 'tier' + + +class DropObjectBase(ScenarioTestHelper.IYqlble): + """Drop a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str, object_type: str) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object.""" + + super().__init__(name) + self._object_type = object_type + + @override + def params(self) -> Dict[str, str]: + return {'object_type': self._object_type} + + @override + def title(self): + return f'Drop {self._object_type.lower()}' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'DROP OBJECT `{self._name}` (TYPE {self._object_type})' + + +class DropTier(DropObjectBase): + """Drop a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object.""" + + super().__init__(name, 'TIER') + + +class DropTieringRule(DropObjectBase): + """Drop a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object.""" + + super().__init__(name, 'TIERING_RULE') \ No newline at end of file diff --git a/ydb/tests/olap/scenario/test_alter_tiering.py b/ydb/tests/olap/scenario/test_alter_tiering.py new file mode 100644 index 000000000000..7b60a63ecb02 --- /dev/null +++ b/ydb/tests/olap/scenario/test_alter_tiering.py @@ -0,0 +1,117 @@ +from conftest import BaseTestSet +from ydb.tests.olap.scenario.helpers import ( + ScenarioTestHelper, + TestContext, + CreateTable, + CreateTableStore, + DropTable, +) +import datetime +import random +from time import sleep +from ydb.issues import GenericError +from ydb import PrimitiveType +from ydb import StatusCode +import threading +import allure +from helpers.tiering_helper import ObjectStorageParams, CreateTier, CreateTieringRule, TierConfig, TieringPolicy, TieringRule, DropTier, DropTieringRule +from helpers.table_helper import AlterTable +import helpers.data_generators as dg +from sys import stderr +from typing import Iterable + + +class TestAlterTiering(BaseTestSet): + schema1 = ( + ScenarioTestHelper.Schema() + .with_column(name='timestamp', type=PrimitiveType.Timestamp, not_null=True) + .with_column(name='level', type=PrimitiveType.Uint64) + .with_key_columns('timestamp') + ) + + class TestThread(threading.Thread): + def run(self) -> None: + self.exc = None + try: + self.ret = self._target(*self._args, **self._kwargs) + except BaseException as e: + self.exc = e + + def join(self, timeout=None): + super().join(timeout) + if self.exc: + raise self.exc + return self.ret + + def _drop_tables(self, prefix: str, count: int, ctx: TestContext): + sth = ScenarioTestHelper(ctx) + for i in range(count): + sth.execute_scheme_query(DropTable(f'store/{prefix}_{i}')) + + def _scan(self, ctx: TestContext, table: str, duration: datetime.timedelta): + deadline = datetime.datetime.now() + duration + sth = ScenarioTestHelper(ctx) + while datetime.datetime.now() < deadline: + sth.execute_scan_query(f'SELECT MIN(level) FROM `{sth.get_full_path(table)}`') + + def _upsert(self, ctx: TestContext, table: str, duration: datetime.timedelta): + deadline = datetime.datetime.now() + duration + sth = ScenarioTestHelper(ctx) + while datetime.datetime.now() < deadline: + sth.bulk_upsert(table, dg.DataGeneratorPerColumn(self.schema1, 100, dg.ColumnValueGeneratorRandom(null_probability=0.1)), comment="random data") + + def _change_tiering_rule(self, ctx: TestContext, table: str, tiering_rules: Iterable[str], duration: datetime.timedelta): + deadline = datetime.datetime.now() + duration + sth = ScenarioTestHelper(ctx) + while datetime.datetime.now() < deadline: + for tiering_rule in tiering_rules: + sth.execute_scheme_query(AlterTable(table).set_tiering(tiering_rule), expected_status={StatusCode.SUCCESS, StatusCode.OVERLOADED}) + + def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): + test_duration = datetime.timedelta(seconds=10) + n_altering_threads = 10 + + s3_config = ObjectStorageParams( + scheme= 'HTTP', + verify_ssl= False, + endpoint= 'storage.yandexcloud.net', + bucket= 'ydb-test-test', + access_key= 'YCAJEETLVEXjyGNJqQL5tz9h5', + secret_key= 'YCPKcqXpGpodOtpTq9yD6CVsEryhLC6hJatBL_hv', + ) + + sth = ScenarioTestHelper(ctx) + + tiers: list[str] = [] + tiering_rules: list[str] = [] + for i in range(10): + tiers.append(f'TestAlterTiering:tier{i}') + tiering_rules.append(f'TestAlterTiering:tiering_rule{i}') + + sth.execute_scheme_query(CreateTier(tiers[-1], TierConfig('tier1', s3_config)), expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}) + sth.execute_scheme_query(CreateTieringRule(tiering_rules[-1], 'timestamp', TieringPolicy().with_rule(TieringRule(tiers[-1], '30000d'))), expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}) + + try: + sth.execute_scheme_query(CreateTableStore('store').with_schema(self.schema1)) + sth.execute_scheme_query(CreateTable(f'store/table').with_schema(self.schema1)) + + threads = [] + + threads.append(self.TestThread(target=self._scan, args=[ctx, 'store/table', test_duration])) + threads.append(self.TestThread(target=self._upsert, args=[ctx, 'store/table', test_duration])) + + for _ in range(n_altering_threads): + threads.append( + self.TestThread(target=self._change_tiering_rule, args=[ctx, 'store/table', random.sample(tiering_rules, len(tiering_rules)), test_duration]) + ) + for t in threads: + t.start() + for t in threads: + t.join() + except Exception: + raise + finally: + for t in tiers: + sth.execute_scheme_query(DropTier(t)) + for t in tiering_rules: + sth.execute_scheme_query(DropTieringRule(t)) diff --git a/ydb/tests/olap/scenario/ya.make b/ydb/tests/olap/scenario/ya.make index 49eee4306f26..58a33a89fdba 100644 --- a/ydb/tests/olap/scenario/ya.make +++ b/ydb/tests/olap/scenario/ya.make @@ -11,6 +11,7 @@ PY3TEST() TEST_SRCS( test_simple.py test_scheme_load.py + test_alter_tiering.py ) PEERDIR( From a4d676a4b3cb658fb0a3da74bcd891abe3b530f1 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Sun, 11 Aug 2024 11:25:01 +0000 Subject: [PATCH 2/5] eidt test --- ydb/tests/olap/scenario/test_alter_tiering.py | 73 ++++++++++--------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/ydb/tests/olap/scenario/test_alter_tiering.py b/ydb/tests/olap/scenario/test_alter_tiering.py index 7b60a63ecb02..a9191ebc04a4 100644 --- a/ydb/tests/olap/scenario/test_alter_tiering.py +++ b/ydb/tests/olap/scenario/test_alter_tiering.py @@ -6,18 +6,23 @@ CreateTableStore, DropTable, ) -import datetime -import random -from time import sleep -from ydb.issues import GenericError +from helpers.tiering_helper import ( + ObjectStorageParams, + CreateTier, + CreateTieringRule, + TierConfig, + TieringPolicy, + TieringRule, + DropTier, + DropTieringRule +) +import helpers.data_generators as dg +from helpers.table_helper import AlterTable from ydb import PrimitiveType from ydb import StatusCode +import datetime +import random import threading -import allure -from helpers.tiering_helper import ObjectStorageParams, CreateTier, CreateTieringRule, TierConfig, TieringPolicy, TieringRule, DropTier, DropTieringRule -from helpers.table_helper import AlterTable -import helpers.data_generators as dg -from sys import stderr from typing import Iterable @@ -65,11 +70,10 @@ def _change_tiering_rule(self, ctx: TestContext, table: str, tiering_rules: Iter sth = ScenarioTestHelper(ctx) while datetime.datetime.now() < deadline: for tiering_rule in tiering_rules: - sth.execute_scheme_query(AlterTable(table).set_tiering(tiering_rule), expected_status={StatusCode.SUCCESS, StatusCode.OVERLOADED}) + sth.execute_scheme_query(AlterTable(table).set_tiering(tiering_rule)) def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): - test_duration = datetime.timedelta(seconds=10) - n_altering_threads = 10 + test_duration = datetime.timedelta(seconds=60) s3_config = ObjectStorageParams( scheme= 'HTTP', @@ -91,27 +95,24 @@ def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): sth.execute_scheme_query(CreateTier(tiers[-1], TierConfig('tier1', s3_config)), expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}) sth.execute_scheme_query(CreateTieringRule(tiering_rules[-1], 'timestamp', TieringPolicy().with_rule(TieringRule(tiers[-1], '30000d'))), expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}) - try: - sth.execute_scheme_query(CreateTableStore('store').with_schema(self.schema1)) - sth.execute_scheme_query(CreateTable(f'store/table').with_schema(self.schema1)) - - threads = [] - - threads.append(self.TestThread(target=self._scan, args=[ctx, 'store/table', test_duration])) - threads.append(self.TestThread(target=self._upsert, args=[ctx, 'store/table', test_duration])) - - for _ in range(n_altering_threads): - threads.append( - self.TestThread(target=self._change_tiering_rule, args=[ctx, 'store/table', random.sample(tiering_rules, len(tiering_rules)), test_duration]) - ) - for t in threads: - t.start() - for t in threads: - t.join() - except Exception: - raise - finally: - for t in tiers: - sth.execute_scheme_query(DropTier(t)) - for t in tiering_rules: - sth.execute_scheme_query(DropTieringRule(t)) + sth.execute_scheme_query(CreateTableStore('store').with_schema(self.schema1)) + sth.execute_scheme_query(CreateTable(f'store/table').with_schema(self.schema1)) + + threads = [] + + threads.append(self.TestThread(target=self._scan, args=[ctx, 'store/table', test_duration])) + threads.append(self.TestThread(target=self._upsert, args=[ctx, 'store/table', test_duration])) + threads.append(self.TestThread( + target=self._change_tiering_rule, + args=[ctx, 'store/table', random.sample(tiering_rules, len(tiering_rules)), test_duration] + )) + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + for tier in tiers: + sth.execute_scheme_query(DropTier(tier)) + for tiering in tiering_rules: + sth.execute_scheme_query(DropTieringRule(tiering)) From 4068c8596b86712145de3609bf8a03deb681519a Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Sun, 11 Aug 2024 12:49:40 +0000 Subject: [PATCH 3/5] improve the test --- .../olap/scenario/helpers/table_helper.py | 41 +++++++++++ .../olap/scenario/helpers/tiering_helper.py | 73 ++++++++----------- ydb/tests/olap/scenario/test_alter_tiering.py | 36 +++++++-- 3 files changed, 103 insertions(+), 47 deletions(-) diff --git a/ydb/tests/olap/scenario/helpers/table_helper.py b/ydb/tests/olap/scenario/helpers/table_helper.py index 8bae922e3726..7e029eb8ed45 100644 --- a/ydb/tests/olap/scenario/helpers/table_helper.py +++ b/ydb/tests/olap/scenario/helpers/table_helper.py @@ -250,6 +250,37 @@ def title(self) -> str: return f'set {self._setting} = {self._value}' +class ResetSetting(AlterTableAction): + """Reset value of a setting for a table-like object. + + Table-like objects are Tables and TableStore. + See {AlterTableLikeObject}. + + Example: + sth = ScenarioTestHelper(ctx) + sth.execute_scheme_query( + AlterTable('testTable').action(ResetSetting('TIERING')) + ) + """ + + def __init__(self, setting: str) -> None: + """Constructor. + + Args: + setting: Name of altered setting.""" + + super().__init__() + self._setting = setting + + @override + def to_yql(self) -> str: + return f'RESET ({self._setting})' + + @override + def title(self) -> str: + return f'reset {self._setting}' + + class AlterTableLikeObject(ScenarioTestHelper.IYqlble): """The base class for all requests to change table-like objects. @@ -322,6 +353,16 @@ def set_tiering(self, tiering_rule: str) -> AlterTableLikeObject: return self(SetSetting('TIERING', f'"{tiering_rule}"')) + def reset_tiering(self) -> AlterTableLikeObject: + """Remove a tiering policy. + + The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. + + Returns: + self.""" + + return self(ResetSetting('TIERING')) + def set_ttl(self, interval: str, column: str) -> AlterTableLikeObject: """Set row TTL. diff --git a/ydb/tests/olap/scenario/helpers/tiering_helper.py b/ydb/tests/olap/scenario/helpers/tiering_helper.py index da5c6743bf5c..f739d68b77cf 100644 --- a/ydb/tests/olap/scenario/helpers/tiering_helper.py +++ b/ydb/tests/olap/scenario/helpers/tiering_helper.py @@ -70,27 +70,23 @@ def to_proto_str(self) -> str: ) -class UpdateTieringRuleBase(ScenarioTestHelper.IYqlble): - """Create or alter a tiering rule. +class AlterTieringRule(ScenarioTestHelper.IYqlble): + """Alter a tiering rule. See {ScenarioTestHelper.IYqlble}. """ - def __init__(self, name: str, default_column: str, config: TieringPolicy, verb: str, preposition: str) -> None: + def __init__(self, name: str, default_column: str, config: TieringPolicy) -> None: """Constructor. Args: name: Name (relative path) of the altered object. default_column: Default column. - config: Tiering rules to apply. - verb: CREATE or ALTER. - preposition: WITH or SET.""" + config: Tiering rules to apply.""" super().__init__(name) self._default_column: str = default_column self._config: TieringPolicy = config - self._verb: str = verb - self._preposition = preposition @override def params(self) -> Dict[str, str]: @@ -98,53 +94,33 @@ def params(self) -> Dict[str, str]: @override def title(self): - return f'Alter tiering rule' + return 'Alter tiering rule' @override def to_yql(self, ctx: TestContext) -> str: - return f'{self._verb} OBJECT `{self._name}` (TYPE TIERING_RULE) {self._preposition} (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' + return f'ALTER OBJECT `{self._name}` (TYPE TIERING_RULE) SET (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' @override def _type(self) -> str: return 'tiering_rule' -class CreateTieringRule(UpdateTieringRuleBase): +class CreateTieringRule(AlterTieringRule): """Create a tiering rule. See {ScenarioTestHelper.IYqlble}. """ + @override + def title(self): + return 'Create tiering rule' - def __init__(self, name: str, default_column: str, config: TieringPolicy) -> None: - """Constructor. - - Args: - name: Name (relative path) of the altered object. - default_column: Default column. - config: Tiering configuration.""" - - super().__init__(name, default_column, config, 'CREATE', 'WITH') - - -class CreateTieringRule(UpdateTieringRuleBase): - """Create a tiering rule. - - See {ScenarioTestHelper.IYqlble}. - """ - - def __init__(self, name: str, default_column: str, config: TieringPolicy) -> None: - """Constructor. - - Args: - name: Name (relative path) of the altered object. - default_column: Default column. - config: Tiering configuration.""" - - super().__init__(name, default_column, config, 'ALTER', 'SET') + @override + def to_yql(self, ctx: TestContext) -> str: + return f'CREATE OBJECT `{self._name}` (TYPE TIERING_RULE) WITH (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' -class CreateTier(ScenarioTestHelper.IYqlble): - """Alter a tiering rule. +class AlterTier(ScenarioTestHelper.IYqlble): + """Alter a tier. See {ScenarioTestHelper.IYqlble}. """ @@ -164,17 +140,32 @@ def params(self) -> Dict[str, str]: @override def title(self): - return f'Create tier' + return 'Alter tier' @override def to_yql(self, ctx: TestContext) -> str: - return f'CREATE OBJECT `{self._name}` (TYPE TIER) WITH (tierConfig = `{self._config.to_proto_str()}`)' + return f'ALTER OBJECT `{self._name}` (TYPE TIER) SET (tierConfig = `{self._config.to_proto_str()}`)' @override def _type(self) -> str: 'tier' +class CreateTier(AlterTier): + """Create a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def title(self): + return 'Create tier' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'CREATE OBJECT `{self._name}` (TYPE TIER) WITH (tierConfig = `{self._config.to_proto_str()}`)' + + class DropObjectBase(ScenarioTestHelper.IYqlble): """Drop a tier. diff --git a/ydb/tests/olap/scenario/test_alter_tiering.py b/ydb/tests/olap/scenario/test_alter_tiering.py index a9191ebc04a4..189405f237a1 100644 --- a/ydb/tests/olap/scenario/test_alter_tiering.py +++ b/ydb/tests/olap/scenario/test_alter_tiering.py @@ -8,7 +8,9 @@ ) from helpers.tiering_helper import ( ObjectStorageParams, + AlterTier, CreateTier, + AlterTieringRule, CreateTieringRule, TierConfig, TieringPolicy, @@ -63,7 +65,11 @@ def _upsert(self, ctx: TestContext, table: str, duration: datetime.timedelta): deadline = datetime.datetime.now() + duration sth = ScenarioTestHelper(ctx) while datetime.datetime.now() < deadline: - sth.bulk_upsert(table, dg.DataGeneratorPerColumn(self.schema1, 100, dg.ColumnValueGeneratorRandom(null_probability=0.1)), comment="random data") + sth.bulk_upsert( + table, + dg.DataGeneratorPerColumn(self.schema1, 100, dg.ColumnValueGeneratorRandom(null_probability=0.1)), + comment="random data", + ) def _change_tiering_rule(self, ctx: TestContext, table: str, tiering_rules: Iterable[str], duration: datetime.timedelta): deadline = datetime.datetime.now() + duration @@ -71,9 +77,10 @@ def _change_tiering_rule(self, ctx: TestContext, table: str, tiering_rules: Iter while datetime.datetime.now() < deadline: for tiering_rule in tiering_rules: sth.execute_scheme_query(AlterTable(table).set_tiering(tiering_rule)) + sth.execute_scheme_query(AlterTable(table).reset_tiering()) def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): - test_duration = datetime.timedelta(seconds=60) + test_duration = datetime.timedelta(seconds=10) s3_config = ObjectStorageParams( scheme= 'HTTP', @@ -92,8 +99,25 @@ def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): tiers.append(f'TestAlterTiering:tier{i}') tiering_rules.append(f'TestAlterTiering:tiering_rule{i}') - sth.execute_scheme_query(CreateTier(tiers[-1], TierConfig('tier1', s3_config)), expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}) - sth.execute_scheme_query(CreateTieringRule(tiering_rules[-1], 'timestamp', TieringPolicy().with_rule(TieringRule(tiers[-1], '30000d'))), expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}) + sth.execute_scheme_query( + CreateTier(tiers[-1], TierConfig('tier1', s3_config)), + expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}, + ) + sth.execute_scheme_query( + CreateTieringRule( + tiering_rules[-1], + 'timestamp', + TieringPolicy().with_rule(TieringRule(tiers[-1], '30000d')), + ), + expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}, + ) + + sth.execute_scheme_query(AlterTier(tiers[-1], TierConfig('tier1', s3_config))) + sth.execute_scheme_query(AlterTieringRule( + tiering_rules[-1], + 'timestamp', + TieringPolicy().with_rule(TieringRule(tiers[-1], '30000d')), + )) sth.execute_scheme_query(CreateTableStore('store').with_schema(self.schema1)) sth.execute_scheme_query(CreateTable(f'store/table').with_schema(self.schema1)) @@ -112,7 +136,7 @@ def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): for thread in threads: thread.join() - for tier in tiers: - sth.execute_scheme_query(DropTier(tier)) for tiering in tiering_rules: sth.execute_scheme_query(DropTieringRule(tiering)) + for tier in tiers: + sth.execute_scheme_query(DropTier(tier)) From 5a4fe38a2b8ed504e894755e1d8220853b8194c1 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Sun, 11 Aug 2024 19:22:30 +0000 Subject: [PATCH 4/5] improve the test --- ydb/tests/olap/scenario/test_alter_tiering.py | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/ydb/tests/olap/scenario/test_alter_tiering.py b/ydb/tests/olap/scenario/test_alter_tiering.py index 189405f237a1..c145217c4c64 100644 --- a/ydb/tests/olap/scenario/test_alter_tiering.py +++ b/ydb/tests/olap/scenario/test_alter_tiering.py @@ -32,7 +32,8 @@ class TestAlterTiering(BaseTestSet): schema1 = ( ScenarioTestHelper.Schema() .with_column(name='timestamp', type=PrimitiveType.Timestamp, not_null=True) - .with_column(name='level', type=PrimitiveType.Uint64) + .with_column(name='writer', type=PrimitiveType.Uint32) + .with_column(name='value', type=PrimitiveType.Uint64) .with_key_columns('timestamp') ) @@ -54,33 +55,38 @@ def _drop_tables(self, prefix: str, count: int, ctx: TestContext): sth = ScenarioTestHelper(ctx) for i in range(count): sth.execute_scheme_query(DropTable(f'store/{prefix}_{i}')) - - def _scan(self, ctx: TestContext, table: str, duration: datetime.timedelta): - deadline = datetime.datetime.now() + duration - sth = ScenarioTestHelper(ctx) - while datetime.datetime.now() < deadline: - sth.execute_scan_query(f'SELECT MIN(level) FROM `{sth.get_full_path(table)}`') - def _upsert(self, ctx: TestContext, table: str, duration: datetime.timedelta): + def _upsert(self, ctx: TestContext, table: str, writer_id: int, duration: datetime.timedelta): deadline = datetime.datetime.now() + duration sth = ScenarioTestHelper(ctx) + next_value = 0; while datetime.datetime.now() < deadline: sth.bulk_upsert( table, - dg.DataGeneratorPerColumn(self.schema1, 100, dg.ColumnValueGeneratorRandom(null_probability=0.1)), - comment="random data", + dg.DataGeneratorPerColumn(self.schema1, 100) + .with_column('timestamp', dg.ColumnValueGeneratorRandom(null_probability=0)) + .with_column('writer', dg.DataGeneratorConst(writer_id)) + .with_column('value', dg.ColumnValueGeneratorSequential(next_value)) ) + result = sth.execute_scan_query(f'SELECT COUNT(*) FROM `{sth.get_full_path(table)}` WHERE writer == {writer_id}') + assert result.result_set.rows[0][0] == next_value def _change_tiering_rule(self, ctx: TestContext, table: str, tiering_rules: Iterable[str], duration: datetime.timedelta): deadline = datetime.datetime.now() + duration sth = ScenarioTestHelper(ctx) while datetime.datetime.now() < deadline: for tiering_rule in tiering_rules: - sth.execute_scheme_query(AlterTable(table).set_tiering(tiering_rule)) - sth.execute_scheme_query(AlterTable(table).reset_tiering()) + sth.execute_scheme_query( + AlterTable(table).set_tiering(tiering_rule), + expected_status={StatusCode.SUCCESS, StatusCode.OVERLOADED}, + ) + sth.execute_scheme_query( + AlterTable(table).reset_tiering(), + expected_status={StatusCode.SUCCESS, StatusCode.OVERLOADED}, + ) def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): - test_duration = datetime.timedelta(seconds=10) + test_duration = datetime.timedelta(seconds=120) s3_config = ObjectStorageParams( scheme= 'HTTP', @@ -99,37 +105,34 @@ def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): tiers.append(f'TestAlterTiering:tier{i}') tiering_rules.append(f'TestAlterTiering:tiering_rule{i}') + tier_config = TierConfig('tier1', s3_config) + tiering_config = TieringPolicy().with_rule(TieringRule(tiers[-1], '10000d')) + + # If tiers exist, allow CREATE to fail, then alter them sth.execute_scheme_query( - CreateTier(tiers[-1], TierConfig('tier1', s3_config)), + CreateTier(tiers[-1], tier_config), expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}, ) sth.execute_scheme_query( - CreateTieringRule( - tiering_rules[-1], - 'timestamp', - TieringPolicy().with_rule(TieringRule(tiers[-1], '30000d')), - ), + CreateTieringRule(tiering_rules[-1], 'timestamp', tiering_config), expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}, ) - sth.execute_scheme_query(AlterTier(tiers[-1], TierConfig('tier1', s3_config))) - sth.execute_scheme_query(AlterTieringRule( - tiering_rules[-1], - 'timestamp', - TieringPolicy().with_rule(TieringRule(tiers[-1], '30000d')), - )) + sth.execute_scheme_query(AlterTier(tiers[-1], tier_config)) + sth.execute_scheme_query(AlterTieringRule(tiering_rules[-1], 'timestamp', tiering_config)) sth.execute_scheme_query(CreateTableStore('store').with_schema(self.schema1)) - sth.execute_scheme_query(CreateTable(f'store/table').with_schema(self.schema1)) + sth.execute_scheme_query(CreateTable('store/table').with_schema(self.schema1)) threads = [] - threads.append(self.TestThread(target=self._scan, args=[ctx, 'store/table', test_duration])) - threads.append(self.TestThread(target=self._upsert, args=[ctx, 'store/table', test_duration])) - threads.append(self.TestThread( - target=self._change_tiering_rule, - args=[ctx, 'store/table', random.sample(tiering_rules, len(tiering_rules)), test_duration] - )) + for i in range(4): + threads.append(self.TestThread(target=self._upsert, args=[ctx, 'store/table', i, test_duration])) + for _ in range(2): + threads.append(self.TestThread( + target=self._change_tiering_rule, + args=[ctx, 'store/table', random.sample(tiering_rules, len(tiering_rules)), test_duration] + )) for thread in threads: thread.start() From 229eee51f86340033605837d487c3034d3e4fd09 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Sun, 11 Aug 2024 19:59:32 +0000 Subject: [PATCH 5/5] nit --- .../olap/scenario/helpers/data_generators.py | 26 ++++++-- .../olap/scenario/helpers/table_helper.py | 2 +- .../olap/scenario/helpers/tiering_helper.py | 62 ++++++++----------- ydb/tests/olap/scenario/test_alter_tiering.py | 12 ++-- 4 files changed, 55 insertions(+), 47 deletions(-) diff --git a/ydb/tests/olap/scenario/helpers/data_generators.py b/ydb/tests/olap/scenario/helpers/data_generators.py index 4945415a1085..286f4d8347a0 100644 --- a/ydb/tests/olap/scenario/helpers/data_generators.py +++ b/ydb/tests/olap/scenario/helpers/data_generators.py @@ -42,14 +42,30 @@ def next_row(self) -> None: pass -class ColumnValueGeneratorNull(IColumnValueGenerator): - """NULL column value generator. +class ColumnValueGeneratorConst(IColumnValueGenerator): + """Const column value generator. - Allways generate NULL value.""" + Allways generate specified value.""" + + def __init__(self, value: Any) -> None: + """Constructor. + + Args: + value: Value to generate. + Example: + DataGeneratorPerColumn( + self.schema2, 10, + ColumnValueGeneratorDefault(init_value=10)) + .with_column('not_level', ColumnValueGeneratorConst(42) + ) + """ + + super().__init__() + self._value = value @override - def generate_value(column: ScenarioTestHelper.Column) -> Any: - return None + def generate_value(self, column: ScenarioTestHelper.Column) -> Any: + return self._value class ColumnValueGeneratorRandom(IColumnValueGenerator): diff --git a/ydb/tests/olap/scenario/helpers/table_helper.py b/ydb/tests/olap/scenario/helpers/table_helper.py index 7e029eb8ed45..8b1963fd13e0 100644 --- a/ydb/tests/olap/scenario/helpers/table_helper.py +++ b/ydb/tests/olap/scenario/helpers/table_helper.py @@ -364,7 +364,7 @@ def reset_tiering(self) -> AlterTableLikeObject: return self(ResetSetting('TIERING')) def set_ttl(self, interval: str, column: str) -> AlterTableLikeObject: - """Set row TTL. + """Set TTL for rows. The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. diff --git a/ydb/tests/olap/scenario/helpers/tiering_helper.py b/ydb/tests/olap/scenario/helpers/tiering_helper.py index f739d68b77cf..5801aadc7af3 100644 --- a/ydb/tests/olap/scenario/helpers/tiering_helper.py +++ b/ydb/tests/olap/scenario/helpers/tiering_helper.py @@ -1,12 +1,11 @@ -# TODO: pre-review refactor from __future__ import annotations from ydb.tests.olap.scenario.helpers.scenario_tests_helper import ( ScenarioTestHelper, TestContext, ) -from abc import abstractmethod, ABC +from abc import abstractmethod -from typing import override, Dict, Optional +from typing import override, Dict from dataclasses import dataclass import json @@ -55,7 +54,7 @@ def with_rule(self, rule: TieringRule): return self def to_json(self) -> str: - return json.dumps({"rules": list(map(lambda x: x.to_dict(), self.rules))}) + return json.dumps({'rules': list(map(lambda x: x.to_dict(), self.rules))}) @dataclass @@ -81,7 +80,7 @@ def __init__(self, name: str, default_column: str, config: TieringPolicy) -> Non Args: name: Name (relative path) of the altered object. - default_column: Default column. + default_column: Default column used for tiering. config: Tiering rules to apply.""" super().__init__(name) @@ -98,11 +97,8 @@ def title(self): @override def to_yql(self, ctx: TestContext) -> str: - return f'ALTER OBJECT `{self._name}` (TYPE TIERING_RULE) SET (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' - - @override - def _type(self) -> str: - return 'tiering_rule' + return f'ALTER OBJECT `{self._name}` (TYPE TIERING_RULE)' \ + f' SET (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' class CreateTieringRule(AlterTieringRule): @@ -110,13 +106,15 @@ class CreateTieringRule(AlterTieringRule): See {ScenarioTestHelper.IYqlble}. """ + @override def title(self): return 'Create tiering rule' @override def to_yql(self, ctx: TestContext) -> str: - return f'CREATE OBJECT `{self._name}` (TYPE TIERING_RULE) WITH (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' + return f'CREATE OBJECT `{self._name}` (TYPE TIERING_RULE)' \ + f' WITH (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' class AlterTier(ScenarioTestHelper.IYqlble): @@ -129,14 +127,15 @@ def __init__(self, name: str, config: TierConfig) -> None: """Constructor. Args: - name: Name (relative path) of the altered object.""" + name: Name (relative path) of the altered object. + config: Tier configuration.""" super().__init__(name) self._config: TierConfig = config @override def params(self) -> Dict[str, str]: - return {self._type(): self._name, 'config': self._config.to_proto_str()} + return {'tier': self._name, 'config': self._config.to_proto_str()} @override def title(self): @@ -146,10 +145,6 @@ def title(self): def to_yql(self, ctx: TestContext) -> str: return f'ALTER OBJECT `{self._name}` (TYPE TIER) SET (tierConfig = `{self._config.to_proto_str()}`)' - @override - def _type(self) -> str: - 'tier' - class CreateTier(AlterTier): """Create a tier. @@ -172,26 +167,29 @@ class DropObjectBase(ScenarioTestHelper.IYqlble): See {ScenarioTestHelper.IYqlble}. """ - def __init__(self, name: str, object_type: str) -> None: + def __init__(self, name: str) -> None: """Constructor. Args: name: Name (relative path) of the altered object.""" super().__init__(name) - self._object_type = object_type @override def params(self) -> Dict[str, str]: - return {'object_type': self._object_type} + return {'object_type': self._object_type()} @override def title(self): - return f'Drop {self._object_type.lower()}' + return f'Drop {self._object_type().lower()}' @override def to_yql(self, ctx: TestContext) -> str: - return f'DROP OBJECT `{self._name}` (TYPE {self._object_type})' + return f'DROP OBJECT `{self._name}` (TYPE {self._object_type()})' + + @abstractmethod + def _object_type(self) -> str: + pass class DropTier(DropObjectBase): @@ -200,13 +198,9 @@ class DropTier(DropObjectBase): See {ScenarioTestHelper.IYqlble}. """ - def __init__(self, name: str) -> None: - """Constructor. - - Args: - name: Name (relative path) of the altered object.""" - - super().__init__(name, 'TIER') + @override + def _object_type(self): + return 'TIER' class DropTieringRule(DropObjectBase): @@ -215,10 +209,6 @@ class DropTieringRule(DropObjectBase): See {ScenarioTestHelper.IYqlble}. """ - def __init__(self, name: str) -> None: - """Constructor. - - Args: - name: Name (relative path) of the altered object.""" - - super().__init__(name, 'TIERING_RULE') \ No newline at end of file + @override + def _object_type(self): + return 'TIERING_RULE' \ No newline at end of file diff --git a/ydb/tests/olap/scenario/test_alter_tiering.py b/ydb/tests/olap/scenario/test_alter_tiering.py index c145217c4c64..e84048ff371c 100644 --- a/ydb/tests/olap/scenario/test_alter_tiering.py +++ b/ydb/tests/olap/scenario/test_alter_tiering.py @@ -16,10 +16,11 @@ TieringPolicy, TieringRule, DropTier, - DropTieringRule + DropTieringRule, ) import helpers.data_generators as dg from helpers.table_helper import AlterTable + from ydb import PrimitiveType from ydb import StatusCode import datetime @@ -32,9 +33,9 @@ class TestAlterTiering(BaseTestSet): schema1 = ( ScenarioTestHelper.Schema() .with_column(name='timestamp', type=PrimitiveType.Timestamp, not_null=True) - .with_column(name='writer', type=PrimitiveType.Uint32) - .with_column(name='value', type=PrimitiveType.Uint64) - .with_key_columns('timestamp') + .with_column(name='writer', type=PrimitiveType.Uint32, not_null=True) + .with_column(name='value', type=PrimitiveType.Uint64, not_null=True) + .with_key_columns('timestamp', 'writer', 'value') ) class TestThread(threading.Thread): @@ -65,9 +66,10 @@ def _upsert(self, ctx: TestContext, table: str, writer_id: int, duration: dateti table, dg.DataGeneratorPerColumn(self.schema1, 100) .with_column('timestamp', dg.ColumnValueGeneratorRandom(null_probability=0)) - .with_column('writer', dg.DataGeneratorConst(writer_id)) + .with_column('writer', dg.ColumnValueGeneratorConst(writer_id)) .with_column('value', dg.ColumnValueGeneratorSequential(next_value)) ) + next_value += 100 result = sth.execute_scan_query(f'SELECT COUNT(*) FROM `{sth.get_full_path(table)}` WHERE writer == {writer_id}') assert result.result_set.rows[0][0] == next_value