Skip to content

Commit

Permalink
Fix multiple streamlookup (#13426)
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Jan 23, 2025
1 parent bd79c91 commit 38d0188
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,16 @@ std::pair<
} else {
result[i] = { EOutputRowItemSource::LookupOther, lookupPayloadColumns.at(name) };
}
} else if (leftLabel.empty()) {
const auto name = prefixedName;
if (auto j = leftJoinColumns.FindPtr(name)) {
result[i] = { EOutputRowItemSource::InputKey, lookupKeyColumns.at(rightNames[*j]) };
} else if (auto k = inputColumns.FindPtr(name)) {
result[i] = { EOutputRowItemSource::InputOther, otherInputIndexes.size() };
otherInputIndexes.push_back(*k);
} else {
Y_ABORT();
}
} else {
Y_ABORT();
}
Expand Down
73 changes: 69 additions & 4 deletions ydb/library/yql/dq/type_ann/dq_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,56 @@ TStatus AnnotateDqConnection(const TExprNode::TPtr& input, TExprContext& ctx) {
}

TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 11, ctx)) {
return TStatus::Error;
}
if (!EnsureCallable(*input->Child(TDqCnStreamLookup::idx_Output), ctx)) {
return TStatus::Error;
}
if (!TDqOutput::Match(input->Child(TDqCnStreamLookup::idx_Output))) {
ctx.AddError(TIssue(ctx.GetPosition(input->Child(TDqCnStreamLookup::idx_Output)->Pos()), TStringBuilder() << "Expected " << TDqOutput::CallableName()));
return TStatus::Error;
}
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_LeftLabel), ctx)) {
return TStatus::Error;
}
if (!EnsureCallable(*input->Child(TDqCnStreamLookup::idx_RightInput), ctx)) {
return TStatus::Error;
}
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_RightLabel), ctx)) {
return TStatus::Error;
}
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_JoinType), ctx)) {
return TStatus::Error;
}
if (!EnsureTuple(*input->Child(TDqCnStreamLookup::idx_JoinKeys), ctx)) {
return TStatus::Error;
}
for (auto& child: input->Child(TDqCnStreamLookup::idx_JoinKeys)->Children()) {
if (!EnsureTupleSize(*child, 4, ctx)) {
return TStatus::Error;
}
for (auto& subChild: child->Children()) {
if (!EnsureAtom(*subChild, ctx)) {
return TStatus::Error;
}
}
}
if (!EnsureTupleOfAtoms(*input->Child(TDqCnStreamLookup::idx_LeftJoinKeyNames), ctx)) {
return TStatus::Error;
}
if (!EnsureTupleOfAtoms(*input->Child(TDqCnStreamLookup::idx_RightJoinKeyNames), ctx)) {
return TStatus::Error;
}
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_TTL), ctx)) {
return TStatus::Error;
}
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_MaxDelayedRows), ctx)) {
return TStatus::Error;
}
if (!EnsureAtom(*input->Child(TDqCnStreamLookup::idx_MaxCachedRows), ctx)) {
return TStatus::Error;
}
auto cnStreamLookup = TDqCnStreamLookup(input);
auto leftInputType = GetDqConnectionType(TDqConnection(input), ctx);
if (!leftInputType) {
Expand All @@ -625,18 +675,33 @@ TStatus AnnotateDqCnStreamLookup(const TExprNode::TPtr& input, TExprContext& ctx
ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), "Streamlookup supports only LEFT JOIN ... ANY"));
return TStatus::Error;
}
const auto leftRowType = GetSeqItemType(leftInputType);
const auto rightRowType = GetSeqItemType(cnStreamLookup.RightInput().Raw()->GetTypeAnn());
auto rightInput = cnStreamLookup.RightInput();
if (!rightInput.Raw()->IsCallable("TDqLookupSourceWrap")) {
ctx.AddError(TIssue(ctx.GetPosition(rightInput.Pos()), TStringBuilder() << "DqCnStreamLookup: RightInput: Expected TDqLookupSourceWrap, but got " << rightInput.Raw()->Content()));
return TStatus::Error;
}
const auto& leftRowType = GetSeqItemType(*leftInputType);
if (!EnsureStructType(input->Pos(), leftRowType, ctx)) {
return TStatus::Error;
}
const auto rightInputType = rightInput.Raw()->GetTypeAnn();
const auto& rightRowType = GetSeqItemType(*rightInputType);
if (!EnsureStructType(input->Pos(), rightRowType, ctx)) {
return TStatus::Error;
}
const auto outputRowType = GetDqJoinResultType<true>(
input->Pos(),
*leftRowType->Cast<TStructExprType>(),
*leftRowType.Cast<TStructExprType>(),
cnStreamLookup.LeftLabel().Cast<TCoAtom>().StringValue(),
*rightRowType->Cast<TStructExprType>(),
*rightRowType.Cast<TStructExprType>(),
cnStreamLookup.RightLabel().StringValue(),
cnStreamLookup.JoinType().StringValue(),
cnStreamLookup.JoinKeys(),
ctx
);
if (!outputRowType) {
return TStatus::Error;
}
if (!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxCachedRows().Ref(), "MaxCachedRows", ctx) ||
!EnsureConvertibleTo<ui64>(cnStreamLookup.TTL().Ref(), "TTL", ctx) ||
!EnsureConvertibleTo<ui64>(cnStreamLookup.MaxDelayedRows().Ref(), "MaxDelayedRows", ctx)) {
Expand Down
78 changes: 53 additions & 25 deletions ydb/library/yql/providers/dq/opt/logical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,36 +225,64 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
return TDqLookupSourceWrap(lookupSourceWrap);
}

