diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index d77eff49ae8a..54735259c970 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -634,6 +634,7 @@ message TLimiterConfig { message TGroupedMemoryLimiterConfig { optional bool Enabled = 1 [default = true]; optional uint64 MemoryLimit = 2; + optional uint64 HardMemoryLimit = 3; } message TExternalIndexConfig { diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index e3ec8567b862..e885d4461dc8 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -52,6 +52,7 @@ class TReadContext { const TActorId ResourceSubscribeActorId; const TActorId ReadCoordinatorActorId; const TComputeShardingPolicy ComputeShardingPolicy; + TAtomic AbortFlag = 0; public: template @@ -61,6 +62,13 @@ class TReadContext { return result; } + void AbortWithError(const TString& errorMessage) { + if (AtomicCas(&AbortFlag, 1, 0)) { + NActors::TActivationContext::Send( + ScanActorId, std::make_unique(TConclusionStatus::Fail(errorMessage))); + } + } + bool IsReverse() const { return ReadMetadata->IsDescSorted(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index 19cbd7d8578e..40f1c4d5aad8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -189,6 +189,14 @@ TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation( , TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) { } +void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) { + auto sourcePtr = Source.lock(); + if (sourcePtr) { + sourcePtr->GetContext()->GetCommonContext()->AbortWithError( + "cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'"); + } +} + TConclusion TAllocateMemoryStep::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& step) const { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h index 1fc9f8bce543..f9e5774c4aba 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h @@ -237,7 +237,7 @@ class TAllocateMemoryStep: public IFetchingStep { NColumnShard::TCounterGuard TasksGuard; virtual bool DoOnAllocated(std::shared_ptr&& guard, const std::shared_ptr& allocation) override; - + virtual void DoOnAllocationImpossible(const TString& errorMessage) override; public: TFetchingStepAllocation(const std::shared_ptr& source, const ui64 mem, const TFetchingScriptCursor& step); }; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h index bbe2d11ccb3a..074a1c42f960 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h @@ -77,6 +77,9 @@ class TBaseMergeTask: public IDataTasksProcessor::ITask, public NGroupedMemoryMa virtual bool DoApply(IDataReader& indexedDataRead) const override; virtual bool DoOnAllocated(std::shared_ptr&& guard, const std::shared_ptr& allocation) override; + virtual void DoOnAllocationImpossible(const TString& errorMessage) override { + Context->GetCommonContext()->AbortWithError("cannot allocate memory for merge task: '" + errorMessage + "'"); + } public: TBaseMergeTask(const std::shared_ptr& mergingContext, const std::shared_ptr& readContext) diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp index b13dbcf950d3..55811c2b65df 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp @@ -195,7 +195,12 @@ TStatsIterator::TFetchingAccessorAllocation::TFetchingAccessorAllocation( , AccessorsManager(context->GetDataAccessorsManager()) , Request(request) , WaitingCountersGuard(context->GetCounters().GetFetcherAcessorsGuard()) - , OwnerId(context->GetScanActorId()) { + , OwnerId(context->GetScanActorId()) + , Context(context) { +} + +void TStatsIterator::TFetchingAccessorAllocation::DoOnAllocationImpossible(const TString& errorMessage) { + Context->AbortWithError("cannot allocate memory for take accessors info: " + errorMessage); } } // namespace NKikimr::NOlap::NReader::NSysView::NChunks diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h index 1022f8b3f76c..b932e86533f7 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h @@ -109,12 +109,15 @@ class TStatsIterator: public NAbstract::TStatsIterator Request; NColumnShard::TCounterGuard WaitingCountersGuard; const NActors::TActorId OwnerId; + const std::shared_ptr Context; + virtual bool DoOnAllocated(std::shared_ptr&& guard, const std::shared_ptr& /*selfPtr*/) override { Guard = std::move(guard); AccessorsManager->AskData(std::move(Request)); return true; } + virtual void DoOnAllocationImpossible(const TString& errorMessage) override; virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { if (result.HasErrors()) { diff --git a/ydb/core/tx/limiter/grouped_memory/service/allocation.h b/ydb/core/tx/limiter/grouped_memory/service/allocation.h index dcbf2971367c..e2ca06bd2861 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/allocation.h +++ b/ydb/core/tx/limiter/grouped_memory/service/allocation.h @@ -48,12 +48,16 @@ class TAllocationInfo: public NColumnShard::TMonitoringObjectsCounterGetName()); AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())( "allocation_internal_group_id", AllocationInternalGroupId); + auto allocationResult = Stage->Allocate(AllocatedVolume); + if (allocationResult.IsFail()) { + AllocationFailed = true; + Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage()); + return false; + } const bool result = Allocation->OnAllocated( std::make_shared(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation); - if (result) { - Stage->Allocate(AllocatedVolume); - } else { - Stage->Free(AllocatedVolume, false); + if (!result) { + Stage->Free(AllocatedVolume, true); AllocationFailed = true; } Allocation = nullptr; diff --git a/ydb/core/tx/limiter/grouped_memory/service/counters.h b/ydb/core/tx/limiter/grouped_memory/service/counters.h index 3c96b3b8b9a4..1d55b7b17f4a 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/counters.h +++ b/ydb/core/tx/limiter/grouped_memory/service/counters.h @@ -10,6 +10,7 @@ class TStageCounters: public NColumnShard::TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr AllocatedChunks; NMonitoring::TDynamicCounters::TCounterPtr WaitingBytes; NMonitoring::TDynamicCounters::TCounterPtr WaitingChunks; + NMonitoring::TDynamicCounters::TCounterPtr AllocationFailCount; public: TStageCounters(const TCommonCountersOwner& owner, const TString& name) @@ -17,7 +18,12 @@ class TStageCounters: public NColumnShard::TCommonCountersOwner { , AllocatedBytes(TBase::GetValue("Allocated/Bytes")) , AllocatedChunks(TBase::GetValue("Allocated/Count")) , WaitingBytes(TBase::GetValue("Waiting/Bytes")) - , WaitingChunks(TBase::GetValue("Waiting/Count")) { + , WaitingChunks(TBase::GetValue("Waiting/Count")) + , AllocationFailCount(TBase::GetValue("AllocationFails/Count")) { + } + + void OnCannotAllocate() { + AllocationFailCount->Add(1); } void Add(const ui64 volume, const bool allocated) { diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h index d92120f46fb6..b0b1b11dce83 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace NKikimr::NOlap::NGroupedMemoryManager { @@ -95,6 +96,7 @@ class TStageFeatures { private: YDB_READONLY_DEF(TString, Name); YDB_READONLY(ui64, Limit, 0); + YDB_READONLY(ui64, HardLimit, 0); YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage); YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting); std::shared_ptr Owner; @@ -114,15 +116,20 @@ class TStageFeatures { return Usage.Val() + Waiting.Val(); } - TStageFeatures( - const TString& name, const ui64 limit, const std::shared_ptr& owner, const std::shared_ptr& counters) + TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr& owner, + const std::shared_ptr& counters) : Name(name) , Limit(limit) + , HardLimit(hardLimit) , Owner(owner) , Counters(counters) { } - void Allocate(const ui64 volume) { + [[nodiscard]] TConclusionStatus Allocate(const ui64 volume) { + if (HardLimit < Usage.Val() + volume) { + Counters->OnCannotAllocate(); + return TConclusionStatus::Fail(TStringBuilder() << "limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ";"); + } Waiting.Sub(volume); Usage.Add(volume); if (Counters) { @@ -130,8 +137,13 @@ class TStageFeatures { Counters->Sub(volume, false); } if (Owner) { - Owner->Allocate(volume); + const auto ownerResult = Owner->Allocate(volume); + if (ownerResult.IsFail()) { + Free(volume, true); + return ownerResult; + } } + return TConclusionStatus::Success(); } void Free(const ui64 volume, const bool allocated) { @@ -199,6 +211,7 @@ class IAllocation { YDB_READONLY(ui64, Identifier, Counter.Inc()); YDB_READONLY(ui64, Memory, 0); bool Allocated = false; + virtual void DoOnAllocationImpossible(const TString& errorMessage) = 0; virtual bool DoOnAllocated( std::shared_ptr&& guard, const std::shared_ptr& allocation) = 0; @@ -216,6 +229,10 @@ class IAllocation { return Allocated; } + void OnAllocationImpossible(const TString& errorMessage) { + DoOnAllocationImpossible(errorMessage); + } + [[nodiscard]] bool OnAllocated( std::shared_ptr&& guard, const std::shared_ptr& allocation); }; diff --git a/ydb/core/tx/limiter/grouped_memory/usage/config.cpp b/ydb/core/tx/limiter/grouped_memory/usage/config.cpp index 17fe55975744..ee01e1ef3f66 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/config.cpp +++ b/ydb/core/tx/limiter/grouped_memory/usage/config.cpp @@ -7,13 +7,16 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TGroupedMemoryLimiterCon if (config.HasMemoryLimit()) { MemoryLimit = config.GetMemoryLimit(); } + if (config.HasHardMemoryLimit()) { + HardMemoryLimit = config.GetHardMemoryLimit(); + } Enabled = config.GetEnabled(); return true; } TString TConfig::DebugString() const { TStringBuilder sb; - sb << "MemoryLimit=" << MemoryLimit << ";Enabled=" << Enabled << ";"; + sb << "MemoryLimit=" << MemoryLimit << ";HardMemoryLimit=" << HardMemoryLimit << ";Enabled=" << Enabled << ";"; return sb; } diff --git a/ydb/core/tx/limiter/grouped_memory/usage/config.h b/ydb/core/tx/limiter/grouped_memory/usage/config.h index 91a9b5bc7afe..c3a69680a199 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/config.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/config.h @@ -8,6 +8,7 @@ class TConfig { private: YDB_READONLY(bool, Enabled, true); YDB_READONLY(ui64, MemoryLimit, ui64(3) << 30); + YDB_READONLY(ui64, HardMemoryLimit, ui64(10) << 30); public: diff --git a/ydb/core/tx/limiter/grouped_memory/usage/service.h b/ydb/core/tx/limiter/grouped_memory/usage/service.h index 8192743218b1..b662494d7b0b 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/service.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/service.h @@ -15,13 +15,14 @@ class TServiceOperatorImpl { private: TConfig ServiceConfig = TConfig::BuildDisabledConfig(); std::shared_ptr Counters; - std::shared_ptr DefaultStageFeatures = std::make_shared("DEFAULT", ((ui64)3) << 30, nullptr, nullptr); + std::shared_ptr DefaultStageFeatures = + std::make_shared("DEFAULT", ((ui64)3) << 30, ((ui64)10) << 30, nullptr, nullptr); using TSelf = TServiceOperatorImpl; static void Register(const TConfig& serviceConfig, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) { Singleton()->Counters = std::make_shared(counters, TMemoryLimiterPolicy::Name); Singleton()->ServiceConfig = serviceConfig; - Singleton()->DefaultStageFeatures = std::make_shared( - "GLOBAL", serviceConfig.GetMemoryLimit(), nullptr, Singleton()->Counters->BuildStageCounters("general")); + Singleton()->DefaultStageFeatures = std::make_shared("GLOBAL", serviceConfig.GetMemoryLimit(), + serviceConfig.GetHardMemoryLimit(), nullptr, Singleton()->Counters->BuildStageCounters("general")); } static const TString& GetMemoryLimiterName() { Y_ABORT_UNLESS(TMemoryLimiterPolicy::Name.size() == 4); @@ -35,7 +36,7 @@ class TServiceOperatorImpl { } else { AFL_VERIFY(Singleton()->DefaultStageFeatures); return std::make_shared( - name, limit, Singleton()->DefaultStageFeatures, Singleton()->Counters->BuildStageCounters(name)); + name, limit, Max(), Singleton()->DefaultStageFeatures, Singleton()->Counters->BuildStageCounters(name)); } }