Skip to content

Commit

Permalink
Merge 41d2c69 into 5f44598
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 1, 2024
2 parents 5f44598 + 41d2c69 commit 85e779e
Show file tree
Hide file tree
Showing 119 changed files with 1,584 additions and 2,656 deletions.
1 change: 0 additions & 1 deletion ydb/core/grpc_services/rpc_create_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ class TCreateTableRPC : public TRpcSchemeRequestActor<TCreateTableRPC, TEvCreate
return false;
}
}
tableDesc->MutableTtlSettings()->SetUseTiering(req.tiering());

return true;
}
Expand Down
8 changes: 0 additions & 8 deletions ydb/core/grpc_services/rpc_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,6 @@ class TCreateLogTableRPC : public TRpcSchemeRequestActor<TCreateLogTableRPC, TEv
if (!FillTtlSettings(*create->MutableTtlSettings()->MutableEnabled(), req->ttl_settings(), status, error)) {
return Reply(status, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
}
} else if (req->has_tiering_settings()) {
create->MutableTtlSettings()->SetUseTiering(req->tiering_settings().tiering_id());
}

create->SetColumnShardCount(req->shards_count());
Expand Down Expand Up @@ -600,12 +598,6 @@ class TAlterLogTableRPC : public TRpcSchemeRequestActor<TAlterLogTableRPC, TEvAl
alter->MutableAlterTtlSettings()->MutableDisabled();
}

if (req->has_set_tiering_settings()) {
alter->MutableAlterTtlSettings()->SetUseTiering(req->set_tiering_settings().tiering_id());
} else if (req->has_drop_tiering_settings()) {
alter->MutableAlterTtlSettings()->SetUseTiering("");
}

ctx.Send(MakeTxProxyID(), proposeRequest.release());
}
};
Expand Down
20 changes: 9 additions & 11 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,6 @@ bool ConvertCreateTableSettingsToProto(NYql::TKikimrTableMetadataPtr metadata, Y
}
}

if (const auto& tiering = metadata->TableSettings.Tiering) {
if (tiering.IsSet()) {
proto.set_tiering(tiering.GetValueSet());
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = "Can't reset TIERING";
return false;
}
}

