From e021073059def288cc0df880b5c030c4e116d855 Mon Sep 17 00:00:00 2001 From: Artem Alekseev Date: Thu, 29 Aug 2024 16:03:50 +0300 Subject: [PATCH] Add tests for olap partitioning --- ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp | 130 +++++++++++++----- ydb/tests/olap/scenario/conftest.py | 6 +- .../olap/scenario/helpers/table_helper.py | 71 ++++++++++ ydb/tests/olap/scenario/test_partitioning.py | 57 ++++++++ ydb/tests/olap/scenario/ya.make | 1 + 5 files changed, 227 insertions(+), 38 deletions(-) create mode 100644 ydb/tests/olap/scenario/test_partitioning.py diff --git a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp index c57897a87594..b321d4272ce2 100644 --- a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp +++ b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp @@ -271,8 +271,15 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { } class TReshardingTest { - private: - YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64"); + public: + TReshardingTest() + : Kikimr(TKikimrSettings().SetWithSampleTables(false)) + , CSController(NYDBTest::TControllers::RegisterCSControllerGuard()) + , TableClient(Kikimr.GetTableClient()) { + CSController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + CSController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); + CSController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30); + } void WaitResharding(const TString& hint = "") { const TInstant start = TInstant::Now(); @@ -308,22 +315,20 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { CompareYson(result, "[[" + ::ToString(expectation) + "u;]]"); } + protected: TKikimrRunner Kikimr; - public: + NKikimr::NYDBTest::TControllers::TGuard CSController; + NYdb::NTable::TTableClient TableClient; + }; - TReshardingTest() - : Kikimr(TKikimrSettings().SetWithSampleTables(false)) { + class TShardingTypeTest: public TReshardingTest { + YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64"); - } + public: + using TReshardingTest::TReshardingTest; void Execute() { - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); - csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); - csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30); - TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 24, 4); - auto tableClient = Kikimr.GetTableClient(); Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD, NKikimrServices::TX_COLUMNSHARD_SCAN }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize(); @@ -340,71 +345,128 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { CheckCount(230000); for (ui32 i = 0; i < 2; ++i) { auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)"; - auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto session = TableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); WaitResharding("SPLIT:" + ::ToString(i)); } { auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)"; - auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto session = TableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); } - AFL_VERIFY(csController->GetShardingFiltersCount().Val() == 0); + AFL_VERIFY(CSController->GetShardingFiltersCount().Val() == 0); CheckCount(230000); - i64 count = csController->GetShardingFiltersCount().Val(); + i64 count = CSController->GetShardingFiltersCount().Val(); AFL_VERIFY(count >= 16)("count", count); - csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation); - csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); - csController->WaitIndexation(TDuration::Seconds(3)); - csController->WaitCompactions(TDuration::Seconds(3)); + CSController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation); + CSController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); + CSController->WaitIndexation(TDuration::Seconds(3)); + CSController->WaitCompactions(TDuration::Seconds(3)); WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000); CheckCount(230000); - csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation); - csController->WaitIndexation(TDuration::Seconds(5)); + CSController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation); + CSController->WaitIndexation(TDuration::Seconds(5)); CheckCount(230000); - csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); - csController->WaitCompactions(TDuration::Seconds(5)); - count = csController->GetShardingFiltersCount().Val(); + CSController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); + CSController->WaitCompactions(TDuration::Seconds(5)); + count = CSController->GetShardingFiltersCount().Val(); CheckCount(230000); - csController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable); + CSController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable); CheckCount(230000); - AFL_VERIFY(count == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val()); + AFL_VERIFY(count == CSController->GetShardingFiltersCount().Val())("count", count)( + "val", CSController->GetShardingFiltersCount().Val()); const ui32 portionsCount = 16; for (ui32 i = 0; i < 4; ++i) { { auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)"; - auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto session = TableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); } WaitResharding("MERGE:" + ::ToString(i)); - // csController->WaitCleaning(TDuration::Seconds(5)); + // CSController->WaitCleaning(TDuration::Seconds(5)); CheckCount(230000); - AFL_VERIFY(count + portionsCount == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val()); + AFL_VERIFY(count + portionsCount == CSController->GetShardingFiltersCount().Val())("count", count)( + "val", CSController->GetShardingFiltersCount().Val()); count += portionsCount; } { auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)"; - auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto session = TableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); } - csController->CheckInvariants(); + CSController->CheckInvariants(); } }; Y_UNIT_TEST(TableReshardingConsistency64) { - TReshardingTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute(); + TShardingTypeTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute(); } Y_UNIT_TEST(TableReshardingModuloN) { - TReshardingTest().SetShardingType("HASH_FUNCTION_MODULO_N").Execute(); + TShardingTypeTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute(); + } + + class TAsyncReshardingTest: public TReshardingTest { + YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64"); + + public: + TAsyncReshardingTest() { + TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 24, 4); + } + + void AddBatch(int numRows) { + WriteTestData(Kikimr, "/Root/olapStore/olapTable", LastPathId, LastTs, numRows); + LastPathId += numRows * 10; + LastTs += numRows * 10; + NumRows += numRows; + } + + NYdb::EStatus StartResharding(TString modification) { + auto alterQuery = + TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=)" + << modification << ");)"; + auto session = TableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + + return alterResult.GetStatus(); + } + + void CheckCount() { + TReshardingTest::CheckCount(NumRows); + } + + private: + ui64 LastPathId{ 1000000 }; + ui64 LastTs{ 300000000 }; + ui64 NumRows{ 0 }; + }; + + Y_UNIT_TEST(UpsertWhileSplitTest) { + TAsyncReshardingTest tester; + + tester.AddBatch(10000); + + tester.CheckCount(); + + for (int i = 0; i < 4; i++) { + UNIT_ASSERT_VALUES_EQUAL_C(tester.StartResharding("SPLIT"), NYdb::EStatus::SUCCESS, ""); + UNIT_ASSERT_VALUES_UNEQUAL_C(tester.StartResharding("SPLIT"), NYdb::EStatus::SUCCESS, ""); + + tester.CheckCount(); + tester.AddBatch(10000); + tester.CheckCount(); + tester.WaitResharding(); + } + tester.AddBatch(10000); + tester.CheckCount(); } } } diff --git a/ydb/tests/olap/scenario/conftest.py b/ydb/tests/olap/scenario/conftest.py index 533375e989d4..c87120afb977 100644 --- a/ydb/tests/olap/scenario/conftest.py +++ b/ydb/tests/olap/scenario/conftest.py @@ -19,13 +19,11 @@ def get_suite_name(cls): @classmethod def setup_class(cls): - if not external_param_is_true('reuse-tables'): - ScenarioTestHelper(None).remove_path(cls.get_suite_name()) + pass @classmethod def teardown_class(cls): - if not external_param_is_true('keep-tables'): - ScenarioTestHelper(None).remove_path(cls.get_suite_name()) + pass def test(self, ctx: TestContext): allure_test_description(ctx.suite, ctx.test) diff --git a/ydb/tests/olap/scenario/helpers/table_helper.py b/ydb/tests/olap/scenario/helpers/table_helper.py index 8b1963fd13e0..28f947cc3820 100644 --- a/ydb/tests/olap/scenario/helpers/table_helper.py +++ b/ydb/tests/olap/scenario/helpers/table_helper.py @@ -187,6 +187,37 @@ def title(self) -> str: return f'add column `{self._column.name}`' +class AlterSharding(AlterTableAction): + """Change shards count for tablestore. + + Table-like objects are Tables and TableStore. + See {AlterTableLikeObject}. + + Example: + sth = ScenarioTestHelper(ctx) + sth.execute_scheme_query( + AlterTable('testTable') + .action(AlterSharding("SPLIT")) + ) + """ + + def __init__(self, modification) -> None: + """Constructor. + + Args: + column: Column description.""" + + super().__init__() + self._modification = modification + + @override + def to_yql(self) -> str: + return f'SET(ACTION=ALTER_SHARDING, MODIFICATION={self._modification})' + + @override + def title(self) -> str: + return self.to_yql() + class DropColumn(AlterTableAction): """Remove a column from a table-like object. @@ -426,3 +457,43 @@ class AlterTableStore(AlterTableLikeObject): @override def _type(self) -> str: return 'tablestore' + +class AlterObject(ScenarioTestHelper.IYqlble): + def __init__(self, name: str) -> None: + """Constructor. + + Args: + name: Name (relative path) of the altered object.""" + + super().__init__(name) + self._actions = [] + + @override + def params(self) -> Dict[str, str]: + return {self._type(): self._name, 'actions': ', '.join([a.title() for a in self._actions])} + + @override + def title(self): + return f'alter object (TYPE TABLESTORE)' + + @override + def to_yql(self, ctx: TestContext) -> str: + actions = ', '.join([a.to_yql() for a in self._actions]) + return f'ALTER OBJECT `{ScenarioTestHelper(ctx).get_full_path(self._name)}` (TYPE TABLESTORE) {actions}' + + def action(self, action: AlterTableAction) -> AlterTableLikeObject: + """Add an action with an object. + + Args: + action: Action on the object, such as creating or deleting a column. + + Returns: + self.""" + + self._actions.append(action) + return self + + @override + def _type(self) -> str: + return 'tablestore' + diff --git a/ydb/tests/olap/scenario/test_partitioning.py b/ydb/tests/olap/scenario/test_partitioning.py new file mode 100644 index 000000000000..d43a5a91849a --- /dev/null +++ b/ydb/tests/olap/scenario/test_partitioning.py @@ -0,0 +1,57 @@ +from conftest import BaseTestSet +from ydb.tests.olap.scenario.helpers import ( + ScenarioTestHelper, + TestContext, + CreateTable, + CreateTableStore, + AlterObject, + AlterSharding +) +from ydb import PrimitiveType, StatusCode +import ydb.tests.olap.scenario.helpers.data_generators as dg +import time + + +class TestPartitioning(BaseTestSet): + schema1 = ( + ScenarioTestHelper.Schema() + .with_column(name='id', type=PrimitiveType.Int32, not_null=True) + .with_column(name='level', type=PrimitiveType.Uint32) + .with_key_columns('id') + ) + + def _test_table(self, ctx: TestContext, table_name: str): + sth = ScenarioTestHelper(ctx) + sth.execute_scheme_query(CreateTable(table_name).with_schema(self.schema1).with_partitions_count(2)) + + sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids") + + sth.execute_scheme_query(AlterObject(table_name).action(AlterSharding("SPLIT"))) + + # sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids") + + time.sleep(240) + + sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids") + + sth.execute_scheme_query(AlterObject(table_name).action(AlterSharding("SPLIT"))) + + # sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids") + + time.sleep(240) + + sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids") + + sth.execute_scheme_query(AlterObject(table_name).action(AlterSharding("MERGE"))) + + # sth.bulk_upsert(table_name, dg.DataGeneratorPerColumn(self.schema1, 100), comment="100 sequetial ids") + + # gen = dg.DataGeneratorPerColumn(self.schema1, 100, dg.ColumnValueGeneratorDefault(init_value=200)) + # sth.bulk_upsert(table_name, gen, comment="100 sequetial ids") + + + def scenario_table(self, ctx: TestContext): + tablestore_name = "testStore5" + sth = ScenarioTestHelper(ctx) + sth.execute_scheme_query(CreateTableStore(tablestore_name).with_schema(self.schema1)) + self._test_table(ctx, tablestore_name + '/testTable') \ No newline at end of file diff --git a/ydb/tests/olap/scenario/ya.make b/ydb/tests/olap/scenario/ya.make index 58a33a89fdba..566139f84526 100644 --- a/ydb/tests/olap/scenario/ya.make +++ b/ydb/tests/olap/scenario/ya.make @@ -12,6 +12,7 @@ PY3TEST() test_simple.py test_scheme_load.py test_alter_tiering.py + test_partitioning.py ) PEERDIR(