diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp index 0a7e47a84a8e..7251fb8d330c 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp @@ -66,15 +66,17 @@ void TScanHead::OnSourceReady(const std::shared_ptr& source, std::s } FetchedCount += finishedSource->GetResultRecordsCount(); FinishedSources.erase(FinishedSources.begin()); - --IntervalsInFlightCount; + if (Context->IsActive()) { + --IntervalsInFlightCount; + } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", finishedSource->GetSourceId())( "source_idx", finishedSource->GetSourceIdx())("limit", Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust())( "fetched", finishedSource->GetResultRecordsCount()); - if (FetchedCount > (ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust()) { + if (FetchedCount > (ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust() && SortedSources.size()) { AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")( "limit", Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust())("fetched", FetchedCount); SortedSources.clear(); - break; + IntervalsInFlightCount = GetInFlightIntervalsCount(); } } } @@ -135,14 +137,7 @@ TConclusion TScanHead::BuildNextInterval() { if (InFlightLimit <= IntervalsInFlightCount) { return false; } - ui32 inFlightCountLocal = 0; - for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) { - if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) { - ++inFlightCountLocal; - } else { - break; - } - } + ui32 inFlightCountLocal = GetInFlightIntervalsCount(); AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal); while (SortedSources.size() && inFlightCountLocal < InFlightLimit) { SortedSources.front()->StartProcessing(SortedSources.front()); @@ -150,14 +145,7 @@ TConclusion TScanHead::BuildNextInterval() { AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second); AFL_VERIFY(FetchingInFlightSources.emplace(SortedSources.front()).second); SortedSources.pop_front(); - ui32 inFlightCountLocalNew = 0; - for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) { - if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) { - ++inFlightCountLocalNew; - } else { - break; - } - } + const ui32 inFlightCountLocalNew = GetInFlightIntervalsCount(); AFL_VERIFY(inFlightCountLocal <= inFlightCountLocalNew); inFlightCountLocal = inFlightCountLocalNew; changed = true; @@ -188,4 +176,30 @@ void TScanHead::Abort() { Y_ABORT_UNLESS(IsFinished()); } +TScanHead::~TScanHead() { + AFL_VERIFY(!IntervalsInFlightCount || !Context->IsActive()); +} + +ui32 TScanHead::GetInFlightIntervalsCount() const { + if (SortedSources.empty()) { + return FetchingInFlightSources.size() + FinishedSources.size(); + } + ui32 inFlightCountLocal = 0; + for (auto it = FinishedSources.begin(); it != FinishedSources.end(); ++it) { + if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) { + ++inFlightCountLocal; + } else { + break; + } + } + for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) { + if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) { + ++inFlightCountLocal; + } else { + break; + } + } + return inFlightCountLocal; +} + } // namespace NKikimr::NOlap::NReader::NSimple diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h index 08df0906f2a0..a7a5f6f40af4 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h @@ -34,7 +34,11 @@ class TScanHead { ui64 FetchedCount = 0; ui64 InFlightLimit = 1; ui64 MaxInFlight = 256; + + ui32 GetInFlightIntervalsCount() const; + public: + ~TScanHead(); void ContinueSource(const ui32 sourceIdx) const { auto it = FetchingSourcesByIdx.find(sourceIdx);