diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 93e1d345b929..9ee043258b89 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -2698,6 +2698,18 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::GENERIC_ERROR, alterResult.GetIssues().ToString()); } + { + auto alterQuery = + TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=` + {"levels" : [{"class_name" : "Zero", "portions_live_duration" : "180s", "expected_blobs_size" : 2048000}, + {"class_name" : "Zero", "expected_blobs_size" : 2048000}, {"class_name" : "Zero"}]}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { auto it = tableClient.StreamExecuteScanQuery(R"( --!syntax_v1 diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 50505ae2f27c..1ebb1a67babc 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -460,6 +460,20 @@ message TStorageTierConfig { optional TCompressionOptions Compression = 3; } +message TCompactionLevelConstructorContainer { + optional string ClassName = 1; + + message TZeroLevel { + optional uint32 PortionsLiveDurationSeconds = 1; + optional uint64 ExpectedBlobsSize = 2; + } + + oneof Implementation { + TZeroLevel ZeroLevel = 10; + } + +} + message TCompactionPlannerConstructorContainer { optional string ClassName = 1; @@ -473,7 +487,7 @@ message TCompactionPlannerConstructorContainer { } message TLCOptimizer { - + repeated TCompactionLevelConstructorContainer Levels = 1; } oneof Implementation { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp index ed41f5de42f7..bf813d2cc686 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp @@ -4,7 +4,7 @@ namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { NKikimr::TConclusion> TOptimizerPlannerConstructor::DoBuildPlanner(const TBuildContext& context) const { - return std::make_shared(context.GetPathId(), context.GetStorages(), context.GetPKSchema()); + return std::make_shared(context.GetPathId(), context.GetStorages(), context.GetPKSchema(), Levels); } bool TOptimizerPlannerConstructor::DoApplyToCurrentObject(IOptimizerPlanner& current) const { @@ -23,6 +23,9 @@ bool TOptimizerPlannerConstructor::DoIsEqualTo(const IOptimizerPlannerConstructo void TOptimizerPlannerConstructor::DoSerializeToProto(TProto& proto) const { *proto.MutableLCBuckets() = NKikimrSchemeOp::TCompactionPlannerConstructorContainer::TLCOptimizer(); + for (auto&& i : Levels) { + *proto.MutableLCBuckets()->AddLevels() = i.SerializeToProto(); + } } bool TOptimizerPlannerConstructor::DoDeserializeFromProto(const TProto& proto) { @@ -30,7 +33,40 @@ bool TOptimizerPlannerConstructor::DoDeserializeFromProto(const TProto& proto) { AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse lc-buckets optimizer from proto")("proto", proto.DebugString()); return false; } + for (auto&& i : proto.GetLCBuckets().GetLevels()) { + TLevelConstructorContainer lContainer; + if (!lContainer.DeserializeFromProto(i)) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse lc-bucket level")("proto", i.DebugString()); + return false; + } + Levels.emplace_back(std::move(lContainer)); + } return true; } +NKikimr::TConclusionStatus TOptimizerPlannerConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + if (!jsonInfo.Has("levels")) { + return TConclusionStatus::Fail("no levels description"); + } + if (!jsonInfo["levels"].IsArray()) { + return TConclusionStatus::Fail("levels have to been array in json description"); + } + auto& arr = jsonInfo["levels"].GetArray(); + if (!arr.size()) { + return TConclusionStatus::Fail("no objects in json array 'levels'"); + } + for (auto&& i : arr) { + const auto className = i["class_name"].GetStringRobust(); + auto level = ILevelConstructor::TFactory::MakeHolder(className); + if (!level) { + return TConclusionStatus::Fail("incorrect level class_name: " + className); + } + if (!level->DeserializeFromJson(i["description"])) { + return TConclusionStatus::Fail("cannot parse level: " + className + ": " + i["description"].GetStringRobust()); + } + Levels.emplace_back(TLevelConstructorContainer(std::shared_ptr(level.Release()))); + } + return TConclusionStatus::Success(); +} + } // namespace NKikimr::NOlap::NStorageOptimizer::NLBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h index f1d47481c177..f85249435eac 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h @@ -1,31 +1,75 @@ #pragma once #include +#include +#include namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { +class ILevelConstructor { +private: + virtual std::shared_ptr DoBuildLevel( + const std::shared_ptr& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const = 0; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) = 0; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) = 0; + virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const = 0; + +public: + using TFactory = NObjectFactory::TObjectFactory; + using TProto = NKikimrSchemeOp::TCompactionLevelConstructorContainer; + + virtual ~ILevelConstructor() = default; + + std::shared_ptr BuildLevel( + const std::shared_ptr& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const { + return DoBuildLevel(nextLevel, indexLevel, counters); + } + + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& json) { + return DoDeserializeFromJson(json); + } + + bool DeserializeFromProto(const TProto& proto) { + return DoDeserializeFromProto(proto); + } + void SerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const { + return DoSerializeToProto(proto); + } + virtual TString GetClassName() const = 0; +}; + +class TLevelConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer; + +public: + using TBase::TBase; +}; + class TOptimizerPlannerConstructor: public IOptimizerPlannerConstructor { public: static TString GetClassNameStatic() { return "lc-buckets"; } + private: - static inline const TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); + std::vector Levels; + + static inline const TFactory::TRegistrator Registrator = + TFactory::TRegistrator(GetClassNameStatic()); virtual void DoSerializeToProto(TProto& proto) const override; virtual bool DoDeserializeFromProto(const TProto& proto) override; - virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& /*jsonInfo*/) override { - return TConclusionStatus::Success(); - } + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; virtual bool DoApplyToCurrentObject(IOptimizerPlanner& current) const override; virtual TConclusion> DoBuildPlanner(const TBuildContext& context) const override; virtual bool DoIsEqualTo(const IOptimizerPlannerConstructor& item) const override; + public: virtual TString GetClassName() const override { return GetClassNameStatic(); } - }; -} // namespace NKikimr::NOlap::NStorageOptimizer::NLBuckets +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make index f95d3abf7469..86918b521992 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( GLOBAL constructor.cpp + GLOBAL zero_level.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp new file mode 100644 index 000000000000..6a02746bc447 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp @@ -0,0 +1,57 @@ +#include "zero_level.h" + +#include + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +TConclusionStatus TZeroLevelConstructor::DoDeserializeFromJson(const NJson::TJsonValue& json) { + if (json.Has("portions_live_duration")) { + const auto& jsonValue = json["portions_live_duration"]; + if (!jsonValue.IsString()) { + return TConclusionStatus::Fail("incorrect portions_live_duration value (have to be similar as 10s, 20m, 30d, etc)"); + } + TDuration d; + if (!TDuration::TryParse(jsonValue.GetString(), d)) { + return TConclusionStatus::Fail("cannot parse portions_live_duration value " + jsonValue.GetString()); + } + PortionsLiveDuration = d; + } + if (json.Has("expected_blobs_size")) { + const auto& jsonValue = json["expected_blobs_size"]; + if (!jsonValue.IsUInteger()) { + return TConclusionStatus::Fail("incorrect expected_blobs_size value (have to be unsigned int)"); + } + ExpectedBlobsSize = jsonValue.GetUInteger(); + } + return TConclusionStatus::Success(); +} + +bool TZeroLevelConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) { + if (!proto.HasZeroLevel()) { + return true; + } + if (proto.GetZeroLevel().HasPortionsLiveDurationSeconds()) { + PortionsLiveDuration = TDuration::Seconds(proto.GetZeroLevel().GetPortionsLiveDurationSeconds()); + } + if (proto.GetZeroLevel().HasExpectedBlobsSize()) { + ExpectedBlobsSize = proto.GetZeroLevel().GetExpectedBlobsSize(); + } + return true; +} + +void TZeroLevelConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const { + if (PortionsLiveDuration) { + proto.MutableZeroLevel()->SetPortionsLiveDurationSeconds(PortionsLiveDuration->Seconds()); + } + if (ExpectedBlobsSize) { + proto.MutableZeroLevel()->SetExpectedBlobsSize(*ExpectedBlobsSize); + } +} + +std::shared_ptr TZeroLevelConstructor::DoBuildLevel( + const std::shared_ptr& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const { + return std::make_shared( + indexLevel, nextLevel, counters, PortionsLiveDuration.value_or(TDuration::Max()), ExpectedBlobsSize.value_or((ui64)1 << 20)); +} + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h new file mode 100644 index 000000000000..531c60f3690d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h @@ -0,0 +1,30 @@ +#pragma once +#include "constructor.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class TZeroLevelConstructor: public ILevelConstructor { +public: + static TString GetClassNameStatic() { + return "Zero"; + } + +private: + std::optional PortionsLiveDuration; + std::optional ExpectedBlobsSize; + + virtual std::shared_ptr DoBuildLevel( + const std::shared_ptr& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const override; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const override; + + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); + +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp index 361a88c457a9..b26a2b90d041 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp @@ -3,12 +3,14 @@ #include "optimizer.h" #include "zero_level.h" +#include + #include namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { -TOptimizerPlanner::TOptimizerPlanner( - const ui64 pathId, const std::shared_ptr& storagesManager, const std::shared_ptr& primaryKeysSchema) +TOptimizerPlanner::TOptimizerPlanner(const ui64 pathId, const std::shared_ptr& storagesManager, + const std::shared_ptr& primaryKeysSchema, const std::vector& levelConstructors) : TBase(pathId) , Counters(std::make_shared()) , StoragesManager(storagesManager) @@ -19,9 +21,19 @@ TOptimizerPlanner::TOptimizerPlanner( Levels.emplace_back( std::make_shared(2, 0.9, maxPortionBlobBytes, nullptr, PortionsInfo, Counters->GetLevelCounters(2))); */ - Levels.emplace_back(std::make_shared(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max())); - Levels.emplace_back(std::make_shared(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max())); - Levels.emplace_back(std::make_shared(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180))); + if (levelConstructors.size()) { + std::shared_ptr nextLevel; + ui32 idx = levelConstructors.size(); + for (auto it = levelConstructors.rbegin(); it != levelConstructors.rend(); ++it) { + --idx; + Levels.emplace_back((*it)->BuildLevel(nextLevel, idx, Counters->GetLevelCounters(idx))); + } + } else { + Levels.emplace_back(std::make_shared(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max(), 1 << 20)); + Levels.emplace_back(std::make_shared(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max(), 1 << 20)); + Levels.emplace_back( + std::make_shared(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180), 1 << 20)); + } std::reverse(Levels.begin(), Levels.end()); RefreshWeights(); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h index 0f48b4680691..0f576672243e 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h @@ -4,6 +4,8 @@ namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { +class TLevelConstructorContainer; + class TOptimizerPlanner: public IOptimizerPlanner { private: using TBase = IOptimizerPlanner; @@ -144,8 +146,8 @@ class TOptimizerPlanner: public IOptimizerPlanner { return result; } - TOptimizerPlanner( - const ui64 pathId, const std::shared_ptr& storagesManager, const std::shared_ptr& primaryKeysSchema); + TOptimizerPlanner(const ui64 pathId, const std::shared_ptr& storagesManager, + const std::shared_ptr& primaryKeysSchema, const std::vector& levelConstructors); }; } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp index e8854f74261d..18d09fe007aa 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp @@ -26,7 +26,7 @@ ui64 TZeroLevelPortions::DoGetWeight() const { return 0; } if (PredOptimization && TInstant::Now() - *PredOptimization < DurationToDrop) { - if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) { + if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < ExpectedBlobsSize) { return 0; } } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h index 7cdbf863b27a..cd7385501e3e 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h @@ -9,6 +9,7 @@ class TZeroLevelPortions: public IPortionsLevel { using TBase = IPortionsLevel; const TLevelCounters LevelCounters; const TDuration DurationToDrop; + const ui64 ExpectedBlobsSize; class TOrderedPortion { private: YDB_READONLY_DEF(TPortionInfo::TConstPtr, Portion); @@ -91,10 +92,11 @@ class TZeroLevelPortions: public IPortionsLevel { virtual TCompactionTaskData DoGetOptimizationTask() const override; public: - TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr& nextLevel, const TLevelCounters& levelCounters, const TDuration durationToDrop) + TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr& nextLevel, const TLevelCounters& levelCounters, const TDuration durationToDrop, const ui64 expectedBlobsSize) : TBase(levelIdx, nextLevel) , LevelCounters(levelCounters) , DurationToDrop(durationToDrop) + , ExpectedBlobsSize(expectedBlobsSize) { } };