From dc3cf109753a8f8f41fe5792f5aaed23dfe62c03 Mon Sep 17 00:00:00 2001 From: Semyon Date: Thu, 5 Sep 2024 15:44:02 +0300 Subject: [PATCH] add feature flag: enable olap tiering (#8719) --- ydb/core/kqp/ut/common/columnshard.cpp | 45 ++++++++++--------- ydb/core/kqp/ut/common/columnshard.h | 6 +-- ydb/core/protos/feature_flags.proto | 1 + ydb/core/testlib/basics/feature_flags.h | 1 + .../olap/operations/alter_table.cpp | 7 +++ ydb/core/tx/schemeshard/olap/ttl/ya.make | 1 + .../tx/schemeshard/ut_helpers/test_env.cpp | 1 + ydb/core/tx/schemeshard/ut_helpers/test_env.h | 1 + ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp | 4 +- ydb/core/tx/tiering/rule/manager.cpp | 4 ++ ydb/core/tx/tiering/tier/manager.cpp | 4 ++ ydb/core/tx/tiering/ut/ut_tiers.cpp | 5 ++- 12 files changed, 55 insertions(+), 25 deletions(-) diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index dd9314d90643..955e260b5985 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -31,32 +31,37 @@ namespace NKqp { using namespace NYdb; - TTestHelper::TTestHelper(const TKikimrSettings& settings) - : Kikimr(settings) - , TableClient(Kikimr.GetTableClient()) - , Session(TableClient.CreateSession().GetValueSync().GetSession()) - {} + TTestHelper::TTestHelper(const TKikimrSettings& settings) { + TKikimrSettings kikimrSettings(settings); + if (!kikimrSettings.FeatureFlags.HasEnableTieringInColumnShard()) { + kikimrSettings.SetEnableTieringInColumnShard(true); + } + + Kikimr = std::make_unique(kikimrSettings); + TableClient = std::make_unique(Kikimr->GetTableClient()); + Session = std::make_unique(TableClient->CreateSession().GetValueSync().GetSession()); + } NKikimr::NKqp::TKikimrRunner& TTestHelper::GetKikimr() { - return Kikimr; + return *Kikimr; } TTestActorRuntime& TTestHelper::GetRuntime() { - return *Kikimr.GetTestServer().GetRuntime(); + return *Kikimr->GetTestServer().GetRuntime(); } NYdb::NTable::TSession& TTestHelper::GetSession() { - return Session; + return *Session; } void TTestHelper::CreateTable(const TColumnTableBase& table, const EStatus expectedStatus) { std::cerr << (table.BuildQuery()) << std::endl; - auto result = Session.ExecuteSchemeQuery(table.BuildQuery()).GetValueSync(); + auto result = GetSession().ExecuteSchemeQuery(table.BuildQuery()).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), expectedStatus, result.GetIssues().ToString()); } void TTestHelper::CreateTier(const TString& tierName) { - auto result = Session.ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync(); + auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } @@ -70,43 +75,43 @@ namespace NKqp { } ] })"; - auto result = Session.ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync(); + auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); return ruleName; } void TTestHelper::SetTiering(const TString& tableName, const TString& ruleName) { auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET (TIERING = '" << ruleName << "')"; - auto result = Session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } void TTestHelper::ResetTiering(const TString& tableName) { auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TIERING)"; - auto result = Session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } void TTestHelper::DropTable(const TString& tableName) { - auto result = Session.DropTable(tableName).GetValueSync(); + auto result = GetSession().DropTable(tableName).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } void TTestHelper::BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) { Y_UNUSED(opStatus); - NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer()); + NKikimr::Tests::NCS::THelper helper(GetKikimr().GetTestServer()); auto batch = updates.BuildArrow(); helper.SendDataViaActorSystem(table.GetName(), batch, opStatus); } void TTestHelper::BulkUpsert(const TColumnTable& table, std::shared_ptr batch, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) { Y_UNUSED(opStatus); - NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer()); + NKikimr::Tests::NCS::THelper helper(GetKikimr().GetTestServer()); helper.SendDataViaActorSystem(table.GetName(), batch, opStatus); } void TTestHelper::ReadData(const TString& query, const TString& expected, const EStatus opStatus /*= EStatus::SUCCESS*/) { - auto it = TableClient.StreamExecuteScanQuery(query).GetValueSync(); + auto it = TableClient->StreamExecuteScanQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); // Means stream successfully get TString result = StreamResultToYson(it, false, opStatus); if (opStatus == EStatus::SUCCESS) { @@ -115,17 +120,17 @@ namespace NKqp { } void TTestHelper::RebootTablets(const TString& tableName) { - auto runtime = Kikimr.GetTestServer().GetRuntime(); + auto runtime = GetKikimr().GetTestServer().GetRuntime(); TActorId sender = runtime->AllocateEdgeActor(); TVector shards; { - auto describeResult = DescribeTable(&Kikimr.GetTestServer(), sender, tableName); + auto describeResult = DescribeTable(&GetKikimr().GetTestServer(), sender, tableName); for (auto shard : describeResult.GetPathDescription().GetColumnTableDescription().GetSharding().GetColumnShards()) { shards.push_back(shard); } } for (auto shard : shards) { - Kikimr.GetTestServer().GetRuntime()->Send(MakePipePerNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward( + GetKikimr().GetTestServer().GetRuntime()->Send(MakePipePerNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward( new TEvents::TEvPoisonPill(), shard, false)); } } diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index 10f8ab1e9296..201b44ad8ea5 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -63,9 +63,9 @@ namespace NKqp { }; private: - TKikimrRunner Kikimr; - NYdb::NTable::TTableClient TableClient; - NYdb::NTable::TSession Session; + std::unique_ptr Kikimr; + std::unique_ptr TableClient; + std::unique_ptr Session; public: TTestHelper(const TKikimrSettings& settings); diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 8a76b20a69a0..fc7346a8c3cb 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -156,4 +156,5 @@ message TFeatureFlags { optional bool EnableGranularTimecast = 137 [default = true]; optional bool EnableAlterShardingInColumnShard = 138 [default = false]; optional bool EnablePgSyntax = 139 [default = true]; + optional bool EnableTieringInColumnShard = 140 [default = false]; } diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index b2bfe23c95fb..1dbdab825a3c 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -65,6 +65,7 @@ class TTestFeatureFlagsHolder { FEATURE_FLAG_SETTER(EnableBackupService) FEATURE_FLAG_SETTER(EnableGranularTimecast) FEATURE_FLAG_SETTER(EnablePgSyntax) + FEATURE_FLAG_SETTER(EnableTieringInColumnShard) #undef FEATURE_FLAG_SETTER }; diff --git a/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp index 0409fad44a3c..4fb76b4a75a0 100644 --- a/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp @@ -271,6 +271,13 @@ class TAlterColumnTable: public TSubOperation { return result; } + const bool hasTiering = Transaction.HasAlterColumnTable() && Transaction.GetAlterColumnTable().HasAlterTtlSettings() && + Transaction.GetAlterColumnTable().GetAlterTtlSettings().HasUseTiering(); + if (hasTiering && HasAppData() && !AppDataVerified().FeatureFlags.GetEnableTieringInColumnShard()) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, "Tiering functionality is disabled for OLAP tables"); + return result; + } + const TString& parentPathStr = Transaction.GetWorkingDir(); const TString& name = Transaction.HasAlterColumnTable() ? Transaction.GetAlterColumnTable().GetName() : Transaction.GetAlterTable().GetName(); LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, diff --git a/ydb/core/tx/schemeshard/olap/ttl/ya.make b/ydb/core/tx/schemeshard/olap/ttl/ya.make index 0eb0e83c9a22..8aea246ebddf 100644 --- a/ydb/core/tx/schemeshard/olap/ttl/ya.make +++ b/ydb/core/tx/schemeshard/olap/ttl/ya.make @@ -6,6 +6,7 @@ SRCS( ) PEERDIR( + ydb/core/base ydb/core/protos ) diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index 217194a7844a..2c418d6a9626 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -543,6 +543,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnableAddColumsWithDefaults(opts.EnableAddColumsWithDefaults_); app.SetEnableReplaceIfExistsForExternalEntities(opts.EnableReplaceIfExistsForExternalEntities_); app.SetEnableChangefeedsOnIndexTables(opts.EnableChangefeedsOnIndexTables_); + app.SetEnableTieringInColumnShard(opts.EnableTieringInColumnShard_); app.ColumnShardConfig.SetDisabledOnSchemeShard(false); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index 388b50caa579..c433855c9c76 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -66,6 +66,7 @@ namespace NSchemeShardUT_Private { OPTION(std::optional, EnableReplaceIfExistsForExternalEntities, std::nullopt); OPTION(std::optional, GraphBackendType, std::nullopt); OPTION(std::optional, EnableChangefeedsOnIndexTables, std::nullopt); + OPTION(std::optional, EnableTieringInColumnShard, std::nullopt); #undef OPTION }; diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp index 1681aab2ee75..90a14c747f8a 100644 --- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp @@ -561,7 +561,9 @@ Y_UNIT_TEST_SUITE(TOlap) { Y_UNIT_TEST(AlterTtl) { TTestBasicRuntime runtime; - TTestEnv env(runtime); + TTestEnvOptions options; + options.EnableTieringInColumnShard(true); + TTestEnv env(runtime, options); ui64 txId = 100; TString olapSchema = R"( diff --git a/ydb/core/tx/tiering/rule/manager.cpp b/ydb/core/tx/tiering/rule/manager.cpp index 321bfec34593..a97ba742467a 100644 --- a/ydb/core/tx/tiering/rule/manager.cpp +++ b/ydb/core/tx/tiering/rule/manager.cpp @@ -13,6 +13,10 @@ void TTieringRulesManager::DoPrepareObjectsBeforeModification(std::vector