From e7f81967dc1cc2eec35391fadc47c824a9b57bc0 Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 26 Jun 2024 15:09:10 +0300 Subject: [PATCH 1/7] Added runtime nodes for gracejoin with spilling --- .../yql/minikql/comp_nodes/mkql_factory.cpp | 2 + .../minikql/comp_nodes/mkql_grace_join.cpp | 17 +++-- .../yql/minikql/comp_nodes/mkql_grace_join.h | 2 + .../yql/minikql/mkql_program_builder.cpp | 64 ++++++++++--------- .../yql/minikql/mkql_program_builder.h | 8 +++ .../yql/minikql/mkql_runtime_version.h | 2 +- 6 files changed, 56 insertions(+), 39 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index 631e65762fd3..24d0ab217db3 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -234,6 +234,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"JoinDict", &WrapJoinDict}, {"GraceJoin", &WrapGraceJoin}, {"GraceSelfJoin", &WrapGraceSelfJoin}, + {"GraceJoinWithSpilling", &WrapGraceJoinWithSpilling}, + {"GraceSelfJoinWithSpilling", &WrapGraceSelfJoinWithSpilling}, {"MapJoinCore", &WrapMapJoinCore}, {"CommonJoinCore", &WrapCommonJoinCore}, {"CombineCore", &WrapCombineCore}, diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 16958c1f37e5..c843aff68129 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -569,7 +569,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue& leftKeyColumns, const std::vector& rightKeyColumns, const std::vector& leftRenames, const std::vector& rightRenames, const std::vector& leftColumnsTypes, const std::vector& rightColumnsTypes, const THolderFactory & holderFactory, - const bool isSelfJoin) + const bool isSelfJoin, bool isSpillingAllowed) : TBase(memInfo) , FlowLeft(flowLeft) , FlowRight(flowRight) @@ -588,6 +588,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue>() ) , IsSelfJoin_(isSelfJoin) , SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns)) + , IsSpillingAllowed(isSpillingAllowed) { if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion || IsSelfJoin_) { LeftPacker->BatchSize = std::numeric_limits::max(); @@ -631,9 +632,7 @@ class TGraceJoinSpillingSupportState : public TComputationValueGetUsed(); const auto limit = TlsAllocState->GetLimit(); @@ -950,6 +949,8 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* bool IsSpillingFinalized = false; ui32 NextBucketToJoin = 0; + + bool IsSpillingAllowed = false; }; class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode { @@ -960,7 +961,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode&& leftKeyColumns, std::vector&& rightKeyColumns, std::vector&& leftRenames, std::vector&& rightRenames, std::vector&& leftColumnsTypes, std::vector&& rightColumnsTypes, - std::vector&& outputRepresentations, bool isSelfJoin) + std::vector&& outputRepresentations, bool isSelfJoin, bool isSpillingAllowed) : TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed) , FlowLeft(flowLeft) , FlowRight(flowRight) @@ -974,6 +975,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode( FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns, LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes, - ctx.HolderFactory, IsSelfJoin_); + ctx.HolderFactory, IsSelfJoin_, IsSpillingAllowed); } IComputationWideFlowNode *const FlowLeft; @@ -1086,6 +1088,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode RightColumnsTypes; const std::vector OutputRepresentations; const bool IsSelfJoin_; + const bool IsSpillingAllowed; }; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h index 5672cca5b54b..7560d3a1334d 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h @@ -6,6 +6,8 @@ namespace NMiniKQL { IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx); } } diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index c672b8ba358d..c9a181ec9fae 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -2104,14 +2104,16 @@ TRuntimeNode TProgramBuilder::JoinDict(TRuntimeNode dict1, bool isMulti1, TRunti return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, +TRuntimeNode TProgramBuilder::GraceJoinCommon(const TStringBuf& funcName, TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) { MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); - MKQL_ENSURE(!rightKeyColumns.empty(), "At least one key column must be specified"); + if (flowRight) { + MKQL_ENSURE(!rightKeyColumns.empty(), "At least one key column must be specified"); + } - TRuntimeNode::TList leftKeyColumnsNodes, rightKeyColumnsNodes, leftRenamesNodes, rightRenamesNodes; + TRuntimeNode::TList leftKeyColumnsNodes, rightKeyColumnsNodes, leftRenamesNodes, rightRenamesNodes; leftKeyColumnsNodes.reserve(leftKeyColumns.size()); std::transform(leftKeyColumns.cbegin(), leftKeyColumns.cend(), std::back_inserter(leftKeyColumnsNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); @@ -2126,9 +2128,11 @@ TRuntimeNode TProgramBuilder::GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flow std::transform(rightRenames.cbegin(), rightRenames.cend(), std::back_inserter(rightRenamesNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); - TCallableBuilder callableBuilder(Env, __func__, returnType); + TCallableBuilder callableBuilder(Env, funcName, returnType); callableBuilder.Add(flowLeft); - callableBuilder.Add(flowRight); + if (flowRight) { + callableBuilder.Add(flowRight); + } callableBuilder.Add(NewDataLiteral((ui32)joinKind)); callableBuilder.Add(NewTuple(leftKeyColumnsNodes)); callableBuilder.Add(NewTuple(rightKeyColumnsNodes)); @@ -2136,9 +2140,14 @@ TRuntimeNode TProgramBuilder::GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flow callableBuilder.Add(NewTuple(rightRenamesNodes)); callableBuilder.Add(NewDataLiteral((ui32)anyJoinSettings)); - return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) { + return GraceJoinCommon(__func__, flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); } TRuntimeNode TProgramBuilder::GraceSelfJoin(TRuntimeNode flowLeft, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, @@ -2148,38 +2157,31 @@ TRuntimeNode TProgramBuilder::GraceSelfJoin(TRuntimeNode flowLeft, EJoinKind jo THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); - - - TRuntimeNode::TList leftKeyColumnsNodes, rightKeyColumnsNodes, leftRenamesNodes, rightRenamesNodes; - - leftKeyColumnsNodes.reserve(leftKeyColumns.size()); - std::transform(leftKeyColumns.cbegin(), leftKeyColumns.cend(), std::back_inserter(leftKeyColumnsNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); - - rightKeyColumnsNodes.reserve(rightKeyColumns.size()); - std::transform(rightKeyColumns.cbegin(), rightKeyColumns.cend(), std::back_inserter(rightKeyColumnsNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); - - leftRenamesNodes.reserve(leftRenames.size()); - std::transform(leftRenames.cbegin(), leftRenames.cend(), std::back_inserter(leftRenamesNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); + return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); +} - rightRenamesNodes.reserve(rightRenames.size()); - std::transform(rightRenames.cbegin(), rightRenames.cend(), std::back_inserter(rightRenamesNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); +TRuntimeNode TProgramBuilder::GraceJoinWithSpilling(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) { + if constexpr (RuntimeVersion < 50U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } - TCallableBuilder callableBuilder(Env, __func__, returnType); - callableBuilder.Add(flowLeft); - callableBuilder.Add(NewDataLiteral((ui32)joinKind)); - callableBuilder.Add(NewTuple(leftKeyColumnsNodes)); - callableBuilder.Add(NewTuple(rightKeyColumnsNodes)); - callableBuilder.Add(NewTuple(leftRenamesNodes)); - callableBuilder.Add(NewTuple(rightRenamesNodes)); - callableBuilder.Add(NewDataLiteral((ui32)anyJoinSettings)); + return GraceJoinCommon(__func__, flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); +} - return TRuntimeNode(callableBuilder.Build(), false); +TRuntimeNode TProgramBuilder::GraceSelfJoinWithSpilling(TRuntimeNode flowLeft, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) { + if constexpr (RuntimeVersion < 50U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); } - TRuntimeNode TProgramBuilder::ToSortedDict(TRuntimeNode list, bool all, const TUnaryLambda& keySelector, const TUnaryLambda& payloadSelector, bool isCompact, ui64 itemsCountHint) { return ToDict(list, all, keySelector, payloadSelector, __func__, isCompact, itemsCountHint); diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 5a78d8bdb003..97daf163c586 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -440,11 +440,19 @@ class TProgramBuilder : public TTypeBuilder { ui64 memLimit, std::optional sortedTableOrder, EAnyJoinSettings anyJoinSettings, const ui32 tableIndexField, TType* returnType); + TRuntimeNode GraceJoinCommon(const TStringBuf& funcName, TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); TRuntimeNode GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); TRuntimeNode GraceSelfJoin(TRuntimeNode flowLeft, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); + TRuntimeNode GraceJoinWithSpilling(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); + TRuntimeNode GraceSelfJoinWithSpilling(TRuntimeNode flowLeft, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); TRuntimeNode CombineCore(TRuntimeNode stream, const TUnaryLambda& keyExtractor, const TBinaryLambda& init, diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index d16f0184db2c..22072157e87f 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 49U +#define MKQL_RUNTIME_VERSION 50U #endif // History: From 02a5067f58940cf452411b333ebe46865792f92b Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 26 Jun 2024 16:22:17 +0300 Subject: [PATCH 2/7] update --- .../minikql/comp_nodes/mkql_grace_join.cpp | 58 ++++++++++++++++++- .../common/mkql/yql_provider_mkql.cpp | 8 ++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index c843aff68129..1e2c8faa9aae 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -1093,6 +1093,60 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNodeAsValue().Get(); + + const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(7))->AsValue().Get()); + + const auto flowLeft = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 0)); + const auto flowRight = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 1)); + + const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); + std::vector outputRepresentations; + outputRepresentations.reserve(outputFlowComponents.size()); + for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) { + outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i])); + } + + std::vector leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; + std::vector leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); + std::vector rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end()); + + leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); + for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { + leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get()); + } + + leftRenames.reserve(leftRenamesNode->GetValuesCount()); + for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) { + leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get()); + } + + rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount()); + for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) { + rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get()); + } + + rightRenames.reserve(rightRenamesNode->GetValuesCount()); + for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { + rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get()); + } + + return new TGraceJoinWrapper( + ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, + std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false, false); +} + IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); @@ -1146,7 +1200,7 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto return new TGraceJoinWrapper( ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false); + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false, false); } @@ -1205,7 +1259,7 @@ IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeF return new TGraceJoinWrapper( ctx.Mutables, flowLeft, nullptr, GetJoinKind(rawJoinKind), anyJoinSettings, std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true); + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true, false); } diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index cd2e1cae7836..0bdf5a7707e8 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -1655,7 +1655,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType); }); - AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) { + AddCallable({"GraceJoinCore", "GraceSelfJoinCore", "GraceJoinCoreWithSpilling", "GraceSelfJoinCoreWithSpilling"}, [](const TExprNode& node, TMkqlBuildContext& ctx) { bool selfJoin = node.Content() == "GraceSelfJoinCore"; int shift = selfJoin ? 0 : 1; const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx); @@ -1703,6 +1703,12 @@ TMkqlCommonCallableCompiler::TShared::TShared() { }); const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + + if (IsSpillingAllowed) { + return selfJoin + ? ctx.ProgramBuilder.GraceSelfJoinWithSpilling(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings) + : ctx.ProgramBuilder.GraceJoinWithSpilling(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); + } return selfJoin ? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings) : ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); From 164d20aa9145408e94cf50d0e1ac371460a5335c Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 26 Jun 2024 17:19:56 +0300 Subject: [PATCH 3/7] finished --- .../minikql/comp_nodes/mkql_grace_join.cpp | 167 +++++------------- .../common/mkql/yql_provider_mkql.cpp | 2 + 2 files changed, 50 insertions(+), 119 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 1e2c8faa9aae..70746c4d2adb 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -1093,22 +1093,33 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNodeAsValue().Get()); + const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode)); - const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode)); - const auto joinKindNode = callable.GetInput(2); - const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3)); - const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4)); - const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5)); - const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(6)); const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get(); - - const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(7))->AsValue().Get()); + const auto flowLeft = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 0)); - const auto flowRight = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 1)); + IComputationWideFlowNode* flowRight = nullptr; + if (isSelfJoin) { + flowRight = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 1)); + } const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); std::vector outputRepresentations; @@ -1119,7 +1130,14 @@ IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNod std::vector leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; std::vector leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); - std::vector rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end()); + std::vector rightColumnsTypes; + if (isSelfJoin) { + rightColumnsTypes = {leftColumnsTypes}; + } else { + const auto rightFlowNode = callable.GetInput(rightFlowNodeIndex); + const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode)); + rightColumnsTypes = {rightFlowComponents.begin(), rightFlowComponents.end()}; + } leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { @@ -1136,6 +1154,10 @@ IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNod rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get()); } + if (isSelfJoin) { + MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal"); + } + rightRenames.reserve(rightRenamesNode->GetValuesCount()); for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get()); @@ -1144,124 +1166,31 @@ IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNod return new TGraceJoinWrapper( ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false, false); + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), isSelfJoin, isSpillingAllowed); } IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); - const auto leftFlowNode = callable.GetInput(0); - const auto rightFlowNode = callable.GetInput(1); - const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode)); - const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode)); - const auto joinKindNode = callable.GetInput(2); - const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3)); - const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4)); - const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5)); - const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(6)); - const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get(); - - const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(7))->AsValue().Get()); - - const auto flowLeft = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 0)); - const auto flowRight = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 1)); - - const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); - std::vector outputRepresentations; - outputRepresentations.reserve(outputFlowComponents.size()); - for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) { - outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i])); - } - - std::vector leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; - std::vector leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); - std::vector rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end()); - - leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); - for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { - leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get()); - } - - leftRenames.reserve(leftRenamesNode->GetValuesCount()); - for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) { - leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get()); - } - - rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount()); - for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) { - rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get()); - } - - rightRenames.reserve(rightRenamesNode->GetValuesCount()); - for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { - rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get()); - } - - return new TGraceJoinWrapper( - ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, - std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false, false); - + return WrapGraceJoinCommon(callable, ctx, false, false); } IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args"); + + return WrapGraceJoinCommon(callable, ctx, true, false); +} - const auto leftFlowNode = callable.GetInput(0); - const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode)); - const auto joinKindNode = callable.GetInput(1); - const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(2)); - const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3)); - const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(4)); - const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5)); - const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get(); - - const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(6))->AsValue().Get()); - - const auto flowLeft = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 0)); - - const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); - std::vector outputRepresentations; - outputRepresentations.reserve(outputFlowComponents.size()); - for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) { - outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i])); - } - - std::vector leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; - std::vector leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); - std::vector rightColumnsTypes{leftColumnsTypes}; - - leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); - for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { - leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get()); - } - - leftRenames.reserve(leftRenamesNode->GetValuesCount()); - for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) { - leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get()); - } - - - rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount()); - for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) { - rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get()); - } - - MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal"); - -// MKQL_ENSURE(leftKeyColumns == rightKeyColumns, "Key columns for self join should be equal"); - - rightRenames.reserve(rightRenamesNode->GetValuesCount()); - for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { - rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get()); - } - - return new TGraceJoinWrapper( - ctx.Mutables, flowLeft, nullptr, GetJoinKind(rawJoinKind), anyJoinSettings, - std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true, false); +IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); + return WrapGraceJoinCommon(callable, ctx, false, true); +} +IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args"); + + return WrapGraceJoinCommon(callable, ctx, true, true); } diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 0bdf5a7707e8..7db89d4e4c28 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -1704,6 +1704,8 @@ TMkqlCommonCallableCompiler::TShared::TShared() { const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + // TODO: use PRAGMA + bool IsSpillingAllowed = false; if (IsSpillingAllowed) { return selfJoin ? ctx.ProgramBuilder.GraceSelfJoinWithSpilling(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings) From ea447cd92b736a9dcdc9efa50f06f3e86141d244 Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 26 Jun 2024 17:24:41 +0300 Subject: [PATCH 4/7] fixup --- ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 70746c4d2adb..9f5316e97f72 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -945,12 +945,11 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* const std::unique_ptr> JoinedTuple; const bool IsSelfJoin_; const bool SelfJoinSameKeys_; + const bool IsSpillingAllowed; bool IsSpillingFinalized = false; ui32 NextBucketToJoin = 0; - - bool IsSpillingAllowed = false; }; class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode { From 61dda39db329ef82015402cea4f77074ef86ef9f Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 26 Jun 2024 19:10:14 +0300 Subject: [PATCH 5/7] added runtime check --- ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 7db89d4e4c28..f72f6f32e1bb 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -1655,7 +1655,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType); }); - AddCallable({"GraceJoinCore", "GraceSelfJoinCore", "GraceJoinCoreWithSpilling", "GraceSelfJoinCoreWithSpilling"}, [](const TExprNode& node, TMkqlBuildContext& ctx) { + AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) { bool selfJoin = node.Content() == "GraceSelfJoinCore"; int shift = selfJoin ? 0 : 1; const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx); @@ -1706,7 +1706,7 @@ TMkqlCommonCallableCompiler::TShared::TShared() { // TODO: use PRAGMA bool IsSpillingAllowed = false; - if (IsSpillingAllowed) { + if (RuntimeVersion >= 50U && IsSpillingAllowed) { return selfJoin ? ctx.ProgramBuilder.GraceSelfJoinWithSpilling(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings) : ctx.ProgramBuilder.GraceJoinWithSpilling(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); From c66149dae15ced1df70333e383ad93c211920e6b Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 26 Jun 2024 20:33:27 +0300 Subject: [PATCH 6/7] fixup --- ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 9f5316e97f72..b25a1408a8ff 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -1116,7 +1116,7 @@ IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNod const auto flowLeft = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 0)); IComputationWideFlowNode* flowRight = nullptr; - if (isSelfJoin) { + if (!isSelfJoin) { flowRight = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 1)); } From b3e56e8101c14f319a7e98f7ddf8241e0393a968 Mon Sep 17 00:00:00 2001 From: Filitov Mikhail Date: Wed, 26 Jun 2024 23:23:15 +0200 Subject: [PATCH 7/7] Update mkql_program_builder.cpp --- ydb/library/yql/minikql/mkql_program_builder.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index c9a181ec9fae..2272ae640b3e 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -2113,7 +2113,7 @@ TRuntimeNode TProgramBuilder::GraceJoinCommon(const TStringBuf& funcName, TRunti MKQL_ENSURE(!rightKeyColumns.empty(), "At least one key column must be specified"); } - TRuntimeNode::TList leftKeyColumnsNodes, rightKeyColumnsNodes, leftRenamesNodes, rightRenamesNodes; + TRuntimeNode::TList leftKeyColumnsNodes, rightKeyColumnsNodes, leftRenamesNodes, rightRenamesNodes; leftKeyColumnsNodes.reserve(leftKeyColumns.size()); std::transform(leftKeyColumns.cbegin(), leftKeyColumns.cend(), std::back_inserter(leftKeyColumnsNodes), [this](const ui32 idx) { return NewDataLiteral(idx); });