Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop wide combiner state from growing unlimited #10997

Merged
37 changes: 32 additions & 5 deletions ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ class TState : public TComputationValue<TState> {
return KeyWidth + StateWidth;
}
public:
TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal)
: TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), States(hash, equal, CountRowsOnPage) {
TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false)
: TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) {
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
CurrentPosition = 0;
Tongue = CurrentPage->data();
Expand Down Expand Up @@ -275,11 +275,28 @@ class TState : public TComputationValue<TState> {
}
Throat = States.GetKey(itInsert) + KeyWidth;
if (isNew) {
States.CheckGrow();
GrowStates();
}
return isNew;
}

void GrowStates() {
try {
States.CheckGrow();
} catch (TMemoryLimitExceededException) {
YQL_LOG(INFO) << "State failed to grow";
if (IsOutOfMemory < AllowOutOfMemory) {
IsOutOfMemory = true;
} else {
throw;
}
}
}

bool CheckIsOutOfMemory() const {
return IsOutOfMemory;
}

template<bool SkipYields>
bool ReadMore() {
if constexpr (SkipYields) {
Expand Down Expand Up @@ -331,6 +348,8 @@ class TState : public TComputationValue<TState> {
private:
std::optional<TStorageIterator> ExtractIt;
const ui32 KeyWidth, StateWidth;
const bool AllowOutOfMemory;
bool IsOutOfMemory = false;
ui64 CurrentPosition = 0;
TRow* CurrentPage = nullptr;
TStorage Storage;
Expand Down Expand Up @@ -386,7 +405,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx
)
: TBase(memInfo)
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal)
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal, allowSpilling && ctx.SpillerFactory)
, UsedInputItemType(usedInputItemType)
, KeyAndStateType(keyAndStateType)
, KeyWidth(keyWidth)
Expand Down Expand Up @@ -451,6 +470,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
ETasteResult TasteIt() {
if (GetMode() == EOperatingMode::InMemory) {
bool isNew = InMemoryProcessingState.TasteIt();
if (InMemoryProcessingState.CheckIsOutOfMemory()) {
StateWantsToSpill = true;
}
Throat = InMemoryProcessingState.Throat;
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
Expand Down Expand Up @@ -649,7 +671,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}

bool CheckMemoryAndSwitchToSpilling() {
if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
if (!(AllowSpilling && Ctx.SpillerFactory)) {
return false;
}
if (StateWantsToSpill || IsSwitchToSpillingModeCondition()) {
StateWantsToSpill = false;
LogMemoryUsage();

SwitchMode(EOperatingMode::SplittingState);
Expand Down Expand Up @@ -840,6 +866,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
NUdf::TUnboxedValuePod* Throat = nullptr;

private:
bool StateWantsToSpill = false;
bool IsEverythingExtracted = false;

TState InMemoryProcessingState;
Expand Down
Loading