Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added runtime nodes for gracejoin with spilling #5972

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"JoinDict", &WrapJoinDict},
{"GraceJoin", &WrapGraceJoin},
{"GraceSelfJoin", &WrapGraceSelfJoin},
{"GraceJoinWithSpilling", &WrapGraceJoinWithSpilling},
{"GraceSelfJoinWithSpilling", &WrapGraceSelfJoinWithSpilling},
{"MapJoinCore", &WrapMapJoinCore},
{"CommonJoinCore", &WrapCommonJoinCore},
{"CombineCore", &WrapCombineCore},
Expand Down
135 changes: 60 additions & 75 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames,
const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory,
const bool isSelfJoin)
const bool isSelfJoin, bool isSpillingAllowed)
: TBase(memInfo)
, FlowLeft(flowLeft)
, FlowRight(flowRight)
Expand All @@ -588,6 +588,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
, JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() )
, IsSelfJoin_(isSelfJoin)
, SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns))
, IsSpillingAllowed(isSpillingAllowed)
{
if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion || IsSelfJoin_) {
LeftPacker->BatchSize = std::numeric_limits<ui64>::max();
Expand Down Expand Up @@ -631,9 +632,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
}

bool IsSwitchToSpillingModeCondition() const {
return false;
// TODO: YQL-18033
// return !HasMemoryForProcessing();
return !HasMemoryForProcessing();
}

void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
Expand Down Expand Up @@ -764,7 +763,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
}

bool isYield = FetchAndPackData(ctx);
if (ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
const auto used = TlsAllocState->GetUsed();
const auto limit = TlsAllocState->GetLimit();

Expand Down Expand Up @@ -946,6 +945,7 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple;
const bool IsSelfJoin_;
const bool SelfJoinSameKeys_;
const bool IsSpillingAllowed;

bool IsSpillingFinalized = false;

Expand All @@ -960,7 +960,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns,
std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames,
std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes,
std::vector<EValueRepresentation>&& outputRepresentations, bool isSelfJoin)
std::vector<EValueRepresentation>&& outputRepresentations, bool isSelfJoin, bool isSpillingAllowed)
: TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed)
, FlowLeft(flowLeft)
, FlowRight(flowRight)
Expand All @@ -974,6 +974,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
, RightColumnsTypes(std::move(rightColumnsTypes))
, OutputRepresentations(std::move(outputRepresentations))
, IsSelfJoin_(isSelfJoin)
, IsSpillingAllowed(isSpillingAllowed)
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
Expand Down Expand Up @@ -1071,7 +1072,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
state = ctx.HolderFactory.Create<TGraceJoinSpillingSupportState>(
FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns,
LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes,
ctx.HolderFactory, IsSelfJoin_);
ctx.HolderFactory, IsSelfJoin_, IsSpillingAllowed);
}

IComputationWideFlowNode *const FlowLeft;
Expand All @@ -1086,28 +1087,38 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
const std::vector<TType *> RightColumnsTypes;
const std::vector<EValueRepresentation> OutputRepresentations;
const bool IsSelfJoin_;
const bool IsSpillingAllowed;
};

}

IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isSelfJoin, bool isSpillingAllowed) {
const auto leftFlowNodeIndex = 0;
const auto rightFlowNodeIndex = 1;
const auto joinKindNodeIndex = isSelfJoin ? 1 : 2;
const auto leftKeyColumnsNodeIndex = joinKindNodeIndex + 1;
const auto rightKeyColumnsNodeIndex = leftKeyColumnsNodeIndex + 1;
const auto leftRenamesNodeIndex = rightKeyColumnsNodeIndex + 1;
const auto rightRenamesNodeIndex = leftRenamesNodeIndex + 1;
const auto anyJoinSettingsIndex = rightRenamesNodeIndex + 1;

const auto leftFlowNode = callable.GetInput(leftFlowNodeIndex);
const auto joinKindNode = callable.GetInput(joinKindNodeIndex);
const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(leftKeyColumnsNodeIndex));
const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(rightKeyColumnsNodeIndex));
const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(leftRenamesNodeIndex));
const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(rightRenamesNodeIndex));
const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(anyJoinSettingsIndex))->AsValue().Get<ui32>());

const auto leftFlowNode = callable.GetInput(0);
const auto rightFlowNode = callable.GetInput(1);
const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode));
const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode));
const auto joinKindNode = callable.GetInput(2);
const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3));
const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4));
const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5));
const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(6));
const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();

const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(7))->AsValue().Get<ui32>());


const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0));
const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1));
IComputationWideFlowNode* flowRight = nullptr;
if (!isSelfJoin) {
flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1));
}

const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
std::vector<EValueRepresentation> outputRepresentations;
Expand All @@ -1118,7 +1129,14 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto

std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames;
std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end());
std::vector<TType*> rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end());
std::vector<TType*> rightColumnsTypes;
if (isSelfJoin) {
rightColumnsTypes = {leftColumnsTypes};
} else {
const auto rightFlowNode = callable.GetInput(rightFlowNodeIndex);
const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode));
rightColumnsTypes = {rightFlowComponents.begin(), rightFlowComponents.end()};
}

leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) {
Expand All @@ -1135,6 +1153,10 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
}

if (isSelfJoin) {
MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal");
}

rightRenames.reserve(rightRenamesNode->GetValuesCount());
for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) {
rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>());
Expand All @@ -1143,68 +1165,31 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
return new TGraceJoinWrapper(
ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings,
std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false);
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), isSelfJoin, isSpillingAllowed);
}

IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");

return WrapGraceJoinCommon(callable, ctx, false, false);
}

IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");

return WrapGraceJoinCommon(callable, ctx, true, false);
}

const auto leftFlowNode = callable.GetInput(0);
const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode));
const auto joinKindNode = callable.GetInput(1);
const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(2));
const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3));
const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(4));
const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5));
const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();

const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(6))->AsValue().Get<ui32>());

const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0));

const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
std::vector<EValueRepresentation> outputRepresentations;
outputRepresentations.reserve(outputFlowComponents.size());
for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) {
outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i]));
}

std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames;
std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end());
std::vector<TType*> rightColumnsTypes{leftColumnsTypes};

leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) {
leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
}

leftRenames.reserve(leftRenamesNode->GetValuesCount());
for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) {
leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get<ui32>());
}


rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) {
rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
}

MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal");

// MKQL_ENSURE(leftKeyColumns == rightKeyColumns, "Key columns for self join should be equal");

rightRenames.reserve(rightRenamesNode->GetValuesCount());
for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) {
rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>());
}

return new TGraceJoinWrapper(
ctx.Mutables, flowLeft, nullptr, GetJoinKind(rawJoinKind), anyJoinSettings,
std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true);
IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");

return WrapGraceJoinCommon(callable, ctx, false, true);
}

IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");

return WrapGraceJoinCommon(callable, ctx, true, true);
}


Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace NMiniKQL {

IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx);

}
}
Loading
Loading