From e2bee3152a650c2f67b3a7a098b3d32e15e2c43f Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Fri, 6 Dec 2024 09:02:54 +0000 Subject: [PATCH 1/4] register memory allocated for metadata in tiering actualizer --- ydb/core/tx/columnshard/columnshard_impl.cpp | 48 +++++++++---- .../columnshard/columnshard_private_events.h | 13 +++- .../tx/columnshard/data_accessor/request.h | 7 +- .../tx/columnshard/engines/column_engine.h | 6 +- .../storage/actualizer/index/index.cpp | 7 +- .../storage/actualizer/tiering/tiering.cpp | 50 +++++++++++--- .../storage/actualizer/tiering/tiering.h | 13 +--- .../columnshard/engines/ut/ut_logs_engine.cpp | 2 +- .../tx/columnshard/hooks/abstract/abstract.h | 10 +++ .../tx/columnshard/hooks/testing/controller.h | 3 + .../test_helper/columnshard_ut_common.cpp | 2 +- .../test_helper/columnshard_ut_common.h | 17 ++--- .../tx/columnshard/test_helper/controllers.h | 4 ++ .../ut_schema/ut_columnshard_schema.cpp | 67 ++++++++++--------- ydb/library/accessor/validator.h | 6 +- 15 files changed, 167 insertions(+), 88 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 5e1244cf1bf1..d2e13ddbc1a1 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -616,7 +616,26 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask { } }; -class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber { +class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber { +protected: + std::shared_ptr ResourcesGuard; + + virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr&& guard) = 0; + + virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final { + AFL_VERIFY(ResourcesGuard); + DoOnRequestsFinished(std::move(result), std::move(ResourcesGuard)); + } + +public: + void SetResourcesGuard(const std::shared_ptr& guard) { + AFL_VERIFY(!ResourcesGuard); + AFL_VERIFY(guard); + ResourcesGuard = guard; + } +}; + +class TDataAccessorsSubscriber: public TDataAccessorsSubscriberBase { protected: const NActors::TActorId ShardActorId; std::shared_ptr Changes; @@ -625,8 +644,9 @@ class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber { virtual void DoOnRequestsFinishedImpl() = 0; - virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final { + virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr&& guard) override final { Changes->SetFetchedDataAccessors(std::move(result), NOlap::TDataAccessorsInitializationContext(VersionedIndex)); + Changes->ResourcesGuard = std::move(guard); DoOnRequestsFinishedImpl(); } @@ -822,7 +842,7 @@ class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITa private: using TBase = NOlap::NResourceBroker::NSubscribe::ITask; std::shared_ptr Request; - std::shared_ptr Subscriber; + std::shared_ptr Subscriber; std::shared_ptr DataAccessorsManager; virtual void DoOnAllocationSuccess(const std::shared_ptr& guard) override { @@ -833,7 +853,7 @@ class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITa public: TAccessorsMemorySubscriber(const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context, - std::shared_ptr&& request, const std::shared_ptr& subscriber, + std::shared_ptr&& request, const std::shared_ptr& subscriber, const std::shared_ptr& dataAccessorsManager) : TBase(0, memory, externalTaskId, context) , Request(std::move(request)) @@ -852,7 +872,6 @@ class TCompactionDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRea AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "compaction")("external_task_id", externalTaskId); auto ev = std::make_unique(VersionedIndex, Changes, CacheDataAfterWrite); - ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard(); TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor( std::make_shared(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification))); } @@ -895,7 +914,6 @@ class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscribe virtual void DoOnRequestsFinishedImpl() override { ACFL_DEBUG("background", "ttl")("need_writes", true); auto ev = std::make_unique(VersionedIndex, Changes, false); - ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard(); TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor( std::make_shared(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification))); } @@ -920,14 +938,14 @@ class TNoWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscri using TBase::TBase; }; -class TCSMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber, public TObjectCounter { +class TCSMetadataSubscriber: public TDataAccessorsSubscriberBase, public TObjectCounter { private: NActors::TActorId TabletActorId; const std::shared_ptr Processor; const ui64 Generation; - virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override { + virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr&& guard) override { NActors::TActivationContext::Send( - TabletActorId, std::make_unique(Processor, Generation, std::move(result))); + TabletActorId, std::make_unique(Processor, Generation, std::move(result), std::move(guard))); } public: @@ -947,8 +965,12 @@ void TColumnShard::SetupMetadata() { } std::vector requests = TablesManager.MutablePrimaryIndex().CollectMetadataRequests(); for (auto&& i : requests) { - i.GetRequest()->RegisterSubscriber(std::make_shared(SelfId(), i.GetProcessor(), Generation())); - DataAccessorsManager->AskData(i.GetRequest()); + const ui64 accessorsMemory = + i.GetRequest()->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()); + NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor, + std::make_shared(accessorsMemory, i.GetRequest()->GetTaskId(), TTLTaskSubscription, + std::shared_ptr(i.GetRequest()), + std::make_shared(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified())); } } @@ -1004,7 +1026,6 @@ class TCleanupPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriber { virtual void DoOnRequestsFinishedImpl() override { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("background", "cleanup")("changes_info", Changes->DebugString()); auto ev = std::make_unique(VersionedIndex, Changes, false); - ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard(); ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write NActors::TActivationContext::Send(ShardActorId, std::move(ev)); } @@ -1093,7 +1114,8 @@ void TColumnShard::Handle(TEvPrivate::TEvStartCompaction::TPtr& ev, const TActor void TColumnShard::Handle(TEvPrivate::TEvMetadataAccessorsInfo::TPtr& ev, const TActorContext& /*ctx*/) { AFL_VERIFY(ev->Get()->GetGeneration() == Generation())("ev", ev->Get()->GetGeneration())("tablet", Generation()); - ev->Get()->GetProcessor()->ApplyResult(ev->Get()->ExtractResult(), TablesManager.MutablePrimaryIndexAsVerified()); + ev->Get()->GetProcessor()->ApplyResult( + ev->Get()->ExtractResult(), TablesManager.MutablePrimaryIndexAsVerified(), ev->Get()->ExtractResourcesGuard()); SetupMetadata(); } diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 023ba621b658..20c0bb02d62a 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -76,6 +76,7 @@ struct TEvPrivate { const std::shared_ptr Processor; const ui64 Generation; NOlap::TDataAccessorsResult Result; + std::shared_ptr ResourcesGuard; public: const std::shared_ptr& GetProcessor() const { @@ -87,12 +88,18 @@ struct TEvPrivate { NOlap::TDataAccessorsResult ExtractResult() { return std::move(Result); } + std::shared_ptr&& ExtractResourcesGuard() { + AFL_VERIFY(ResourcesGuard); + return std::move(ResourcesGuard); + } - TEvMetadataAccessorsInfo( - const std::shared_ptr& processor, const ui64 gen, NOlap::TDataAccessorsResult&& result) + TEvMetadataAccessorsInfo(const std::shared_ptr& processor, const ui64 gen, + NOlap::TDataAccessorsResult&& result, std::shared_ptr&& guard) : Processor(processor) , Generation(gen) - , Result(std::move(result)) { + , Result(std::move(result)) + , ResourcesGuard(std::move(guard)) { + AFL_VERIFY(ResourcesGuard); } }; diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h index 050e0524040c..13f343c1cd39 100644 --- a/ydb/core/tx/columnshard/data_accessor/request.h +++ b/ydb/core/tx/columnshard/data_accessor/request.h @@ -2,12 +2,13 @@ #include #include #include +#include namespace NKikimr::NOlap { class TDataAccessorsRequest; -class TDataAccessorsResult { +class TDataAccessorsResult : private NNonCopyable::TMoveOnly { private: THashMap ErrorsByPathId; THashMap> AccessorsByPathId; @@ -315,6 +316,10 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter&& resourcesGuard) = 0; public: virtual ~IMetadataAccessorResultProcessor() = default; - void ApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& engine) { - return DoApplyResult(std::move(result), engine); + void ApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& engine, std::shared_ptr&& resourcesGuard) { + return DoApplyResult(std::move(result), engine, std::move(resourcesGuard)); } IMetadataAccessorResultProcessor() = default; diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp index cc726700f930..18f17aa16140 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp @@ -56,11 +56,8 @@ std::vector TGranuleActualizationIndex::CollectMetadataReque if (!TieringActualizer) { return {}; } - auto req = TieringActualizer->BuildMetadataRequest(PathId, portions, TieringActualizer); - if (!req) { - return {}; - } - return { *req }; + auto reqs = TieringActualizer->BuildMetadataRequests(PathId, portions, TieringActualizer); + return reqs; } } diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp index b63cd5cf4c0b..19a95ee262fc 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp @@ -78,6 +78,17 @@ std::optional TTieringActualizer::Bu return {}; } +void TTieringActualizer::AddPortionImpl(const TPortionInfo& portion, const TInstant now) { + auto info = BuildActualizationInfo(portion, now); + if (!info) { + return; + } + AFL_VERIFY(PortionIdByWaitDuration[info->GetAddress()].AddPortion(*info, portion.GetPortionId(), now)); + auto address = info->GetAddress(); + TFindActualizationInfo findId(std::move(address), info->GetWaitInstant(now)); + AFL_VERIFY(PortionsInfo.emplace(portion.GetPortionId(), std::move(findId)).second); +} + void TTieringActualizer::DoAddPortion(const TPortionInfo& portion, const TAddExternalContext& addContext) { AFL_VERIFY(PathId == portion.GetPathId()); if (!addContext.GetPortionExclusiveGuarantee()) { @@ -107,6 +118,9 @@ void TTieringActualizer::ActualizePortionInfo(const TPortionDataAccessor& access if (!NewPortionIds.erase(accessor.GetPortionInfo().GetPortionId())) { return; } + if (NewPortionIds.empty()) { + NYDBTest::TControllers::GetColumnShardController()->OnTieringMetadataActualized(); + } auto& portion = accessor.GetPortionInfo(); if (Tiering) { std::shared_ptr portionSchema = portion.GetSchema(VersionedIndex); @@ -201,15 +215,18 @@ void TTieringActualizer::DoExtractTasks( void TTieringActualizer::Refresh(const std::optional& info, const TAddExternalContext& externalContext) { Tiering = info; + std::optional newTieringColumnId; if (Tiering) { - TieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetEvictColumnName()); - } else { - TieringColumnId = {}; + newTieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetEvictColumnName()); } TargetCriticalSchema = VersionedIndex.GetLastCriticalSchema(); PortionsInfo.clear(); NewPortionIds.clear(); PortionIdByWaitDuration.clear(); + if (newTieringColumnId != TieringColumnId) { + MaxByPortionId.clear(); + } + TieringColumnId = newTieringColumnId; for (auto&& i : externalContext.GetPortions()) { AddPortion(i.second, externalContext); @@ -220,7 +237,8 @@ namespace { class TActualizationReply: public IMetadataAccessorResultProcessor { private: std::weak_ptr TieringActualizer; - virtual void DoApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& /*engine*/) override { + virtual void DoApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& /*engine*/, + std::shared_ptr&& /*resourcesGuard*/) override { auto locked = TieringActualizer.lock(); if (!locked) { return; @@ -240,18 +258,32 @@ class TActualizationReply: public IMetadataAccessorResultProcessor { } // namespace -std::optional TTieringActualizer::BuildMetadataRequest( +std::vector TTieringActualizer::BuildMetadataRequests( const ui64 /*pathId*/, const THashMap& portions, const std::shared_ptr& index) { if (NewPortionIds.empty()) { - return std::nullopt; + NYDBTest::TControllers::GetColumnShardController()->OnTieringMetadataActualized(); + return {}; } - std::shared_ptr result = std::make_shared(); + + const ui64 batchMemorySoftLimit = !NYDBTest::TControllers::GetColumnShardController()->GetMetadataRequestSoftMemoryLimit(); + std::vector requests; + std::shared_ptr currentRequest; for (auto&& i : NewPortionIds) { + if (!currentRequest) { + currentRequest = std::make_shared(); + } auto it = portions.find(i); AFL_VERIFY(it != portions.end()); - result->AddPortion(it->second); + currentRequest->AddPortion(it->second); + if (currentRequest->PredictAccessorsMemory(it->second->GetSchema(VersionedIndex)) >= batchMemorySoftLimit) { + requests.emplace_back(currentRequest, std::make_shared(index)); + currentRequest.reset(); + } + } + if (currentRequest) { + requests.emplace_back(std::move(currentRequest), std::make_shared(index)); } - return TCSMetadataRequest(result, std::make_shared(index)); + return requests; } } // namespace NKikimr::NOlap::NActualizer diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h index a421f3148f22..3f3e6aca9d60 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h @@ -126,23 +126,14 @@ class TTieringActualizer: public IActualizer { std::optional BuildActualizationInfo(const TPortionInfo& portion, const TInstant now) const; - void AddPortionImpl(const TPortionInfo& portion, const TInstant now) { - auto info = BuildActualizationInfo(portion, now); - if (!info) { - return; - } - AFL_VERIFY(PortionIdByWaitDuration[info->GetAddress()].AddPortion(*info, portion.GetPortionId(), now)); - auto address = info->GetAddress(); - TFindActualizationInfo findId(std::move(address), info->GetWaitInstant(now)); - AFL_VERIFY(PortionsInfo.emplace(portion.GetPortionId(), std::move(findId)).second); - } + void AddPortionImpl(const TPortionInfo& portion, const TInstant now); virtual void DoAddPortion(const TPortionInfo& portion, const TAddExternalContext& addContext) override; virtual void DoRemovePortion(const ui64 portionId) override; virtual void DoExtractTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) override; public: void ActualizePortionInfo(const TPortionDataAccessor& accessor, const TActualizationContext& context); - std::optional BuildMetadataRequest( + std::vector BuildMetadataRequests( const ui64 pathId, const THashMap& portions, const std::shared_ptr& index); void Refresh(const std::optional& info, const TAddExternalContext& externalContext); diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index e862de1929c7..d99b5dfd0638 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -443,7 +443,7 @@ class TTestMetadataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubsc TColumnEngineForLogs& Engine; virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { - Processor->ApplyResult(std::move(result), Engine); + Processor->ApplyResult(std::move(result), Engine, nullptr); } public: diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index a747c5a5837a..347862587195 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -24,6 +24,7 @@ namespace NKikimr::NOlap { class TColumnEngineChanges; class IBlobsGCAction; class TPortionInfo; +class TDataAccessorsResult; namespace NIndexes { class TIndexMetaContainer; } @@ -106,6 +107,9 @@ class ICSController { virtual ui64 DoGetRejectMemoryIntervalLimit(const ui64 defaultValue) const { return defaultValue; } + virtual ui64 DoGetMetadataRequestSoftMemoryLimit(const ui64 defaultValue) const { + return defaultValue; + } virtual ui64 DoGetReadSequentiallyBufferSize(const ui64 defaultValue) const { return defaultValue; } @@ -208,6 +212,10 @@ class ICSController { const ui64 defaultValue = NOlap::TGlobalLimits::DefaultRejectMemoryIntervalLimit; return DoGetRejectMemoryIntervalLimit(defaultValue); } + ui64 GetMetadataRequestSoftMemoryLimit() const { + const ui64 defaultValue = 100 * (1 << 20); + return DoGetMetadataRequestSoftMemoryLimit(defaultValue); + } virtual bool NeedForceCompactionBacketsConstruction() const { return false; } @@ -234,6 +242,8 @@ class ICSController { } virtual void OnPortionActualization(const NOlap::TPortionInfo& /*info*/) { } + virtual void OnTieringMetadataActualized() { + } virtual void OnMaxValueUsage() { } diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index af375a612caf..9076e8183cd1 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -192,6 +192,9 @@ class TController: public TReadOnlyController { virtual ui64 DoGetRejectMemoryIntervalLimit(const ui64 def) const override { return OverrideRejectMemoryIntervalLimit.value_or(def); } + virtual ui64 DoGetMetadataRequestSoftMemoryLimit(const ui64 def) const override { + return 0; + } virtual EOptimizerCompactionWeightControl GetCompactionControl() const override { return CompactionControl; } diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp index e8d016f45d17..a55f13241b65 100644 --- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp @@ -399,7 +399,7 @@ void TTestSchema::InitSchema(const std::vector& colu for (ui32 i = 0; i < columns.size(); ++i) { *schema->MutableColumns()->Add() = columns[i].CreateColumn(i + 1); - if (!specials.NeedTestStatistics()) { + if (!specials.NeedTestStatistics(pk)) { continue; } if (NOlap::NIndexes::NMax::TIndexMeta::IsAvailableType(columns[i].GetType())) { diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h index 20418f574cf9..2b7501d54c9c 100644 --- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h @@ -118,20 +118,14 @@ struct TTestSchema { }; struct TTableSpecials : public TStorageTier { - private: - bool NeedTestStatisticsFlag = true; public: std::vector Tiers; bool WaitEmptyAfter = false; TTableSpecials() noexcept = default; - bool NeedTestStatistics() const { - return NeedTestStatisticsFlag; - } - - void SetNeedTestStatistics(const bool value) { - NeedTestStatisticsFlag = value; + bool NeedTestStatistics(const std::vector& pk) const { + return GetTtlColumn() != pk.front().GetName(); } bool HasTiers() const { @@ -161,6 +155,13 @@ struct TTestSchema { result << ";TTL=" << TStorageTier::DebugString(); return result; } + + TString GetTtlColumn() const { + for (const auto& tier : Tiers) { + UNIT_ASSERT_VALUES_EQUAL(tier.TtlColumn, TtlColumn); + } + return TtlColumn; + } }; using TTestColumn = NArrow::NTest::TTestColumn; static auto YdbSchema(const TTestColumn& firstKeyItem = TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp))) { diff --git a/ydb/core/tx/columnshard/test_helper/controllers.h b/ydb/core/tx/columnshard/test_helper/controllers.h index 8886700d452f..f4516d59478a 100644 --- a/ydb/core/tx/columnshard/test_helper/controllers.h +++ b/ydb/core/tx/columnshard/test_helper/controllers.h @@ -10,6 +10,7 @@ class TWaitCompactionController: public NYDBTest::NColumnShard::TController { TAtomicCounter ExportsFinishedCount = 0; NMetadata::NFetcher::ISnapshot::TPtr CurrentConfig; ui32 TiersModificationsCount = 0; + YDB_READONLY(TAtomicCounter, TieringMetadataActualizationCount, 0); YDB_READONLY(TAtomicCounter, StatisticsUsageCount, 0); YDB_READONLY(TAtomicCounter, MaxValueUsageCount, 0); YDB_ACCESSOR_DEF(std::optional, SmallSizeDetector); @@ -53,6 +54,9 @@ class TWaitCompactionController: public NYDBTest::NColumnShard::TController { return ExportsFinishedCount.Val(); } + virtual void OnTieringMetadataActualized() override { + TieringMetadataActualizationCount.Inc(); + } virtual void OnStatisticsUsage(const NKikimr::NOlap::NIndexes::TIndexMetaContainer& /*statOperator*/) override { StatisticsUsageCount.Inc(); } diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index 1220870814bf..24aa3d39ad1a 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -100,11 +100,20 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS); } -bool TriggerMetadata(TTestBasicRuntime& runtime, TActorId& sender) { +bool TriggerMetadata( + TTestBasicRuntime& runtime, TActorId& sender, NYDBTest::TControllers::TGuard& controller) { + auto isDone = [initialCounter = controller->GetTieringMetadataActualizationCount().Val(), &controller]() { + return controller->GetTieringMetadataActualizationCount().Val() != initialCounter; + }; + auto event = std::make_unique(); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release()); - runtime.GrabEdgeEvent(sender, TDuration::Seconds(5)); - return true; + + const TInstant deadline = TInstant::Now() + TDuration::Seconds(5); + while (!isDone() && TInstant::Now() < deadline) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + return isDone(); } bool CheckSame(const std::shared_ptr& batch, const ui32 expectedSize, @@ -339,7 +348,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT(CheckSame(rb, PORTION_ROWS, spec.TtlColumn, ts[0])); } - if (spec.NeedTestStatistics() && spec.TtlColumn != "timestamp") { + if (spec.NeedTestStatistics(testYdbPk)) { AFL_VERIFY(csControllerGuard->GetStatisticsUsageCount().Val()); AFL_VERIFY(!csControllerGuard->GetMaxValueUsageCount().Val()); } else { @@ -629,7 +638,7 @@ std::vector> TestTiers(bool reboots, const std::vectorSetTiersSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i])); } - TriggerMetadata(runtime, sender); + UNIT_ASSERT(TriggerMetadata(runtime, sender, csControllerGuard)); if (eventLoss) { if (*eventLoss == i) { @@ -706,7 +715,7 @@ std::vector> TestTiers(bool reboots, const std::vectorGetStatisticsUsageCount().Val()); // AFL_VERIFY(!csControllerGuard->GetMaxValueUsageCount().Val()); // } else { @@ -860,11 +869,10 @@ std::vector> TestOneTierExport(const TTestSchema::TTableSp return rowsBytes; } -void TestTwoHotTiers(bool reboot, bool changeTtl, const bool statisticsUsage, const EInitialEviction initial = EInitialEviction::None, +void TestTwoHotTiers(bool reboot, bool changeTtl, const EInitialEviction initial = EInitialEviction::None, bool revCompaction = false) { TTestSchema::TTableSpecials spec; spec.SetTtlColumn("timestamp"); - spec.SetNeedTestStatistics(statisticsUsage); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp")); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp")); spec.Tiers[(revCompaction ? 0 : 1)].SetCodec("zstd"); @@ -897,13 +905,12 @@ void TestTwoHotTiers(bool reboot, bool changeTtl, const bool statisticsUsage, co } } -void TestHotAndColdTiers(bool reboot, const EInitialEviction initial, const bool statisticsUsage) { +void TestHotAndColdTiers(bool reboot, const EInitialEviction initial) { TTestSchema::TTableSpecials spec; spec.SetTtlColumn("timestamp"); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp")); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp")); spec.Tiers.back().S3 = TTestSchema::TStorageTier::FakeS3(); - spec.SetNeedTestStatistics(statisticsUsage); TestTiersAndTtl(spec, reboot, initial); } @@ -1293,91 +1300,91 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { // TODO: EnableOneTierAfterTtl, EnableTtlAfterOneTier Y_UNIT_TEST(HotTiers) { - TestTwoHotTiers(false, false, false); + TestTwoHotTiers(false, false); } Y_UNIT_TEST(RebootHotTiers) { - TestTwoHotTiers(true, false, false); + TestTwoHotTiers(true, false); } Y_UNIT_TEST(HotTiersWithStat) { - TestTwoHotTiers(false, false, true); + TestTwoHotTiers(false, false); } Y_UNIT_TEST(RebootHotTiersWithStat) { - TestTwoHotTiers(true, false, true); + TestTwoHotTiers(true, false); } Y_UNIT_TEST(HotTiersRevCompression) { - TestTwoHotTiers(false, false, false, EInitialEviction::None, true); + TestTwoHotTiers(false, false, EInitialEviction::None); } Y_UNIT_TEST(RebootHotTiersRevCompression) { - TestTwoHotTiers(true, false, false, EInitialEviction::None, true); + TestTwoHotTiers(true, false, EInitialEviction::None); } Y_UNIT_TEST(HotTiersTtl) { NColumnShard::gAllowLogBatchingDefaultValue = false; - TestTwoHotTiers(false, true, false); + TestTwoHotTiers(false, true); } Y_UNIT_TEST(RebootHotTiersTtl) { NColumnShard::gAllowLogBatchingDefaultValue = false; - TestTwoHotTiers(true, true, false); + TestTwoHotTiers(true, true); } Y_UNIT_TEST(HotTiersTtlWithStat) { NColumnShard::gAllowLogBatchingDefaultValue = false; - TestTwoHotTiers(false, true, true); + TestTwoHotTiers(false, true); } Y_UNIT_TEST(RebootHotTiersTtlWithStat) { NColumnShard::gAllowLogBatchingDefaultValue = false; - TestTwoHotTiers(true, true, true); + TestTwoHotTiers(true, true); } Y_UNIT_TEST(HotTiersAfterTtl) { - TestTwoHotTiers(false, false, false, EInitialEviction::Ttl); + TestTwoHotTiers(false, false, EInitialEviction::Ttl); } Y_UNIT_TEST(RebootHotTiersAfterTtl) { - TestTwoHotTiers(true, false, false, EInitialEviction::Ttl); + TestTwoHotTiers(true, false, EInitialEviction::Ttl); } // TODO: EnableTtlAfterHotTiers Y_UNIT_TEST(ColdTiers) { - TestHotAndColdTiers(false, EInitialEviction::Tiering, false); + TestHotAndColdTiers(false, EInitialEviction::Tiering); } Y_UNIT_TEST(RebootColdTiers) { //NColumnShard::gAllowLogBatchingDefaultValue = false; - TestHotAndColdTiers(true, EInitialEviction::Tiering, false); + TestHotAndColdTiers(true, EInitialEviction::Tiering); } Y_UNIT_TEST(ColdTiersWithStat) { - TestHotAndColdTiers(false, EInitialEviction::Tiering, true); + TestHotAndColdTiers(false, EInitialEviction::Tiering); } Y_UNIT_TEST(RebootColdTiersWithStat) { //NColumnShard::gAllowLogBatchingDefaultValue = false; - TestHotAndColdTiers(true, EInitialEviction::Tiering, true); + TestHotAndColdTiers(true, EInitialEviction::Tiering); } Y_UNIT_TEST(EnableColdTiersAfterNoEviction) { - TestHotAndColdTiers(false, EInitialEviction::None, false); + TestHotAndColdTiers(false, EInitialEviction::None); } Y_UNIT_TEST(RebootEnableColdTiersAfterNoEviction) { - TestHotAndColdTiers(true, EInitialEviction::None, false); + TestHotAndColdTiers(true, EInitialEviction::None); } Y_UNIT_TEST(EnableColdTiersAfterTtl) { - TestHotAndColdTiers(false, EInitialEviction::Ttl, false); + TestHotAndColdTiers(false, EInitialEviction::Ttl); } Y_UNIT_TEST(RebootEnableColdTiersAfterTtl) { - TestHotAndColdTiers(true, EInitialEviction::Ttl, false); + TestHotAndColdTiers(true, EInitialEviction::Ttl); } Y_UNIT_TEST(OneColdTier) { diff --git a/ydb/library/accessor/validator.h b/ydb/library/accessor/validator.h index 6182b524bfa7..5602005f3f77 100644 --- a/ydb/library/accessor/validator.h +++ b/ydb/library/accessor/validator.h @@ -10,8 +10,8 @@ class TValidator { return object; } template - static T& CheckNotNull(T& object) { + static T&& CheckNotNull(T&& object) { AFL_VERIFY(!!object); - return object; + return std::forward(object); } -}; \ No newline at end of file +}; From 117df93680d7be5ec259beb3b20c383cf2c1bf7e Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Fri, 6 Dec 2024 10:40:05 +0000 Subject: [PATCH 2/4] pass RG through result for metadata --- ydb/core/tx/columnshard/columnshard_impl.cpp | 5 +++-- ydb/core/tx/columnshard/columnshard_private_events.h | 11 ++--------- ydb/core/tx/columnshard/data_accessor/request.h | 11 +++++++++++ ydb/core/tx/columnshard/data_accessor/ya.make | 1 + ydb/core/tx/columnshard/engines/column_engine.h | 6 +++--- .../engines/storage/actualizer/tiering/tiering.cpp | 3 +-- 6 files changed, 21 insertions(+), 16 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index d2e13ddbc1a1..5ff59ba68dc8 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -944,8 +944,9 @@ class TCSMetadataSubscriber: public TDataAccessorsSubscriberBase, public TObject const std::shared_ptr Processor; const ui64 Generation; virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr&& guard) override { + result.SetResourcesGuard(std::move(guard)); NActors::TActivationContext::Send( - TabletActorId, std::make_unique(Processor, Generation, std::move(result), std::move(guard))); + TabletActorId, std::make_unique(Processor, Generation, std::move(result))); } public: @@ -1115,7 +1116,7 @@ void TColumnShard::Handle(TEvPrivate::TEvStartCompaction::TPtr& ev, const TActor void TColumnShard::Handle(TEvPrivate::TEvMetadataAccessorsInfo::TPtr& ev, const TActorContext& /*ctx*/) { AFL_VERIFY(ev->Get()->GetGeneration() == Generation())("ev", ev->Get()->GetGeneration())("tablet", Generation()); ev->Get()->GetProcessor()->ApplyResult( - ev->Get()->ExtractResult(), TablesManager.MutablePrimaryIndexAsVerified(), ev->Get()->ExtractResourcesGuard()); + ev->Get()->ExtractResult(), TablesManager.MutablePrimaryIndexAsVerified()); SetupMetadata(); } diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 20c0bb02d62a..5bdb69509e8a 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -76,7 +76,6 @@ struct TEvPrivate { const std::shared_ptr Processor; const ui64 Generation; NOlap::TDataAccessorsResult Result; - std::shared_ptr ResourcesGuard; public: const std::shared_ptr& GetProcessor() const { @@ -88,18 +87,12 @@ struct TEvPrivate { NOlap::TDataAccessorsResult ExtractResult() { return std::move(Result); } - std::shared_ptr&& ExtractResourcesGuard() { - AFL_VERIFY(ResourcesGuard); - return std::move(ResourcesGuard); - } TEvMetadataAccessorsInfo(const std::shared_ptr& processor, const ui64 gen, - NOlap::TDataAccessorsResult&& result, std::shared_ptr&& guard) + NOlap::TDataAccessorsResult&& result) : Processor(processor) , Generation(gen) - , Result(std::move(result)) - , ResourcesGuard(std::move(guard)) { - AFL_VERIFY(ResourcesGuard); + , Result(std::move(result)) { } }; diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h index 13f343c1cd39..6025140b65b3 100644 --- a/ydb/core/tx/columnshard/data_accessor/request.h +++ b/ydb/core/tx/columnshard/data_accessor/request.h @@ -14,6 +14,7 @@ class TDataAccessorsResult : private NNonCopyable::TMoveOnly { THashMap> AccessorsByPathId; THashMap PortionsById; std::vector Portions; + std::shared_ptr ResourcesGuard; public: const std::vector& GetPortions() const { @@ -61,6 +62,16 @@ class TDataAccessorsResult : private NNonCopyable::TMoveOnly { bool HasErrors() const { return ErrorsByPathId.size(); } + + void SetResourcesGuard(std::shared_ptr&& guard) { + AFL_VERIFY(!ResourcesGuard); + AFL_VERIFY(guard); + } + + std::shared_ptr&& ExtractResourcesGuard() { + AFL_VERIFY(ResourcesGuard); + return std::move(ResourcesGuard); + } }; class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCounter { diff --git a/ydb/core/tx/columnshard/data_accessor/ya.make b/ydb/core/tx/columnshard/data_accessor/ya.make index 4a40651b9d6e..f3212e91e74e 100644 --- a/ydb/core/tx/columnshard/data_accessor/ya.make +++ b/ydb/core/tx/columnshard/data_accessor/ya.make @@ -12,6 +12,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/portions ydb/core/tx/columnshard/data_accessor/abstract ydb/core/tx/columnshard/data_accessor/local_db + ydb/core/tx/columnshard/resource_subscriber ) END() diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index b74ce2ef78e2..d628f1dd2373 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -252,13 +252,13 @@ class TColumnEngineStats { class TColumnEngineForLogs; class IMetadataAccessorResultProcessor { private: - virtual void DoApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& engine, std::shared_ptr&& resourcesGuard) = 0; + virtual void DoApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& engine) = 0; public: virtual ~IMetadataAccessorResultProcessor() = default; - void ApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& engine, std::shared_ptr&& resourcesGuard) { - return DoApplyResult(std::move(result), engine, std::move(resourcesGuard)); + void ApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& engine) { + return DoApplyResult(std::move(result), engine); } IMetadataAccessorResultProcessor() = default; diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp index 19a95ee262fc..636038d49043 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp @@ -237,8 +237,7 @@ namespace { class TActualizationReply: public IMetadataAccessorResultProcessor { private: std::weak_ptr TieringActualizer; - virtual void DoApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& /*engine*/, - std::shared_ptr&& /*resourcesGuard*/) override { + virtual void DoApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& /*engine*/) override { auto locked = TieringActualizer.lock(); if (!locked) { return; From c034681c8f8ef1ab99b1edde6c90648f347b15dd Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Fri, 6 Dec 2024 12:53:08 +0000 Subject: [PATCH 3/4] fix test build --- ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index d99b5dfd0638..e862de1929c7 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -443,7 +443,7 @@ class TTestMetadataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubsc TColumnEngineForLogs& Engine; virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { - Processor->ApplyResult(std::move(result), Engine, nullptr); + Processor->ApplyResult(std::move(result), Engine); } public: From b17caf92e9163912833c5da4b90234bb6b7a4544 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Mon, 9 Dec 2024 07:40:05 +0000 Subject: [PATCH 4/4] clean up --- ydb/core/tx/columnshard/columnshard_impl.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 5ff59ba68dc8..64e3dbf47c18 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -617,16 +617,17 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask { }; class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber { -protected: +private: std::shared_ptr ResourcesGuard; - virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr&& guard) = 0; - virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final { AFL_VERIFY(ResourcesGuard); DoOnRequestsFinished(std::move(result), std::move(ResourcesGuard)); } +protected: + virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr&& guard) = 0; + public: void SetResourcesGuard(const std::shared_ptr& guard) { AFL_VERIFY(!ResourcesGuard);