Skip to content

Commit

Permalink
correction
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Nov 25, 2024
1 parent ceea2f0 commit 6c14aa4
Showing 1 changed file with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,25 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
ContinueSource(*sourceIdxToContinue);
break;
}
if (isFinished) {
AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx()));
if (Context->GetCommonContext()->GetReadMetadata()->Limit) {
FinishedSources.emplace(*FetchingSources.begin());
if (!isFinished) {
break
}
AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx()));
if (Context->GetCommonContext()->GetReadMetadata()->Limit) {
FinishedSources.emplace(*FetchingSources.begin());
}
FetchingSources.erase(FetchingSources.begin());
while (FetchingSources.size() && FinishedSources.size()) {
auto finishedSource = *FinishedSources.begin();
auto fetchingSource = *FetchingSources.begin();
if (finishedSource->GetFinish() < fetchingSource->GetStart()) {
FetchedCount += finishedSource->GetRecordsCount();
}
FetchingSources.erase(FetchingSources.begin());
while (FetchingSources.size() && FinishedSources.size()) {
auto finishedSource = *FinishedSources.begin();
auto fetchingSource = *FetchingSources.begin();
if (finishedSource->GetFinish() < fetchingSource->GetStart()) {
FetchedCount += finishedSource->GetRecordsCount();
}
FinishedSources.erase(FinishedSources.begin());
if (FetchedCount > Context->GetCommonContext()->GetReadMetadata()->Limit) {
Context->Abort();
Abort();
}
FinishedSources.erase(FinishedSources.begin());
if (FetchedCount > Context->GetCommonContext()->GetReadMetadata()->Limit) {
Context->Abort();
Abort();
}
} else {
break;
}
}
}
Expand Down

0 comments on commit 6c14aa4

Please sign in to comment.