Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only input node #14830

Merged
merged 1 commit into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(agg, children, providedTrait, requiredTrait, requester)

case window: StreamPhysicalGroupWindowAggregateBase =>
// WindowAggregate and WindowTableAggregate support insert-only in input
val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)
// WindowAggregate and WindowTableAggregate support all changes in input
val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
val builder = ModifyKindSet.newBuilder()
.addContainedKind(ModifyKind.INSERT)
if (window.emitStrategy.produceUpdates) {
Expand Down Expand Up @@ -470,20 +470,20 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti

case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
_: StreamPhysicalPythonGroupTableAggregate =>
// Aggregate, TableAggregate and Limit requires update_before if there are updates
_: StreamPhysicalPythonGroupTableAggregate |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use StreamPhysicalGroupWindowAggregateBase instead of three sub-classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use three sub-classes as previous version. I thought author maybe have reason to use three sub-classes instead of StreamPhysicalGroupWindowAggregateBase , for example, prevent the misuse of the parent class when adding a new child class in the future ?

_: StreamPhysicalGroupWindowAggregateBase =>
// Aggregate, TableAggregate, Limit and GroupWindowAggregate requires update_before if
// there are updates
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
val children = visitChildren(rel, requiredChildTrait)
// use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
createNewNode(rel, children, requiredTrait)

case _: StreamPhysicalGroupWindowAggregate | _: StreamPhysicalGroupWindowTableAggregate |
_: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
_: StreamPhysicalDeduplicate | _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonGroupWindowAggregate | _: StreamPhysicalPythonOverAggregate |
_: StreamPhysicalWindowJoin =>
// WindowAggregate, WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP,
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
// WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP,
// OverAggregate, and IntervalJoin, WindowJoin require nothing about UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredTrait)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,39 @@ Calc(select=[a, 3:BIGINT AS $1])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[a, b, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testLastRowWithWindowOnRowtime">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[SUM($2)])
+- LogicalProject(b=[$1], $f1=[$TUMBLE($2, 4:INTERVAL SECOND)], a=[$0])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(a=[$0], b=[$1], ts=[$2], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]])

== Optimized Physical Plan ==
Calc(select=[b, EXPR$1, w$start AS EXPR$2])
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, ts, a])
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME])
+- Exchange(distribution=[hash[a]])
+- WatermarkAssigner(rowtime=[ts], watermark=[ts])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, ts])

== Optimized Execution Plan ==
Calc(select=[b, EXPR$1, w$start AS EXPR$2])
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, ts, a])
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME])
+- Exchange(distribution=[hash[a]])
+- WatermarkAssigner(rowtime=[ts], watermark=[ts])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, ts])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,30 @@ Union(all=[true], union=[b, ts, a], changelogMode=[I,UA,D])
+- GroupAggregate(groupBy=[a], select=[a, MAX(ts) AS t, MAX(b) AS b], changelogMode=[I,UA])
+- Exchange(distribution=[hash[a]], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database, append_src]], fields=[ts, a, b], changelogMode=[I])
]]>
</Resource>
</TestCase>
<TestCase name="testUpsertSourceWithComputedColumnAndWatermark">
<Resource name="sql">
<![CDATA[SELECT a, b, c FROM src WHERE a > 1]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$1], b=[$2], c=[$3])
+- LogicalFilter(condition=[>($1, 1)])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
+- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
]]>
</Resource>
</TestCase>
Expand All @@ -618,27 +642,28 @@ Calc(select=[id, Reinterpret(TO_TIMESTAMP(c)) AS ts], changelogMode=[I,UA,D])
]]>
</Resource>
</TestCase>
<TestCase name="testUpsertSourceWithComputedColumnAndWatermark">
<TestCase name="testWindowAggregateOnChangelogSource">
<Resource name="sql">
<![CDATA[SELECT a, b, c FROM src WHERE a > 1]]>
<![CDATA[
SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*)
FROM src
GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$1], b=[$2], c=[$3])
+- LogicalFilter(condition=[>($1, 1)])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[$1])
+- LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])
+- LogicalProject($f0=[$TUMBLE(PROCTIME(), 10000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
+- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
Calc(select=[w$start AS EXPR$0, EXPR$1], changelogMode=[I])
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 10000)], properties=[w$start, w$end, w$proctime], select=[COUNT(*) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime], changelogMode=[I])
+- Exchange(distribution=[single], changelogMode=[I,UB,UA])
+- TableSourceScan(table=[[default_catalog, default_database, src, project=[]]], fields=[], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
Expand Down
Loading