Skip to content

Commit

Permalink
Prepare for LLVM implementation. (ydb-platform#5431)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tony-Romanov authored Jun 11, 2024
1 parent bd66b9f commit 0702d45
Showing 1 changed file with 20 additions and 29 deletions.
49 changes: 20 additions & 29 deletions ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,15 @@ class TSpilledUnboxedValuesIterator {
return Width_;
}

void Pop() {
TStorage Pop() {
auto data(std::move(Data));
HasValue = false;
Read();
return data;
}

NKikimr::NUdf::TUnboxedValue* GetValue() {
return &*Data.begin();
}
const NKikimr::NUdf::TUnboxedValue* GetValue() const {
return &*Data.begin();
const NUdf::TUnboxedValue* GetValue() const {
return &Data.front();
}
};

Expand Down Expand Up @@ -719,15 +718,20 @@ using TBase = TComputationValue<TSpillingSupportState>;
}
}

void Seal() {
if (!SpilledStates.empty()) {
bool Seal() {
if (SpilledStates.empty()) {
SealInMemory();
} else {
SwitchMode(EOperatingMode::Spilling);
return;
}
SealInMemory();
return IsReadyToContinue();
}

NUdf::TUnboxedValue* Extract() {
if (!IsReadyToContinue()) {
return nullptr;
}

if (SpilledUnboxedValuesIterators.empty()) {
// No spilled data
return ExtractInMemory();
Expand All @@ -742,21 +746,12 @@ using TBase = TComputationValue<TSpillingSupportState>;

std::pop_heap(SpilledUnboxedValuesIterators.begin(), SpilledUnboxedValuesIterators.end());
auto &currentIt = SpilledUnboxedValuesIterators.back();
return currentIt.GetValue();
}

void Clean() {
if (SpilledUnboxedValuesIterators.empty()) {
// No spilled data
return;
}
auto &currentIt = SpilledUnboxedValuesIterators.back();
currentIt.Pop();
Storage = currentIt.Pop();
if (currentIt.IsFinished()) {
SpilledUnboxedValuesIterators.pop_back();
}
return Storage.data();
}

private:
EOperatingMode GetMode() const { return Mode; }

Expand Down Expand Up @@ -902,26 +897,22 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideSortWrapper>;
case EFetchResult::One:
ptr->Put();
continue;
case EFetchResult::Finish:
if (ptr->Seal())
break;
[[fallthrough]];
case EFetchResult::Yield:
return EFetchResult::Yield;
case EFetchResult::Finish:
ptr->Seal();
break;
}
}

if (!ptr->IsReadyToContinue()) {
return EFetchResult::Yield;
}

if (auto extract = ptr->Extract()) {
for (const auto index : Indexes) {
if (const auto to = output[index])
*to = std::move(*extract++);
else
++extract;
}
ptr->Clean();
return EFetchResult::One;
}

Expand Down

0 comments on commit 0702d45

Please sign in to comment.