diff --git a/ydb/tests/olap/scenario/helpers/data_generators.py b/ydb/tests/olap/scenario/helpers/data_generators.py index 4945415a1085..286f4d8347a0 100644 --- a/ydb/tests/olap/scenario/helpers/data_generators.py +++ b/ydb/tests/olap/scenario/helpers/data_generators.py @@ -42,14 +42,30 @@ def next_row(self) -> None: pass -class ColumnValueGeneratorNull(IColumnValueGenerator): - """NULL column value generator. +class ColumnValueGeneratorConst(IColumnValueGenerator): + """Const column value generator. - Allways generate NULL value.""" + Allways generate specified value.""" + + def __init__(self, value: Any) -> None: + """Constructor. + + Args: + value: Value to generate. + Example: + DataGeneratorPerColumn( + self.schema2, 10, + ColumnValueGeneratorDefault(init_value=10)) + .with_column('not_level', ColumnValueGeneratorConst(42) + ) + """ + + super().__init__() + self._value = value @override - def generate_value(column: ScenarioTestHelper.Column) -> Any: - return None + def generate_value(self, column: ScenarioTestHelper.Column) -> Any: + return self._value class ColumnValueGeneratorRandom(IColumnValueGenerator): diff --git a/ydb/tests/olap/scenario/helpers/table_helper.py b/ydb/tests/olap/scenario/helpers/table_helper.py index ef2088ecd692..8b1963fd13e0 100644 --- a/ydb/tests/olap/scenario/helpers/table_helper.py +++ b/ydb/tests/olap/scenario/helpers/table_helper.py @@ -218,6 +218,69 @@ def title(self) -> str: return f'drop column `{self._column}`' +class SetSetting(AlterTableAction): + """Set a setting value for a table-like object. + + Table-like objects are Tables and TableStore. + See {AlterTableLikeObject}. + + Example: + sth = ScenarioTestHelper(ctx) + sth.execute_scheme_query( + AlterTable('testTable').action(SetSetting('TIERING', 'tiering1)) + ) + """ + + def __init__(self, setting: str, value_literal: str) -> None: + """Constructor. + + Args: + column: Name of the column to be deleted.""" + + super().__init__() + self._setting = setting + self._value = value_literal + + @override + def to_yql(self) -> str: + return f'SET {self._setting} {self._value}' + + @override + def title(self) -> str: + return f'set {self._setting} = {self._value}' + + +class ResetSetting(AlterTableAction): + """Reset value of a setting for a table-like object. + + Table-like objects are Tables and TableStore. + See {AlterTableLikeObject}. + + Example: + sth = ScenarioTestHelper(ctx) + sth.execute_scheme_query( + AlterTable('testTable').action(ResetSetting('TIERING')) + ) + """ + + def __init__(self, setting: str) -> None: + """Constructor. + + Args: + setting: Name of altered setting.""" + + super().__init__() + self._setting = setting + + @override + def to_yql(self) -> str: + return f'RESET ({self._setting})' + + @override + def title(self) -> str: + return f'reset {self._setting}' + + class AlterTableLikeObject(ScenarioTestHelper.IYqlble): """The base class for all requests to change table-like objects. @@ -277,6 +340,42 @@ def drop_column(self, column: str) -> AlterTableLikeObject: return self(DropColumn(column)) + def set_tiering(self, tiering_rule: str) -> AlterTableLikeObject: + """Set a tiering policy. + + The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. + + Args: + tiering_rule: Name of a TIERING_RULE object. + + Returns: + self.""" + + return self(SetSetting('TIERING', f'"{tiering_rule}"')) + + def reset_tiering(self) -> AlterTableLikeObject: + """Remove a tiering policy. + + The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. + + Returns: + self.""" + + return self(ResetSetting('TIERING')) + + def set_ttl(self, interval: str, column: str) -> AlterTableLikeObject: + """Set TTL for rows. + + The method is similar to calling {AlterTableLikeObject.action} with a {SetSetting} instance. + + Args: + tiering_rule: Name of a TIERING_RULE object. + + Returns: + self.""" + + return self(SetSetting('TTL', f'Interval("{interval}") ON `{column}`')) + @override def params(self) -> Dict[str, str]: return {self._type(): self._name, 'actions': ', '.join([a.title() for a in self._actions])} diff --git a/ydb/tests/olap/scenario/helpers/tiering_helper.py b/ydb/tests/olap/scenario/helpers/tiering_helper.py new file mode 100644 index 000000000000..5801aadc7af3 --- /dev/null +++ b/ydb/tests/olap/scenario/helpers/tiering_helper.py @@ -0,0 +1,214 @@ +from __future__ import annotations +from ydb.tests.olap.scenario.helpers.scenario_tests_helper import ( + ScenarioTestHelper, + TestContext, +) +from abc import abstractmethod + +from typing import override, Dict +from dataclasses import dataclass +import json + + +@dataclass +class ObjectStorageParams: + endpoint: str + bucket: str + access_key: str + secret_key: str + scheme: str = 'HTTP' + verify_ssl: bool = False + + def to_proto_str(self) -> str: + return ( + f'Scheme: {self.scheme}\n' + f'VerifySSL: {str(self.verify_ssl).lower()}\n' + f'Endpoint: "{self.endpoint}"\n' + f'Bucket: "{self.bucket}"\n' + f'AccessKey: "{self.access_key}"\n' + f'SecretKey: "{self.secret_key}"\n' + ) + + +@dataclass +class TieringRule: + tier_name: str + duration_for_evict: str + + def to_dict(self): + return { + 'tierName': self.tier_name, + 'durationForEvict': self.duration_for_evict, + } + + +@dataclass +class TieringPolicy: + rules: list[TieringRule] + + def __init__(self): + self.rules = [] + + def with_rule(self, rule: TieringRule): + self.rules.append(rule) + return self + + def to_json(self) -> str: + return json.dumps({'rules': list(map(lambda x: x.to_dict(), self.rules))}) + + +@dataclass +class TierConfig: + name: str + s3_params: ObjectStorageParams + + def to_proto_str(self) -> str: + return ( + f'Name: "{self.name}"\n' + f'ObjectStorage: {{\n{self.s3_params.to_proto_str()}\n}}' + ) + + +class AlterTieringRule(ScenarioTestHelper.IYqlble): + """Alter a tiering rule. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str, default_column: str, config: TieringPolicy) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object. + default_column: Default column used for tiering. + config: Tiering rules to apply.""" + + super().__init__(name) + self._default_column: str = default_column + self._config: TieringPolicy = config + + @override + def params(self) -> Dict[str, str]: + return {'tiering_rule': self._name, 'config': self._config.to_json()} + + @override + def title(self): + return 'Alter tiering rule' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'ALTER OBJECT `{self._name}` (TYPE TIERING_RULE)' \ + f' SET (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' + + +class CreateTieringRule(AlterTieringRule): + """Create a tiering rule. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def title(self): + return 'Create tiering rule' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'CREATE OBJECT `{self._name}` (TYPE TIERING_RULE)' \ + f' WITH (defaultColumn = {self._default_column}, description = `{self._config.to_json()}`)' + + +class AlterTier(ScenarioTestHelper.IYqlble): + """Alter a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str, config: TierConfig) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object. + config: Tier configuration.""" + + super().__init__(name) + self._config: TierConfig = config + + @override + def params(self) -> Dict[str, str]: + return {'tier': self._name, 'config': self._config.to_proto_str()} + + @override + def title(self): + return 'Alter tier' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'ALTER OBJECT `{self._name}` (TYPE TIER) SET (tierConfig = `{self._config.to_proto_str()}`)' + + +class CreateTier(AlterTier): + """Create a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def title(self): + return 'Create tier' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'CREATE OBJECT `{self._name}` (TYPE TIER) WITH (tierConfig = `{self._config.to_proto_str()}`)' + + +class DropObjectBase(ScenarioTestHelper.IYqlble): + """Drop a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + def __init__(self, name: str) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object.""" + + super().__init__(name) + + @override + def params(self) -> Dict[str, str]: + return {'object_type': self._object_type()} + + @override + def title(self): + return f'Drop {self._object_type().lower()}' + + @override + def to_yql(self, ctx: TestContext) -> str: + return f'DROP OBJECT `{self._name}` (TYPE {self._object_type()})' + + @abstractmethod + def _object_type(self) -> str: + pass + + +class DropTier(DropObjectBase): + """Drop a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def _object_type(self): + return 'TIER' + + +class DropTieringRule(DropObjectBase): + """Drop a tier. + + See {ScenarioTestHelper.IYqlble}. + """ + + @override + def _object_type(self): + return 'TIERING_RULE' \ No newline at end of file diff --git a/ydb/tests/olap/scenario/test_alter_tiering.py b/ydb/tests/olap/scenario/test_alter_tiering.py new file mode 100644 index 000000000000..13fa7b3ffcd6 --- /dev/null +++ b/ydb/tests/olap/scenario/test_alter_tiering.py @@ -0,0 +1,147 @@ +from conftest import BaseTestSet +from ydb.tests.olap.scenario.helpers import ( + ScenarioTestHelper, + TestContext, + CreateTable, + CreateTableStore, + DropTable, +) +from helpers.tiering_helper import ( + ObjectStorageParams, + AlterTier, + CreateTier, + AlterTieringRule, + CreateTieringRule, + TierConfig, + TieringPolicy, + TieringRule, + DropTier, + DropTieringRule, +) +import helpers.data_generators as dg +from helpers.table_helper import AlterTable + +from ydb import PrimitiveType +from ydb import StatusCode +import datetime +import random +import threading +from typing import Iterable + + +class TestAlterTiering(BaseTestSet): + schema1 = ( + ScenarioTestHelper.Schema() + .with_column(name='timestamp', type=PrimitiveType.Timestamp, not_null=True) + .with_column(name='writer', type=PrimitiveType.Uint32, not_null=True) + .with_column(name='value', type=PrimitiveType.Uint64, not_null=True) + .with_key_columns('timestamp', 'writer', 'value') + ) + + class TestThread(threading.Thread): + def run(self) -> None: + self.exc = None + try: + self.ret = self._target(*self._args, **self._kwargs) + except BaseException as e: + self.exc = e + + def join(self, timeout=None): + super().join(timeout) + if self.exc: + raise self.exc + return self.ret + + def _drop_tables(self, prefix: str, count: int, ctx: TestContext): + sth = ScenarioTestHelper(ctx) + for i in range(count): + sth.execute_scheme_query(DropTable(f'store/{prefix}_{i}')) + + def _upsert(self, ctx: TestContext, table: str, writer_id: int, duration: datetime.timedelta): + deadline = datetime.datetime.now() + duration + sth = ScenarioTestHelper(ctx) + next_value = 0; + while datetime.datetime.now() < deadline: + sth.bulk_upsert( + table, + dg.DataGeneratorPerColumn(self.schema1, 100) + .with_column('timestamp', dg.ColumnValueGeneratorRandom(null_probability=0)) + .with_column('writer', dg.ColumnValueGeneratorConst(writer_id)) + .with_column('value', dg.ColumnValueGeneratorSequential(next_value)) + ) + next_value += 100 + result = sth.execute_scan_query(f'SELECT COUNT(*) FROM `{sth.get_full_path(table)}` WHERE writer == {writer_id}') + assert result.result_set.rows[0][0] == next_value + + def _change_tiering_rule(self, ctx: TestContext, table: str, tiering_rules: Iterable[str], duration: datetime.timedelta): + deadline = datetime.datetime.now() + duration + sth = ScenarioTestHelper(ctx) + while datetime.datetime.now() < deadline: + for tiering_rule in tiering_rules: + sth.execute_scheme_query( + AlterTable(table).set_tiering(tiering_rule), + expected_status={StatusCode.SUCCESS, StatusCode.OVERLOADED}, + ) + sth.execute_scheme_query( + AlterTable(table).reset_tiering(), + expected_status={StatusCode.SUCCESS, StatusCode.OVERLOADED}, + ) + + def scenario_alter_tiering_rule_while_writing(self, ctx: TestContext): + test_duration = datetime.timedelta(seconds=120) + + s3_config = ObjectStorageParams( + scheme = 'HTTP', + verify_ssl = False, + endpoint = 'storage.yandexcloud.net', + bucket = 'fake', + access_key = 'fake', + secret_key = 'fake', + ) + + sth = ScenarioTestHelper(ctx) + + tiers: list[str] = [] + tiering_rules: list[str] = [] + for i in range(10): + tiers.append(f'TestAlterTiering:tier{i}') + tiering_rules.append(f'TestAlterTiering:tiering_rule{i}') + + tier_config = TierConfig('tier1', s3_config) + tiering_config = TieringPolicy().with_rule(TieringRule(tiers[-1], '10000d')) + + # If tiers exist, allow CREATE to fail, then alter them + sth.execute_scheme_query( + CreateTier(tiers[-1], tier_config), + expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}, + ) + sth.execute_scheme_query( + CreateTieringRule(tiering_rules[-1], 'timestamp', tiering_config), + expected_status={StatusCode.SUCCESS, StatusCode.GENERIC_ERROR}, + ) + + sth.execute_scheme_query(AlterTier(tiers[-1], tier_config)) + sth.execute_scheme_query(AlterTieringRule(tiering_rules[-1], 'timestamp', tiering_config)) + + sth.execute_scheme_query(CreateTableStore('store').with_schema(self.schema1)) + sth.execute_scheme_query(CreateTable('store/table').with_schema(self.schema1)) + + threads = [] + + for i in range(4): + threads.append(self.TestThread(target=self._upsert, args=[ctx, 'store/table', i, test_duration])) + for _ in range(2): + threads.append(self.TestThread( + target=self._change_tiering_rule, + args=[ctx, 'store/table', random.sample(tiering_rules, len(tiering_rules)), test_duration] + )) + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + for tiering in tiering_rules: + sth.execute_scheme_query(DropTieringRule(tiering)) + for tier in tiers: + sth.execute_scheme_query(DropTier(tier)) diff --git a/ydb/tests/olap/scenario/ya.make b/ydb/tests/olap/scenario/ya.make index 49eee4306f26..58a33a89fdba 100644 --- a/ydb/tests/olap/scenario/ya.make +++ b/ydb/tests/olap/scenario/ya.make @@ -11,6 +11,7 @@ PY3TEST() TEST_SRCS( test_simple.py test_scheme_load.py + test_alter_tiering.py ) PEERDIR(