if (metadata->TableSettings.StoreExternalBlobs) {
auto& storageSettings = *proto.mutable_storage_settings();
TString value = to_lower(metadata->TableSettings.StoreExternalBlobs.GetRef());
Expand Down Expand Up @@ -520,7 +510,15 @@ bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
const auto& inputSettings = metadata->TableSettings.TtlSettings.GetValueSet();
auto& resultSettings = *tableDesc.MutableTtlSettings();
resultSettings.MutableEnabled()->SetColumnName(inputSettings.ColumnName);
resultSettings.MutableEnabled()->SetExpireAfterSeconds(inputSettings.ExpireAfter.Seconds());
for (const auto& tier : inputSettings.Tiers) {
auto* tierProto = resultSettings.MutableEnabled()->AddTiers();
tierProto->SetApplyAfterSeconds(tier.ApplyAfter.Seconds());
if (tier.StorageName) {
tierProto->MutableEvictToExternalStorage()->SetStorageName(*tier.StorageName);
} else {
tierProto->MutableDelete();
}
}
if (inputSettings.ColumnUnit) {
resultSettings.MutableEnabled()->SetColumnUnit(static_cast<NKikimrSchemeOp::TTTLSettings::EUnit>(*inputSettings.ColumnUnit));
}
Expand Down
7 changes: 0 additions & 7 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1657,13 +1657,6 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
ConvertTtlSettingsToProto(ttlSettings, *alterTableRequest.mutable_set_ttl_settings());
} else if (name == "resetTtlSettings") {
alterTableRequest.mutable_drop_ttl_settings();
} else if (name == "setTiering") {
const auto tieringName = TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
alterTableRequest.set_set_tiering(tieringName);
} else if (name == "resetTiering") {
alterTableRequest.mutable_drop_tiering();
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << "Unknown table profile setting: " << name));
Expand Down
32 changes: 16 additions & 16 deletions ydb/core/kqp/provider/yql_kikimr_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,42 +101,36 @@ bool TTtlSettings::TryParse(const NNodes::TCoNameValueTupleList& node, TTtlSetti
if (name == "columnName") {
YQL_ENSURE(field.Value().Maybe<TCoAtom>());
settings.ColumnName = field.Value().Cast<TCoAtom>().StringValue();
} else if (name == "expireAfter") {
// TODO (yentsovsemyon): remove this clause after extending TTL syntax in YQL
YQL_ENSURE(field.Value().Maybe<TCoInterval>());
auto value = FromString<i64>(field.Value().Cast<TCoInterval>().Literal().Value());
if (value < 0) {
error = "Interval value cannot be negative";
return false;
}

settings.ExpireAfter = TDuration::FromValue(value);
} else if (name == "tiers") {
YQL_ENSURE(field.Value().Maybe<TExprList>());
auto listNode = field.Value().Cast<TExprList>();

for (size_t i = 0; i < listNode.Size(); ++i) {
auto tierNode = listNode.Item(i);

std::optional<TString> storageName;
TDuration evictionDelay;
YQL_ENSURE(tierNode.Maybe<TCoNameValueTupleList>());
for (const auto& tierField : tierNode.Cast<TCoNameValueTupleList>()) {
auto tierFieldName = tierField.Name().Value();
if (tierFieldName == "storageName") {
error = "TTL cannot contain tiered storage: tiering in TTL syntax is not supported";
return false;
YQL_ENSURE(tierField.Value().Maybe<TCoAtom>());
storageName = tierField.Value().Cast<TCoAtom>().StringValue();
} else if (tierFieldName == "evictionDelay") {
YQL_ENSURE(tierField.Value().Maybe<TCoInterval>());
auto value = FromString<i64>(tierField.Value().Cast<TCoInterval>().Literal().Value());
if (value < 0) {
error = "Interval value cannot be negative";
return false;
}
settings.ExpireAfter = TDuration::FromValue(value);
evictionDelay = TDuration::FromValue(value);
} else {
error = TStringBuilder() << "Unknown field: " << tierFieldName;
return false;
}
}

settings.Tiers.emplace_back(evictionDelay, storageName);
}
} else if (name == "columnUnit") {
YQL_ENSURE(field.Value().Maybe<TCoAtom>());
Expand Down Expand Up @@ -318,9 +312,15 @@ void ConvertTtlSettingsToProto(const NYql::TTtlSettings& settings, Ydb::Table::T
opts.set_column_name(settings.ColumnName);
opts.set_column_unit(static_cast<Ydb::Table::ValueSinceUnixEpochModeSettings::Unit>(*settings.ColumnUnit));
}
auto* deleteTier = proto.add_tiers();
deleteTier->set_apply_after_seconds(settings.ExpireAfter.Seconds());
deleteTier->mutable_delete_();
for (const auto& tier : settings.Tiers) {
auto* tierProto = proto.add_tiers();
tierProto->set_apply_after_seconds(tier.ApplyAfter.Seconds());
if (tier.StorageName) {
tierProto->mutable_evict_to_external_storage()->set_storage_name(*tier.StorageName);
} else {
tierProto->mutable_delete_();
}
}
}

Ydb::FeatureFlag::Status GetFlagValue(const TMaybe<bool>& value) {
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,14 @@ struct TTtlSettings {
Nanoseconds = 4,
};

struct TTier {
TDuration ApplyAfter;
std::optional<TString> StorageName;
};

TString ColumnName;
TDuration ExpireAfter;
TMaybe<EUnit> ColumnUnit;
std::vector<TTier> Tiers;

static bool TryParse(const NNodes::TCoNameValueTupleList& node, TTtlSettings& settings, TString& error);
};
Expand All @@ -241,7 +246,6 @@ struct TTableSettings {
TMaybe<TString> KeyBloomFilter;
TMaybe<TString> ReadReplicasSettings;
TResetableSetting<TTtlSettings, void> TtlSettings;
TResetableSetting<TString, void> Tiering;
TMaybe<TString> PartitionByHashFunction;
TMaybe<TString> StoreExternalBlobs;

Expand Down
8 changes: 0 additions & 8 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1251,14 +1251,6 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't reset TTL settings"));
return TStatus::Error;
} else if (name == "setTiering") {
meta->TableSettings.Tiering.Set(TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
));
} else if (name == "resetTiering") {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't reset TIERING"));
return TStatus::Error;
} else if (name == "storeType") {
TMaybe<TString> storeType = TString(setting.Value().Cast<TCoAtom>().Value());
if (storeType && to_lower(storeType.GetRef()) == "column") {
Expand Down
61 changes: 21 additions & 40 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,24 @@
#include <ydb/core/formats/arrow/serializer/parsing.h>
#include <ydb/core/testlib/cs_helper.h>

#include <format>

extern "C" {
#include <yql/essentials/parser/pg_wrapper/postgresql/src/include/catalog/pg_type_d.h>
}

namespace NKikimr {
namespace NKqp {

TString GetConfigProtoWithName(const TString & tierName) {
return TStringBuilder() << "Name : \"" << tierName << "\"\n" <<
R"(
ObjectStorage : {
Endpoint: "fake"
Bucket: "fake"
SecretableAccessKey: {
Value: {
Data: "secretAccessKey"
}
}
SecretableSecretKey: {
Value: {
Data: "fakeSecret"
}
}
}
)";
}

using namespace NYdb;

TTestHelper::TTestHelper(const TKikimrSettings& settings) {
TKikimrSettings kikimrSettings(settings);
if (!kikimrSettings.FeatureFlags.HasEnableTieringInColumnShard()) {
kikimrSettings.SetEnableTieringInColumnShard(true);
}
if (!kikimrSettings.FeatureFlags.HasEnableExternalDataSources()) {
kikimrSettings.SetEnableExternalDataSources(true);
}

Kikimr = std::make_unique<TKikimrRunner>(kikimrSettings);
TableClient = std::make_unique<NYdb::NTable::TTableClient>(Kikimr->GetTableClient());
Expand All @@ -64,33 +48,30 @@ namespace NKqp {
}

void TTestHelper::CreateTier(const TString& tierName) {
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

TString TTestHelper::CreateTieringRule(const TString& tierName, const TString& columnName) {
const TString ruleName = tierName + "_" + columnName;
const TString configTieringStr = TStringBuilder() << R"({
"rules" : [
{
"tierName" : ")" << tierName << R"(",
"durationForEvict" : "10d"
}
]
})";
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync();
// auto result = GetSession().ExecuteSchemeQuery(R"(
auto result = Kikimr->GetTableClient(NYdb::NTable::TClientSettings().AuthToken("root@builtin")).GetSession().GetValueSync().GetSession().ExecuteSchemeQuery(R"(
UPSERT OBJECT `accessKey` (TYPE SECRET) WITH (value = `secretAccessKey`);
UPSERT OBJECT `secretKey` (TYPE SECRET) WITH (value = `fakeSecret`);
CREATE EXTERNAL DATA SOURCE `)" + tierName + R"(` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="http://fake.fake/fake",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="accessKey",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="secretKey",
AWS_REGION="ru-central1"
);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
return ruleName;
}

void TTestHelper::SetTiering(const TString& tableName, const TString& ruleName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET (TIERING = '" << ruleName << "')";
void TTestHelper::SetTiering(const TString& tableName, const TString& tierName, const TString& columnName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET TTL Interval(\"P10D\") TO EXTERNAL DATA SOURCE `" << tierName << "` ON `" << columnName << "`;";
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::ResetTiering(const TString& tableName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TIERING)";
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TTL)";
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ class TTestHelper {
NYdb::NTable::TSession& GetSession();
void CreateTable(const TColumnTableBase& table, const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS);
void DropTable(const TString& tableName);
void EnsureSecret(const TString& name, const TString& value);
void CreateTier(const TString& tierName);
TString CreateTieringRule(const TString& tierName, const TString& columnName);
void SetTiering(const TString& tableName, const TString& ruleName);
void SetTiering(const TString& tableName, const TString& tierName, const TString& columnName);
void ResetTiering(const TString& tableName);
void BulkUpsert(
const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ class TKikimrRunner {

NYdb::TDriverConfig GetDriverConfig() const { return DriverConfig; }

NYdb::NTable::TTableClient GetTableClient() const {
return NYdb::NTable::TTableClient(*Driver, NYdb::NTable::TClientSettings()
.UseQueryCache(false));
NYdb::NTable::TTableClient GetTableClient(
NYdb::NTable::TClientSettings settings = NYdb::NTable::TClientSettings()) const {
return NYdb::NTable::TTableClient(*Driver, settings.UseQueryCache(false));
}

NYdb::NQuery::TQueryClient GetQueryClient(
Expand Down
Loading

0 comments on commit 85e779e

Please sign in to comment.