diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h index 23514aff1dce..a0a3df00d04b 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.h @@ -15,9 +15,10 @@ enum class EMemType { }; enum class EStageFeaturesIndexes { - Filter = 0, - Fetching = 1, - Merge = 2 + Accessors = 0, + Filter = 1, + Fetching = 2, + Merge = 3 }; class TIndexesSet { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index 28d31123ebaf..5e7be055cbc7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -27,7 +27,10 @@ std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(con if (source->NeedAccessorsFetching()) { if (!AskAccumulatorsScript) { AskAccumulatorsScript = std::make_shared(*this); - AskAccumulatorsScript->AddStep(std::make_shared()); + if (ui64 size = source->PredictAccessorsMemory()) { + AskAccumulatorsScript->AddStep(size, EStageFeaturesIndexes::Accessors); + } + AskAccumulatorsScript->AddStep(); } AskAccumulatorsScript->AddStep(*FFColumns); return AskAccumulatorsScript; @@ -54,8 +57,8 @@ std::shared_ptr TSpecialReadContext::GetColumnsFetchingPlan(con } } { - auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0] - [useIndexes ? 1 : 0][needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0]; + auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0] + [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0]; if (!result) { TGuard wg(Mutex); result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0] @@ -274,11 +277,11 @@ std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(c TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) : CommonContext(commonContext) { - ReadMetadata = dynamic_pointer_cast(CommonContext->GetReadMetadata()); Y_ABORT_UNLESS(ReadMetadata); Y_ABORT_UNLESS(ReadMetadata->SelectInfo); + double kffAccessors = 0.01; double kffFilter = 0.45; double kffFetching = 0.45; double kffMerge = 0.10; @@ -287,15 +290,19 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co stagePrefix = "EF"; kffFilter = 0.7; kffFetching = 0.15; - kffMerge = 0.15; + kffMerge = 0.14; + kffAccessors = 0.01; } else { stagePrefix = "FO"; kffFilter = 0.1; kffFetching = 0.75; - kffMerge = 0.15; + kffMerge = 0.14; + kffAccessors = 0.01; } - std::vector> stages = { + std::vector> stages = { + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( + stagePrefix + "::ACCESSORS", kffAccessors * TGlobalLimits::ScanMemoryLimit), NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( stagePrefix + "::FILTER", kffFilter * TGlobalLimits::ScanMemoryLimit), NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures( @@ -304,8 +311,8 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co }; ProcessMemoryGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages); - ProcessScopeGuard = - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId()); + ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard( + CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId()); auto readSchema = ReadMetadata->GetResultSchema(); SpecColumns = std::make_shared(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index f09e0e5b9a0a..fbeaba7f098b 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -200,7 +200,7 @@ void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(cons TConclusion TAllocateMemoryStep::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - ui64 size = 0; + ui64 size = PredefinedSize.value_or(0); for (auto&& i : Packs) { ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType()); if (source->GetStageData().GetUseFilter() && source->GetContext()->GetReadMetadata()->Limit && i.GetMemType() != EMemType::Blob) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h index f9e5774c4aba..f630c36ebae5 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h @@ -227,6 +227,7 @@ class TAllocateMemoryStep: public IFetchingStep { std::vector Packs; THashMap> Control; const EStageFeaturesIndexes StageIndex; + std::optional PredefinedSize; protected: class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation { @@ -238,6 +239,7 @@ class TAllocateMemoryStep: public IFetchingStep { virtual bool DoOnAllocated(std::shared_ptr&& guard, const std::shared_ptr& allocation) override; virtual void DoOnAllocationImpossible(const TString& errorMessage) override; + public: TFetchingStepAllocation(const std::shared_ptr& source, const ui64 mem, const TFetchingScriptCursor& step); }; @@ -266,6 +268,12 @@ class TAllocateMemoryStep: public IFetchingStep { , StageIndex(stageIndex) { AddAllocation(columns, memType); } + + TAllocateMemoryStep(const ui64 size, const EStageFeaturesIndexes stageIndex) + : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex)) + , StageIndex(stageIndex) + , PredefinedSize(size) { + } }; class TDetectInMemStep: public IFetchingStep { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 563dab447720..fe164bd212b8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -77,7 +77,7 @@ class IDataSource { public: virtual bool NeedAccessorsForRead() const = 0; virtual bool NeedAccessorsFetching() const = 0; - + virtual ui64 PredictAccessorsMemory() const = 0; bool StartFetchingAccessor(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step) { return DoStartFetchingAccessor(sourcePtr, step); } @@ -318,6 +318,10 @@ class TPortionDataSource: public IDataSource { virtual bool DoStartFetchingAccessor(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step) override; public: + virtual ui64 PredictAccessorsMemory() const override { + return Portion->GetApproxChunksCount(GetContext()->GetCommonContext()->GetReadMetadata()->GetResultSchema()->GetColumnsCount()) * sizeof(TColumnRecord); + } + virtual bool NeedAccessorsForRead() const override { return true; } @@ -430,6 +434,10 @@ class TCommittedDataSource: public IDataSource { } public: + virtual ui64 PredictAccessorsMemory() const override { + return 0; + } + virtual bool NeedAccessorsForRead() const override { return false; }