Skip to content

Commit

Permalink
[FLINK-20487][table-planner-blink] Remove restriction on StreamPhysic…
Browse files Browse the repository at this point in the history
…alGroupWindowAggregate which only supports insert-only input node
  • Loading branch information
beyond1920 committed May 18, 2021
1 parent 5ff7f73 commit 1178237
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(agg, children, providedTrait, requiredTrait, requester)

case window: StreamPhysicalGroupWindowAggregateBase =>
// WindowAggregate and WindowTableAggregate support insert-only in input
val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)
// WindowAggregate and WindowTableAggregate support all changes in input
val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
val builder = ModifyKindSet.newBuilder()
.addContainedKind(ModifyKind.INSERT)
if (window.emitStrategy.produceUpdates) {
Expand Down Expand Up @@ -470,20 +470,20 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti

case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
_: StreamPhysicalPythonGroupTableAggregate =>
// Aggregate, TableAggregate and Limit requires update_before if there are updates
_: StreamPhysicalPythonGroupTableAggregate |
_: StreamPhysicalGroupWindowAggregateBase =>
// Aggregate, TableAggregate, Limit and GroupWindowAggregate requires update_before if
// there are updates
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
val children = visitChildren(rel, requiredChildTrait)
// use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
createNewNode(rel, children, requiredTrait)

case _: StreamPhysicalGroupWindowAggregate | _: StreamPhysicalGroupWindowTableAggregate |
_: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
_: StreamPhysicalDeduplicate | _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonGroupWindowAggregate | _: StreamPhysicalPythonOverAggregate |
_: StreamPhysicalWindowJoin =>
// WindowAggregate, WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP,
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
// WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP,
// OverAggregate, and IntervalJoin, WindowJoin require nothing about UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredTrait)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,39 @@ Calc(select=[a, 3:BIGINT AS $1])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[a, b, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testLastRowWithWindowOnRowtime">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[SUM($2)])
+- LogicalProject(b=[$1], $f1=[$TUMBLE($2, 4:INTERVAL SECOND)], a=[$0])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(a=[$0], b=[$1], ts=[$2], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]])
== Optimized Physical Plan ==
Calc(select=[b, EXPR$1, w$start AS EXPR$2])
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, ts, a])
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME])
+- Exchange(distribution=[hash[a]])
+- WatermarkAssigner(rowtime=[ts], watermark=[ts])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, ts])
== Optimized Execution Plan ==
Calc(select=[b, EXPR$1, w$start AS EXPR$2])
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, ts, a])
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME])
+- Exchange(distribution=[hash[a]])
+- WatermarkAssigner(rowtime=[ts], watermark=[ts])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, ts])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,30 @@ Union(all=[true], union=[b, ts, a], changelogMode=[I,UA,D])
+- GroupAggregate(groupBy=[a], select=[a, MAX(ts) AS t, MAX(b) AS b], changelogMode=[I,UA])
+- Exchange(distribution=[hash[a]], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database, append_src]], fields=[ts, a, b], changelogMode=[I])
]]>
</Resource>
</TestCase>
<TestCase name="testUpsertSourceWithComputedColumnAndWatermark">
<Resource name="sql">
<![CDATA[SELECT a, b, c FROM src WHERE a > 1]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$1], b=[$2], c=[$3])
+- LogicalFilter(condition=[>($1, 1)])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
+- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
]]>
</Resource>
</TestCase>
Expand All @@ -618,27 +642,28 @@ Calc(select=[id, Reinterpret(TO_TIMESTAMP(c)) AS ts], changelogMode=[I,UA,D])
]]>
</Resource>
</TestCase>
<TestCase name="testUpsertSourceWithComputedColumnAndWatermark">
<TestCase name="testWindowAggregateOnChangelogSource">
<Resource name="sql">
<![CDATA[SELECT a, b, c FROM src WHERE a > 1]]>
<![CDATA[
SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*)
FROM src
GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$1], b=[$2], c=[$3])
+- LogicalFilter(condition=[>($1, 1)])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[$1])
+- LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])
+- LogicalProject($f0=[$TUMBLE(PROCTIME(), 10000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
+- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
Calc(select=[w$start AS EXPR$0, EXPR$1], changelogMode=[I])
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 10000)], properties=[w$start, w$end, w$proctime], select=[COUNT(*) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime], changelogMode=[I])
+- Exchange(distribution=[single], changelogMode=[I,UB,UA])
+- TableSourceScan(table=[[default_catalog, default_database, src, project=[]]], fields=[], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
Expand Down
Loading

0 comments on commit 1178237

Please sign in to comment.