From 0702d45d81cae81991a3ddf18d60704073626a95 Mon Sep 17 00:00:00 2001 From: Tony-Romanov <150126326+Tony-Romanov@users.noreply.github.com> Date: Tue, 11 Jun 2024 14:10:45 +0200 Subject: [PATCH] Prepare for LLVM implementation. (#5431) --- .../minikql/comp_nodes/mkql_wide_top_sort.cpp | 49 ++++++++----------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp index fbbe9c64ef78..b624e0a15bcd 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp @@ -182,16 +182,15 @@ class TSpilledUnboxedValuesIterator { return Width_; } - void Pop() { + TStorage Pop() { + auto data(std::move(Data)); HasValue = false; Read(); + return data; } - NKikimr::NUdf::TUnboxedValue* GetValue() { - return &*Data.begin(); - } - const NKikimr::NUdf::TUnboxedValue* GetValue() const { - return &*Data.begin(); + const NUdf::TUnboxedValue* GetValue() const { + return &Data.front(); } }; @@ -719,15 +718,20 @@ using TBase = TComputationValue; } } - void Seal() { - if (!SpilledStates.empty()) { + bool Seal() { + if (SpilledStates.empty()) { + SealInMemory(); + } else { SwitchMode(EOperatingMode::Spilling); - return; } - SealInMemory(); + return IsReadyToContinue(); } NUdf::TUnboxedValue* Extract() { + if (!IsReadyToContinue()) { + return nullptr; + } + if (SpilledUnboxedValuesIterators.empty()) { // No spilled data return ExtractInMemory(); @@ -742,21 +746,12 @@ using TBase = TComputationValue; std::pop_heap(SpilledUnboxedValuesIterators.begin(), SpilledUnboxedValuesIterators.end()); auto ¤tIt = SpilledUnboxedValuesIterators.back(); - return currentIt.GetValue(); - } - - void Clean() { - if (SpilledUnboxedValuesIterators.empty()) { - // No spilled data - return; - } - auto ¤tIt = SpilledUnboxedValuesIterators.back(); - currentIt.Pop(); + Storage = currentIt.Pop(); if (currentIt.IsFinished()) { SpilledUnboxedValuesIterators.pop_back(); } + return Storage.data(); } - private: EOperatingMode GetMode() const { return Mode; } @@ -902,18 +897,15 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode; case EFetchResult::One: ptr->Put(); continue; + case EFetchResult::Finish: + if (ptr->Seal()) + break; + [[fallthrough]]; case EFetchResult::Yield: return EFetchResult::Yield; - case EFetchResult::Finish: - ptr->Seal(); - break; } } - if (!ptr->IsReadyToContinue()) { - return EFetchResult::Yield; - } - if (auto extract = ptr->Extract()) { for (const auto index : Indexes) { if (const auto to = output[index]) @@ -921,7 +913,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode; else ++extract; } - ptr->Clean(); return EFetchResult::One; }