Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lll-phill-lll committed Jun 26, 2024
1 parent 65d3464 commit 7cf0b03
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
58 changes: 56 additions & 2 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,60 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr

}

IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool IsSelfJoin, bool isSpillingAllowed) {
const auto leftFlowNode = callable.GetInput(0);
const auto rightFlowNode = callable.GetInput(1);
const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode));
const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode));
const auto joinKindNode = callable.GetInput(2);
const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3));
const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4));
const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5));
const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(6));
const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();

const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(7))->AsValue().Get<ui32>());

const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0));
const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1));

const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
std::vector<EValueRepresentation> outputRepresentations;
outputRepresentations.reserve(outputFlowComponents.size());
for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) {
outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i]));
}

std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames;
std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end());
std::vector<TType*> rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end());

leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) {
leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
}

leftRenames.reserve(leftRenamesNode->GetValuesCount());
for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) {
leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get<ui32>());
}

rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) {
rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
}

rightRenames.reserve(rightRenamesNode->GetValuesCount());
for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) {
rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>());
}

return new TGraceJoinWrapper(
ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings,
std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false, false);
}

IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");

Expand Down Expand Up @@ -1146,7 +1200,7 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
return new TGraceJoinWrapper(
ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings,
std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false);
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false, false);

}

Expand Down Expand Up @@ -1205,7 +1259,7 @@ IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeF
return new TGraceJoinWrapper(
ctx.Mutables, flowLeft, nullptr, GetJoinKind(rawJoinKind), anyJoinSettings,
std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true);
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true, false);


}
Expand Down
8 changes: 7 additions & 1 deletion ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType);
});

AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
AddCallable({"GraceJoinCore", "GraceSelfJoinCore", "GraceJoinCoreWithSpilling", "GraceSelfJoinCoreWithSpilling"}, [](const TExprNode& node, TMkqlBuildContext& ctx) {
bool selfJoin = node.Content() == "GraceSelfJoinCore";
int shift = selfJoin ? 0 : 1;
const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx);
Expand Down Expand Up @@ -1703,6 +1703,12 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
});

const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);

if (IsSpillingAllowed) {
return selfJoin
? ctx.ProgramBuilder.GraceSelfJoinWithSpilling(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
: ctx.ProgramBuilder.GraceJoinWithSpilling(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
}
return selfJoin
? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings)
: ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);
Expand Down

0 comments on commit 7cf0b03

Please sign in to comment.