From 7cf0b0320f7363106c01381efca1f5696cb6865f Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 26 Jun 2024 16:22:17 +0300 Subject: [PATCH] update --- .../minikql/comp_nodes/mkql_grace_join.cpp | 58 ++++++++++++++++++- .../common/mkql/yql_provider_mkql.cpp | 8 ++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index c843aff68129..1e2c8faa9aae 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -1093,6 +1093,60 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNodeAsValue().Get(); + + const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(7))->AsValue().Get()); + + const auto flowLeft = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 0)); + const auto flowRight = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 1)); + + const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); + std::vector outputRepresentations; + outputRepresentations.reserve(outputFlowComponents.size()); + for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) { + outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i])); + } + + std::vector leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; + std::vector leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); + std::vector rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end()); + + leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); + for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { + leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get()); + } + + leftRenames.reserve(leftRenamesNode->GetValuesCount()); + for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) { + leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get()); + } + + rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount()); + for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) { + rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get()); + } + + rightRenames.reserve(rightRenamesNode->GetValuesCount()); + for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { + rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get()); + } + + return new TGraceJoinWrapper( + ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, + std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false, false); +} + IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); @@ -1146,7 +1200,7 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto return new TGraceJoinWrapper( ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false); + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false, false); } @@ -1205,7 +1259,7 @@ IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeF return new TGraceJoinWrapper( ctx.Mutables, flowLeft, nullptr, GetJoinKind(rawJoinKind), anyJoinSettings, std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true); + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true, false); } diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index cd2e1cae7836..0bdf5a7707e8 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -1655,7 +1655,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType); }); - AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) { + AddCallable({"GraceJoinCore", "GraceSelfJoinCore", "GraceJoinCoreWithSpilling", "GraceSelfJoinCoreWithSpilling"}, [](const TExprNode& node, TMkqlBuildContext& ctx) { bool selfJoin = node.Content() == "GraceSelfJoinCore"; int shift = selfJoin ? 0 : 1; const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx); @@ -1703,6 +1703,12 @@ TMkqlCommonCallableCompiler::TShared::TShared() { }); const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + + if (IsSpillingAllowed) { + return selfJoin + ? ctx.ProgramBuilder.GraceSelfJoinWithSpilling(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings) + : ctx.ProgramBuilder.GraceJoinWithSpilling(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); + } return selfJoin ? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings) : ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);