diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.cpp b/ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.cpp new file mode 100644 index 000000000000..94a18e7e4140 --- /dev/null +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.cpp @@ -0,0 +1,21 @@ +#include "drop_stat.h" +#include + +namespace NKikimr::NKqp { + +TConclusionStatus TDropStatOperation::DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) { + { + auto fValue = features.Extract("NAME"); + if (!fValue) { + return TConclusionStatus::Fail("can't find parameter NAME"); + } + Name = *fValue; + } + return TConclusionStatus::Success(); +} + +void TDropStatOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const { + *schemaData.AddDropStatistics() = Name; +} + +} diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.h b/ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.h new file mode 100644 index 000000000000..777aae036858 --- /dev/null +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.h @@ -0,0 +1,19 @@ +#include "abstract.h" + +namespace NKikimr::NKqp { + +class TDropStatOperation : public ITableStoreOperation { + static TString GetTypeName() { + return "DROP_STAT"; + } + + static inline auto Registrator = TFactory::TRegistrator(GetTypeName()); +private: + TString Name; +public: + TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override; + void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const override; +}; + +} + diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.cpp b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.cpp new file mode 100644 index 000000000000..9e8360dd5e35 --- /dev/null +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.cpp @@ -0,0 +1,49 @@ +#include "upsert_stat.h" +#include +#include + +namespace NKikimr::NKqp { + +TConclusionStatus TUpsertStatOperation::DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) { + { + auto fValue = features.Extract("NAME"); + if (!fValue) { + return TConclusionStatus::Fail("can't find alter parameter NAME"); + } + Name = *fValue; + } + TString type; + { + auto fValue = features.Extract("TYPE"); + if (!fValue) { + return TConclusionStatus::Fail("can't find alter parameter TYPE"); + } + type = *fValue; + } + { + auto fValue = features.Extract("FEATURES"); + if (!fValue) { + return TConclusionStatus::Fail("can't find alter parameter FEATURES"); + } + if (!Constructor.Initialize(type)) { + return TConclusionStatus::Fail("can't initialize stat constructor object for type \"" + type + "\""); + } + NJson::TJsonValue jsonData; + if (!NJson::ReadJsonFastTree(*fValue, &jsonData)) { + return TConclusionStatus::Fail("incorrect json in request FEATURES parameter"); + } + auto result = Constructor->DeserializeFromJson(jsonData); + if (result.IsFail()) { + return result; + } + } + return TConclusionStatus::Success(); +} + +void TUpsertStatOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const { + auto* proto = schemaData.AddUpsertStatistics(); + proto->SetName(Name); + Constructor.SerializeToProto(*proto); +} + +} diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.h b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.h new file mode 100644 index 000000000000..5d8abdffae8d --- /dev/null +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.h @@ -0,0 +1,23 @@ +#include "abstract.h" +#include + +namespace NKikimr::NKqp { + +class TUpsertStatOperation : public ITableStoreOperation { +private: + static TString GetTypeName() { + return "UPSERT_STAT"; + } + + static inline const auto Registrator = TFactory::TRegistrator(GetTypeName()); +private: + TString Name; + NOlap::NStatistics::TConstructorContainer Constructor; +public: + TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override; + + void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const override; +}; + +} + diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make index 3301f543b8f6..354f6d641b65 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make @@ -7,6 +7,8 @@ SRCS( GLOBAL drop_column.cpp GLOBAL upsert_index.cpp GLOBAL drop_index.cpp + GLOBAL upsert_stat.cpp + GLOBAL drop_stat.cpp ) PEERDIR( diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 23d9eaa68493..c04a8ea37b5e 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -233,6 +233,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } result += R"( Columns { Name: "pk_int" Type: "Int64" } + Columns { Name: "ts" Type: "Timestamp" } KeyColumnNames: "pk_int" Engine: COLUMN_ENGINE_REPLACING_TIMESERIES )"; @@ -3836,6 +3837,72 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ABORT_UNLESS(bytesPack / bytesUnpack < 0.1); } + Y_UNIT_TEST(StatsUsage) { + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + { + auto settings = TKikimrSettings().SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTypedLocalHelper helper("Utf8", kikimr); + helper.CreateTestOlapTable(); + auto tableClient = kikimr.GetTableClient(); + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, TYPE=max, NAME=max_pk_int, FEATURES=`{\"column_name\": \"pk_int\"}`);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, TYPE=max, NAME=max_field, FEATURES=`{\"column_name\": \"field\"}`);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, TYPE=max, NAME=max_pk_int, FEATURES=`{\"column_name\": \"pk_int\"}`);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_STAT, NAME=max_pk_int);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + } + } + + Y_UNIT_TEST(StatsUsageWithTTL) { + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + { + auto settings = TKikimrSettings().SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTypedLocalHelper helper("Utf8", kikimr); + helper.CreateTestOlapTable(); + auto tableClient = kikimr.GetTableClient(); + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, TYPE=max, NAME=max_ts, FEATURES=`{\"column_name\": \"ts\"}`);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + auto alterQuery = TStringBuilder() << "ALTER TABLE `/Root/olapStore/olapTable` SET (TTL = Interval(\"P1D\") ON ts);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_STAT, NAME=max_ts);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + } + } + namespace { class TTransferStatus { private: diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 9228eb7837a3..ee3f4460437a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -17,6 +17,7 @@ import "ydb/public/api/protos/ydb_value.proto"; import "ydb/library/actors/protos/actors.proto"; import "ydb/library/mkql_proto/protos/minikql.proto"; import "ydb/core/protos/index_builder.proto"; +import "ydb/core/tx/columnshard/engines/scheme/statistics/protos/data.proto"; import "google/protobuf/empty.proto"; @@ -512,6 +513,7 @@ message TColumnTableSchema { optional bool CompositeMarks = 9 [ default = false ]; repeated TOlapIndexDescription Indexes = 10; + repeated NKikimrColumnShardStatisticsProto.TOperatorContainer Statistics = 11; } message TAlterColumnTableSchema { @@ -521,6 +523,8 @@ message TAlterColumnTableSchema { repeated TOlapColumnDiff AlterColumns = 7; repeated TOlapIndexRequested UpsertIndexes = 8; repeated string DropIndexes = 9; + repeated NKikimrColumnShardStatisticsProto.TConstructorContainer UpsertStatistics = 10; + repeated string DropStatistics = 11; } // Schema presets are used to manage multiple tables with the same schema diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 1e07404ac8f6..f7a30d622091 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -1,4 +1,5 @@ import "ydb/library/actors/protos/actors.proto"; +import "ydb/core/tx/columnshard/engines/scheme/statistics/protos/data.proto"; import "ydb/core/protos/flat_scheme_op.proto"; import "ydb/core/protos/long_tx_service.proto"; import "ydb/core/protos/subdomains.proto"; @@ -281,6 +282,7 @@ message TIndexPortionMeta { optional bytes PrimaryKeyBorders = 6; // arrow::RecordBatch with first and last ReplaceKey rows optional TSnapshot RecordSnapshotMin = 7; optional TSnapshot RecordSnapshotMax = 8; + optional NKikimrColumnShardStatisticsProto.TPortionStorage StatisticsStorage = 9; } message TIndexColumnMeta { diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index 7eb2b832a82c..ddd9d5659b2b 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -163,6 +163,7 @@ PEERDIR( ydb/library/yql/public/types ydb/library/services ydb/library/ydb_issue/proto + ydb/core/tx/columnshard/engines/scheme/statistics/protos ) CPP_PROTO_PLUGIN0(config_proto_plugin ydb/core/config/tools/protobuf_plugin) diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 2d81e956c122..e5450b02313b 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -3,6 +3,7 @@ #include "columnshard.h" #include "columnshard_impl.h" #include "blob_cache.h" +#include "engines/scheme/statistics/max/operator.h" #include #include @@ -105,11 +106,22 @@ struct TTestSchema { }; struct TTableSpecials : public TStorageTier { + private: + bool NeedTestStatisticsFlag = true; + public: std::vector Tiers; bool WaitEmptyAfter = false; TTableSpecials() noexcept = default; + bool NeedTestStatistics() const { + return NeedTestStatisticsFlag; + } + + void SetNeedTestStatistics(const bool value) { + NeedTestStatisticsFlag = value; + } + bool HasTiers() const { return !Tiers.empty(); } @@ -217,6 +229,12 @@ struct TTestSchema { for (ui32 i = 0; i < columns.size(); ++i) { *schema->MutableColumns()->Add() = columns[i].CreateColumn(i + 1); + if (!specials.NeedTestStatistics()) { + continue; + } + if (NOlap::NStatistics::NMax::TOperator::IsAvailableType(columns[i].GetType())) { + *schema->AddStatistics() = NOlap::NStatistics::TOperatorContainer(std::make_shared(i + 1)).SerializeToProto(); + } } Y_ABORT_UNLESS(pk.size() > 0); diff --git a/ydb/core/tx/columnshard/counters/engine_logs.cpp b/ydb/core/tx/columnshard/counters/engine_logs.cpp index 8086ccbe7934..2b269b0b18c4 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.cpp +++ b/ydb/core/tx/columnshard/counters/engine_logs.cpp @@ -47,6 +47,9 @@ TEngineLogsCounters::TEngineLogsCounters() PortionNoBorderCount = TBase::GetDeriviative("Ttl/PortionNoBorder/Count"); PortionNoBorderBytes = TBase::GetDeriviative("Ttl/PortionNoBorder/Bytes"); + + StatUsageForTTLCount = TBase::GetDeriviative("Ttl/StatUsageForTTLCount/Count"); + ChunkUsageForTTLCount = TBase::GetDeriviative("Ttl/ChunkUsageForTTLCount/Count"); } void TEngineLogsCounters::TPortionsInfoGuard::OnNewPortion(const std::shared_ptr& portion) const { diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h index 20aa0a9757c8..e9842cbd9422 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.h +++ b/ydb/core/tx/columnshard/counters/engine_logs.h @@ -243,6 +243,9 @@ class TEngineLogsCounters: public TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr PortionNoTtlColumnCount; NMonitoring::TDynamicCounters::TCounterPtr PortionNoTtlColumnBytes; + NMonitoring::TDynamicCounters::TCounterPtr StatUsageForTTLCount; + NMonitoring::TDynamicCounters::TCounterPtr ChunkUsageForTTLCount; + NMonitoring::TDynamicCounters::TCounterPtr PortionNoBorderCount; NMonitoring::TDynamicCounters::TCounterPtr PortionNoBorderBytes; @@ -302,6 +305,14 @@ class TEngineLogsCounters: public TCommonCountersOwner { PortionNoTtlColumnBytes->Add(size); } + void OnChunkUsageForTTL() const { + ChunkUsageForTTLCount->Add(1); + } + + void OnStatUsageForTTL() const { + StatUsageForTTLCount->Add(1); + } + void OnPortionNoBorder(const ui64 size) const { PortionNoBorderCount->Add(1); PortionNoBorderBytes->Add(size); diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 351097b9bbc8..38fd1c380490 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -195,7 +195,8 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc for (auto&& i : packs) { TGeneralSerializedSlice slice(std::move(i), GetSplitSettings()); auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); - AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), nullptr, GranuleMeta->GetPathId(), resultSchema->GetSnapshot(), SaverContext.GetStoragesManager())); + AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), nullptr, GranuleMeta->GetPathId(), + resultSchema->GetSnapshot(), SaverContext.GetStoragesManager(), resultSchema)); NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, IStoragesManager::DefaultStorageId); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index c162cfcef622..0d7d0ffda905 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -124,7 +124,7 @@ std::vector TChangesWithAppend::MakeAppendedPortions(cons for (auto&& i : packs) { TGeneralSerializedSlice slice(std::move(i), GetSplitSettings()); auto b = batch->Slice(recordIdx, slice.GetRecordsCount()); - out.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), nullptr, pathId, snapshot, SaverContext.GetStoragesManager())); + out.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), nullptr, pathId, snapshot, SaverContext.GetStoragesManager(), resultSchema)); NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); out.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, IStoragesManager::DefaultStorageId); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 7359531dcdc2..11e9bb6492c0 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -348,74 +348,87 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip ttl through busy portion")("portion_id", info->GetAddress().DebugString()); continue; } + auto portionSchema = VersionedIndex.GetSchema(info->GetMinSnapshot()); + auto statOperator = portionSchema->GetIndexInfo().GetStatistics(NStatistics::TIdentifier(NStatistics::EType::Max, {ttlColumnId})); + std::shared_ptr max; + if (!statOperator) { + max = info->MaxValue(ttlColumnId); + if (!max) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); + SignalCounters.OnPortionNoBorder(info->BlobsBytes()); + continue; + } else { + NYDBTest::TControllers::GetColumnShardController()->OnMaxValueUsage(); + SignalCounters.OnChunkUsageForTTL(); + } + } else { + NYDBTest::TControllers::GetColumnShardController()->OnStatisticsUsage(statOperator); + SignalCounters.OnStatUsageForTTL(); + max = statOperator.GetScalarVerified(info->GetMeta().GetStatisticsStorage()); + } const bool tryEvictPortion = ttl.HasTiers() && context.HasLimitsForEviction(); - if (auto max = info->MaxValue(ttlColumnId)) { - bool keep = !expireTimestampOpt; - if (expireTimestampOpt) { - auto mpiOpt = ttl.Ttl->ScalarToInstant(max); - Y_ABORT_UNLESS(mpiOpt); - const TInstant maxTtlPortionInstant = *mpiOpt; - const TDuration d = maxTtlPortionInstant - *expireTimestampOpt; - keep = !!d; - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds()); - if (d && dWaiting > d) { - dWaiting = d; - } + bool keep = !expireTimestampOpt; + if (expireTimestampOpt) { + auto mpiOpt = ttl.Ttl->ScalarToInstant(max); + Y_ABORT_UNLESS(mpiOpt); + const TInstant maxTtlPortionInstant = *mpiOpt; + const TDuration d = maxTtlPortionInstant - *expireTimestampOpt; + keep = !!d; + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds()); + if (d && dWaiting > d) { + dWaiting = d; } + } - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.HasLimitsForTtl()); - if (keep && tryEvictPortion) { - const TString currentTierName = info->GetMeta().GetTierName() ? info->GetMeta().GetTierName() : IStoragesManager::DefaultStorageId; - TString tierName = ""; - const TInstant maxChangePortionInstant = info->RecordSnapshotMax().GetPlanInstant(); - if (now - maxChangePortionInstant < TDuration::Minutes(60)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_portion_to_evict")("reason", "too_fresh")("delta", now - maxChangePortionInstant); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.HasLimitsForTtl()); + if (keep && tryEvictPortion) { + const TString currentTierName = info->GetMeta().GetTierName() ? info->GetMeta().GetTierName() : IStoragesManager::DefaultStorageId; + TString tierName = ""; + const TInstant maxChangePortionInstant = info->RecordSnapshotMax().GetPlanInstant(); + if (now - maxChangePortionInstant < TDuration::Minutes(60)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_portion_to_evict")("reason", "too_fresh")("delta", now - maxChangePortionInstant); + continue; + } + for (auto& tierRef : ttl.GetOrderedTiers()) { + auto& tierInfo = tierRef.Get(); + if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { + SignalCounters.OnPortionNoTtlColumn(info->BlobsBytes()); continue; } - for (auto& tierRef : ttl.GetOrderedTiers()) { - auto& tierInfo = tierRef.Get(); - if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { - SignalCounters.OnPortionNoTtlColumn(info->BlobsBytes()); - continue; - } - auto mpiOpt = tierInfo.ScalarToInstant(max); - Y_ABORT_UNLESS(mpiOpt); - const TInstant maxTieringPortionInstant = *mpiOpt; - - const TDuration d = tierInfo.GetEvictInstant(context.Now) - maxTieringPortionInstant; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering_choosing")("max", maxTieringPortionInstant.Seconds()) - ("evict", tierInfo.GetEvictInstant(context.Now).Seconds())("tier_name", tierInfo.GetName())("d", d); - if (d) { - tierName = tierInfo.GetName(); - break; - } else { - auto dWaitLocal = maxTieringPortionInstant - tierInfo.GetEvictInstant(context.Now); - if (dWaiting > dWaitLocal) { - dWaiting = dWaitLocal; - } + auto mpiOpt = tierInfo.ScalarToInstant(max); + Y_ABORT_UNLESS(mpiOpt); + const TInstant maxTieringPortionInstant = *mpiOpt; + + const TDuration d = tierInfo.GetEvictInstant(context.Now) - maxTieringPortionInstant; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering_choosing")("max", maxTieringPortionInstant.Seconds()) + ("evict", tierInfo.GetEvictInstant(context.Now).Seconds())("tier_name", tierInfo.GetName())("d", d); + if (d) { + tierName = tierInfo.GetName(); + break; + } else { + auto dWaitLocal = maxTieringPortionInstant - tierInfo.GetEvictInstant(context.Now); + if (dWaiting > dWaitLocal) { + dWaiting = dWaitLocal; } } - if (!tierName) { - tierName = IStoragesManager::DefaultStorageId; - } - if (currentTierName != tierName) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", currentTierName)("to", tierName); - context.Changes->AddPortionToEvict(*info, TPortionEvictionFeatures(tierName, pathId)); - context.AppPortionForEvictionChecker(*info); - SignalCounters.OnPortionToEvict(info->BlobsBytes()); - } } - if (!keep && context.HasLimitsForTtl()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString()); - AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second); - SignalCounters.OnPortionToDrop(info->BlobsBytes()); - context.AppPortionForTtlChecker(*info); + if (!tierName) { + tierName = IStoragesManager::DefaultStorageId; } - } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); - SignalCounters.OnPortionNoBorder(info->BlobsBytes()); + if (currentTierName != tierName) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", currentTierName)("to", tierName); + context.Changes->AddPortionToEvict(*info, TPortionEvictionFeatures(tierName, pathId)); + context.AppPortionForEvictionChecker(*info); + SignalCounters.OnPortionToEvict(info->BlobsBytes()); + } + } + if (!keep && context.HasLimitsForTtl()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString()); + AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second); + SignalCounters.OnPortionToDrop(info->BlobsBytes()); + context.AppPortionForTtlChecker(*info); } } if (dWaiting > TDuration::MilliSeconds(500) && (!context.HasLimitsForEviction() || !context.HasLimitsForTtl())) { diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp index a323f544c586..282211dd5cda 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -36,6 +36,14 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio return true; } FirstPkColumn = indexInfo.GetPKFirstColumnId(); + { + auto parsed = NStatistics::TPortionStorage::BuildFromProto(portionMeta.GetStatisticsStorage()); + if (!parsed) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", parsed.GetErrorMessage()); + return false; + } + StatisticsStorage = parsed.DetachResult(); + } TierName = portionMeta.GetTierName(); if (portionMeta.GetIsInserted()) { Produced = TPortionMeta::EProduced::INSERTED; @@ -72,7 +80,7 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const { NKikimrTxColumnShard::TIndexPortionMeta portionMeta; portionMeta.SetTierName(TierName); - + *portionMeta.MutableStatisticsStorage() = StatisticsStorage.SerializeToProto(); switch (Produced) { case TPortionMeta::EProduced::UNSPECIFIED: Y_ABORT_UNLESS(false); diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h index d98b76dac95a..ffe52cc9dd67 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include #include #include #include @@ -15,6 +16,7 @@ struct TPortionMeta { private: std::shared_ptr ReplaceKeyEdges; // first and last PK rows YDB_ACCESSOR_DEF(TString, TierName); + YDB_READONLY_DEF(NStatistics::TPortionStorage, StatisticsStorage); public: using EProduced = NPortion::EProduced; @@ -26,6 +28,11 @@ struct TPortionMeta { EProduced Produced{EProduced::UNSPECIFIED}; ui32 FirstPkColumn = 0; + void SetStatisticsStorage(NStatistics::TPortionStorage&& storage) { + AFL_VERIFY(StatisticsStorage.IsEmpty()); + StatisticsStorage = std::move(storage); + } + bool IsChunkWithPortionInfo(const ui32 columnId, const ui32 chunkIdx) const { return columnId == FirstPkColumn && chunkIdx == 0; } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 185e0adcda14..4b5f84d252bc 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -36,9 +36,7 @@ class TPortionInfo { TPortionMeta Meta; ui64 DeprecatedGranuleId = 0; YDB_READONLY_DEF(std::vector, Indexes); - std::vector BlobIds; - TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TPortionInfo& proto, const TIndexInfo& info); public: const TBlobRange RestoreBlobRange(const TBlobRangeLink16& linkRange) const { @@ -56,6 +54,10 @@ class TPortionInfo { THashMap DecodeBlobAddresses(NBlobOperations::NRead::TCompositeReadBlobs&& blobs, const TIndexInfo& indexInfo) const; + void SetStatisticsStorage(NStatistics::TPortionStorage&& storage) { + Meta.SetStatisticsStorage(std::move(storage)); + } + const TString& GetColumnStorageId(const ui32 columnId, const TIndexInfo& indexInfo) const; ui64 GetTxVolume() const; // fake-correct method for determ volume on rewrite this portion in transaction progress diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp index ef652b61f069..1b975c959064 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.cpp @@ -127,7 +127,7 @@ std::vector TPortionInfoWithBlobs::Restor } NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::vector&& chunks, - std::shared_ptr batch, const ui64 granule, const TSnapshot& snapshot, const std::shared_ptr& operators) + std::shared_ptr batch, const ui64 granule, const TSnapshot& snapshot, const std::shared_ptr& operators, const std::shared_ptr& schema) { TPortionInfoWithBlobs result(TPortionInfo(granule, 0, snapshot), batch); for (auto&& blob: chunks) { @@ -142,6 +142,7 @@ NKikimr::NOlap::TPortionInfoWithBlobs TPortionInfoWithBlobs::BuildByBlobs(std::v return l.GetAddress() < r.GetAddress(); }; std::sort(result.GetPortionInfo().Records.begin(), result.GetPortionInfo().Records.end(), pred); + schema->GetIndexInfo().FillStatistics(result); return result; } @@ -220,4 +221,17 @@ bool TPortionInfoWithBlobs::ExtractColumnChunks(const ui32 columnId, std::vector std::swap(chunksLocal, chunks); return true; } + +void TPortionInfoWithBlobs::FillStatistics(const std::map& operators, const TIndexInfo& index) { + NStatistics::TPortionStorage storage; + for (auto&& i : operators) { + THashMap>> data; + for (auto&& entityId : i.second->GetEntityIds()) { + data.emplace(entityId, GetEntityChunks(entityId)); + } + i.second->FillStatisticsData(data, storage, index); + } + PortionInfo.SetStatisticsStorage(std::move(storage)); +} + } diff --git a/ydb/core/tx/columnshard/engines/portions/with_blobs.h b/ydb/core/tx/columnshard/engines/portions/with_blobs.h index f171e7af7835..4a9f3cc25ddb 100644 --- a/ydb/core/tx/columnshard/engines/portions/with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/with_blobs.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include @@ -106,8 +108,11 @@ class TPortionInfoWithBlobs { return PortionInfo.BlobsBytes(); } + void FillStatistics(const std::map& operators, const TIndexInfo& index); + static TPortionInfoWithBlobs BuildByBlobs(std::vector&& chunks, - std::shared_ptr batch, const ui64 granule, const TSnapshot& snapshot, const std::shared_ptr& operators); + std::shared_ptr batch, const ui64 granule, const TSnapshot& snapshot, const std::shared_ptr& operators, + const std::shared_ptr& schema); std::optional ChangeSaver(ISnapshotSchema::TPtr currentSchema, const TSaverContext& saverContext) const; diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index d3b35fd45d9e..d57fa0c2eb3e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -1,5 +1,7 @@ #include "index_info.h" +#include "statistics/abstract/operator.h" +#include #include #include #include @@ -370,6 +372,17 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& return false; } + { + NStatistics::TPortionStorageCursor cursor; + for (const auto& stat : schema.GetStatistics()) { + NStatistics::TOperatorContainer container; + AFL_VERIFY(container.DeserializeFromProto(stat)); + container.SetCursor(cursor); + Statistics.emplace(container->GetIdentifier(), container); + container->ShiftCursor(cursor); + } + } + for (const auto& idx : schema.GetIndexes()) { NIndexes::TIndexMetaContainer meta; AFL_VERIFY(meta.DeserializeFromProto(idx)); @@ -476,4 +489,8 @@ void TIndexInfo::InitializeCaches(const std::shared_ptr& opera } } +void TIndexInfo::FillStatistics(TPortionInfoWithBlobs& portion) const { + portion.FillStatistics(Statistics, *this); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 1e495cabfca6..12c6b3ee80c7 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -3,6 +3,10 @@ #include "column_features.h" #include "tier_info.h" +#include "indexes/abstract/meta.h" +#include "statistics/abstract/operator.h" +#include "statistics/abstract/common.h" + #include #include @@ -12,7 +16,6 @@ #include #include #include -#include "indexes/abstract/meta.h" namespace arrow { class Array; @@ -26,6 +29,7 @@ namespace NKikimr::NArrow { namespace NKikimr::NOlap { +class TPortionInfoWithBlobs; struct TInsertedData; class TSnapshotColumnInfo; using TNameTypeInfo = std::pair; @@ -37,6 +41,7 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { THashMap ColumnFeatures; THashMap> ArrowColumnByColumnIdCache; THashMap Indexes; + std::map Statistics; TIndexInfo(const TString& name); bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema, const std::shared_ptr& operators); TColumnFeatures& GetOrCreateColumnFeatures(const ui32 columnId) const; @@ -49,6 +54,16 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { static const TString STORE_INDEX_STATS_TABLE; static const TString TABLE_INDEX_STATS_TABLE; + void FillStatistics(TPortionInfoWithBlobs& portion) const; + + NStatistics::TOperatorContainer GetStatistics(const NStatistics::TIdentifier& id) const { + auto it = Statistics.find(id); + if (it != Statistics.end()) { + return it->second; + } + return NStatistics::TOperatorContainer(); + } + const THashMap& GetIndexes() const { return Indexes; } diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.cpp b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.cpp new file mode 100644 index 000000000000..e7960e66809e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.cpp @@ -0,0 +1,40 @@ +#include "common.h" +#include + +namespace NKikimr::NOlap::NStatistics { + +TIdentifier::TIdentifier(const EType type, const std::vector& entities) + : Type(type) + , EntityIds(entities) +{ + AFL_VERIFY(EntityIds.size()); +} + +bool TIdentifier::operator<(const TIdentifier& item) const { + if (Type != item.Type) { + return (ui32)Type < (ui32)item.Type; + } + for (ui32 i = 0; i < std::min(EntityIds.size(), item.EntityIds.size()); ++i) { + if (EntityIds[i] < item.EntityIds[i]) { + return true; + } + } + return false; +} + +bool TIdentifier::operator==(const TIdentifier& item) const { + if (Type != item.Type) { + return false; + } + if (EntityIds.size() != item.EntityIds.size()) { + return false; + } + for (ui32 i = 0; i < EntityIds.size(); ++i) { + if (EntityIds[i] != item.EntityIds[i]) { + return false; + } + } + return true; +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.h b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.h new file mode 100644 index 000000000000..5a1b5b892740 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.h @@ -0,0 +1,23 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NOlap::NStatistics { +enum class EType { + Undefined /* "undefined" */, + Max /* "max" */ +}; + +class TIdentifier { +private: + YDB_READONLY(EType, Type, EType::Undefined); + YDB_READONLY_DEF(std::vector, EntityIds); +public: + TIdentifier(const EType type, const std::vector& entities); + + bool operator<(const TIdentifier& item) const; + bool operator==(const TIdentifier& item) const; +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.cpp b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.cpp new file mode 100644 index 000000000000..5713317c7d21 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.cpp @@ -0,0 +1,5 @@ +#include "constructor.h" + +namespace NKikimr::NOlap::NStatistics { + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.h b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.h new file mode 100644 index 000000000000..77d434d1eb5f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.h @@ -0,0 +1,69 @@ +#pragma once +#include "common.h" +#include "portion_storage.h" +#include "operator.h" + +#include + +namespace NKikimr::NSchemeShard { +class TOlapSchema; +} + +namespace NKikimrColumnShardStatisticsProto { +class TOperatorContainer; +} + +namespace NKikimr::NOlap::NStatistics { + +class IConstructor { +private: + YDB_READONLY(EType, Type, EType::Undefined); + IConstructor() = default; +protected: + virtual TConclusion DoCreateOperator(const NSchemeShard::TOlapSchema& currentSchema) const = 0; + virtual bool DoDeserializeFromProto(const NKikimrColumnShardStatisticsProto::TConstructorContainer& proto) = 0; + virtual void DoSerializeToProto(NKikimrColumnShardStatisticsProto::TConstructorContainer& proto) const = 0; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonData) = 0; +public: + using TProto = NKikimrColumnShardStatisticsProto::TConstructorContainer; + using TFactory = NObjectFactory::TObjectFactory; + + virtual ~IConstructor() = default; + + IConstructor(const EType type) + :Type(type) { + + } + + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonData) { + return DoDeserializeFromJson(jsonData); + } + + TConclusion CreateOperator(const NSchemeShard::TOlapSchema& currentSchema) const { + return DoCreateOperator(currentSchema); + } + + TString GetClassName() const { + return ::ToString(Type); + } + + bool DeserializeFromProto(const NKikimrColumnShardStatisticsProto::TConstructorContainer& proto) { + if (!TryFromString(proto.GetClassName(), Type)) { + return false; + } + return DoDeserializeFromProto(proto); + } + + void SerializeToProto(NKikimrColumnShardStatisticsProto::TConstructorContainer& proto) const { + return DoSerializeToProto(proto); + } +}; + +class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer; +public: + using TBase::TBase; +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.cpp b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.cpp new file mode 100644 index 000000000000..357d8bbd3934 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.cpp @@ -0,0 +1,12 @@ +#include "operator.h" + +namespace NKikimr::NOlap::NStatistics { + +bool IOperator::DeserializeFromProto(const NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) { + if (!TryFromString(proto.GetClassName(), Type)) { + return false; + } + return DoDeserializeFromProto(proto); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.h b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.h new file mode 100644 index 000000000000..f84966ae6df0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.h @@ -0,0 +1,74 @@ +#pragma once +#include "common.h" +#include "portion_storage.h" + +#include +#include + +#include + +namespace NKikimr::NOlap::NStatistics { + +class IOperator { +private: + YDB_READONLY(EType, Type, EType::Undefined); + IOperator() = default; +protected: + virtual void DoFillStatisticsData(const THashMap>>& data, TPortionStorage& portionStats, const TIndexInfo& index) const = 0; + virtual void DoShiftCursor(TPortionStorageCursor& cursor) const = 0; + virtual bool DoDeserializeFromProto(const NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) = 0; + virtual void DoSerializeToProto(NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) const = 0; +public: + using TProto = NKikimrColumnShardStatisticsProto::TOperatorContainer; + using TFactory = NObjectFactory::TObjectFactory; + + virtual ~IOperator() = default; + + virtual std::vector GetEntityIds() const = 0; + + IOperator(const EType type) + :Type(type) { + + } + + void ShiftCursor(TPortionStorageCursor& cursor) const { + DoShiftCursor(cursor); + } + + void FillStatisticsData(const THashMap>>& data, TPortionStorage& portionStats, const TIndexInfo& index) const { + DoFillStatisticsData(data, portionStats, index); + } + + TString GetClassName() const { + return ::ToString(Type); + } + + TIdentifier GetIdentifier() const { + return TIdentifier(Type, GetEntityIds()); + } + + bool DeserializeFromProto(const NKikimrColumnShardStatisticsProto::TOperatorContainer& proto); + + void SerializeToProto(NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) const { + return DoSerializeToProto(proto); + } +}; + +class TOperatorContainer: public NBackgroundTasks::TInterfaceProtoContainer { +private: + std::optional Cursor; + using TBase = NBackgroundTasks::TInterfaceProtoContainer; +public: + using TBase::TBase; + void SetCursor(const TPortionStorageCursor& cursor) { + AFL_VERIFY(!Cursor); + Cursor = cursor; + } + + std::shared_ptr GetScalarVerified(const TPortionStorage& storage) { + AFL_VERIFY(!!Cursor); + return storage.GetScalarVerified(*Cursor); + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/portion_storage.cpp b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/portion_storage.cpp new file mode 100644 index 000000000000..f6ea83924ba0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/portion_storage.cpp @@ -0,0 +1,114 @@ +#include "portion_storage.h" +#include +#include + +namespace NKikimr::NOlap::NStatistics { + +NKikimrColumnShardStatisticsProto::TScalar TPortionStorage::ScalarToProto(const arrow::Scalar& scalar) { + NKikimrColumnShardStatisticsProto::TScalar result; + switch (scalar.type->id()) { + case arrow::Type::BOOL: + result.SetBool(static_cast(scalar).value); + break; + case arrow::Type::UINT8: + result.SetUint8(static_cast(scalar).value); + break; + case arrow::Type::UINT16: + result.SetUint16(static_cast(scalar).value); + break; + case arrow::Type::UINT32: + result.SetUint32(static_cast(scalar).value); + break; + case arrow::Type::UINT64: + result.SetUint64(static_cast(scalar).value); + break; + case arrow::Type::INT8: + result.SetInt8(static_cast(scalar).value); + break; + case arrow::Type::INT16: + result.SetInt16(static_cast(scalar).value); + break; + case arrow::Type::INT32: + result.SetInt32(static_cast(scalar).value); + break; + case arrow::Type::INT64: + result.SetInt64(static_cast(scalar).value); + break; + case arrow::Type::DOUBLE: + result.SetDouble(static_cast(scalar).value); + break; + case arrow::Type::TIMESTAMP: + { + auto* ts = result.MutableTimestamp(); + ts->SetValue(static_cast(scalar).value); + ts->SetUnit(static_cast(*scalar.type).unit()); + break; + } + default: + AFL_VERIFY(false)("problem", "incorrect type for statistics usage")("type", scalar.type->ToString()); + } + return result; +} + +std::shared_ptr TPortionStorage::ProtoToScalar(const NKikimrColumnShardStatisticsProto::TScalar& proto) { + if (proto.HasBool()) { + return std::make_shared(proto.GetBool()); + } else if (proto.HasUint8()) { + return std::make_shared(proto.GetUint8()); + } else if (proto.HasUint16()) { + return std::make_shared(proto.GetUint16()); + } else if (proto.HasUint32()) { + return std::make_shared(proto.GetUint32()); + } else if (proto.HasUint64()) { + return std::make_shared(proto.GetUint64()); + } else if (proto.HasInt8()) { + return std::make_shared(proto.GetInt8()); + } else if (proto.HasInt16()) { + return std::make_shared(proto.GetInt16()); + } else if (proto.HasInt32()) { + return std::make_shared(proto.GetInt32()); + } else if (proto.HasInt64()) { + return std::make_shared(proto.GetInt64()); + } else if (proto.HasDouble()) { + return std::make_shared(proto.GetDouble()); + } else if (proto.HasTimestamp()) { + arrow::TimeUnit::type unit = arrow::TimeUnit::type(proto.GetTimestamp().GetUnit()); + return std::make_shared(proto.GetTimestamp().GetValue(), std::make_shared(unit)); + } + AFL_VERIFY(false)("problem", "incorrect statistics proto")("proto", proto.DebugString()); + return nullptr; +} + +std::shared_ptr TPortionStorage::GetScalarVerified(const TPortionStorageCursor& cursor) const { + AFL_VERIFY(cursor.GetScalarsPosition() < Data.size()); + AFL_VERIFY(Data[cursor.GetScalarsPosition()]); + return Data[cursor.GetScalarsPosition()]; +} + +void TPortionStorage::AddScalar(const std::shared_ptr& scalar) { + const auto type = scalar->type->id(); + AFL_VERIFY(type == arrow::Type::BOOL || + type == arrow::Type::UINT8 || type == arrow::Type::UINT16 || type == arrow::Type::UINT32 || type == arrow::Type::UINT64 || + type == arrow::Type::INT8 || type == arrow::Type::INT16 || type == arrow::Type::INT32 || type == arrow::Type::INT64 || + type == arrow::Type::DOUBLE || type == arrow::Type::TIMESTAMP) + ("problem", "incorrect_stat_type")("incoming", scalar->type->ToString()); + Data.emplace_back(scalar); +} + +NKikimrColumnShardStatisticsProto::TPortionStorage TPortionStorage::SerializeToProto() const { + NKikimrColumnShardStatisticsProto::TPortionStorage result; + for (auto&& i : Data) { + AFL_VERIFY(i); + *result.AddScalars() = ScalarToProto(*i); + } + return result; +} + +NKikimr::TConclusionStatus TPortionStorage::DeserializeFromProto(const NKikimrColumnShardStatisticsProto::TPortionStorage& proto) { + for (auto&& i : proto.GetScalars()) { + Data.emplace_back(ProtoToScalar(i)); + } + return TConclusionStatus::Success(); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/portion_storage.h b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/portion_storage.h new file mode 100644 index 000000000000..a3e4b6bcb0dd --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/portion_storage.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include +#include + +#include + +namespace NKikimrColumnShardStatisticsProto { +class TScalar; +class TPortionStorage; +} + +namespace NKikimr::NOlap::NStatistics { +class TPortionStorageCursor { +private: + YDB_READONLY(ui32, ScalarsPosition, 0); +public: + TPortionStorageCursor() = default; + + void AddScalarsPosition(const ui32 shift) { + ScalarsPosition += shift; + } +}; + +class TPortionStorage { +private: + YDB_READONLY_DEF(std::vector>, Data); + static NKikimrColumnShardStatisticsProto::TScalar ScalarToProto(const arrow::Scalar& value); + static std::shared_ptr ProtoToScalar(const NKikimrColumnShardStatisticsProto::TScalar& proto); + TConclusionStatus DeserializeFromProto(const NKikimrColumnShardStatisticsProto::TPortionStorage& proto); + +public: + bool IsEmpty() const { + return Data.empty(); + } + + std::shared_ptr GetScalarVerified(const TPortionStorageCursor& cursor) const; + + void AddScalar(const std::shared_ptr& scalar); + + NKikimrColumnShardStatisticsProto::TPortionStorage SerializeToProto() const; + + static TConclusion BuildFromProto(const NKikimrColumnShardStatisticsProto::TPortionStorage& proto) { + TPortionStorage result; + auto parse = result.DeserializeFromProto(proto); + if (!parse) { + return parse; + } + return result; + } +}; +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/ya.make new file mode 100644 index 000000000000..40ede1168864 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/abstract/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + portion_storage.cpp + constructor.cpp + operator.cpp + common.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/scheme/statistics/protos + contrib/libs/apache/arrow + ydb/library/actors/core + ydb/library/conclusion + ydb/core/tx/columnshard/splitter +) + +GENERATE_ENUM_SERIALIZATION(common.h) + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/max/constructor.cpp b/ydb/core/tx/columnshard/engines/scheme/statistics/max/constructor.cpp new file mode 100644 index 000000000000..c10c2c7e741b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/max/constructor.cpp @@ -0,0 +1,45 @@ +#include "constructor.h" +#include "operator.h" + +namespace NKikimr::NOlap::NStatistics::NMax { + +NKikimr::TConclusion TConstructor::DoCreateOperator(const NSchemeShard::TOlapSchema& currentSchema) const { + auto column = currentSchema.GetColumns().GetByName(ColumnName); + if (!TOperator::IsAvailableType(column->GetType())) { + return TConclusionStatus::Fail("incorrect type for stat calculation"); + } + return TOperatorContainer(std::make_shared(column->GetId())); +} + +bool TConstructor::DoDeserializeFromProto(const NKikimrColumnShardStatisticsProto::TConstructorContainer& proto) { + if (!proto.HasMax()) { + return false; + } + ColumnName = proto.GetMax().GetColumnName(); + if (!ColumnName) { + return false; + } + return true; +} + +void TConstructor::DoSerializeToProto(NKikimrColumnShardStatisticsProto::TConstructorContainer& proto) const { + AFL_VERIFY(!!ColumnName); + proto.MutableMax()->SetColumnName(ColumnName); +} + +NKikimr::TConclusionStatus TConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonData) { + if (!jsonData.Has("column_name")) { + return TConclusionStatus::Fail("no column_name field in json description"); + } + TString columnNameLocal; + if (!jsonData["column_name"].GetString(&columnNameLocal)) { + return TConclusionStatus::Fail("incorrect column_name field in json description (no string)"); + } + if (!columnNameLocal) { + return TConclusionStatus::Fail("empty column_name field in json description"); + } + ColumnName = columnNameLocal; + return TConclusionStatus::Success(); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/max/constructor.h b/ydb/core/tx/columnshard/engines/scheme/statistics/max/constructor.h new file mode 100644 index 000000000000..56f384dedba0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/max/constructor.h @@ -0,0 +1,33 @@ +#pragma once +#include +#include + +#include + +namespace NKikimr::NOlap::NStatistics::NMax { + +class TConstructor: public IConstructor { +private: + using TBase = IConstructor; + static inline const auto Registrator = TFactory::TRegistrator(::ToString(EType::Max)); + YDB_READONLY(TString, ColumnName, 0); +protected: + virtual TConclusion DoCreateOperator(const NSchemeShard::TOlapSchema& currentSchema) const override; + virtual bool DoDeserializeFromProto(const NKikimrColumnShardStatisticsProto::TConstructorContainer& proto) override; + virtual void DoSerializeToProto(NKikimrColumnShardStatisticsProto::TConstructorContainer& proto) const override; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonData) override; +public: + TConstructor(const TString& columnName) + : TBase(EType::Max) + , ColumnName(columnName) + { + + } + + TConstructor() + :TBase(EType::Max) { + + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/max/operator.cpp b/ydb/core/tx/columnshard/engines/scheme/statistics/max/operator.cpp new file mode 100644 index 000000000000..4ba2adcbfdc6 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/max/operator.cpp @@ -0,0 +1,39 @@ +#include "operator.h" +#include +#include + +namespace NKikimr::NOlap::NStatistics::NMax { + +void TOperator::DoFillStatisticsData(const THashMap>>& data, TPortionStorage& portionStats, const TIndexInfo& index) const { + AFL_VERIFY(data.size() == 1); + auto loader = index.GetColumnLoaderVerified(EntityId); + std::shared_ptr result; + for (auto&& i : data.begin()->second) { + auto rb = NArrow::TStatusValidator::GetValid(loader->Apply(i->GetData())); + AFL_VERIFY(rb->num_columns() == 1); + auto res = NArrow::FindMinMaxPosition(rb->column(0)); + auto currentScalarMax = NArrow::TStatusValidator::GetValid(rb->column(0)->GetScalar(res.second)); + if (!result || NArrow::ScalarCompare(result, currentScalarMax) < 0) { + result = currentScalarMax; + } + } + portionStats.AddScalar(result); +} + +bool TOperator::DoDeserializeFromProto(const NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) { + if (!proto.HasMax()) { + return false; + } + EntityId = proto.GetMax().GetEntityId(); + if (!EntityId) { + return false; + } + return true; +} + +void TOperator::DoSerializeToProto(NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) const { + AFL_VERIFY(EntityId); + proto.MutableMax()->SetEntityId(EntityId); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/max/operator.h b/ydb/core/tx/columnshard/engines/scheme/statistics/max/operator.h new file mode 100644 index 000000000000..aabdf949be8d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/max/operator.h @@ -0,0 +1,57 @@ +#pragma once +#include + +namespace NKikimr::NOlap::NStatistics::NMax { + +class TOperator: public IOperator { +private: + using TBase = IOperator; + ui32 EntityId = 0; + static inline auto Registrator = TFactory::TRegistrator(::ToString(EType::Max)); +protected: + virtual void DoFillStatisticsData(const THashMap>>& data, TPortionStorage& portionStats, const TIndexInfo& index) const override; + virtual void DoShiftCursor(TPortionStorageCursor& cursor) const override { + cursor.AddScalarsPosition(1); + } + virtual std::vector GetEntityIds() const override { + return {EntityId}; + } + virtual bool DoDeserializeFromProto(const NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) override; + virtual void DoSerializeToProto(NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) const override; +public: + + static bool IsAvailableType(const NScheme::TTypeInfo type) { + switch (type.GetTypeId()) { + case NScheme::NTypeIds::Int8: + case NScheme::NTypeIds::Uint8: + case NScheme::NTypeIds::Int16: + case NScheme::NTypeIds::Uint16: + case NScheme::NTypeIds::Int32: + case NScheme::NTypeIds::Uint32: + case NScheme::NTypeIds::Int64: + case NScheme::NTypeIds::Uint64: + case NScheme::NTypeIds::Timestamp: + case NScheme::NTypeIds::Double: + case NScheme::NTypeIds::Datetime: + case NScheme::NTypeIds::Date: + return true; + default: + break; + } + return false; + } + + TOperator() + : TBase(EType::Max) + { + + } + + TOperator(const ui32 entityId) + : TBase(EType::Max) + , EntityId(entityId) { + + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/max/ya.make b/ydb/core/tx/columnshard/engines/scheme/statistics/max/ya.make new file mode 100644 index 000000000000..53d06e168bd2 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/max/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + GLOBAL constructor.cpp + GLOBAL operator.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/scheme/statistics/abstract + ydb/core/tx/schemeshard/olap/schema + ydb/core/formats/arrow +) + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/protos/data.proto b/ydb/core/tx/columnshard/engines/scheme/statistics/protos/data.proto new file mode 100644 index 000000000000..dcbb3a5ff8bf --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/protos/data.proto @@ -0,0 +1,54 @@ +package NKikimrColumnShardStatisticsProto; + +message TScalar { + message TTimestamp { + optional uint64 Value = 1; + optional uint32 Unit = 2; + } + oneof Value { + bool Bool = 1; + uint32 Uint8 = 2; + uint32 Uint16 = 3; + uint32 Uint32 = 4; + uint64 Uint64 = 5; + + int32 Int8 = 6; + int32 Int16 = 7; + int32 Int32 = 8; + int64 Int64 = 9; + + double Double = 10; + + TTimestamp Timestamp = 11; + } +} + +message TPortionStorage { + repeated TScalar Scalars = 1; +} + +message TMaxConstructor { + optional string ColumnName = 3; +} + +message TConstructorContainer { + optional string Name = 1; + + optional string ClassName = 40; + oneof Implementation { + TMaxConstructor Max = 41; + } +} + +message TMaxOperator { + optional uint32 EntityId = 1; +} + +message TOperatorContainer { + optional string Name = 1; + + optional string ClassName = 40; + oneof Implementation { + TMaxOperator Max = 41; + } +} diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/protos/ya.make b/ydb/core/tx/columnshard/engines/scheme/statistics/protos/ya.make new file mode 100644 index 000000000000..f72b3b7cf620 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/protos/ya.make @@ -0,0 +1,11 @@ +PROTO_LIBRARY() + +SRCS( + data.proto +) + +PEERDIR( + +) + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/statistics/ya.make b/ydb/core/tx/columnshard/engines/scheme/statistics/ya.make new file mode 100644 index 000000000000..d1c042780ff6 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/statistics/ya.make @@ -0,0 +1,9 @@ +LIBRARY() + +PEERDIR( + ydb/core/tx/columnshard/engines/scheme/statistics/abstract + ydb/core/tx/columnshard/engines/scheme/statistics/max + ydb/core/tx/columnshard/engines/scheme/statistics/protos +) + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make index 84d7a7072980..b6bf9874c4a6 100644 --- a/ydb/core/tx/columnshard/engines/scheme/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/ya.make @@ -15,6 +15,7 @@ PEERDIR( ydb/library/actors/core ydb/core/tx/columnshard/engines/scheme/indexes + ydb/core/tx/columnshard/engines/scheme/statistics ydb/core/tx/columnshard/blobs_action/abstract ) diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 663863feddbc..489f58afd6ea 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -19,6 +19,9 @@ class TColumnShard; namespace NKikimr::NOlap { class TColumnEngineChanges; class IBlobsGCAction; +namespace NStatistics { +class TOperatorContainer; +} } namespace arrow { class RecordBatch; @@ -79,7 +82,11 @@ class ICSController { void OnDataSharingStarted(const ui64 tabletId, const TString& sessionId) { return DoOnDataSharingStarted(tabletId, sessionId); } - + virtual void OnStatisticsUsage(const NOlap::NStatistics::TOperatorContainer& /*statOperator*/) { + + } + virtual void OnMaxValueUsage() { + } void OnTabletInitCompleted(const NColumnShard::TColumnShard& shard) { DoOnTabletInitCompleted(shard); } diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index 3bc851a02bb4..d189226aa000 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -36,6 +36,8 @@ class TWaitCompactionController: public NKikimr::NYDBTest::NColumnShard::TContro NMetadata::NFetcher::ISnapshot::TPtr CurrentConfig; bool CompactionEnabledFlag = true; ui32 TiersModificationsCount = 0; + YDB_READONLY(TAtomicCounter, StatisticsUsageCount, 0); + YDB_READONLY(TAtomicCounter, MaxValueUsageCount, 0); protected: virtual void OnTieringModified(const std::shared_ptr& /*tiers*/) override { ++TiersModificationsCount; @@ -60,6 +62,12 @@ class TWaitCompactionController: public NKikimr::NYDBTest::NColumnShard::TContro return true; } public: + virtual void OnStatisticsUsage(const NOlap::NStatistics::TOperatorContainer& /*statOperator*/) override { + StatisticsUsageCount.Inc(); + } + virtual void OnMaxValueUsage() override { + MaxValueUsageCount.Inc(); + } void SetCompactionEnabled(const bool value) { CompactionEnabledFlag = value; } @@ -397,6 +405,14 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT(CheckSame(rb, PORTION_ROWS, spec.TtlColumn, ts[0])); } + + if (spec.NeedTestStatistics()) { + AFL_VERIFY(csControllerGuard->GetStatisticsUsageCount().Val()); + AFL_VERIFY(!csControllerGuard->GetMaxValueUsageCount().Val()); + } else { + AFL_VERIFY(!csControllerGuard->GetStatisticsUsageCount().Val()); + AFL_VERIFY(csControllerGuard->GetMaxValueUsageCount().Val()); + } } class TCountersContainer { @@ -758,6 +774,14 @@ std::vector> TestTiers(bool reboots, const std::vectorGetStatisticsUsageCount().Val()); + AFL_VERIFY(!csControllerGuard->GetMaxValueUsageCount().Val()); + } else { + AFL_VERIFY(!csControllerGuard->GetStatisticsUsageCount().Val()); + AFL_VERIFY(csControllerGuard->GetMaxValueUsageCount().Val()); + } + return specRowsBytes; } @@ -904,10 +928,11 @@ std::vector> TestOneTierExport(const TTestSchema::TTableSp return rowsBytes; } -void TestTwoHotTiers(bool reboot, bool changeTtl, const EInitialEviction initial = EInitialEviction::None, +void TestTwoHotTiers(bool reboot, bool changeTtl, const bool statisticsUsage, const EInitialEviction initial = EInitialEviction::None, bool revCompaction = false) { TTestSchema::TTableSpecials spec; spec.SetTtlColumn("timestamp"); + spec.SetNeedTestStatistics(statisticsUsage); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp")); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp")); spec.Tiers[(revCompaction ? 0 : 1)].SetCodec("zstd"); @@ -940,13 +965,13 @@ void TestTwoHotTiers(bool reboot, bool changeTtl, const EInitialEviction initial } } -void TestHotAndColdTiers(bool reboot, const EInitialEviction initial) { +void TestHotAndColdTiers(bool reboot, const EInitialEviction initial, const bool statisticsUsage) { TTestSchema::TTableSpecials spec; spec.SetTtlColumn("timestamp"); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp")); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp")); spec.Tiers.back().S3 = TTestSchema::TStorageTier::FakeS3(); - + spec.SetNeedTestStatistics(statisticsUsage); TestTiersAndTtl(spec, reboot, initial); } @@ -1329,64 +1354,91 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { // TODO: EnableOneTierAfterTtl, EnableTtlAfterOneTier Y_UNIT_TEST(HotTiers) { - TestTwoHotTiers(false, false); + TestTwoHotTiers(false, false, false); } Y_UNIT_TEST(RebootHotTiers) { - TestTwoHotTiers(true, false); + TestTwoHotTiers(true, false, false); + } + + Y_UNIT_TEST(HotTiersWithStat) { + TestTwoHotTiers(false, false, true); + } + + Y_UNIT_TEST(RebootHotTiersWithStat) { + TestTwoHotTiers(true, false, true); } Y_UNIT_TEST(HotTiersRevCompression) { - TestTwoHotTiers(false, false, EInitialEviction::None, true); + TestTwoHotTiers(false, false, false, EInitialEviction::None, true); } Y_UNIT_TEST(RebootHotTiersRevCompression) { - TestTwoHotTiers(true, false, EInitialEviction::None, true); + TestTwoHotTiers(true, false, false, EInitialEviction::None, true); } Y_UNIT_TEST(HotTiersTtl) { NColumnShard::gAllowLogBatchingDefaultValue = false; - TestTwoHotTiers(false, true); + TestTwoHotTiers(false, true, false); } Y_UNIT_TEST(RebootHotTiersTtl) { NColumnShard::gAllowLogBatchingDefaultValue = false; - TestTwoHotTiers(true, true); + TestTwoHotTiers(true, true, false); + } + + Y_UNIT_TEST(HotTiersTtlWithStat) { + NColumnShard::gAllowLogBatchingDefaultValue = false; + TestTwoHotTiers(false, true, true); + } + + Y_UNIT_TEST(RebootHotTiersTtlWithStat) { + NColumnShard::gAllowLogBatchingDefaultValue = false; + TestTwoHotTiers(true, true, true); } Y_UNIT_TEST(HotTiersAfterTtl) { - TestTwoHotTiers(false, false, EInitialEviction::Ttl); + TestTwoHotTiers(false, false, false, EInitialEviction::Ttl); } Y_UNIT_TEST(RebootHotTiersAfterTtl) { - TestTwoHotTiers(true, false, EInitialEviction::Ttl); + TestTwoHotTiers(true, false, false, EInitialEviction::Ttl); } // TODO: EnableTtlAfterHotTiers Y_UNIT_TEST(ColdTiers) { - TestHotAndColdTiers(false, EInitialEviction::Tiering); + TestHotAndColdTiers(false, EInitialEviction::Tiering, false); } Y_UNIT_TEST(RebootColdTiers) { //NColumnShard::gAllowLogBatchingDefaultValue = false; - TestHotAndColdTiers(true, EInitialEviction::Tiering); + TestHotAndColdTiers(true, EInitialEviction::Tiering, false); + } + + Y_UNIT_TEST(ColdTiersWithStat) { + TestHotAndColdTiers(false, EInitialEviction::Tiering, true); + } + + Y_UNIT_TEST(RebootColdTiersWithStat) { + //NColumnShard::gAllowLogBatchingDefaultValue = false; + TestHotAndColdTiers(true, EInitialEviction::Tiering, true); } Y_UNIT_TEST(EnableColdTiersAfterNoEviction) { - TestHotAndColdTiers(false, EInitialEviction::None); + TestHotAndColdTiers(false, EInitialEviction::None, false); } Y_UNIT_TEST(RebootEnableColdTiersAfterNoEviction) { - TestHotAndColdTiers(true, EInitialEviction::None); + TestHotAndColdTiers(true, EInitialEviction::None, false); } Y_UNIT_TEST(EnableColdTiersAfterTtl) { - TestHotAndColdTiers(false, EInitialEviction::Ttl); + TestHotAndColdTiers(false, EInitialEviction::Ttl, false); } Y_UNIT_TEST(RebootEnableColdTiersAfterTtl) { - TestHotAndColdTiers(true, EInitialEviction::Ttl); + TestHotAndColdTiers(true, EInitialEviction::Ttl, false); } Y_UNIT_TEST(OneColdTier) { diff --git a/ydb/core/tx/schemeshard/common/validation.cpp b/ydb/core/tx/schemeshard/common/validation.cpp new file mode 100644 index 000000000000..51615e4e1119 --- /dev/null +++ b/ydb/core/tx/schemeshard/common/validation.cpp @@ -0,0 +1,30 @@ +#include "validation.h" + +namespace NKikimr::NSchemeShard::NValidation { + +bool TTTLValidator::ValidateUnit(const NScheme::TTypeId columnType, NKikimrSchemeOp::TTTLSettings::EUnit unit, TString& errStr) { + switch (columnType) { + case NScheme::NTypeIds::Date: + case NScheme::NTypeIds::Datetime: + case NScheme::NTypeIds::Timestamp: + if (unit != NKikimrSchemeOp::TTTLSettings::UNIT_AUTO) { + errStr = "To enable TTL on date type column 'DateTypeColumnModeSettings' should be specified"; + return false; + } + break; + case NScheme::NTypeIds::Uint32: + case NScheme::NTypeIds::Uint64: + case NScheme::NTypeIds::DyNumber: + if (unit == NKikimrSchemeOp::TTTLSettings::UNIT_AUTO) { + errStr = "To enable TTL on integral type column 'ValueSinceUnixEpochModeSettings' should be specified"; + return false; + } + break; + default: + errStr = "Unsupported column type"; + return false; + } + return true; +} + +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/common/validation.h b/ydb/core/tx/schemeshard/common/validation.h new file mode 100644 index 000000000000..12d1e801e2ad --- /dev/null +++ b/ydb/core/tx/schemeshard/common/validation.h @@ -0,0 +1,13 @@ +#pragma once +#include +#include + +#include + +namespace NKikimr::NSchemeShard::NValidation { + +class TTTLValidator { +public: + static bool ValidateUnit(const NScheme::TTypeId columnType, NKikimrSchemeOp::TTTLSettings::EUnit unit, TString& errStr); +}; +} \ No newline at end of file diff --git a/ydb/core/tx/schemeshard/common/ya.make b/ydb/core/tx/schemeshard/common/ya.make new file mode 100644 index 000000000000..179f7adf4b9a --- /dev/null +++ b/ydb/core/tx/schemeshard/common/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + validation.cpp +) + +PEERDIR( + ydb/core/protos + ydb/public/lib/scheme_types +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/indexes/update.cpp b/ydb/core/tx/schemeshard/olap/indexes/update.cpp index ac2623f24443..727a21e7fae8 100644 --- a/ydb/core/tx/schemeshard/olap/indexes/update.cpp +++ b/ydb/core/tx/schemeshard/olap/indexes/update.cpp @@ -10,12 +10,13 @@ void TOlapIndexUpsert::SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& re IndexConstructor.SerializeToProto(requestedProto); } -void TOlapIndexUpsert::DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& indexSchema) { +bool TOlapIndexUpsert::DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& indexSchema) { Name = indexSchema.GetName(); if (!!indexSchema.GetStorageId()) { StorageId = indexSchema.GetStorageId(); } AFL_VERIFY(IndexConstructor.DeserializeFromProto(indexSchema))("incorrect_proto", indexSchema.DebugString()); + return true; } bool TOlapIndexesUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { @@ -28,7 +29,7 @@ bool TOlapIndexesUpdate::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& a TSet upsertIndexNames; for (auto& indexSchema : alterRequest.GetUpsertIndexes()) { TOlapIndexUpsert index; - index.DeserializeFromProto(indexSchema); + AFL_VERIFY(index.DeserializeFromProto(indexSchema)); if (!upsertIndexNames.emplace(index.GetName()).second) { errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "index '" << index.GetName() << "' duplication for add"); return false; diff --git a/ydb/core/tx/schemeshard/olap/indexes/update.h b/ydb/core/tx/schemeshard/olap/indexes/update.h index c9bf0d61628e..4350c175b306 100644 --- a/ydb/core/tx/schemeshard/olap/indexes/update.h +++ b/ydb/core/tx/schemeshard/olap/indexes/update.h @@ -21,7 +21,7 @@ namespace NKikimr::NSchemeShard { return IndexConstructor; } - void DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& requestedProto); + bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& requestedProto); void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& requestedProto) const; }; diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.cpp b/ydb/core/tx/schemeshard/olap/schema/schema.cpp index ff7ba1bbe4ca..5446b77c4fa8 100644 --- a/ydb/core/tx/schemeshard/olap/schema/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/schema/schema.cpp @@ -1,93 +1,202 @@ #include "schema.h" +#include +#include namespace NKikimr::NSchemeShard { - bool TOlapSchema::Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors) { - if (!Columns.ApplyUpdate(schemaUpdate.GetColumns(), errors, NextColumnId)) { - return false; - } +namespace { +static inline bool IsDropped(const TOlapColumnsDescription::TColumn& col) { + Y_UNUSED(col); + return false; +} + +static inline ui32 GetType(const TOlapColumnsDescription::TColumn& col) { + Y_ABORT_UNLESS(col.GetType().GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported"); + return col.GetType().GetTypeId(); +} + +} - if (!Indexes.ApplyUpdate(*this, schemaUpdate.GetIndexes(), errors, NextColumnId)) { +static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::TTtl& ttl, + const THashMap& sourceColumns, + const THashMap& alterColumns, + const THashMap& colName2Id, + IErrorCollector& errors) { + const TString colName = ttl.GetColumnName(); + + auto it = colName2Id.find(colName); + if (it == colName2Id.end()) { + errors.AddError(Sprintf("Cannot enable TTL on unknown column: '%s'", colName.data())); + return false; + } + + const TOlapColumnsDescription::TColumn* column = nullptr; + const ui32 colId = it->second; + if (alterColumns.contains(colId)) { + column = &alterColumns.at(colId); + } else if (sourceColumns.contains(colId)) { + column = &sourceColumns.at(colId); + } else { + Y_ABORT_UNLESS("Unknown column"); + } + + if (IsDropped(*column)) { + errors.AddError(Sprintf("Cannot enable TTL on dropped column: '%s'", colName.data())); + return false; + } + + if (ttl.HasExpireAfterBytes()) { + errors.AddError("TTL with eviction by size is not supported yet"); + return false; + } + + if (!ttl.HasExpireAfterSeconds()) { + errors.AddError("TTL without eviction time"); + return false; + } + + auto unit = ttl.GetColumnUnit(); + + switch (GetType(*column)) { + case NScheme::NTypeIds::DyNumber: + errors.AddError("Unsupported column type for TTL in column tables"); return false; - } + default: + break; + } - if (!HasEngine()) { - Engine = schemaUpdate.GetEngineDef(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); - } else { - if (schemaUpdate.HasEngine()) { - errors.AddError(NKikimrScheme::StatusSchemeError, "No engine updates supported"); + TString errStr; + if (!NValidation::TTTLValidator::ValidateUnit(GetType(*column), unit, errStr)) { + errors.AddError(errStr); + return false; + } + return true; +} + +bool TOlapSchema::ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, IErrorCollector& errors) const { + using TTtlProto = NKikimrSchemeOp::TColumnDataLifeCycle; + switch (ttl.GetStatusCase()) { + case TTtlProto::kEnabled: + { + const auto* column = Columns.GetByName(ttl.GetEnabled().GetColumnName()); + if (!column) { + errors.AddError("Incorrect ttl column - not found in scheme"); return false; } + if (!Statistics.GetByIdOptional(NOlap::NStatistics::EType::Max, {column->GetId()})) { + TOlapStatisticsModification modification; + NOlap::NStatistics::TConstructorContainer container(std::make_shared(column->GetName())); + modification.AddUpsert("__TTL_PROVIDER::" + TGUID::CreateTimebased().AsUuidString(), container); + if (!Statistics.ApplyUpdate(*this, modification, errors)) { + return false; + } + } + return ValidateColumnTableTtl(ttl.GetEnabled(), {}, Columns.GetColumns(), Columns.GetColumnsByName(), errors); } - - ++Version; - return true; + case TTtlProto::kDisabled: + default: + break; } - void TOlapSchema::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { - NextColumnId = tableSchema.GetNextColumnId(); - Version = tableSchema.GetVersion(); - Y_ABORT_UNLESS(tableSchema.HasEngine()); - Engine = tableSchema.GetEngine(); - CompositeMarksFlag = tableSchema.GetCompositeMarks(); + return true; +} - Columns.Parse(tableSchema); - Indexes.Parse(tableSchema); +bool TOlapSchema::Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors) { + if (!Columns.ApplyUpdate(schemaUpdate.GetColumns(), errors, NextColumnId)) { + return false; } - void TOlapSchema::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const { - tableSchema.SetNextColumnId(NextColumnId); - tableSchema.SetVersion(Version); - tableSchema.SetCompositeMarks(CompositeMarksFlag); - - Y_ABORT_UNLESS(HasEngine()); - tableSchema.SetEngine(GetEngineUnsafe()); + if (!Indexes.ApplyUpdate(*this, schemaUpdate.GetIndexes(), errors, NextColumnId)) { + return false; + } - Columns.Serialize(tableSchema); - Indexes.Serialize(tableSchema); + if (!Statistics.ApplyUpdate(*this, schemaUpdate.GetStatistics(), errors)) { + return false; } - bool TOlapSchema::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const { - if (!Columns.Validate(opSchema, errors)) { + if (!HasEngine()) { + Engine = schemaUpdate.GetEngineDef(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); + } else { + if (schemaUpdate.HasEngine()) { + errors.AddError(NKikimrScheme::StatusSchemeError, "No engine updates supported"); return false; } + } - if (!Indexes.Validate(opSchema, errors)) { - return false; - } + ++Version; + return true; +} - if (opSchema.GetEngine() != Engine) { - errors.AddError("Specified schema engine does not match schema preset"); - return false; - } - return true; +void TOlapSchema::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { + NextColumnId = tableSchema.GetNextColumnId(); + Version = tableSchema.GetVersion(); + Y_ABORT_UNLESS(tableSchema.HasEngine()); + Engine = tableSchema.GetEngine(); + CompositeMarksFlag = tableSchema.GetCompositeMarks(); + + Columns.Parse(tableSchema); + Indexes.Parse(tableSchema); + Statistics.Parse(tableSchema); +} + +void TOlapSchema::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const { + tableSchema.SetNextColumnId(NextColumnId); + tableSchema.SetVersion(Version); + tableSchema.SetCompositeMarks(CompositeMarksFlag); + + Y_ABORT_UNLESS(HasEngine()); + tableSchema.SetEngine(GetEngineUnsafe()); + + Columns.Serialize(tableSchema); + Indexes.Serialize(tableSchema); + Statistics.Serialize(tableSchema); +} + +bool TOlapSchema::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const { + if (!Columns.Validate(opSchema, errors)) { + return false; } - void TOlapStoreSchemaPreset::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) { - Y_ABORT_UNLESS(presetProto.HasId()); - Y_ABORT_UNLESS(presetProto.HasName()); - Y_ABORT_UNLESS(presetProto.HasSchema()); - Id = presetProto.GetId(); - Name = presetProto.GetName(); - TOlapSchema::ParseFromLocalDB(presetProto.GetSchema()); + if (!Indexes.Validate(opSchema, errors)) { + return false; } - void TOlapStoreSchemaPreset::Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const { - presetProto.SetId(Id); - presetProto.SetName(Name); - TOlapSchema::Serialize(*presetProto.MutableSchema()); + if (!Statistics.Validate(opSchema, errors)) { + return false; } - bool TOlapStoreSchemaPreset::ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors) { - if (presetProto.HasId()) { - errors.AddError("Schema preset id cannot be specified explicitly"); - return false; - } - if (!presetProto.GetName()) { - errors.AddError("Schema preset name cannot be empty"); - return false; - } - Name = presetProto.GetName(); - return true; + if (opSchema.GetEngine() != Engine) { + errors.AddError("Specified schema engine does not match schema preset"); + return false; } + return true; +} + +void TOlapStoreSchemaPreset::ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) { + Y_ABORT_UNLESS(presetProto.HasId()); + Y_ABORT_UNLESS(presetProto.HasName()); + Y_ABORT_UNLESS(presetProto.HasSchema()); + Id = presetProto.GetId(); + Name = presetProto.GetName(); + TOlapSchema::ParseFromLocalDB(presetProto.GetSchema()); +} + +void TOlapStoreSchemaPreset::Serialize(NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto) const { + presetProto.SetId(Id); + presetProto.SetName(Name); + TOlapSchema::Serialize(*presetProto.MutableSchema()); +} + +bool TOlapStoreSchemaPreset::ParseFromRequest(const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, IErrorCollector& errors) { + if (presetProto.HasId()) { + errors.AddError("Schema preset id cannot be specified explicitly"); + return false; + } + if (!presetProto.GetName()) { + errors.AddError("Schema preset name cannot be empty"); + return false; + } + Name = presetProto.GetName(); + return true; +} } diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.h b/ydb/core/tx/schemeshard/olap/schema/schema.h index ca9e8b14c64b..be85818febbe 100644 --- a/ydb/core/tx/schemeshard/olap/schema/schema.h +++ b/ydb/core/tx/schemeshard/olap/schema/schema.h @@ -1,8 +1,10 @@ #pragma once #include #include +#include #include #include +#include #include "update.h" namespace NKikimr::NSchemeShard { @@ -12,12 +14,17 @@ namespace NKikimr::NSchemeShard { YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine); YDB_READONLY_DEF(TOlapColumnsDescription, Columns); YDB_READONLY_DEF(TOlapIndexesDescription, Indexes); + mutable TOlapStatisticsDescription Statistics; YDB_READONLY(ui32, NextColumnId, 1); YDB_READONLY(ui32, Version, 0); YDB_READONLY_FLAG(CompositeMarks, true); public: + const TOlapStatisticsDescription& GetStatistics() const { + return Statistics; + } + bool Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors); void ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); diff --git a/ydb/core/tx/schemeshard/olap/schema/update.cpp b/ydb/core/tx/schemeshard/olap/schema/update.cpp index 92d7d9038fb8..05c2cd11eb5e 100644 --- a/ydb/core/tx/schemeshard/olap/schema/update.cpp +++ b/ydb/core/tx/schemeshard/olap/schema/update.cpp @@ -23,6 +23,10 @@ namespace NKikimr::NSchemeShard { return false; } + if (!Statistics.Parse(alterRequest, errors)) { + return false; + } + return true; } } diff --git a/ydb/core/tx/schemeshard/olap/schema/update.h b/ydb/core/tx/schemeshard/olap/schema/update.h index bb4173433f30..32e8b79e243a 100644 --- a/ydb/core/tx/schemeshard/olap/schema/update.h +++ b/ydb/core/tx/schemeshard/olap/schema/update.h @@ -1,10 +1,13 @@ #pragma once +#include +#include namespace NKikimr::NSchemeShard { class TOlapSchemaUpdate { YDB_READONLY_DEF(TOlapColumnsUpdate, Columns); YDB_READONLY_DEF(TOlapIndexesUpdate, Indexes); + YDB_READONLY_DEF(TOlapStatisticsModification, Statistics); YDB_READONLY_OPT(NKikimrSchemeOp::EColumnTableEngine, Engine); public: bool Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema, IErrorCollector& errors, bool allowNullKeys = false); diff --git a/ydb/core/tx/schemeshard/olap/schema/ya.make b/ydb/core/tx/schemeshard/olap/schema/ya.make index 9c21a43bbbb7..f4cf1eb53c78 100644 --- a/ydb/core/tx/schemeshard/olap/schema/ya.make +++ b/ydb/core/tx/schemeshard/olap/schema/ya.make @@ -8,6 +8,7 @@ SRCS( PEERDIR( ydb/core/tx/schemeshard/olap/columns ydb/core/tx/schemeshard/olap/indexes + ydb/core/tx/schemeshard/common ) END() diff --git a/ydb/core/tx/schemeshard/olap/statistics/schema.cpp b/ydb/core/tx/schemeshard/olap/statistics/schema.cpp new file mode 100644 index 000000000000..962242b97150 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/statistics/schema.cpp @@ -0,0 +1,94 @@ +#include "schema.h" +#include + +namespace NKikimr::NSchemeShard { + +void TOlapStatisticsSchema::SerializeToProto(NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) const { + proto.SetName(Name); + Operator.SerializeToProto(proto); +} + +bool TOlapStatisticsSchema::DeserializeFromProto(const NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) { + Name = proto.GetName(); + AFL_VERIFY(Operator.DeserializeFromProto(proto))("incorrect_proto", proto.DebugString()); + return true; +} + +bool TOlapStatisticsSchema::ApplyUpdate(const TOlapSchema& /*currentSchema*/, const TOlapStatisticsUpsert& upsert, IErrorCollector& errors) { + AFL_VERIFY(upsert.GetName() == GetName()); + AFL_VERIFY(!!upsert.GetConstructor()); + if (upsert.GetConstructor().GetClassName() != Operator.GetClassName()) { + errors.AddError("different index classes: " + upsert.GetConstructor().GetClassName() + " vs " + Operator.GetClassName()); + return false; + } + errors.AddError("cannot modify statistics calculation for " + GetName() + ". not implemented currently."); + return false; +} + +bool TOlapStatisticsDescription::ApplyUpdate(const TOlapSchema& currentSchema, const TOlapStatisticsModification& schemaUpdate, IErrorCollector& errors) { + for (auto&& stat : schemaUpdate.GetUpsert()) { + auto* current = MutableByNameOptional(stat.GetName()); + if (current) { + if (!current->ApplyUpdate(currentSchema, stat, errors)) { + return false; + } + } else { + auto meta = stat.GetConstructor()->CreateOperator(currentSchema); + if (!meta) { + errors.AddError(meta.GetErrorMessage()); + return false; + } + TOlapStatisticsSchema object(stat.GetName(), meta.DetachResult()); + Y_ABORT_UNLESS(ObjectsByName.emplace(stat.GetName(), std::move(object)).second); + } + } + + for (const auto& name : schemaUpdate.GetDrop()) { + auto info = GetByNameOptional(name); + if (!info) { + errors.AddError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Unknown stat for drop: " << name); + return false; + } + AFL_VERIFY(ObjectsByName.erase(name)); + } + + return true; +} + +void TOlapStatisticsDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) { + for (const auto& proto : tableSchema.GetStatistics()) { + TOlapStatisticsSchema object; + AFL_VERIFY(object.DeserializeFromProto(proto)); + AFL_VERIFY(ObjectsByName.emplace(proto.GetName(), std::move(object)).second); + } +} + +void TOlapStatisticsDescription::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const { + for (const auto& object : ObjectsByName) { + object.second.SerializeToProto(*tableSchema.AddStatistics()); + } +} + +bool TOlapStatisticsDescription::Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const { + THashSet usedObjects; + for (const auto& proto : opSchema.GetStatistics()) { + if (proto.GetName().empty()) { + errors.AddError("Statistic cannot have an empty name"); + return false; + } + + const TString& name = proto.GetName(); + if (!GetByNameOptional(name)) { + errors.AddError("Stat '" + name + "' does not match schema preset"); + return false; + } + + if (!usedObjects.emplace(proto.GetName()).second) { + errors.AddError("Column '" + name + "' is specified multiple times"); + return false; + } + } + return true; +} + +} diff --git a/ydb/core/tx/schemeshard/olap/statistics/schema.h b/ydb/core/tx/schemeshard/olap/statistics/schema.h new file mode 100644 index 000000000000..7fa62e6b5994 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/statistics/schema.h @@ -0,0 +1,82 @@ +#pragma once +#include "update.h" + +namespace NKikimr::NSchemeShard { + +class TOlapSchema; + +class TOlapStatisticsSchema { +private: + YDB_READONLY_DEF(TString, Name); + YDB_READONLY_DEF(NOlap::NStatistics::TOperatorContainer, Operator); +public: + TOlapStatisticsSchema() = default; + + TOlapStatisticsSchema(const TString& name, const NOlap::NStatistics::TOperatorContainer& container) + : Name(name) + , Operator(container) + { + + } + + bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapStatisticsUpsert& upsert, IErrorCollector& errors); + + void SerializeToProto(NKikimrColumnShardStatisticsProto::TOperatorContainer& proto) const; + bool DeserializeFromProto(const NKikimrColumnShardStatisticsProto::TOperatorContainer& proto); +}; + +class TOlapStatisticsDescription { +public: + using TObjectsByName = THashMap; + +private: + YDB_READONLY_DEF(TObjectsByName, ObjectsByName); +public: + const TOlapStatisticsSchema* GetByIdOptional(const NOlap::NStatistics::EType type, const std::vector& entityIds) const noexcept { + for (auto&& i : ObjectsByName) { + if (!i.second.GetOperator()) { + continue; + } + if (i.second.GetOperator()->GetIdentifier() != NOlap::NStatistics::TIdentifier(type, entityIds)) { + continue; + } + return &i.second; + } + return nullptr; + } + + const TOlapStatisticsSchema* GetByNameOptional(const TString& name) const noexcept { + auto it = ObjectsByName.find(name); + if (it != ObjectsByName.end()) { + return &it->second; + } + return nullptr; + } + + const TOlapStatisticsSchema& GetByNameVerified(const TString& name) const noexcept { + auto object = GetByNameOptional(name); + AFL_VERIFY(object); + return *object; + } + + TOlapStatisticsSchema* MutableByNameOptional(const TString& name) noexcept { + auto it = ObjectsByName.find(name); + if (it != ObjectsByName.end()) { + return &it->second; + } + return nullptr; + } + + TOlapStatisticsSchema& MutableByNameVerified(const TString& name) noexcept { + auto* object = MutableByNameOptional(name); + AFL_VERIFY(object); + return *object; + } + + bool ApplyUpdate(const TOlapSchema& currentSchema, const TOlapStatisticsModification& schemaUpdate, IErrorCollector& errors); + + void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); + void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const; + bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const; +}; +} diff --git a/ydb/core/tx/schemeshard/olap/statistics/update.cpp b/ydb/core/tx/schemeshard/olap/statistics/update.cpp new file mode 100644 index 000000000000..1c82c07c300c --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/statistics/update.cpp @@ -0,0 +1,35 @@ +#include "update.h" + +namespace NKikimr::NSchemeShard { + +void TOlapStatisticsUpsert::SerializeToProto(NKikimrColumnShardStatisticsProto::TConstructorContainer& requestedProto) const { + requestedProto.SetName(Name); + Constructor.SerializeToProto(requestedProto); +} + +bool TOlapStatisticsUpsert::DeserializeFromProto(const NKikimrColumnShardStatisticsProto::TConstructorContainer& proto) { + Name = proto.GetName(); + AFL_VERIFY(Constructor.DeserializeFromProto(proto))("incorrect_proto", proto.DebugString()); + return true; +} + +bool TOlapStatisticsModification::Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) { + for (const auto& name : alterRequest.GetDropStatistics()) { + if (!Drop.emplace(name).second) { + errors.AddError(NKikimrScheme::StatusInvalidParameter, "Duplicated statistics for drop"); + return false; + } + } + TSet upsertNames; + for (auto& schema : alterRequest.GetUpsertStatistics()) { + TOlapStatisticsUpsert stat; + AFL_VERIFY(stat.DeserializeFromProto(schema)); + if (!upsertNames.emplace(stat.GetName()).second) { + errors.AddError(NKikimrScheme::StatusAlreadyExists, TStringBuilder() << "stat '" << stat.GetName() << "' duplication for add"); + return false; + } + Upsert.emplace_back(std::move(stat)); + } + return true; +} +} diff --git a/ydb/core/tx/schemeshard/olap/statistics/update.h b/ydb/core/tx/schemeshard/olap/statistics/update.h new file mode 100644 index 000000000000..96558928acf3 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/statistics/update.h @@ -0,0 +1,43 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard { + + class TOlapStatisticsUpsert { + private: + YDB_READONLY_DEF(TString, Name); + protected: + NOlap::NStatistics::TConstructorContainer Constructor; + public: + TOlapStatisticsUpsert() = default; + TOlapStatisticsUpsert(const TString& name, const NOlap::NStatistics::TConstructorContainer& constructor) + : Name(name) + , Constructor(constructor) + { + + } + + const NOlap::NStatistics::TConstructorContainer& GetConstructor() const { + return Constructor; + } + + bool DeserializeFromProto(const NKikimrColumnShardStatisticsProto::TConstructorContainer& requestedProto); + void SerializeToProto(NKikimrColumnShardStatisticsProto::TConstructorContainer& requestedProto) const; + }; + + class TOlapStatisticsModification { + private: + YDB_READONLY_DEF(TVector, Upsert); + YDB_READONLY_DEF(TSet, Drop); + public: + void AddUpsert(const TString& name, const NOlap::NStatistics::TConstructorContainer container) { + Upsert.emplace_back(TOlapStatisticsUpsert(name, container)); + } + + bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors); + }; +} diff --git a/ydb/core/tx/schemeshard/olap/statistics/ya.make b/ydb/core/tx/schemeshard/olap/statistics/ya.make new file mode 100644 index 000000000000..0303a9692f52 --- /dev/null +++ b/ydb/core/tx/schemeshard/olap/statistics/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + schema.cpp + update.cpp +) + +PEERDIR( + ydb/services/bg_tasks/abstract +) + +END() diff --git a/ydb/core/tx/schemeshard/olap/ya.make b/ydb/core/tx/schemeshard/olap/ya.make index 63e509c2630f..7043bc6ec61a 100644 --- a/ydb/core/tx/schemeshard/olap/ya.make +++ b/ydb/core/tx/schemeshard/olap/ya.make @@ -6,6 +6,7 @@ PEERDIR( ydb/core/tx/schemeshard/olap/schema ydb/core/tx/schemeshard/olap/common ydb/core/tx/schemeshard/olap/operations + ydb/core/tx/schemeshard/olap/statistics ) END() diff --git a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp index 40a792d0f484..cc447f07015b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp @@ -1,60 +1,23 @@ #include "schemeshard_info_types.h" + +#include "common/validation.h" #include "olap/columns/schema.h" + #include namespace NKikimr { namespace NSchemeShard { -// Helper accessors for OLTP and OLAP tables that use different TColumn's namespace { - inline - bool IsDropped(const TOlapColumnsDescription::TColumn& col) { - Y_UNUSED(col); - return false; - } - - inline - ui32 GetType(const TOlapColumnsDescription::TColumn& col) { - Y_ABORT_UNLESS(col.GetType().GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported"); - return col.GetType().GetTypeId(); - } - - inline - bool IsDropped(const TTableInfo::TColumn& col) { - return col.IsDropped(); - } +static inline bool IsDropped(const TTableInfo::TColumn& col) { + return col.IsDropped(); +} - inline - ui32 GetType(const TTableInfo::TColumn& col) { - Y_ABORT_UNLESS(col.PType.GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported"); - return col.PType.GetTypeId(); - } +static inline ui32 GetType(const TTableInfo::TColumn& col) { + Y_ABORT_UNLESS(col.PType.GetTypeId() != NScheme::NTypeIds::Pg, "pg types are not supported"); + return col.PType.GetTypeId(); } -template -bool ValidateUnit(const TColumn& column, NKikimrSchemeOp::TTTLSettings::EUnit unit, TString& errStr) { - switch (GetType(column)) { - case NScheme::NTypeIds::Date: - case NScheme::NTypeIds::Datetime: - case NScheme::NTypeIds::Timestamp: - if (unit != NKikimrSchemeOp::TTTLSettings::UNIT_AUTO) { - errStr = "To enable TTL on date type column 'DateTypeColumnModeSettings' should be specified"; - return false; - } - break; - case NScheme::NTypeIds::Uint32: - case NScheme::NTypeIds::Uint64: - case NScheme::NTypeIds::DyNumber: - if (unit == NKikimrSchemeOp::TTTLSettings::UNIT_AUTO) { - errStr = "To enable TTL on integral type column 'ValueSinceUnixEpochModeSettings' should be specified"; - return false; - } - break; - default: - errStr = "Unsupported column type"; - return false; - } - return true; } bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl, @@ -92,7 +55,7 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl, } const auto unit = enabled.GetColumnUnit(); - if (!ValidateUnit(*column, unit, errStr)) { + if (!NValidation::TTTLValidator::ValidateUnit(GetType(*column), unit, errStr)) { return false; } @@ -117,75 +80,4 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl, return true; } -static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::TTtl& ttl, - const THashMap& sourceColumns, - const THashMap& alterColumns, - const THashMap& colName2Id, - IErrorCollector& errors) -{ - const TString colName = ttl.GetColumnName(); - - auto it = colName2Id.find(colName); - if (it == colName2Id.end()) { - errors.AddError(Sprintf("Cannot enable TTL on unknown column: '%s'", colName.data())); - return false; - } - - const TOlapColumnsDescription::TColumn* column = nullptr; - const ui32 colId = it->second; - if (alterColumns.contains(colId)) { - column = &alterColumns.at(colId); - } else if (sourceColumns.contains(colId)) { - column = &sourceColumns.at(colId); - } else { - Y_ABORT_UNLESS("Unknown column"); - } - - if (IsDropped(*column)) { - errors.AddError(Sprintf("Cannot enable TTL on dropped column: '%s'", colName.data())); - return false; - } - - if (ttl.HasExpireAfterBytes()) { - errors.AddError("TTL with eviction by size is not supported yet"); - return false; - } - - if (!ttl.HasExpireAfterSeconds()) { - errors.AddError("TTL without eviction time"); - return false; - } - - auto unit = ttl.GetColumnUnit(); - - switch (GetType(*column)) { - case NScheme::NTypeIds::DyNumber: - errors.AddError("Unsupported column type for TTL in column tables"); - return false; - default: - break; - } - - TString errStr; - if (!ValidateUnit(*column, unit, errStr)) { - errors.AddError(errStr); - return false; - } - return true; -} - -bool TOlapSchema::ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, IErrorCollector& errors) const { - using TTtlProto = NKikimrSchemeOp::TColumnDataLifeCycle; - - switch (ttl.GetStatusCase()) { - case TTtlProto::kEnabled: - return ValidateColumnTableTtl(ttl.GetEnabled(), {}, Columns.GetColumns(), Columns.GetColumnsByName(), errors); - case TTtlProto::kDisabled: - default: - break; - } - - return true; -} - }} diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 8cbf4b637d3c..7dac75c00b8c 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -258,6 +258,7 @@ PEERDIR( ydb/core/tablet_flat ydb/core/tx ydb/core/tx/datashard + ydb/core/tx/schemeshard/common ydb/core/tx/schemeshard/olap ydb/core/tx/scheme_board ydb/core/tx/tx_allocator_client