diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index 631e65762fd3..24d0ab217db3 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -234,6 +234,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"JoinDict", &WrapJoinDict}, {"GraceJoin", &WrapGraceJoin}, {"GraceSelfJoin", &WrapGraceSelfJoin}, + {"GraceJoinWithSpilling", &WrapGraceJoinWithSpilling}, + {"GraceSelfJoinWithSpilling", &WrapGraceSelfJoinWithSpilling}, {"MapJoinCore", &WrapMapJoinCore}, {"CommonJoinCore", &WrapCommonJoinCore}, {"CombineCore", &WrapCombineCore}, diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 16958c1f37e5..b25a1408a8ff 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -569,7 +569,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue& leftKeyColumns, const std::vector& rightKeyColumns, const std::vector& leftRenames, const std::vector& rightRenames, const std::vector& leftColumnsTypes, const std::vector& rightColumnsTypes, const THolderFactory & holderFactory, - const bool isSelfJoin) + const bool isSelfJoin, bool isSpillingAllowed) : TBase(memInfo) , FlowLeft(flowLeft) , FlowRight(flowRight) @@ -588,6 +588,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue>() ) , IsSelfJoin_(isSelfJoin) , SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns)) + , IsSpillingAllowed(isSpillingAllowed) { if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion || IsSelfJoin_) { LeftPacker->BatchSize = std::numeric_limits::max(); @@ -631,9 +632,7 @@ class TGraceJoinSpillingSupportState : public TComputationValueGetUsed(); const auto limit = TlsAllocState->GetLimit(); @@ -946,6 +945,7 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* const std::unique_ptr> JoinedTuple; const bool IsSelfJoin_; const bool SelfJoinSameKeys_; + const bool IsSpillingAllowed; bool IsSpillingFinalized = false; @@ -960,7 +960,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode&& leftKeyColumns, std::vector&& rightKeyColumns, std::vector&& leftRenames, std::vector&& rightRenames, std::vector&& leftColumnsTypes, std::vector&& rightColumnsTypes, - std::vector&& outputRepresentations, bool isSelfJoin) + std::vector&& outputRepresentations, bool isSelfJoin, bool isSpillingAllowed) : TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed) , FlowLeft(flowLeft) , FlowRight(flowRight) @@ -974,6 +974,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode( FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns, LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes, - ctx.HolderFactory, IsSelfJoin_); + ctx.HolderFactory, IsSelfJoin_, IsSpillingAllowed); } IComputationWideFlowNode *const FlowLeft; @@ -1086,28 +1087,38 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode RightColumnsTypes; const std::vector OutputRepresentations; const bool IsSelfJoin_; + const bool IsSpillingAllowed; }; } -IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); +IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isSelfJoin, bool isSpillingAllowed) { + const auto leftFlowNodeIndex = 0; + const auto rightFlowNodeIndex = 1; + const auto joinKindNodeIndex = isSelfJoin ? 1 : 2; + const auto leftKeyColumnsNodeIndex = joinKindNodeIndex + 1; + const auto rightKeyColumnsNodeIndex = leftKeyColumnsNodeIndex + 1; + const auto leftRenamesNodeIndex = rightKeyColumnsNodeIndex + 1; + const auto rightRenamesNodeIndex = leftRenamesNodeIndex + 1; + const auto anyJoinSettingsIndex = rightRenamesNodeIndex + 1; + + const auto leftFlowNode = callable.GetInput(leftFlowNodeIndex); + const auto joinKindNode = callable.GetInput(joinKindNodeIndex); + const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(leftKeyColumnsNodeIndex)); + const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(rightKeyColumnsNodeIndex)); + const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(leftRenamesNodeIndex)); + const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(rightRenamesNodeIndex)); + const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(anyJoinSettingsIndex))->AsValue().Get()); - const auto leftFlowNode = callable.GetInput(0); - const auto rightFlowNode = callable.GetInput(1); const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode)); - const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode)); - const auto joinKindNode = callable.GetInput(2); - const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3)); - const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4)); - const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5)); - const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(6)); const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get(); - - const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(7))->AsValue().Get()); + const auto flowLeft = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 0)); - const auto flowRight = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 1)); + IComputationWideFlowNode* flowRight = nullptr; + if (!isSelfJoin) { + flowRight = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 1)); + } const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); std::vector outputRepresentations; @@ -1118,7 +1129,14 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto std::vector leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; std::vector leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); - std::vector rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end()); + std::vector rightColumnsTypes; + if (isSelfJoin) { + rightColumnsTypes = {leftColumnsTypes}; + } else { + const auto rightFlowNode = callable.GetInput(rightFlowNodeIndex); + const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode)); + rightColumnsTypes = {rightFlowComponents.begin(), rightFlowComponents.end()}; + } leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { @@ -1135,6 +1153,10 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get()); } + if (isSelfJoin) { + MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal"); + } + rightRenames.reserve(rightRenamesNode->GetValuesCount()); for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get()); @@ -1143,68 +1165,31 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto return new TGraceJoinWrapper( ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false); + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), isSelfJoin, isSpillingAllowed); +} +IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); + + return WrapGraceJoinCommon(callable, ctx, false, false); } IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args"); + + return WrapGraceJoinCommon(callable, ctx, true, false); +} - const auto leftFlowNode = callable.GetInput(0); - const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode)); - const auto joinKindNode = callable.GetInput(1); - const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(2)); - const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3)); - const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(4)); - const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5)); - const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get(); - - const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(6))->AsValue().Get()); - - const auto flowLeft = dynamic_cast (LocateNode(ctx.NodeLocator, callable, 0)); - - const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); - std::vector outputRepresentations; - outputRepresentations.reserve(outputFlowComponents.size()); - for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) { - outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i])); - } - - std::vector leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; - std::vector leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); - std::vector rightColumnsTypes{leftColumnsTypes}; - - leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); - for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { - leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get()); - } - - leftRenames.reserve(leftRenamesNode->GetValuesCount()); - for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) { - leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get()); - } - - - rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount()); - for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) { - rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get()); - } - - MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal"); - -// MKQL_ENSURE(leftKeyColumns == rightKeyColumns, "Key columns for self join should be equal"); - - rightRenames.reserve(rightRenamesNode->GetValuesCount()); - for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { - rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get()); - } - - return new TGraceJoinWrapper( - ctx.Mutables, flowLeft, nullptr, GetJoinKind(rawJoinKind), anyJoinSettings, - std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true); +IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); + return WrapGraceJoinCommon(callable, ctx, false, true); +} +IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args"); + + return WrapGraceJoinCommon(callable, ctx, true, true); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h index 5672cca5b54b..7560d3a1334d 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h @@ -6,6 +6,8 @@ namespace NMiniKQL { IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx); } } diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index c672b8ba358d..2272ae640b3e 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -2104,12 +2104,14 @@ TRuntimeNode TProgramBuilder::JoinDict(TRuntimeNode dict1, bool isMulti1, TRunti return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, +TRuntimeNode TProgramBuilder::GraceJoinCommon(const TStringBuf& funcName, TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) { MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); - MKQL_ENSURE(!rightKeyColumns.empty(), "At least one key column must be specified"); + if (flowRight) { + MKQL_ENSURE(!rightKeyColumns.empty(), "At least one key column must be specified"); + } TRuntimeNode::TList leftKeyColumnsNodes, rightKeyColumnsNodes, leftRenamesNodes, rightRenamesNodes; @@ -2126,9 +2128,11 @@ TRuntimeNode TProgramBuilder::GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flow std::transform(rightRenames.cbegin(), rightRenames.cend(), std::back_inserter(rightRenamesNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); - TCallableBuilder callableBuilder(Env, __func__, returnType); + TCallableBuilder callableBuilder(Env, funcName, returnType); callableBuilder.Add(flowLeft); - callableBuilder.Add(flowRight); + if (flowRight) { + callableBuilder.Add(flowRight); + } callableBuilder.Add(NewDataLiteral((ui32)joinKind)); callableBuilder.Add(NewTuple(leftKeyColumnsNodes)); callableBuilder.Add(NewTuple(rightKeyColumnsNodes)); @@ -2136,9 +2140,14 @@ TRuntimeNode TProgramBuilder::GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flow callableBuilder.Add(NewTuple(rightRenamesNodes)); callableBuilder.Add(NewDataLiteral((ui32)anyJoinSettings)); - return TRuntimeNode(callableBuilder.Build(), false); +} +TRuntimeNode TProgramBuilder::GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) { + + return GraceJoinCommon(__func__, flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); } TRuntimeNode TProgramBuilder::GraceSelfJoin(TRuntimeNode flowLeft, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, @@ -2148,38 +2157,31 @@ TRuntimeNode TProgramBuilder::GraceSelfJoin(TRuntimeNode flowLeft, EJoinKind jo THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); - - - TRuntimeNode::TList leftKeyColumnsNodes, rightKeyColumnsNodes, leftRenamesNodes, rightRenamesNodes; - - leftKeyColumnsNodes.reserve(leftKeyColumns.size()); - std::transform(leftKeyColumns.cbegin(), leftKeyColumns.cend(), std::back_inserter(leftKeyColumnsNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); - - rightKeyColumnsNodes.reserve(rightKeyColumns.size()); - std::transform(rightKeyColumns.cbegin(), rightKeyColumns.cend(), std::back_inserter(rightKeyColumnsNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); - - leftRenamesNodes.reserve(leftRenames.size()); - std::transform(leftRenames.cbegin(), leftRenames.cend(), std::back_inserter(leftRenamesNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); + return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); +} - rightRenamesNodes.reserve(rightRenames.size()); - std::transform(rightRenames.cbegin(), rightRenames.cend(), std::back_inserter(rightRenamesNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); +TRuntimeNode TProgramBuilder::GraceJoinWithSpilling(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) { + if constexpr (RuntimeVersion < 50U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } - TCallableBuilder callableBuilder(Env, __func__, returnType); - callableBuilder.Add(flowLeft); - callableBuilder.Add(NewDataLiteral((ui32)joinKind)); - callableBuilder.Add(NewTuple(leftKeyColumnsNodes)); - callableBuilder.Add(NewTuple(rightKeyColumnsNodes)); - callableBuilder.Add(NewTuple(leftRenamesNodes)); - callableBuilder.Add(NewTuple(rightRenamesNodes)); - callableBuilder.Add(NewDataLiteral((ui32)anyJoinSettings)); + return GraceJoinCommon(__func__, flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); +} - return TRuntimeNode(callableBuilder.Build(), false); +TRuntimeNode TProgramBuilder::GraceSelfJoinWithSpilling(TRuntimeNode flowLeft, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) { + if constexpr (RuntimeVersion < 50U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + return GraceJoinCommon(__func__, flowLeft, {}, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); } - TRuntimeNode TProgramBuilder::ToSortedDict(TRuntimeNode list, bool all, const TUnaryLambda& keySelector, const TUnaryLambda& payloadSelector, bool isCompact, ui64 itemsCountHint) { return ToDict(list, all, keySelector, payloadSelector, __func__, isCompact, itemsCountHint); diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 5a78d8bdb003..97daf163c586 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -440,11 +440,19 @@ class TProgramBuilder : public TTypeBuilder { ui64 memLimit, std::optional sortedTableOrder, EAnyJoinSettings anyJoinSettings, const ui32 tableIndexField, TType* returnType); + TRuntimeNode GraceJoinCommon(const TStringBuf& funcName, TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); TRuntimeNode GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); TRuntimeNode GraceSelfJoin(TRuntimeNode flowLeft, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); + TRuntimeNode GraceJoinWithSpilling(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, + const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); + TRuntimeNode GraceSelfJoinWithSpilling(TRuntimeNode flowLeft, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, + const TArrayRef& leftRenames, const TArrayRef& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); TRuntimeNode CombineCore(TRuntimeNode stream, const TUnaryLambda& keyExtractor, const TBinaryLambda& init, diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index d16f0184db2c..22072157e87f 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 49U +#define MKQL_RUNTIME_VERSION 50U #endif // History: diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index cd2e1cae7836..f72f6f32e1bb 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -1703,6 +1703,14 @@ TMkqlCommonCallableCompiler::TShared::TShared() { }); const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + + // TODO: use PRAGMA + bool IsSpillingAllowed = false; + if (RuntimeVersion >= 50U && IsSpillingAllowed) { + return selfJoin + ? ctx.ProgramBuilder.GraceSelfJoinWithSpilling(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings) + : ctx.ProgramBuilder.GraceJoinWithSpilling(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings); + } return selfJoin ? ctx.ProgramBuilder.GraceSelfJoin(flowLeft, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings) : ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType, anyJoinSettings);