Skip to content

Commit

Permalink
Fixed: Make block combine use stream instead of flow (#10707)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladl2802 authored Oct 23, 2024
1 parent f1d756b commit 930e312
Show file tree
Hide file tree
Showing 25 changed files with 1,307 additions and 855 deletions.
14 changes: 7 additions & 7 deletions ydb/library/yql/core/type_ann/type_ann_blocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input,
}

TTypeAnnotationNode::TListType blockItemTypes;
if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

Expand All @@ -817,7 +817,7 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input,
}

auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
return IGraphTransformer::TStatus::Ok;
}

Expand All @@ -828,7 +828,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu
}

TTypeAnnotationNode::TListType blockItemTypes;
if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

Expand Down Expand Up @@ -867,7 +867,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu

retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
return IGraphTransformer::TStatus::Ok;
}

Expand All @@ -879,7 +879,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
}

TTypeAnnotationNode::TListType blockItemTypes;
if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
YQL_ENSURE(blockItemTypes.size() > 0);
Expand Down Expand Up @@ -917,7 +917,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
}

// disallow any scalar columns except for streamIndex column
auto itemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
auto itemTypes = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
for (ui32 i = 0; i + 1 < itemTypes.size(); ++i) {
bool isScalar = itemTypes[i]->GetKind() == ETypeAnnotationKind::Scalar;
if (isScalar && i != streamIndex) {
Expand All @@ -929,7 +929,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr

retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
return IGraphTransformer::TStatus::Ok;
}

Expand Down
57 changes: 37 additions & 20 deletions ydb/library/yql/core/yql_aggregate_expander.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
} else {
stream = AggList;
}
auto blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false);

TExprNode::TPtr blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false);
if (!blocks) {
return nullptr;
}
Expand All @@ -708,22 +709,30 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
if (hashed) {
aggWideFlow = Ctx.Builder(Node->Pos())
.Callable("WideFromBlocks")
.Callable(0, "BlockCombineHashed")
.Add(0, blocks)
.Callable(1, "Void")
.Callable(0, "ToFlow")
.Callable(0, "BlockCombineHashed")
.Callable(0, "FromFlow")
.Add(0, blocks)
.Seal()
.Callable(1, "Void")
.Seal()
.Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
.Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
.Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Seal()
.Build();
} else {
aggWideFlow = Ctx.Builder(Node->Pos())
.Callable("BlockCombineAll")
.Add(0, blocks)
.Callable(1, "Void")
.Callable("ToFlow")
.Callable(0, "BlockCombineAll")
.Callable(0, "FromFlow")
.Add(0, blocks)
.Seal()
.Callable(1, "Void")
.Seal()
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Build();
}
Expand Down Expand Up @@ -2891,23 +2900,31 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
TExprNode::TPtr aggBlocks;
if (!isMany) {
aggBlocks = Ctx.Builder(Node->Pos())
.Callable("BlockMergeFinalizeHashed")
.Add(0, blocks)
.Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Callable("ToFlow")
.Callable(0, "BlockMergeFinalizeHashed")
.Callable(0, "FromFlow")
.Add(0, blocks)
.Seal()
.Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Seal()
.Build();
} else {
auto manyStreamsSetting = GetSetting(*Node->Child(3), "many_streams");
YQL_ENSURE(manyStreamsSetting, "Missing many_streams setting");

aggBlocks = Ctx.Builder(Node->Pos())
.Callable("BlockMergeManyFinalizeHashed")
.Add(0, blocks)
.Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Atom(3, ToString(streamIdxColumn))
.Add(4, manyStreamsSetting->TailPtr())
.Callable("ToFlow")
.Callable(0, "BlockMergeManyFinalizeHashed")
.Callable(0, "FromFlow")
.Add(0, blocks)
.Seal()
.Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Atom(3, ToString(streamIdxColumn))
.Add(4, manyStreamsSetting->TailPtr())
.Seal()
.Seal()
.Build();
}
Expand Down
24 changes: 14 additions & 10 deletions ydb/library/yql/dq/opt/dq_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -786,16 +786,20 @@ NNodes::TExprBase DqPeepholeRewriteLength(const NNodes::TExprBase& node, TExprCo
if (typesCtx.IsBlockEngineEnabled()) {
return NNodes::TExprBase(ctx.Builder(node.Pos())
.Callable("NarrowMap")
.Callable(0, "BlockCombineAll")
.Callable(0, "WideToBlocks")
.Add(0, MakeExpandMap(node.Pos(), {}, dqPhyLength.Input().Ptr(), ctx))
.Seal()
.Callable(1, "Void")
.Seal()
.List(2)
.List(0)
.Callable(0, "AggBlockApply")
.Atom(0, "count_all")
.Callable(0, "ToFlow")
.Callable(0, "BlockCombineAll")
.Callable(0, "FromFlow")
.Callable(0, "WideToBlocks")
.Add(0, MakeExpandMap(node.Pos(), {}, dqPhyLength.Input().Ptr(), ctx))
.Seal()
.Seal()
.Callable(1, "Void")
.Seal()
.List(2)
.List(0)
.Callable(0, "AggBlockApply")
.Atom(0, "count_all")
.Seal()
.Seal()
.Seal()
.Seal()
Expand Down
Loading

0 comments on commit 930e312

Please sign in to comment.