Skip to content

Commit

Permalink
Merge a5685dd into ed1e090
Browse files Browse the repository at this point in the history
  • Loading branch information
lll-phill-lll authored Jun 26, 2024
2 parents ed1e090 + a5685dd commit 4ba8071
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 108 deletions.
2 changes: 2 additions & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"JoinDict", &WrapJoinDict},
{"GraceJoin", &WrapGraceJoin},
{"GraceSelfJoin", &WrapGraceSelfJoin},
{"GraceJoinWithSpilling", &WrapGraceJoinWithSpilling},
{"GraceSelfJoinWithSpilling", &WrapGraceSelfJoinWithSpilling},
{"MapJoinCore", &WrapMapJoinCore},
{"CommonJoinCore", &WrapCommonJoinCore},
{"CombineCore", &WrapCombineCore},
Expand Down
135 changes: 60 additions & 75 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames,
const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory,
const bool isSelfJoin)
const bool isSelfJoin, bool isSpillingAllowed)
: TBase(memInfo)
, FlowLeft(flowLeft)
, FlowRight(flowRight)
Expand All @@ -588,6 +588,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
, JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() )
, IsSelfJoin_(isSelfJoin)
, SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns))
, IsSpillingAllowed(isSpillingAllowed)
{
if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion || IsSelfJoin_) {
LeftPacker->BatchSize = std::numeric_limits<ui64>::max();
Expand Down Expand Up @@ -631,9 +632,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
}

bool IsSwitchToSpillingModeCondition() const {
return false;
// TODO: YQL-18033
// return !HasMemoryForProcessing();
return !HasMemoryForProcessing();
}

void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
Expand Down Expand Up @@ -764,7 +763,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
}

bool isYield = FetchAndPackData(ctx);
if (ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
const auto used = TlsAllocState->GetUsed();
const auto limit = TlsAllocState->GetLimit();

Expand Down Expand Up @@ -946,6 +945,7 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple;
const bool IsSelfJoin_;
const bool SelfJoinSameKeys_;
const bool IsSpillingAllowed;

bool IsSpillingFinalized = false;

Expand All @@ -960,7 +960,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns,
std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames,
std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes,
std::vector<EValueRepresentation>&& outputRepresentations, bool isSelfJoin)
std::vector<EValueRepresentation>&& outputRepresentations, bool isSelfJoin, bool isSpillingAllowed)
: TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed)
, FlowLeft(flowLeft)
, FlowRight(flowRight)
Expand All @@ -974,6 +974,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
, RightColumnsTypes(std::move(rightColumnsTypes))
, OutputRepresentations(std::move(outputRepresentations))
, IsSelfJoin_(isSelfJoin)
, IsSpillingAllowed(isSpillingAllowed)
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
Expand Down Expand Up @@ -1071,7 +1072,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
state = ctx.HolderFactory.Create<TGraceJoinSpillingSupportState>(
FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns,
LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes,
ctx.HolderFactory, IsSelfJoin_);
ctx.HolderFactory, IsSelfJoin_, IsSpillingAllowed);
}

IComputationWideFlowNode *const FlowLeft;
Expand All @@ -1086,28 +1087,38 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
const std::vector<TType *> RightColumnsTypes;
const std::vector<EValueRepresentation> OutputRepresentations;
const bool IsSelfJoin_;
const bool IsSpillingAllowed;
};

}

IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isSelfJoin, bool isSpillingAllowed) {
const auto leftFlowNodeIndex = 0;
const auto rightFlowNodeIndex = 1;
const auto joinKindNodeIndex = isSelfJoin ? 1 : 2;
const auto leftKeyColumnsNodeIndex = joinKindNodeIndex + 1;
const auto rightKeyColumnsNodeIndex = leftKeyColumnsNodeIndex + 1;
const auto leftRenamesNodeIndex = rightKeyColumnsNodeIndex + 1;
const auto rightRenamesNodeIndex = leftRenamesNodeIndex + 1;
const auto anyJoinSettingsIndex = rightRenamesNodeIndex + 1;

const auto leftFlowNode = callable.GetInput(leftFlowNodeIndex);
const auto joinKindNode = callable.GetInput(joinKindNodeIndex);
const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(leftKeyColumnsNodeIndex));
const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(rightKeyColumnsNodeIndex));
const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(leftRenamesNodeIndex));
const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(rightRenamesNodeIndex));
const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(anyJoinSettingsIndex))->AsValue().Get<ui32>());

const auto leftFlowNode = callable.GetInput(0);
const auto rightFlowNode = callable.GetInput(1);
const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode));
const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode));
const auto joinKindNode = callable.GetInput(2);
const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3));
const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4));
const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5));
const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(6));
const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();

const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(7))->AsValue().Get<ui32>());


const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0));
const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1));
IComputationWideFlowNode* flowRight = nullptr;
if (isSelfJoin) {
flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1));
}

const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
std::vector<EValueRepresentation> outputRepresentations;
Expand All @@ -1118,7 +1129,14 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto

std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames;
std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end());
std::vector<TType*> rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end());
std::vector<TType*> rightColumnsTypes;
if (isSelfJoin) {
rightColumnsTypes = {leftColumnsTypes};
} else {
const auto rightFlowNode = callable.GetInput(rightFlowNodeIndex);
const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode));
rightColumnsTypes = {rightFlowComponents.begin(), rightFlowComponents.end()};
}

leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) {
Expand All @@ -1135,6 +1153,10 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
}

if (isSelfJoin) {
MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal");
}

rightRenames.reserve(rightRenamesNode->GetValuesCount());
for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) {
rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>());
Expand All @@ -1143,68 +1165,31 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
return new TGraceJoinWrapper(
ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings,
std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false);
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), isSelfJoin, isSpillingAllowed);
}

IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");

return WrapGraceJoinCommon(callable, ctx, false, false);
}

IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");

return WrapGraceJoinCommon(callable, ctx, true, false);
}

const auto leftFlowNode = callable.GetInput(0);
const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode));
const auto joinKindNode = callable.GetInput(1);
const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(2));
const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3));
const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(4));
const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5));
const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();

const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(6))->AsValue().Get<ui32>());

const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0));

const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
std::vector<EValueRepresentation> outputRepresentations;
outputRepresentations.reserve(outputFlowComponents.size());
for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) {
outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i]));
}

std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames;
std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end());
std::vector<TType*> rightColumnsTypes{leftColumnsTypes};

leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) {
leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
}

leftRenames.reserve(leftRenamesNode->GetValuesCount());
for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) {
leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get<ui32>());
}


rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) {
rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>());
}

MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal");

// MKQL_ENSURE(leftKeyColumns == rightKeyColumns, "Key columns for self join should be equal");

rightRenames.reserve(rightRenamesNode->GetValuesCount());
for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) {
rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>());
}

return new TGraceJoinWrapper(
ctx.Mutables, flowLeft, nullptr, GetJoinKind(rawJoinKind), anyJoinSettings,
std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true);
IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");

return WrapGraceJoinCommon(callable, ctx, false, true);
}

IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");

return WrapGraceJoinCommon(callable, ctx, true, true);
}


Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace NMiniKQL {

IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapGraceJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapGraceSelfJoinWithSpilling(TCallable& callable, const TComputationNodeFactoryContext& ctx);

}
}
Loading

0 comments on commit 4ba8071

Please sign in to comment.