Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

native memory control #11559

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ message TLimiterConfig {
message TGroupedMemoryLimiterConfig {
optional bool Enabled = 1 [default = true];
optional uint64 MemoryLimit = 2;
optional uint64 HardMemoryLimit = 3;
}

message TExternalIndexConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TReadContext {
const TActorId ResourceSubscribeActorId;
const TActorId ReadCoordinatorActorId;
const TComputeShardingPolicy ComputeShardingPolicy;
TAtomic AbortFlag = 0;

public:
template <class T>
Expand All @@ -61,6 +62,13 @@ class TReadContext {
return result;
}

void AbortWithError(const TString& errorMessage) {
if (AtomicCas(&AbortFlag, 1, 0)) {
NActors::TActivationContext::Send(
ScanActorId, std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail(errorMessage)));
}
}

bool IsReverse() const {
return ReadMetadata->IsDescSorted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(
, TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) {
}

void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
auto sourcePtr = Source.lock();
if (sourcePtr) {
sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
"cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
}
}

TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class TAllocateMemoryStep: public IFetchingStep {
NColumnShard::TCounterGuard TasksGuard;
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;

virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
public:
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class TBaseMergeTask: public IDataTasksProcessor::ITask, public NGroupedMemoryMa
virtual bool DoApply(IDataReader& indexedDataRead) const override;
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
virtual void DoOnAllocationImpossible(const TString& errorMessage) override {
Context->GetCommonContext()->AbortWithError("cannot allocate memory for merge task: '" + errorMessage + "'");
}

public:
TBaseMergeTask(const std::shared_ptr<TMergingContext>& mergingContext, const std::shared_ptr<TSpecialReadContext>& readContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,12 @@ TStatsIterator::TFetchingAccessorAllocation::TFetchingAccessorAllocation(
, AccessorsManager(context->GetDataAccessorsManager())
, Request(request)
, WaitingCountersGuard(context->GetCounters().GetFetcherAcessorsGuard())
, OwnerId(context->GetScanActorId()) {
, OwnerId(context->GetScanActorId())
, Context(context) {
}

void TStatsIterator::TFetchingAccessorAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
Context->AbortWithError("cannot allocate memory for take accessors info: " + errorMessage);
}

} // namespace NKikimr::NOlap::NReader::NSysView::NChunks
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,15 @@ class TStatsIterator: public NAbstract::TStatsIterator<NKikimr::NSysView::Schema
std::shared_ptr<TDataAccessorsRequest> Request;
NColumnShard::TCounterGuard WaitingCountersGuard;
const NActors::TActorId OwnerId;
const std::shared_ptr<NReader::TReadContext> Context;

virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /*selfPtr*/) override {
Guard = std::move(guard);
AccessorsManager->AskData(std::move(Request));
return true;
}
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;

virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override {
if (result.HasErrors()) {
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/tx/limiter/grouped_memory/service/allocation.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ class TAllocationInfo: public NColumnShard::TMonitoringObjectsCounter<TAllocatio
AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName());
AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())(
"allocation_internal_group_id", AllocationInternalGroupId);
auto allocationResult = Stage->Allocate(AllocatedVolume);
if (allocationResult.IsFail()) {
AllocationFailed = true;
Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage());
return false;
}
const bool result = Allocation->OnAllocated(
std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation);
if (result) {
Stage->Allocate(AllocatedVolume);
} else {
Stage->Free(AllocatedVolume, false);
if (!result) {
Stage->Free(AllocatedVolume, true);
AllocationFailed = true;
}
Allocation = nullptr;
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/limiter/grouped_memory/service/counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ class TStageCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr AllocatedChunks;
NMonitoring::TDynamicCounters::TCounterPtr WaitingBytes;
NMonitoring::TDynamicCounters::TCounterPtr WaitingChunks;
NMonitoring::TDynamicCounters::TCounterPtr AllocationFailCount;

public:
TStageCounters(const TCommonCountersOwner& owner, const TString& name)
: TBase(owner, "stage", name)
, AllocatedBytes(TBase::GetValue("Allocated/Bytes"))
, AllocatedChunks(TBase::GetValue("Allocated/Count"))
, WaitingBytes(TBase::GetValue("Waiting/Bytes"))
, WaitingChunks(TBase::GetValue("Waiting/Count")) {
, WaitingChunks(TBase::GetValue("Waiting/Count"))
, AllocationFailCount(TBase::GetValue("AllocationFails/Count")) {
}

void OnCannotAllocate() {
AllocationFailCount->Add(1);
}

void Add(const ui64 volume, const bool allocated) {
Expand Down
25 changes: 21 additions & 4 deletions ydb/core/tx/limiter/grouped_memory/usage/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/conclusion/status.h>

namespace NKikimr::NOlap::NGroupedMemoryManager {

Expand Down Expand Up @@ -95,6 +96,7 @@ class TStageFeatures {
private:
YDB_READONLY_DEF(TString, Name);
YDB_READONLY(ui64, Limit, 0);
YDB_READONLY(ui64, HardLimit, 0);
YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage);
YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting);
std::shared_ptr<TStageFeatures> Owner;
Expand All @@ -114,24 +116,34 @@ class TStageFeatures {
return Usage.Val() + Waiting.Val();
}

TStageFeatures(
const TString& name, const ui64 limit, const std::shared_ptr<TStageFeatures>& owner, const std::shared_ptr<TStageCounters>& counters)
TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner,
const std::shared_ptr<TStageCounters>& counters)
: Name(name)
, Limit(limit)
, HardLimit(hardLimit)
, Owner(owner)
, Counters(counters) {
}

void Allocate(const ui64 volume) {
[[nodiscard]] TConclusionStatus Allocate(const ui64 volume) {
if (HardLimit < Usage.Val() + volume) {
Counters->OnCannotAllocate();
return TConclusionStatus::Fail(TStringBuilder() << "limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ";");
}
Waiting.Sub(volume);
Usage.Add(volume);
if (Counters) {
Counters->Add(volume, true);
Counters->Sub(volume, false);
}
if (Owner) {
Owner->Allocate(volume);
const auto ownerResult = Owner->Allocate(volume);
if (ownerResult.IsFail()) {
Free(volume, true);
return ownerResult;
}
}
return TConclusionStatus::Success();
}

void Free(const ui64 volume, const bool allocated) {
Expand Down Expand Up @@ -199,6 +211,7 @@ class IAllocation {
YDB_READONLY(ui64, Identifier, Counter.Inc());
YDB_READONLY(ui64, Memory, 0);
bool Allocated = false;
virtual void DoOnAllocationImpossible(const TString& errorMessage) = 0;
virtual bool DoOnAllocated(
std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) = 0;

Expand All @@ -216,6 +229,10 @@ class IAllocation {
return Allocated;
}

void OnAllocationImpossible(const TString& errorMessage) {
DoOnAllocationImpossible(errorMessage);
}

[[nodiscard]] bool OnAllocated(
std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation);
};
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/limiter/grouped_memory/usage/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TGroupedMemoryLimiterCon
if (config.HasMemoryLimit()) {
MemoryLimit = config.GetMemoryLimit();
}
if (config.HasHardMemoryLimit()) {
HardMemoryLimit = config.GetHardMemoryLimit();
}
Enabled = config.GetEnabled();
return true;
}

TString TConfig::DebugString() const {
TStringBuilder sb;
sb << "MemoryLimit=" << MemoryLimit << ";Enabled=" << Enabled << ";";
sb << "MemoryLimit=" << MemoryLimit << ";HardMemoryLimit=" << HardMemoryLimit << ";Enabled=" << Enabled << ";";
return sb;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/limiter/grouped_memory/usage/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class TConfig {
private:
YDB_READONLY(bool, Enabled, true);
YDB_READONLY(ui64, MemoryLimit, ui64(3) << 30);
YDB_READONLY(ui64, HardMemoryLimit, ui64(10) << 30);

public:

Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/limiter/grouped_memory/usage/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ class TServiceOperatorImpl {
private:
TConfig ServiceConfig = TConfig::BuildDisabledConfig();
std::shared_ptr<TCounters> Counters;
std::shared_ptr<TStageFeatures> DefaultStageFeatures = std::make_shared<TStageFeatures>("DEFAULT", ((ui64)3) << 30, nullptr, nullptr);
std::shared_ptr<TStageFeatures> DefaultStageFeatures =
std::make_shared<TStageFeatures>("DEFAULT", ((ui64)3) << 30, ((ui64)10) << 30, nullptr, nullptr);
using TSelf = TServiceOperatorImpl<TMemoryLimiterPolicy>;
static void Register(const TConfig& serviceConfig, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
Singleton<TSelf>()->Counters = std::make_shared<TCounters>(counters, TMemoryLimiterPolicy::Name);
Singleton<TSelf>()->ServiceConfig = serviceConfig;
Singleton<TSelf>()->DefaultStageFeatures = std::make_shared<TStageFeatures>(
"GLOBAL", serviceConfig.GetMemoryLimit(), nullptr, Singleton<TSelf>()->Counters->BuildStageCounters("general"));
Singleton<TSelf>()->DefaultStageFeatures = std::make_shared<TStageFeatures>("GLOBAL", serviceConfig.GetMemoryLimit(),
serviceConfig.GetHardMemoryLimit(), nullptr, Singleton<TSelf>()->Counters->BuildStageCounters("general"));
}
static const TString& GetMemoryLimiterName() {
Y_ABORT_UNLESS(TMemoryLimiterPolicy::Name.size() == 4);
Expand All @@ -35,7 +36,7 @@ class TServiceOperatorImpl {
} else {
AFL_VERIFY(Singleton<TSelf>()->DefaultStageFeatures);
return std::make_shared<TStageFeatures>(
name, limit, Singleton<TSelf>()->DefaultStageFeatures, Singleton<TSelf>()->Counters->BuildStageCounters(name));
name, limit, Max<ui64>(), Singleton<TSelf>()->DefaultStageFeatures, Singleton<TSelf>()->Counters->BuildStageCounters(name));
}
}

Expand Down
Loading