From 22cd3a3ee172d61e6a1e1ad8f20101a3164e8e82 Mon Sep 17 00:00:00 2001 From: Semyon Date: Tue, 20 Aug 2024 17:10:31 +0300 Subject: [PATCH] Add a scenario test for altering tiering config (#7636) --- ydb/tests/olap/scenario/conftest.py | 3 +- .../olap/scenario/helpers/data_generators.py | 26 +- .../olap/scenario/helpers/table_helper.py | 99 +++++++ .../olap/scenario/helpers/tiering_helper.py | 246 ++++++++++++++++++ ydb/tests/olap/scenario/test_alter_tiering.py | 152 +++++++++++ ydb/tests/olap/scenario/ya.make | 1 + 6 files changed, 521 insertions(+), 6 deletions(-) create mode 100644 ydb/tests/olap/scenario/helpers/tiering_helper.py create mode 100644 ydb/tests/olap/scenario/test_alter_tiering.py diff --git a/ydb/tests/olap/scenario/conftest.py b/ydb/tests/olap/scenario/conftest.py index 51de3c62ada0..533375e989d4 100644 --- a/ydb/tests/olap/scenario/conftest.py +++ b/ydb/tests/olap/scenario/conftest.py @@ -19,7 +19,8 @@ def get_suite_name(cls): @classmethod def setup_class(cls): - ScenarioTestHelper(None).remove_path(cls.get_suite_name()) + if not external_param_is_true('reuse-tables'): + ScenarioTestHelper(None).remove_path(cls.get_suite_name()) @classmethod def teardown_class(cls): diff --git a/ydb/tests/olap/scenario/helpers/data_generators.py b/ydb/tests/olap/scenario/helpers/data_generators.py index 4945415a1085..286f4d8347a0 100644 --- a/ydb/tests/olap/scenario/helpers/data_generators.py +++ b/ydb/tests/olap/scenario/helpers/data_generators.py @@ -42,14 +42,30 @@ def next_row(self) -> None: pass -class ColumnValueGeneratorNull(IColumnValueGenerator): - """NULL column value generator. +class ColumnValueGeneratorConst(IColumnValueGenerator): + """Const column value generator. - Allways generate NULL value.""" + Allways generate specified value.""" + + def __init__(self, value: Any) -> None: + """Constructor. + + Args: + value: Value to generate. + Example: + DataGeneratorPerColumn( + self.schema2, 10, + ColumnValueGeneratorDefault(init_value=10)) + .with_column('not_level', ColumnValueGeneratorConst(42) + ) + """ + + super().__init__() + self._value = value @override - def generate_value(column: ScenarioTestHelper.Column) -> Any: - return None + def generate_value(self, column: ScenarioTestHelper.Column) -> Any: + return self._value class ColumnValueGeneratorRandom(IColumnValueGenerator): diff --git a/ydb/tests/olap/scenario/helpers/table_helper.py b/ydb/tests/olap/scenario/helpers/table_helper.py index ef2088ecd692..8b1963fd13e0 100644 --- a/ydb/tests/olap/scenario/helpers/table_helper.py +++ b/ydb/tests/olap/scenario/helpers/table_helper.py @@ -218,6 +218,69 @@ def title(self) -> str: return f'drop column `{self._column}`' +class SetSetting(AlterTableAction): + """Set a setting value for a table-like object. + + Table-like objects are Tables and TableStore. + See {AlterTableLikeObject}. + + Example: + sth = ScenarioTestHelper(ctx) + sth.execute_scheme_query( + AlterTable('testTable').action(SetSetting('TIERING', 'tiering1)) + ) + """ + + def __init__(self, setting: str, value_literal: str) -> None: + """Constructor. + + Args: + column: Name of the column to be deleted.""" + + super().__init__() + self._setting = setting + self._value = value_literal + + @override + def to_yql(self) -> str: + return f'SET {self._setting} {self._value}' + + @override + def title(self) -> str: + return f'set {self._setting} = {self._value}' + + +class ResetSetting(AlterTableAction): + """Reset value of a setting for a table-like object. + + Table-like objects are Tables and TableStore. + See {AlterTableLikeObject}. + + Example: + sth = ScenarioTestHelper(ctx) + sth.execute_scheme_query( + AlterTable('testTable').action(ResetSetting('TIERING')) + ) + """ + + def __init__(self, setting: str) -> None: + """Constructor. + + Args: + setting: Name of altered setting.""" + + super().__init__() + self._setting = setting + + @override + def to_yql(self) -> str: + return f'RESET ({self._setting})' + + @override + def title(self) -> str: + return f'reset {self._setting}' + + class AlterTableLikeObject(ScenarioTestHelper.IYqlble): """The base class for all requests to change table-like objects. @@ -277,6 +340,42 @@ def drop_column(self, column: str) -> AlterTableLikeObject: return self(DropColumn(column)) + def set_tiering(self, tiering_rule: str) -> AlterTableLikeObject: + """Set a tiering policy. + + The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. + + Args: + tiering_rule: Name of a TIERING_RULE object. + + Returns: + self.""" + + return self(SetSetting('TIERING', f'"{tiering_rule}"')) + + def reset_tiering(self) -> AlterTableLikeObject: + """Remove a tiering policy. + + The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. + + Returns: + self.""" + + return self(ResetSetting('TIERING')) + + def set_ttl(self, interval: str, column: str) -> AlterTableLikeObject: + """Set TTL for rows. + + The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. + + Args: + tiering_rule: Name of a TIERING_RULE object. + + Returns: + self.""" + + return self(SetSetting('TTL', f'Interval("{interval}") ON `{column}`')) + @override def params(self) -> Dict[str, str]: return {self._type(): self._name, 'actions': ', '.join([a.title() for a in self._actions])} diff --git a/ydb/tests/olap/scenario/helpers/tiering_helper.py b/ydb/tests/olap/scenario/helpers/tiering_helper.py new file mode 100644 index 000000000000..712aed66cac9 --- /dev/null +++ b/ydb/tests/olap/scenario/helpers/tiering_helper.py @@ -0,0 +1,246 @@ +from __future__ import annotations +from ydb.tests.olap.scenario.helpers.scenario_tests_helper import ( + ScenarioTestHelper, + TestContext, +) +from abc import abstractmethod + +from typing import override, Dict +from dataclasses import dataclass +import json + + +@dataclass +class ObjectStorageParams: + endpoint: str + bucket: str + access_key: str + secret_key: str + scheme: str = 'HTTP' + verify_ssl: bool = False + + def to_proto_str(self) -> str: + return ( + f'Scheme: {self.scheme}\n' + f'VerifySSL: {str(self.verify_ssl).lower()}\n' + f'Endpoint: "{self.endpoint}"\n' + f'Bucket: "{self.bucket}"\n' + f'AccessKey: "{self.access_key}"\n' + f'SecretKey: "{self.secret_key}"\n' + ) + + +@dataclass +class TieringRule: + tier_name: str + duration_for_evict: str + + def to_dict(self): + return { + 'tierName': self.tier_name, + 'durationForEvict': self.duration_for_evict, + } + + +@dataclass +class TieringPolicy: + rules: list[TieringRule] + + def __init__(self): + self.rules = [] + + def with_rule(self, rule: TieringRule): + self.rules.append(rule) + return self + + def to_json(self) -> str: + return json.dumps({'rules': list(map(lambda x: x.to_dict(), self.rules))}) + + +@dataclass +class TierConfig: + name: str + s3_params: ObjectStorageParams + + def to_proto_str(self) -> str: + return ( + f'Name: "{self.name}"\n' + f'ObjectStorage: {{\n{self.s3_params.to_proto_str()}\n}}' + ) + + +class AlterTieringRule(ScenarioTestHelper.IYqlble): + """Alter a tiering rule. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str, default_column: str, config: TieringPolicy) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object. + default_column: Default column used for tiering. + config: Tiering rules to apply.""" + + super().__init__(name) + self._default_column: str = default_column + self._config: TieringPolicy = config + + @override + def params(self) -> Dict[str, str]: + return {'tiering_rule': self._name, 'config': self._config.to_json()} + + @override + def title(self): + return 'Alter tiering rule' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'ALTER OBJECT `{self._name}` (TYPE TIERING_RULE)' \ + f' SET (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' + + +class CreateTieringRule(AlterTieringRule): + """Create a tiering rule. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def title(self): + return 'Create tiering rule' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'CREATE OBJECT `{self._name}` (TYPE TIERING_RULE)' \ + f' WITH (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' + + +class CreateTieringRuleIfNotExists(AlterTieringRule): + """Create a tiering rule. If it exists, do nothing. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def title(self): + return 'Create tiering rule' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'CREATE OBJECT IF NOT EXISTS `{self._name}` (TYPE TIERING_RULE)' \ + f' WITH (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' + + +class AlterTier(ScenarioTestHelper.IYqlble): + """Alter a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str, config: TierConfig) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object. + config: Tier configuration.""" + + super().__init__(name) + self._config: TierConfig = config + + @override + def params(self) -> Dict[str, str]: + return {'tier': self._name, 'config': self._config.to_proto_str()} + + @override + def title(self): + return 'Alter tier' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'ALTER OBJECT `{self._name}` (TYPE TIER) SET (tierConfig = `{self._config.to_proto_str()}`)' + + +class CreateTier(AlterTier): + """Create a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def title(self): + return 'Create tier' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'CREATE OBJECT `{self._name}` (TYPE TIER) WITH (tierConfig = `{self._config.to_proto_str()}`)' + + +class CreateTierIfNotExists(AlterTier): + """Create a tier. If it exists, do nothing. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def title(self): + return 'Create tier' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'CREATE OBJECT IF NOT EXISTS `{self._name}` (TYPE TIER)' \ + f' WITH (tierConfig = `{self._config.to_proto_str()}`)' + + +class DropObjectBase(ScenarioTestHelper.IYqlble): + """Drop a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object.""" + + super().__init__(name) + + @override + def params(self) -> Dict[str, str]: + return {'object_type': self._object_type()} + + @override + def title(self): + return f'Drop {self._object_type().lower()}' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'DROP OBJECT `{self._name}` (TYPE {self._object_type()})' + + @abstractmethod + def _object_type(self) -> str: + pass + + +class DropTier(DropObjectBase): + """Drop a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def _object_type(self): + return 'TIER' + + +class DropTieringRule(DropObjectBase): + """Drop a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def _object_type(self): + return 'TIERING_RULE' diff --git a/ydb/tests/olap/scenario/test_alter_tiering.py b/ydb/tests/olap/scenario/test_alter_tiering.py new file mode 100644 index 000000000000..cae96e9da48d --- /dev/null +++ b/ydb/tests/olap/scenario/test_alter_tiering.py @@ -0,0 +1,152 @@ +from conftest import BaseTestSet +from ydb.tests.olap.scenario.helpers import ( + ScenarioTestHelper, + TestContext, + CreateTable, + CreateTableStore, + DropTable, +) +from helpers.tiering_helper import ( + ObjectStorageParams, + AlterTier, + CreateTierIfNotExists, + AlterTieringRule, + CreateTieringRuleIfNotExists, + TierConfig, + TieringPolicy, + TieringRule, + DropTier, + DropTieringRule, +) +import helpers.data_generators as dg +from helpers.table_helper import AlterTable + +from ydb.tests.olap.lib.utils import get_external_param +from ydb import PrimitiveType +import datetime +import random +import threading +from typing import Iterable +import time + + +class TestAlterTiering(BaseTestSet): + schema1 = ( + ScenarioTestHelper.Schema() + .with_column(name='timestamp', type=PrimitiveType.Timestamp, not_null=True) + .with_column(name='writer', type=PrimitiveType.Uint32, not_null=True) + .with_column(name='value', type=PrimitiveType.Uint64, not_null=True) + .with_column(name='data', type=PrimitiveType.String, not_null=True) + .with_key_columns('timestamp', 'writer', 'value') + ) + + class TestThread(threading.Thread): + def run(self) -> None: + self.exc = None + try: + self.ret = self._target(*self._args, **self._kwargs) + except BaseException as e: + self.exc = e + + def join(self, timeout=None): + super().join(timeout) + if self.exc: + raise self.exc + return self.ret + + def _drop_tables(self, prefix: str, count: int, ctx: TestContext): + sth = ScenarioTestHelper(ctx) + for i in range(count): + sth.execute_scheme_query(DropTable(f'store/{prefix}_{i}')) + + def _upsert(self, ctx: TestContext, table: str, writer_id: int, duration: datetime.timedelta): + deadline = datetime.datetime.now() + duration + sth = ScenarioTestHelper(ctx) + rows_written = 0 + i = 0 + while datetime.datetime.now() < deadline: + sth.bulk_upsert( + table, + dg.DataGeneratorPerColumn(self.schema1, 1000) + .with_column('timestamp', dg.ColumnValueGeneratorRandom(null_probability=0)) + .with_column('writer', dg.ColumnValueGeneratorConst(writer_id)) + .with_column('value', dg.ColumnValueGeneratorSequential(rows_written)) + .with_column('data', dg.ColumnValueGeneratorConst(random.randbytes(1024))) + ) + rows_written += 1000 + i += 1 + if rows_written > 100000 and i % 10 == 0: + scan_result = sth.execute_scan_query(f'SELECT COUNT(*) FROM `{sth.get_full_path('store/table')}` WHERE writer == {writer_id}') + assert scan_result.result_set.rows[0][0] == rows_written + + def _change_tiering_rule(self, ctx: TestContext, table: str, tiering_rules: Iterable[str], duration: datetime.timedelta): + deadline = datetime.datetime.now() + duration + sth = ScenarioTestHelper(ctx) + while datetime.datetime.now() < deadline: + for tiering_rule in tiering_rules: + sth.execute_scheme_query(AlterTable(table).set_tiering(tiering_rule)) + sth.execute_scheme_query(AlterTable(table).reset_tiering()) + + def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): + test_duration = datetime.timedelta(seconds=400) + + s3_endpoint = get_external_param('s3-endpoint', 'storage.yandexcloud.net') + s3_access_key = get_external_param('s3-access-key', 'YCAJEM3Pg9fMyuX9ZUOJ_fake') + s3_secret_key = get_external_param('s3-secret-key', 'YCM7Ovup55wDkymyEtO8pw5F10_L5jtVY8w_fake') + s3_buckets = get_external_param('s3-buckets', 'ydb-tiering-test-1,ydb-tiering-test-2').split(',') + + s3_configs = [ + ObjectStorageParams( + scheme='HTTP', + verify_ssl=False, + endpoint=s3_endpoint, + bucket=bucket, + access_key=s3_access_key, + secret_key=s3_secret_key + ) for bucket in s3_buckets + ] + + sth = ScenarioTestHelper(ctx) + + tiers: list[str] = [] + tiering_rules: list[str] = [] + for i, s3_config in enumerate(s3_configs): + tiers.append(f'TestAlterTiering:tier{i}') + tiering_rules.append(f'TestAlterTiering:tiering_rule{i}') + + tier_config = TierConfig(tiers[-1], s3_config) + tiering_config = TieringPolicy().with_rule(TieringRule(tiers[-1], '1s')) + + sth.execute_scheme_query(CreateTierIfNotExists(tiers[-1], tier_config)) + sth.execute_scheme_query(CreateTieringRuleIfNotExists(tiering_rules[-1], 'timestamp', tiering_config)) + + sth.execute_scheme_query(AlterTier(tiers[-1], tier_config)) + sth.execute_scheme_query(AlterTieringRule(tiering_rules[-1], 'timestamp', tiering_config)) + + sth.execute_scheme_query(CreateTableStore('store').with_schema(self.schema1)) + sth.execute_scheme_query(CreateTable('store/table').with_schema(self.schema1)) + + threads = [] + + threads.append(self.TestThread( + target=self._change_tiering_rule, + args=[ctx, 'store/table', tiering_rules, test_duration] + )) + writer_id_offset = random.randint(0, 1 << 30) + for i in range(4): + threads.append(self.TestThread(target=self._upsert, args=[ctx, 'store/table', writer_id_offset + i, test_duration])) + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + for tiering in tiering_rules: + sth.execute_scheme_query(DropTieringRule(tiering)) + for tier in tiers: + sth.execute_scheme_query(DropTier(tier)) + + sth.execute_scheme_query(AlterTable('store/table').set_ttl('P1D', 'timestamp')) + + while sth.execute_scan_query(f'SELECT COUNT(*) FROM `{sth.get_full_path('store/table')}`').result_set.rows[0][0]: + time.sleep(10) diff --git a/ydb/tests/olap/scenario/ya.make b/ydb/tests/olap/scenario/ya.make index 49eee4306f26..58a33a89fdba 100644 --- a/ydb/tests/olap/scenario/ya.make +++ b/ydb/tests/olap/scenario/ya.make @@ -11,6 +11,7 @@ PY3TEST() TEST_SRCS( test_simple.py test_scheme_load.py + test_alter_tiering.py ) PEERDIR(