Skip to content

Commit

Permalink
add feature flag: enable olap tiering (#8719)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Sep 5, 2024
1 parent 8e9e6ad commit dc3cf10
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 25 deletions.
45 changes: 25 additions & 20 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,37 @@ namespace NKqp {

using namespace NYdb;

TTestHelper::TTestHelper(const TKikimrSettings& settings)
: Kikimr(settings)
, TableClient(Kikimr.GetTableClient())
, Session(TableClient.CreateSession().GetValueSync().GetSession())
{}
TTestHelper::TTestHelper(const TKikimrSettings& settings) {
TKikimrSettings kikimrSettings(settings);
if (!kikimrSettings.FeatureFlags.HasEnableTieringInColumnShard()) {
kikimrSettings.SetEnableTieringInColumnShard(true);
}

Kikimr = std::make_unique<TKikimrRunner>(kikimrSettings);
TableClient = std::make_unique<NYdb::NTable::TTableClient>(Kikimr->GetTableClient());
Session = std::make_unique<NYdb::NTable::TSession>(TableClient->CreateSession().GetValueSync().GetSession());
}

NKikimr::NKqp::TKikimrRunner& TTestHelper::GetKikimr() {
return Kikimr;
return *Kikimr;
}

TTestActorRuntime& TTestHelper::GetRuntime() {
return *Kikimr.GetTestServer().GetRuntime();
return *Kikimr->GetTestServer().GetRuntime();
}

NYdb::NTable::TSession& TTestHelper::GetSession() {
return Session;
return *Session;
}

void TTestHelper::CreateTable(const TColumnTableBase& table, const EStatus expectedStatus) {
std::cerr << (table.BuildQuery()) << std::endl;
auto result = Session.ExecuteSchemeQuery(table.BuildQuery()).GetValueSync();
auto result = GetSession().ExecuteSchemeQuery(table.BuildQuery()).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), expectedStatus, result.GetIssues().ToString());
}

void TTestHelper::CreateTier(const TString& tierName) {
auto result = Session.ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync();
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Expand All @@ -70,43 +75,43 @@ namespace NKqp {
}
]
})";
auto result = Session.ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync();
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
return ruleName;
}

void TTestHelper::SetTiering(const TString& tableName, const TString& ruleName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET (TIERING = '" << ruleName << "')";
auto result = Session.ExecuteSchemeQuery(alterQuery).GetValueSync();
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::ResetTiering(const TString& tableName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TIERING)";
auto result = Session.ExecuteSchemeQuery(alterQuery).GetValueSync();
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::DropTable(const TString& tableName) {
auto result = Session.DropTable(tableName).GetValueSync();
auto result = GetSession().DropTable(tableName).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) {
Y_UNUSED(opStatus);
NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer());
NKikimr::Tests::NCS::THelper helper(GetKikimr().GetTestServer());
auto batch = updates.BuildArrow();
helper.SendDataViaActorSystem(table.GetName(), batch, opStatus);
}

void TTestHelper::BulkUpsert(const TColumnTable& table, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) {
Y_UNUSED(opStatus);
NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer());
NKikimr::Tests::NCS::THelper helper(GetKikimr().GetTestServer());
helper.SendDataViaActorSystem(table.GetName(), batch, opStatus);
}

