From 38d0188089614eb44f02811c93c24f34434e50f1 Mon Sep 17 00:00:00 2001 From: yumkam Date: Thu, 23 Jan 2025 18:00:07 +0300 Subject: [PATCH] Fix multiple streamlookup (#13426) --- .../dq_input_transform_lookup.cpp | 10 ++ ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 73 +++++++++++- .../yql/providers/dq/opt/logical_optimize.cpp | 78 +++++++++---- .../providers/dq/opt/physical_optimize.cpp | 10 +- .../dq/planner/execution_planner.cpp | 2 +- ydb/tests/fq/generic/streaming/test_join.py | 109 ++++++++++++++++++ .../fq/generic/streaming/ydb/01_basic.sh | 2 +- 7 files changed, 251 insertions(+), 33 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 405c4c0cc96e..2ac92d9587fb 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -522,6 +522,16 @@ std::pair< } else { result[i] = { EOutputRowItemSource::LookupOther, lookupPayloadColumns.at(name) }; } + } else if (leftLabel.empty()) { + const auto name = prefixedName; + if (auto j = leftJoinColumns.FindPtr(name)) { + result[i] = { EOutputRowItemSource::InputKey, lookupKeyColumns.at(rightNames[*j]) }; + } else if (auto k = inputColumns.FindPtr(name)) { + result[i] = { EOutputRowItemSource::InputOther, otherInputIndexes.size() }; + otherInputIndexes.push_back(*k); + } else { + Y_ABORT(); + } } else { Y_ABORT(); } diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index 258dad5297f7..63b745a8ae27 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -616,6 +616,56 @@ TStatus AnnotateDqConnection(const TExprNode::TPtr& input, TExprContext& ctx) { } TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureArgsCount(*input, 11, ctx)) { + return TStatus::Error; + } + if (!EnsureCallable(*input->Child(TDqCnStreamLookup::idx_Output), ctx)) { + return TStatus::Error; + } + if (!TDqOutput::Match(input->Child(TDqCnStreamLookup::idx_Output))) { + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TDqCnStreamLookup::idx_Output)->Pos()), TStringBuilder() << "Expected " << TDqOutput::CallableName())); + return TStatus::Error; + } + if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_LeftLabel), ctx)) { + return TStatus::Error; + } + if (!EnsureCallable(*input->Child(TDqCnStreamLookup::idx_RightInput), ctx)) { + return TStatus::Error; + } + if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_RightLabel), ctx)) { + return TStatus::Error; + } + if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_JoinType), ctx)) { + return TStatus::Error; + } + if (!EnsureTuple(*input->Child(TDqCnStreamLookup::idx_JoinKeys), ctx)) { + return TStatus::Error; + } + for (auto& child: input->Child(TDqCnStreamLookup::idx_JoinKeys)->Children()) { + if (!EnsureTupleSize(*child, 4, ctx)) { + return TStatus::Error; + } + for (auto& subChild: child->Children()) { + if (!EnsureAtom(*subChild, ctx)) { + return TStatus::Error; + } + } + } + if (!EnsureTupleOfAtoms(*input->Child(TDqCnStreamLookup::idx_LeftJoinKeyNames), ctx)) { + return TStatus::Error; + } + if (!EnsureTupleOfAtoms(*input->Child(TDqCnStreamLookup::idx_RightJoinKeyNames), ctx)) { + return TStatus::Error; + } + if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_TTL), ctx)) { + return TStatus::Error; + } + if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_MaxDelayedRows), ctx)) { + return TStatus::Error; + } + if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_MaxCachedRows), ctx)) { + return TStatus::Error; + } auto cnStreamLookup = TDqCnStreamLookup(input); auto leftInputType = GetDqConnectionType(TDqConnection(input), ctx); if (!leftInputType) { @@ -625,18 +675,33 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), "Streamlookup supports only LEFT JOIN ... ANY")); return TStatus::Error; } - const auto leftRowType = GetSeqItemType(leftInputType); - const auto rightRowType = GetSeqItemType(cnStreamLookup.RightInput().Raw()->GetTypeAnn()); + auto rightInput = cnStreamLookup.RightInput(); + if (!rightInput.Raw()->IsCallable("TDqLookupSourceWrap")) { + ctx.AddError(TIssue(ctx.GetPosition(rightInput.Pos()), TStringBuilder() << "DqCnStreamLookup: RightInput: Expected TDqLookupSourceWrap, but got " << rightInput.Raw()->Content())); + return TStatus::Error; + } + const auto& leftRowType = GetSeqItemType(*leftInputType); + if (!EnsureStructType(input->Pos(), leftRowType, ctx)) { + return TStatus::Error; + } + const auto rightInputType = rightInput.Raw()->GetTypeAnn(); + const auto& rightRowType = GetSeqItemType(*rightInputType); + if (!EnsureStructType(input->Pos(), rightRowType, ctx)) { + return TStatus::Error; + } const auto outputRowType = GetDqJoinResultType( input->Pos(), - *leftRowType->Cast(), + *leftRowType.Cast(), cnStreamLookup.LeftLabel().Cast().StringValue(), - *rightRowType->Cast(), + *rightRowType.Cast(), cnStreamLookup.RightLabel().StringValue(), cnStreamLookup.JoinType().StringValue(), cnStreamLookup.JoinKeys(), ctx ); + if (!outputRowType) { + return TStatus::Error; + } if (!EnsureConvertibleTo(cnStreamLookup.MaxCachedRows().Ref(), "MaxCachedRows", ctx) || !EnsureConvertibleTo(cnStreamLookup.TTL().Ref(), "TTL", ctx) || !EnsureConvertibleTo(cnStreamLookup.MaxDelayedRows().Ref(), "MaxDelayedRows", ctx)) { diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index 242fbc0d4de2..20dc54a24c94 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -225,36 +225,64 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase { return TDqLookupSourceWrap(lookupSourceWrap); } - TMaybeNode RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) { - Y_UNUSED(ctx); - const auto equiJoin = node.Cast(); - if (equiJoin.ArgCount() != 4) { // 2 parties join - return node; + // Recursively walk join tree and replace right-side of StreamLookupJoin + ui32 RewriteStreamJoinTuple(ui32 idx, const TCoEquiJoin& equiJoin, const TCoEquiJoinTuple& joinTuple, std::vector& args, TExprContext& ctx, bool& changed) { + // recursion depth O(args.size()) + Y_ENSURE(idx < args.size()); + // handle left side + if (!joinTuple.LeftScope().Maybe()) { + idx = RewriteStreamJoinTuple(idx, equiJoin, joinTuple.LeftScope().Cast(), args, ctx, changed); + } else { + ++idx; + } + // handle right side + if (!joinTuple.RightScope().Maybe()) { + return RewriteStreamJoinTuple(idx, equiJoin, joinTuple.RightScope().Cast(), args, ctx, changed); } - const auto left = equiJoin.Arg(0).Cast().List(); - const auto right = equiJoin.Arg(1).Cast().List(); - const auto joinTuple = equiJoin.Arg(equiJoin.ArgCount() - 2).Cast(); + Y_ENSURE(idx < args.size()); if (!IsStreamLookup(joinTuple)) { - return node; + return idx + 1; } - if (!right.Maybe() && !right.Maybe()) { - return node; + auto right = equiJoin.Arg(idx).Cast(); + auto rightList = right.List(); + if (auto maybeExtractMembers = rightList.Maybe()) { + rightList = maybeExtractMembers.Cast().Input(); } + TExprNode::TPtr lookupSourceWrap; + if (auto maybeSource = rightList.Maybe()) { + lookupSourceWrap = LookupSourceFromSource(maybeSource.Cast(), ctx).Ptr(); + } else if (auto maybeRead = rightList.Maybe()) { + lookupSourceWrap = LookupSourceFromRead(maybeRead.Cast(), ctx).Ptr(); + } else { + return idx + 1; + } + changed = true; + args[idx] = + Build(ctx, joinTuple.Pos()) + .List(lookupSourceWrap) + .Scope(right.Scope()) + .Done().Ptr(); + return idx + 1; + } - TDqLookupSourceWrap lookupSourceWrap = right.Maybe() - ? LookupSourceFromSource(right.Cast(), ctx) - : LookupSourceFromRead(right.Cast(), ctx) - ; - - return Build(ctx, node.Pos()) - .Add(equiJoin.Arg(0)) - .Add() - .List(lookupSourceWrap) - .Scope(equiJoin.Arg(1).Cast().Scope()) - .Build() - .Add(equiJoin.Arg(2)) - .Add(equiJoin.Arg(3)) - .Done(); + TMaybeNode RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) { + const auto equiJoin = node.Cast(); + auto argCount = equiJoin.ArgCount(); + const auto joinTuple = equiJoin.Arg(argCount - 2).Cast(); + std::vector args(argCount); + bool changed = false; + auto rightIdx = RewriteStreamJoinTuple(0u, equiJoin, joinTuple, args, ctx, changed); + Y_ENSURE(rightIdx + 2 == argCount); + if (!changed) { + return node; + } + // fill copies of remaining args + for (ui32 i = 0; i < argCount; ++i) { + if (!args[i]) { + args[i] = equiJoin.Arg(i).Ptr(); + } + } + return Build(ctx, node.Pos()).Add(std::move(args)).Done(); } TMaybeNode OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) { diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index c44a91125355..98691d51131d 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -312,10 +312,16 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { if (!maxDelayedRows) { maxDelayedRows = ctx.NewAtom(pos, 1'000'000); } + auto rightInput = join.RightInput().Ptr(); + if (auto maybe = TExprBase(rightInput).Maybe()) { + rightInput = maybe.Cast().Input().Ptr(); + } + auto leftLabel = join.LeftLabel().Maybe() ? join.LeftLabel().Cast().Ptr() : ctx.NewAtom(pos, ""); + Y_ENSURE(join.RightLabel().Maybe()); auto cn = Build(ctx, pos) .Output(left.Output().Cast()) - .LeftLabel(join.LeftLabel().Cast()) - .RightInput(join.RightInput()) + .LeftLabel(leftLabel) + .RightInput(rightInput) .RightLabel(join.RightLabel().Cast()) .JoinKeys(join.JoinKeys()) .JoinType(join.JoinType()) diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index d38dfb612b11..90bd08f2a91e 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -608,7 +608,7 @@ namespace NYql::NDqs { settings.SetRightLabel(streamLookup.RightLabel().StringValue()); settings.SetJoinType(streamLookup.JoinType().StringValue()); for (const auto& k: streamLookup.LeftJoinKeyNames()) { - *settings.AddLeftJoinKeyNames() = RemoveAliases(k.StringValue()); + *settings.AddLeftJoinKeyNames() = streamLookup.LeftLabel().StringValue().empty() ? k.StringValue() : RemoveAliases(k.StringValue()); } for (const auto& k: streamLookup.RightJoinKeyNames()) { *settings.AddRightJoinKeyNames() = RemoveAliases(k.StringValue()); diff --git a/ydb/tests/fq/generic/streaming/test_join.py b/ydb/tests/fq/generic/streaming/test_join.py index c69f66ceaa97..d982410e22d9 100644 --- a/ydb/tests/fq/generic/streaming/test_join.py +++ b/ydb/tests/fq/generic/streaming/test_join.py @@ -412,6 +412,115 @@ def freeze(json): ] ), ), + # 8 + ( + R''' + $input = SELECT * FROM myyds.`{input_topic}` + WITH ( + FORMAT=json_each_row, + SCHEMA ( + za Int32, + yb STRING, + yc Int32, + zd Int32, + ) + ) ; + + $enriched1 = select a, b, c, d, e, f, za, yb, yc, zd + from + $input as e + left join {streamlookup} any ydb_conn_{table_name}.db as u + on(e.za = u.a AND e.yb = u.b) + ; + + $enriched2 = SELECT e.a AS a, e.b AS b, e.c AS c, e.d AS d, e.e AS e, e.f AS f, za, yb, yc, zd, u.c AS c2, u.d AS d2 + from + $enriched1 as e + left join {streamlookup} any ydb_conn_{table_name}.db as u + on(e.za = u.a AND e.yb = u.b) + ; + + $enriched = select a, b, c, d, e, f, za, yb, yc, zd, (c2 IS NOT DISTINCT FROM c) as eq1, (d2 IS NOT DISTINCT FROM d) as eq2 + from + $enriched2 as e + ; + + insert into myyds.`{output_topic}` + select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched; + ''', + ResequenceId( + [ + ( + '{"id":1,"za":1,"yb":"2","yc":100,"zd":101}', + '{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101,"eq1":true,"eq2":true}', + ), + ( + '{"id":2,"za":7,"yb":"8","yc":106,"zd":107}', + '{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107,"eq1":true,"eq2":true}', + ), + ( + '{"id":3,"za":2,"yb":"1","yc":114,"zd":115}', + '{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}', + ), + ( + '{"id":3,"za":null,"yb":"1","yc":114,"zd":115}', + '{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":null,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}', + ), + ] + ), + ), + # 9 + ( + R''' + $input = SELECT * FROM myyds.`{input_topic}` + WITH ( + FORMAT=json_each_row, + SCHEMA ( + a Int32, + b STRING, + c Int32, + d Int32, + ) + ) ; + + $enriched12 = select u.a as a, u.b as b, u.c as c, u.d as d, u.e as e, u.f as f, e.a as za, e.b as yb, e.c as yc, e.d as zd, u2.c as c2, u2.d as d2 + from + $input as e + left join {streamlookup} any ydb_conn_{table_name}.db as u + on(e.a = u.a AND e.b = u.b) + left join {streamlookup} any ydb_conn_{table_name}.db as u2 + on(e.b = u2.b AND e.a = u2.a) + ; + + $enriched = select a, b, c, d, e, f, za, yb, yc, zd, (c2 IS NOT DISTINCT FROM c) as eq1, (d2 IS NOT DISTINCT FROM d) as eq2 + from + $enriched12 as e + ; + + insert into myyds.`{output_topic}` + select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched; + ''', + ResequenceId( + [ + ( + '{"id":1,"a":1,"b":"2","c":100,"d":101}', + '{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101,"eq1":true,"eq2":true}', + ), + ( + '{"id":2,"a":7,"b":"8","c":106,"d":107}', + '{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107,"eq1":true,"eq2":true}', + ), + ( + '{"id":3,"a":2,"b":"1","c":114,"d":115}', + '{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}', + ), + ( + '{"id":3,"a":null,"b":"1","c":114,"d":115}', + '{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":null,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}', + ), + ] + ), + ), ] diff --git a/ydb/tests/fq/generic/streaming/ydb/01_basic.sh b/ydb/tests/fq/generic/streaming/ydb/01_basic.sh index e53213f2e6ad..d8a73368b075 100755 --- a/ydb/tests/fq/generic/streaming/ydb/01_basic.sh +++ b/ydb/tests/fq/generic/streaming/ydb/01_basic.sh @@ -35,7 +35,7 @@ set -ex (56, 12, "2a02:1812:1713:4f00:517e:1d79:c88b:704", "Elena", 2), (18, 17, "ivalid ip", "newUser", 12); COMMIT; - CREATE TABLE db (b STRING NOT NULL, c Int32, a Int32 NOT NULL, d Int32, f Int32, e Int32, PRIMARY KEY(b, a)); + CREATE TABLE db (b STRING NOT NULL, c Int32, a Int32 NOT NULL, d Int32, f Int32, e Int32, g Int32, h Int32, PRIMARY KEY(b, a)); COMMIT; INSERT INTO db (a, b, c, d, e, f) VALUES (1, "2", 3, 4, 5, 6),