TMaybeNode<TExprBase> RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) {
Y_UNUSED(ctx);
const auto equiJoin = node.Cast<TCoEquiJoin>();
if (equiJoin.ArgCount() != 4) { // 2 parties join
return node;
// Recursively walk join tree and replace right-side of StreamLookupJoin
ui32 RewriteStreamJoinTuple(ui32 idx, const TCoEquiJoin& equiJoin, const TCoEquiJoinTuple& joinTuple, std::vector<TExprNode::TPtr>& args, TExprContext& ctx, bool& changed) {
// recursion depth O(args.size())
Y_ENSURE(idx < args.size());
// handle left side
if (!joinTuple.LeftScope().Maybe<TCoAtom>()) {
idx = RewriteStreamJoinTuple(idx, equiJoin, joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), args, ctx, changed);
} else {
++idx;
}
// handle right side
if (!joinTuple.RightScope().Maybe<TCoAtom>()) {
return RewriteStreamJoinTuple(idx, equiJoin, joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), args, ctx, changed);
}
const auto left = equiJoin.Arg(0).Cast<TCoEquiJoinInput>().List();
const auto right = equiJoin.Arg(1).Cast<TCoEquiJoinInput>().List();
const auto joinTuple = equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>();
Y_ENSURE(idx < args.size());
if (!IsStreamLookup(joinTuple)) {
return node;
return idx + 1;
}
if (!right.Maybe<TDqSourceWrap>() && !right.Maybe<TDqReadWrap>()) {
return node;
auto right = equiJoin.Arg(idx).Cast<TCoEquiJoinInput>();
auto rightList = right.List();
if (auto maybeExtractMembers = rightList.Maybe<TCoExtractMembers>()) {
rightList = maybeExtractMembers.Cast().Input();
}
TExprNode::TPtr lookupSourceWrap;
if (auto maybeSource = rightList.Maybe<TDqSourceWrap>()) {
lookupSourceWrap = LookupSourceFromSource(maybeSource.Cast(), ctx).Ptr();
} else if (auto maybeRead = rightList.Maybe<TDqReadWrap>()) {
lookupSourceWrap = LookupSourceFromRead(maybeRead.Cast(), ctx).Ptr();
} else {
return idx + 1;
}
changed = true;
args[idx] =
Build<TCoEquiJoinInput>(ctx, joinTuple.Pos())
.List(lookupSourceWrap)
.Scope(right.Scope())
.Done().Ptr();
return idx + 1;
}

TDqLookupSourceWrap lookupSourceWrap = right.Maybe<TDqSourceWrap>()
? LookupSourceFromSource(right.Cast<TDqSourceWrap>(), ctx)
: LookupSourceFromRead(right.Cast<TDqReadWrap>(), ctx)
;

return Build<TCoEquiJoin>(ctx, node.Pos())
.Add(equiJoin.Arg(0))
.Add<TCoEquiJoinInput>()
.List(lookupSourceWrap)
.Scope(equiJoin.Arg(1).Cast<TCoEquiJoinInput>().Scope())
.Build()
.Add(equiJoin.Arg(2))
.Add(equiJoin.Arg(3))
.Done();
TMaybeNode<TExprBase> RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) {
const auto equiJoin = node.Cast<TCoEquiJoin>();
auto argCount = equiJoin.ArgCount();
const auto joinTuple = equiJoin.Arg(argCount - 2).Cast<TCoEquiJoinTuple>();
std::vector<TExprNode::TPtr> args(argCount);
bool changed = false;
auto rightIdx = RewriteStreamJoinTuple(0u, equiJoin, joinTuple, args, ctx, changed);
Y_ENSURE(rightIdx + 2 == argCount);
if (!changed) {
return node;
}
// fill copies of remaining args
for (ui32 i = 0; i < argCount; ++i) {
if (!args[i]) {
args[i] = equiJoin.Arg(i).Ptr();
}
}
return Build<TCoEquiJoin>(ctx, node.Pos()).Add(std::move(args)).Done();
}

TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
Expand Down
10 changes: 8 additions & 2 deletions ydb/library/yql/providers/dq/opt/physical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,16 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
if (!maxDelayedRows) {
maxDelayedRows = ctx.NewAtom(pos, 1'000'000);
}
auto rightInput = join.RightInput().Ptr();
if (auto maybe = TExprBase(rightInput).Maybe<TCoExtractMembers>()) {
rightInput = maybe.Cast().Input().Ptr();
}
auto leftLabel = join.LeftLabel().Maybe<NNodes::TCoAtom>() ? join.LeftLabel().Cast<NNodes::TCoAtom>().Ptr() : ctx.NewAtom(pos, "");
Y_ENSURE(join.RightLabel().Maybe<NNodes::TCoAtom>());
auto cn = Build<TDqCnStreamLookup>(ctx, pos)
.Output(left.Output().Cast())
.LeftLabel(join.LeftLabel().Cast<NNodes::TCoAtom>())
.RightInput(join.RightInput())
.LeftLabel(leftLabel)
.RightInput(rightInput)
.RightLabel(join.RightLabel().Cast<NNodes::TCoAtom>())
.JoinKeys(join.JoinKeys())
.JoinType(join.JoinType())
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/planner/execution_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ namespace NYql::NDqs {
settings.SetRightLabel(streamLookup.RightLabel().StringValue());
settings.SetJoinType(streamLookup.JoinType().StringValue());
for (const auto& k: streamLookup.LeftJoinKeyNames()) {
*settings.AddLeftJoinKeyNames() = RemoveAliases(k.StringValue());
*settings.AddLeftJoinKeyNames() = streamLookup.LeftLabel().StringValue().empty() ? k.StringValue() : RemoveAliases(k.StringValue());
}
for (const auto& k: streamLookup.RightJoinKeyNames()) {
*settings.AddRightJoinKeyNames() = RemoveAliases(k.StringValue());
Expand Down
109 changes: 109 additions & 0 deletions ydb/tests/fq/generic/streaming/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,115 @@ def freeze(json):
]
),
),
# 8
(
R'''
$input = SELECT * FROM myyds.`{input_topic}`
WITH (
FORMAT=json_each_row,
SCHEMA (
za Int32,
yb STRING,
yc Int32,
zd Int32,
)
) ;
$enriched1 = select a, b, c, d, e, f, za, yb, yc, zd
from
$input as e
left join {streamlookup} any ydb_conn_{table_name}.db as u
on(e.za = u.a AND e.yb = u.b)
;
$enriched2 = SELECT e.a AS a, e.b AS b, e.c AS c, e.d AS d, e.e AS e, e.f AS f, za, yb, yc, zd, u.c AS c2, u.d AS d2
from
$enriched1 as e
left join {streamlookup} any ydb_conn_{table_name}.db as u
on(e.za = u.a AND e.yb = u.b)
;
$enriched = select a, b, c, d, e, f, za, yb, yc, zd, (c2 IS NOT DISTINCT FROM c) as eq1, (d2 IS NOT DISTINCT FROM d) as eq2
from
$enriched2 as e
;
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
ResequenceId(
[
(
'{"id":1,"za":1,"yb":"2","yc":100,"zd":101}',
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101,"eq1":true,"eq2":true}',
),
(
'{"id":2,"za":7,"yb":"8","yc":106,"zd":107}',
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107,"eq1":true,"eq2":true}',
),
(
'{"id":3,"za":2,"yb":"1","yc":114,"zd":115}',
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
),
(
'{"id":3,"za":null,"yb":"1","yc":114,"zd":115}',
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":null,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
),
]
),
),
# 9
(
R'''
$input = SELECT * FROM myyds.`{input_topic}`
WITH (
FORMAT=json_each_row,
SCHEMA (
a Int32,
b STRING,
c Int32,
d Int32,
)
) ;
$enriched12 = select u.a as a, u.b as b, u.c as c, u.d as d, u.e as e, u.f as f, e.a as za, e.b as yb, e.c as yc, e.d as zd, u2.c as c2, u2.d as d2
from
$input as e
left join {streamlookup} any ydb_conn_{table_name}.db as u
on(e.a = u.a AND e.b = u.b)
left join {streamlookup} any ydb_conn_{table_name}.db as u2
on(e.b = u2.b AND e.a = u2.a)
;
$enriched = select a, b, c, d, e, f, za, yb, yc, zd, (c2 IS NOT DISTINCT FROM c) as eq1, (d2 IS NOT DISTINCT FROM d) as eq2
from
$enriched12 as e
;
insert into myyds.`{output_topic}`
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
''',
ResequenceId(
[
(
'{"id":1,"a":1,"b":"2","c":100,"d":101}',
'{"a":1,"b":"2","c":3,"d":4,"e":5,"f":6,"za":1,"yb":"2","yc":100,"zd":101,"eq1":true,"eq2":true}',
),
(
'{"id":2,"a":7,"b":"8","c":106,"d":107}',
'{"a":7,"b":"8","c":9,"d":10,"e":11,"f":12,"za":7,"yb":"8","yc":106,"zd":107,"eq1":true,"eq2":true}',
),
(
'{"id":3,"a":2,"b":"1","c":114,"d":115}',
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
),
(
'{"id":3,"a":null,"b":"1","c":114,"d":115}',
'{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":null,"yb":"1","yc":114,"zd":115,"eq1":true,"eq2":true}',
),
]
),
),
]


Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/generic/streaming/ydb/01_basic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ set -ex
(56, 12, "2a02:1812:1713:4f00:517e:1d79:c88b:704", "Elena", 2),
(18, 17, "ivalid ip", "newUser", 12);
COMMIT;
CREATE TABLE db (b STRING NOT NULL, c Int32, a Int32 NOT NULL, d Int32, f Int32, e Int32, PRIMARY KEY(b, a));
CREATE TABLE db (b STRING NOT NULL, c Int32, a Int32 NOT NULL, d Int32, f Int32, e Int32, g Int32, h Int32, PRIMARY KEY(b, a));
COMMIT;
INSERT INTO db (a, b, c, d, e, f) VALUES
(1, "2", 3, 4, 5, 6),
Expand Down

0 comments on commit 38d0188

Please sign in to comment.