diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index 9c90c8da1689..7032a172babc 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -13,7 +13,6 @@ std::unique_ptr TSpecialReadContext::Build ui64 TSpecialReadContext::GetMemoryForSources(const THashMap>& sources) { ui64 result = 0; for (auto&& i : sources) { - auto fetchingPlan = GetColumnsFetchingPlan(i.second); AFL_VERIFY(i.second->GetIntervalsCount()); const ui64 sourceMemory = std::max(1, i.second->GetResourceGuardsMemory() / i.second->GetIntervalsCount()); result += sourceMemory; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index 28b723c893dc..0535f72b85e2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -222,6 +222,7 @@ TConclusion TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptrGetSourceIdx())("memory", size); auto allocation = std::make_shared(source, size, step, StageIndex); NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(), diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp index 9da043a366c1..7f44376f3ad9 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp @@ -10,15 +10,12 @@ void TFetchingInterval::ConstructResult() { if (ready != WaitSourcesCount) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_construct_result")("interval_idx", IntervalIdx)( "count", WaitSourcesCount)("ready", ready)("interval_id", GetIntervalId()); - return; - } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "start_construct_result")("interval_idx", IntervalIdx)( - "interval_id", GetIntervalId()); - } - if (AtomicCas(&SourcesFinalized, 1, 0)) { + } else if (AtomicCas(&SourcesFinalized, 1, 0)) { IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitMergerStart); MergingContext->SetIntervalChunkMemory(Context->GetMemoryForSources(Sources)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "start_construct_result")("interval_idx", IntervalIdx)( + "interval_id", GetIntervalId())("memory", MergingContext->GetIntervalChunkMemory())("count", WaitSourcesCount); auto task = std::make_shared(MergingContext, Context, std::move(Sources)); task->SetPriority(NConveyor::ITask::EPriority::High); @@ -81,6 +78,8 @@ void TFetchingInterval::OnPartSendingComplete() { } IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitMergerContinue); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "continue_construct_result")("interval_idx", IntervalIdx)( + "interval_id", GetIntervalId())("memory", MergingContext->GetIntervalChunkMemory())("count", WaitSourcesCount); auto task = std::make_shared(MergingContext, Context, std::move(Merger)); task->SetPriority(NConveyor::ITask::EPriority::High); NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(), diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp index 479eae69d0bf..241040efd333 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp @@ -33,6 +33,8 @@ void TBaseMergeTask::PrepareResultBatch() { LastPK = nullptr; return; } + const ui64 dataSizeBefore = NArrow::GetTableDataSize(ResultBatch); + const ui64 memorySizeBefore = NArrow::GetTableMemorySize(ResultBatch); { ResultBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); AFL_VERIFY((ui32)ResultBatch->num_columns() == Context->GetProgramInputColumns()->GetColumnNamesVector().size()); @@ -45,7 +47,10 @@ void TBaseMergeTask::PrepareResultBatch() { } else { ShardedBatch = NArrow::TShardedRecordBatch(ResultBatch); } - AllocationGuard->Update(NArrow::GetTableMemorySize(ResultBatch)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "update_memory_merger")("before_data", dataSizeBefore)( + "before_memory", memorySizeBefore)("after_memory", NArrow::GetTableMemorySize(ResultBatch))( + "after_data", NArrow::GetTableDataSize(ResultBatch))("guard", AllocationGuard->GetMemory()); + // AllocationGuard->Update(NArrow::GetTableMemorySize(ResultBatch)); AFL_VERIFY(!!LastPK == !!ShardedBatch->GetRecordsCount())("lpk", !!LastPK)("sb", ShardedBatch->GetRecordsCount()); } else { AllocationGuard = nullptr; diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h index 43ef21bf0bcc..df314b8a9475 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h @@ -129,9 +129,13 @@ class TStageFeatures { Waiting.Sub(volume); if (HardLimit < Usage.Val() + volume) { Counters->OnCannotAllocate(); + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "cannot_allocate")("limit", HardLimit)( + "usage", Usage.Val())( + "delta", volume); return TConclusionStatus::Fail(TStringBuilder() << Name << "::(limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ");"); } Usage.Add(volume); + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "allocate")("usage", Usage.Val())("delta", volume); if (Counters) { Counters->Add(volume, true); Counters->Sub(volume, false); @@ -155,6 +159,7 @@ class TStageFeatures { } else { Waiting.Sub(volume); } + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "free")("usage", Usage.Val())("delta", volume); if (withOwner && Owner) { Owner->Free(volume, allocated); @@ -166,6 +171,8 @@ class TStageFeatures { Counters->Sub(from, allocated); Counters->Add(to, allocated); } + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "update")("usage", Usage.Val())("waiting", Waiting.Val())( + "allocated", allocated)("from", from)("to", to); if (allocated) { Usage.Sub(from); Usage.Add(to);