Skip to content

Commit

Permalink
fix scanner in flight control (#13690)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 22, 2025
1 parent 8f9ba1d commit bcf28e3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
}
FetchedCount += finishedSource->GetResultRecordsCount();
FinishedSources.erase(FinishedSources.begin());
--IntervalsInFlightCount;
if (Context->IsActive()) {
--IntervalsInFlightCount;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", finishedSource->GetSourceId())(
"source_idx", finishedSource->GetSourceIdx())("limit", Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust())(
"fetched", finishedSource->GetResultRecordsCount());
if (FetchedCount > (ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust()) {
if (FetchedCount > (ui64)Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust() && SortedSources.size()) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")(
"limit", Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust())("fetched", FetchedCount);
SortedSources.clear();
break;
IntervalsInFlightCount = GetInFlightIntervalsCount();
}
}
}
Expand Down Expand Up @@ -135,29 +137,15 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
if (InFlightLimit <= IntervalsInFlightCount) {
return false;
}
ui32 inFlightCountLocal = 0;
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocal;
} else {
break;
}
}
ui32 inFlightCountLocal = GetInFlightIntervalsCount();
AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal);
while (SortedSources.size() && inFlightCountLocal < InFlightLimit) {
SortedSources.front()->StartProcessing(SortedSources.front());
FetchingSources.emplace_back(SortedSources.front());
AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second);
AFL_VERIFY(FetchingInFlightSources.emplace(SortedSources.front()).second);
SortedSources.pop_front();
ui32 inFlightCountLocalNew = 0;
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocalNew;
} else {
break;
}
}
const ui32 inFlightCountLocalNew = GetInFlightIntervalsCount();
AFL_VERIFY(inFlightCountLocal <= inFlightCountLocalNew);
inFlightCountLocal = inFlightCountLocalNew;
changed = true;
Expand Down Expand Up @@ -188,4 +176,30 @@ void TScanHead::Abort() {
Y_ABORT_UNLESS(IsFinished());
}

TScanHead::~TScanHead() {
AFL_VERIFY(!IntervalsInFlightCount || !Context->IsActive());
}

ui32 TScanHead::GetInFlightIntervalsCount() const {
if (SortedSources.empty()) {
return FetchingInFlightSources.size() + FinishedSources.size();
}
ui32 inFlightCountLocal = 0;
for (auto it = FinishedSources.begin(); it != FinishedSources.end(); ++it) {
if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocal;
} else {
break;
}
}
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocal;
} else {
break;
}
}
return inFlightCountLocal;
}

} // namespace NKikimr::NOlap::NReader::NSimple
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ class TScanHead {
ui64 FetchedCount = 0;
ui64 InFlightLimit = 1;
ui64 MaxInFlight = 256;

ui32 GetInFlightIntervalsCount() const;

public:
~TScanHead();

void ContinueSource(const ui32 sourceIdx) const {
auto it = FetchingSourcesByIdx.find(sourceIdx);
Expand Down

0 comments on commit bcf28e3

Please sign in to comment.