void TTestHelper::ReadData(const TString& query, const TString& expected, const EStatus opStatus /*= EStatus::SUCCESS*/) {
auto it = TableClient.StreamExecuteScanQuery(query).GetValueSync();
auto it = TableClient->StreamExecuteScanQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); // Means stream successfully get
TString result = StreamResultToYson(it, false, opStatus);
if (opStatus == EStatus::SUCCESS) {
Expand All @@ -115,17 +120,17 @@ namespace NKqp {
}

void TTestHelper::RebootTablets(const TString& tableName) {
auto runtime = Kikimr.GetTestServer().GetRuntime();
auto runtime = GetKikimr().GetTestServer().GetRuntime();
TActorId sender = runtime->AllocateEdgeActor();
TVector<ui64> shards;
{
auto describeResult = DescribeTable(&Kikimr.GetTestServer(), sender, tableName);
auto describeResult = DescribeTable(&GetKikimr().GetTestServer(), sender, tableName);
for (auto shard : describeResult.GetPathDescription().GetColumnTableDescription().GetSharding().GetColumnShards()) {
shards.push_back(shard);
}
}
for (auto shard : shards) {
Kikimr.GetTestServer().GetRuntime()->Send(MakePipePerNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward(
GetKikimr().GetTestServer().GetRuntime()->Send(MakePipePerNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward(
new TEvents::TEvPoisonPill(), shard, false));
}
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ namespace NKqp {
};

private:
TKikimrRunner Kikimr;
NYdb::NTable::TTableClient TableClient;
NYdb::NTable::TSession Session;
std::unique_ptr<TKikimrRunner> Kikimr;
std::unique_ptr<NYdb::NTable::TTableClient> TableClient;
std::unique_ptr<NYdb::NTable::TSession> Session;

public:
TTestHelper(const TKikimrSettings& settings);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,5 @@ message TFeatureFlags {
optional bool EnableGranularTimecast = 137 [default = true];
optional bool EnableAlterShardingInColumnShard = 138 [default = false];
optional bool EnablePgSyntax = 139 [default = true];
optional bool EnableTieringInColumnShard = 140 [default = false];
}
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableBackupService)
FEATURE_FLAG_SETTER(EnableGranularTimecast)
FEATURE_FLAG_SETTER(EnablePgSyntax)
FEATURE_FLAG_SETTER(EnableTieringInColumnShard)

#undef FEATURE_FLAG_SETTER
};
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/schemeshard/olap/operations/alter_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ class TAlterColumnTable: public TSubOperation {
return result;
}

const bool hasTiering = Transaction.HasAlterColumnTable() && Transaction.GetAlterColumnTable().HasAlterTtlSettings() &&
Transaction.GetAlterColumnTable().GetAlterTtlSettings().HasUseTiering();
if (hasTiering && HasAppData() && !AppDataVerified().FeatureFlags.GetEnableTieringInColumnShard()) {
result->SetError(NKikimrScheme::StatusPreconditionFailed, "Tiering functionality is disabled for OLAP tables");
return result;
}

const TString& parentPathStr = Transaction.GetWorkingDir();
const TString& name = Transaction.HasAlterColumnTable() ? Transaction.GetAlterColumnTable().GetName() : Transaction.GetAlterTable().GetName();
LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/olap/ttl/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(
)

PEERDIR(
ydb/core/base
ydb/core/protos
)

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
app.SetEnableAddColumsWithDefaults(opts.EnableAddColumsWithDefaults_);
app.SetEnableReplaceIfExistsForExternalEntities(opts.EnableReplaceIfExistsForExternalEntities_);
app.SetEnableChangefeedsOnIndexTables(opts.EnableChangefeedsOnIndexTables_);
app.SetEnableTieringInColumnShard(opts.EnableTieringInColumnShard_);

app.ColumnShardConfig.SetDisabledOnSchemeShard(false);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/test_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ namespace NSchemeShardUT_Private {
OPTION(std::optional<bool>, EnableReplaceIfExistsForExternalEntities, std::nullopt);
OPTION(std::optional<TString>, GraphBackendType, std::nullopt);
OPTION(std::optional<bool>, EnableChangefeedsOnIndexTables, std::nullopt);
OPTION(std::optional<bool>, EnableTieringInColumnShard, std::nullopt);

#undef OPTION
};
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,9 @@ Y_UNIT_TEST_SUITE(TOlap) {

Y_UNIT_TEST(AlterTtl) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
TTestEnvOptions options;
options.EnableTieringInColumnShard(true);
TTestEnv env(runtime, options);
ui64 txId = 100;

TString olapSchema = R"(
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/tiering/rule/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ void TTieringRulesManager::DoPrepareObjectsBeforeModification(std::vector<TTieri
NMetadata::NModifications::TOperationParsingResult TTieringRulesManager::DoBuildPatchFromSettings(
const NYql::TObjectSettingsImpl& settings,
TInternalModificationContext& /*context*/) const {
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableTieringInColumnShard()) {
return TConclusionStatus::Fail("Tiering functionality is disabled for OLAP tables.");
}

NMetadata::NInternal::TTableRecord result;
result.SetColumn(TTieringRule::TDecoder::TieringRuleId, NMetadata::NInternal::TYDBValue::Utf8(settings.GetObjectId()));
if (settings.GetObjectId().StartsWith("$") || settings.GetObjectId().StartsWith("_")) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/tiering/tier/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ NMetadata::NModifications::TOperationParsingResult TTiersManager::DoBuildPatchFr
const NYql::TObjectSettingsImpl& settings,
TInternalModificationContext& context) const
{
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableTieringInColumnShard()) {
return TConclusionStatus::Fail("Tiering functionality is disabled for OLAP tables.");
}

NMetadata::NInternal::TTableRecord result;
result.SetColumn(TTierConfig::TDecoder::TierName, NMetadata::NInternal::TYDBValue::Utf8(settings.GetObjectId()));
if (settings.GetObjectId().StartsWith("$") || settings.GetObjectId().StartsWith("_")) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/tiering/ut/ut_tiers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
serverSettings.GrpcPort = grpcPort;
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true);
.SetEnableMetadataProvider(true)
.SetEnableTieringInColumnShard(true)
;

Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
Expand Down Expand Up @@ -420,6 +421,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
.SetEnableTieringInColumnShard(true)
.SetAppConfig(appConfig);

Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
Expand Down Expand Up @@ -550,6 +552,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
.SetEnableTieringInColumnShard(true)
;

Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
Expand Down

0 comments on commit dc3cf10

Please sign in to comment.