Skip to content

Commit

Permalink
Stop state from growing and spill growed is spilling is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
vladl2802 committed Oct 28, 2024
1 parent 1620cd0 commit e5d9b3d
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,21 @@ class TState : public TComputationValue<TState> {
Tongue = CurrentPage->data() + CurrentPosition;
}
Throat = States.GetKey(itInsert) + KeyWidth;
if (isNew) {
States.CheckGrow();
if (isNew && !IsOutOfMemory) {
try {
States.CheckGrow();
} catch (const TMemoryLimitExceededException& e) {
Cerr << "State " << (void *)this << " no longer growing\n";
IsOutOfMemory = true;
}
}
return isNew;
}

bool CheckIsOutOfMemory() const {
return IsOutOfMemory;
}

template<bool SkipYields>
bool ReadMore() {
if constexpr (SkipYields) {
Expand Down Expand Up @@ -331,6 +340,7 @@ class TState : public TComputationValue<TState> {
private:
std::optional<TStorageIterator> ExtractIt;
const ui32 KeyWidth, StateWidth;
bool IsOutOfMemory = false;
ui64 CurrentPosition = 0;
TRow* CurrentPage = nullptr;
TStorage Storage;
Expand Down Expand Up @@ -451,6 +461,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
ETasteResult TasteIt() {
if (GetMode() == EOperatingMode::InMemory) {
bool isNew = InMemoryProcessingState.TasteIt();
if (InMemoryProcessingState.CheckIsOutOfMemory()) {
StateWantsToSpill = true;
}
Throat = InMemoryProcessingState.Throat;
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
Expand Down Expand Up @@ -649,7 +662,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}

bool CheckMemoryAndSwitchToSpilling() {
if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
if (!(AllowSpilling && Ctx.SpillerFactory)) {
return false;
}
if (StateWantsToSpill || IsSwitchToSpillingModeCondition()) {
StateWantsToSpill = false;
LogMemoryUsage();

SwitchMode(EOperatingMode::SplittingState);
Expand Down Expand Up @@ -831,6 +848,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}

bool IsSwitchToSpillingModeCondition() const {
return false;
return !HasMemoryForProcessing() || TlsAllocState->GetMaximumLimitValueReached();
}

Expand All @@ -840,6 +858,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
NUdf::TUnboxedValuePod* Throat = nullptr;

private:
bool StateWantsToSpill = false;
bool IsEverythingExtracted = false;

TState InMemoryProcessingState;
Expand Down

0 comments on commit e5d9b3d

Please sign in to comment.