Skip to content

Commit

Permalink
Merge 2ef04c8 into 3a074dd
Browse files Browse the repository at this point in the history
  • Loading branch information
vladl2802 authored Oct 28, 2024
2 parents 3a074dd + 2ef04c8 commit b8db0fc
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,26 @@ class TState : public TComputationValue<TState> {
}
Throat = States.GetKey(itInsert) + KeyWidth;
if (isNew) {
States.CheckGrow();
SafeGrow();
}
return isNew;
}

void SafeGrow() {
if (!IsOutOfMemory) {
try {
States.CheckGrow();
} catch (const TMemoryLimitExceededException& e) {
Cerr << "State " << (void *)this << " no longer growing\n";
IsOutOfMemory = true;
}
}
}

bool CheckIsOutOfMemory() const {
return IsOutOfMemory;
}

template<bool SkipYields>
bool ReadMore() {
if constexpr (SkipYields) {
Expand Down Expand Up @@ -331,6 +346,7 @@ class TState : public TComputationValue<TState> {
private:
std::optional<TStorageIterator> ExtractIt;
const ui32 KeyWidth, StateWidth;
bool IsOutOfMemory = false;
ui64 CurrentPosition = 0;
TRow* CurrentPage = nullptr;
TStorage Storage;
Expand Down Expand Up @@ -451,6 +467,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
ETasteResult TasteIt() {
if (GetMode() == EOperatingMode::InMemory) {
bool isNew = InMemoryProcessingState.TasteIt();
if (InMemoryProcessingState.CheckIsOutOfMemory()) {
StateWantsToSpill = true;
}
Throat = InMemoryProcessingState.Throat;
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
Expand Down Expand Up @@ -649,7 +668,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}

bool CheckMemoryAndSwitchToSpilling() {
if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
if (!(AllowSpilling && Ctx.SpillerFactory)) {
return false;
}
if (StateWantsToSpill || IsSwitchToSpillingModeCondition()) {
StateWantsToSpill = false;
LogMemoryUsage();

SwitchMode(EOperatingMode::SplittingState);
Expand Down Expand Up @@ -840,6 +863,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
NUdf::TUnboxedValuePod* Throat = nullptr;

private:
bool StateWantsToSpill = false;
bool IsEverythingExtracted = false;

TState InMemoryProcessingState;
Expand Down

0 comments on commit b8db0fc

Please sign in to comment.