diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java new file mode 100644 index 0000000000000..b9e569ed10e0c --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy; +import org.apache.flink.table.planner.plan.logical.WindowingStrategy; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec; +import org.apache.flink.table.planner.plan.utils.JoinUtil; +import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator; +import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperatorBuilder; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.ZoneId; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** {@link StreamExecNode} for WindowJoin. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class StreamExecWindowJoin extends ExecNodeBase + implements StreamExecNode, SingleTransformationTranslator { + public static final String FIELD_NAME_JOIN_SPEC = "joinSpec"; + public static final String FIELD_NAME_LEFT_WINDOWING = "leftWindowing"; + public static final String FIELD_NAME_RIGHT_WINDOWING = "rightWindowing"; + + @JsonProperty(FIELD_NAME_JOIN_SPEC) + private final JoinSpec joinSpec; + + @JsonProperty(FIELD_NAME_LEFT_WINDOWING) + private final WindowingStrategy leftWindowing; + + @JsonProperty(FIELD_NAME_RIGHT_WINDOWING) + private final WindowingStrategy rightWindowing; + + public StreamExecWindowJoin( + JoinSpec joinSpec, + WindowingStrategy leftWindowing, + WindowingStrategy rightWindowing, + InputProperty leftInputProperty, + InputProperty rightInputProperty, + RowType outputType, + String description) { + this( + joinSpec, + leftWindowing, + rightWindowing, + getNewNodeId(), + Lists.newArrayList(leftInputProperty, rightInputProperty), + outputType, + description); + } + + @JsonCreator + public StreamExecWindowJoin( + @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec, + @JsonProperty(FIELD_NAME_LEFT_WINDOWING) WindowingStrategy leftWindowing, + @JsonProperty(FIELD_NAME_RIGHT_WINDOWING) WindowingStrategy rightWindowing, + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super(id, inputProperties, outputType, description); + checkArgument(inputProperties.size() == 2); + this.joinSpec = checkNotNull(joinSpec); + validate(leftWindowing); + validate(rightWindowing); + this.leftWindowing = leftWindowing; + this.rightWindowing = rightWindowing; + } + + private void validate(WindowingStrategy windowing) { + // validate window strategy + if (!windowing.isRowtime()) { + throw new TableException("Processing time Window Join is not supported yet."); + } + + if (!(windowing instanceof WindowAttachedWindowingStrategy)) { + throw new TableException(windowing.getClass().getName() + " is not supported yet."); + } + } + + @Override + @SuppressWarnings("unchecked") + protected Transformation translateToPlanInternal(PlannerBase planner) { + int leftWindowEndIndex = ((WindowAttachedWindowingStrategy) leftWindowing).getWindowEnd(); + int rightWindowEndIndex = ((WindowAttachedWindowingStrategy) rightWindowing).getWindowEnd(); + final ExecEdge leftInputEdge = getInputEdges().get(0); + final ExecEdge rightInputEdge = getInputEdges().get(1); + + final Transformation leftTransform = + (Transformation) leftInputEdge.translateToPlan(planner); + final Transformation rightTransform = + (Transformation) rightInputEdge.translateToPlan(planner); + + final RowType leftType = (RowType) leftInputEdge.getOutputType(); + final RowType rightType = (RowType) rightInputEdge.getOutputType(); + JoinUtil.validateJoinSpec(joinSpec, leftType, rightType, true); + + final int[] leftJoinKey = joinSpec.getLeftKeys(); + final int[] rightJoinKey = joinSpec.getRightKeys(); + + final InternalTypeInfo leftTypeInfo = InternalTypeInfo.of(leftType); + final InternalTypeInfo rightTypeInfo = InternalTypeInfo.of(rightType); + + final TableConfig tableConfig = planner.getTableConfig(); + GeneratedJoinCondition generatedCondition = + JoinUtil.generateConditionFunction(tableConfig, joinSpec, leftType, rightType); + + ZoneId shiftTimeZone = + TimeWindowUtil.getShiftTimeZone(leftWindowing.getTimeAttributeType(), tableConfig); + WindowJoinOperator operator = + WindowJoinOperatorBuilder.builder() + .leftSerializer(leftTypeInfo.toRowSerializer()) + .rightSerializer(rightTypeInfo.toRowSerializer()) + .generatedJoinCondition(generatedCondition) + .leftWindowEndIndex(leftWindowEndIndex) + .rightWindowEndIndex(rightWindowEndIndex) + .filterNullKeys(joinSpec.getFilterNulls()) + .joinType(joinSpec.getJoinType()) + .withShiftTimezone(shiftTimeZone) + .build(); + + final RowType returnType = (RowType) getOutputType(); + final TwoInputTransformation transform = + new TwoInputTransformation<>( + leftTransform, + rightTransform, + getDescription(), + operator, + InternalTypeInfo.of(returnType), + leftTransform.getParallelism()); + + // set KeyType and Selector for state + RowDataKeySelector leftSelect = + KeySelectorUtil.getRowDataSelector(leftJoinKey, leftTypeInfo); + RowDataKeySelector rightSelect = + KeySelectorUtil.getRowDataSelector(rightJoinKey, rightTypeInfo); + transform.setStateKeySelectors(leftSelect, rightSelect); + transform.setStateKeyType(leftSelect.getProducedType()); + return transform; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowJoin.scala index b94cbe2004d85..d33ce44a79660 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowJoin.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowJoin.scala @@ -18,8 +18,10 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream import org.apache.flink.table.api.TableException +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.logical.WindowingStrategy -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode +import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoin import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat @@ -102,6 +104,13 @@ class StreamPhysicalWindowJoin( } override def translateToExecNode(): ExecNode[_] = { - ??? + new StreamExecWindowJoin( + joinSpec, + leftWindowing, + rightWindowing, + InputProperty.DEFAULT, + InputProperty.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription) } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest.java new file mode 100644 index 0000000000000..58a36f01df5ba --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** Test json serialization/deserialization for window join. */ +public class WindowJoinJsonPlanTest extends TableTestBase { + + private StreamTableTestUtil util; + private TableEnvironment tEnv; + + @Before + public void setup() { + util = streamTestUtil(TableConfig.getDefault()); + tEnv = util.getTableEnv(); + + String srcTable1Ddl = + "CREATE TABLE MyTable (\n" + + " a INT,\n" + + " b BIGINT,\n" + + " c VARCHAR,\n" + + " `rowtime` AS TO_TIMESTAMP(c),\n" + + " proctime as PROCTIME(),\n" + + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(srcTable1Ddl); + + String srcTable2Ddl = + "CREATE TABLE MyTable2 (\n" + + " a INT,\n" + + " b BIGINT,\n" + + " c VARCHAR,\n" + + " `rowtime` AS TO_TIMESTAMP(c),\n" + + " proctime as PROCTIME(),\n" + + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(srcTable2Ddl); + } + + @Test + public void testEventTimeTumbleWindow() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " l_a INT,\n" + + " window_start TIMESTAMP(3),\n" + + " window_end TIMESTAMP(3),\n" + + " l_cnt BIGINT,\n" + + " l_uv BIGINT,\n" + + " r_a INT,\n" + + " r_cnt BIGINT,\n" + + " r_uv BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan( + "insert into MySink select\n" + + " L.a,\n" + + " L.window_start,\n" + + " L.window_end,\n" + + " L.cnt,\n" + + " L.uv,\n" + + " R.a,\n" + + " R.cnt,\n" + + " R.uv\n" + + "FROM (\n" + + " SELECT\n" + + " a,\n" + + " window_start,\n" + + " window_end,\n" + + " count(*) as cnt,\n" + + " count(distinct c) AS uv\n" + + " FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n" + + " GROUP BY a, window_start, window_end, window_time\n" + + ") L\n" + + "JOIN (\n" + + " SELECT\n" + + " a,\n" + + " window_start,\n" + + " window_end,\n" + + " count(*) as cnt,\n" + + " count(distinct c) AS uv\n" + + " FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n" + + " GROUP BY a, window_start, window_end, window_time\n" + + ") R\n" + + "ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a"); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java new file mode 100644 index 0000000000000..d8badd917fcb8 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.jsonplan; + +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.TestData; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.JsonPlanTestBase; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +/** Test for window join json plan. */ +public class WindowJoinJsonITCase extends JsonPlanTestBase { + + @Before + public void setup() throws Exception { + super.setup(); + createTestValuesSourceTable( + "MyTable", + JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()), + new String[] { + "ts STRING", + "`int` INT", + "`double` DOUBLE", + "`float` FLOAT", + "`bigdec` DECIMAL(10, 2)", + "`string` STRING", + "`name` STRING", + "`rowtime` AS TO_TIMESTAMP(`ts`)", + "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND", + }, + new HashMap() { + { + put("enable-watermark-push-down", "true"); + put("failing-source", "true"); + } + }); + + createTestValuesSourceTable( + "MyTable2", + JavaScalaConversionUtil.toJava(TestData.windowData2WithTimestamp()), + new String[] { + "ts STRING", + "`int` INT", + "`double` DOUBLE", + "`float` FLOAT", + "`bigdec` DECIMAL(10, 2)", + "`string` STRING", + "`name` STRING", + "`rowtime` AS TO_TIMESTAMP(`ts`)", + "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND", + }, + new HashMap() { + { + put("enable-watermark-push-down", "true"); + put("failing-source", "true"); + } + }); + } + + @Test + public void testEventTimeTumbleWindow() throws Exception { + createTestValuesSinkTable( + "MySink", + "name STRING", + "window_start TIMESTAMP(3)", + "window_end TIMESTAMP(3)", + "uv1 BIGINT", + "uv2 BIGINT"); + String jsonPlan = + tableEnv.getJsonPlan( + "insert into MySink select\n" + + " L.name,\n" + + " L.window_start,\n" + + " L.window_end,\n" + + " uv1,\n" + + " uv2\n" + + "FROM(\n" + + " SELECT\n" + + " name,\n" + + " window_start,\n" + + " window_end,\n" + + " COUNT(DISTINCT `string`) as uv1\n" + + " FROM TABLE(\n" + + " TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n" + + " GROUP BY name, window_start, window_end\n" + + " ) L\n" + + "JOIN (\n" + + " SELECT\n" + + " name,\n" + + " window_start,\n" + + " window_end,\n" + + " COUNT(DISTINCT `string`) as uv2\n" + + " FROM TABLE(\n" + + " TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n" + + " GROUP BY name, window_start, window_end\n" + + " ) R\n" + + "ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.name = R.name"); + tableEnv.executeJsonPlan(jsonPlan).await(); + + List result = TestValuesTableFactory.getResults("MySink"); + assertResult( + Arrays.asList( + "+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 2]", + "+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 1]", + "+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 1]"), + result); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out new file mode 100644 index 0000000000000..5e9924aa89881 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out @@ -0,0 +1,1657 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "timestampKind" : "ROWTIME", + "typeName" : "TIMESTAMP", + "nullable" : true + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Calc(select=[a, c, rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "cnt", + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "name" : "uv", + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "TumblingWindow", + "size" : "PT15M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 2, + "isRowtime" : true + }, + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "count1$0" : "BIGINT" + }, { + "count$1" : "BIGINT" + }, { + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlO3hwdnIAK29yZy5hcGFjaGUuZmxpbmsudGFibGUuYXBpLmRhdGF2aWV3Lk1hcFZpZXcAAAAAAAAAAAAAAHhwc3IAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZQAAAAAAAAABAgAFWgAOaXNJbnN0YW50aWFibGVMAAphdHRyaWJ1dGVzcQB+AAFMAAtjb21wYXJpc2lvbnQAS0xvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZENvbXBhcmlzaW9uO0wAE2ltcGxlbWVudGF0aW9uQ2xhc3NxAH4AA0wACXN1cGVyVHlwZXQANUxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGU7eHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Vc2VyRGVmaW5lZFR5cGUAAAAAAAAAAQIAA1oAB2lzRmluYWxMAAtkZXNjcmlwdGlvbnQAEkxqYXZhL2xhbmcvU3RyaW5nO0wAEG9iamVjdElkZW50aWZpZXJ0ADFMb3JnL2FwYWNoZS9mbGluay90YWJsZS9jYXRhbG9nL09iamVjdElkZW50aWZpZXI7eHIAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZQAAAAAAAAABAgACWgAKaXNOdWxsYWJsZUwACHR5cGVSb290dAA2TG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZVJvb3Q7eHABfnIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZVJvb3QAAAAAAAAAABIAAHhyAA5qYXZhLmxhbmcuRW51bQAAAAAAAAAAEgAAeHB0AA9TVFJVQ1RVUkVEX1RZUEUBcHABc3IAJmphdmEudXRpbC5Db2xsZWN0aW9ucyRVbm1vZGlmaWFibGVMaXN0/A8lMbXsjhACAAFMAARsaXN0cQB+AAF4cgAsamF2YS51dGlsLkNvbGxlY3Rpb25zJFVubW9kaWZpYWJsZUNvbGxlY3Rpb24ZQgCAy173HgIAAUwAAWN0ABZMamF2YS91dGlsL0NvbGxlY3Rpb247eHBzcgATamF2YS51dGlsLkFycmF5TGlzdHiB0h2Zx2GdAwABSQAEc2l6ZXhwAAAAAXcEAAAAAXNyAEdvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZEF0dHJpYnV0ZQAAAAAAAAABAgADTAALZGVzY3JpcHRpb25xAH4ADEwABG5hbWVxAH4ADEwABHR5cGVxAH4ABHhwcHQAA21hcHNyACxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTWFwVHlwZQAAAAAAAAABAgACTAAHa2V5VHlwZXEAfgAETAAJdmFsdWVUeXBlcQB+AAR4cQB+AA4BfnEAfgARdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgAOAX5xAH4AEXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AA4AfnEAfgARdAAGQklHSU5UeHEAfgAafnIASW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZSRTdHJ1Y3R1cmVkQ29tcGFyaXNpb24AAAAAAAAAABIAAHhxAH4AEnQABE5PTkVxAH4AB3BzcQB+ABkAAAABdwQAAAABc3IALW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuS2V5VmFsdWVEYXRhVHlwZY4kybjNPKCeAgACTAALa2V5RGF0YVR5cGV0ACdMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9EYXRhVHlwZTtMAA12YWx1ZURhdGFUeXBlcQB+AC94cQB+AAJ2cgANamF2YS51dGlsLk1hcAAAAAAAAAAAAAAAeHBxAH4AH3NyACtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkF0b21pY0RhdGFUeXBlGohTKfp6IzICAAB4cQB+AAJ2cgAmb3JnLmFwYWNoZS5mbGluay50YWJsZS5kYXRhLlN0cmluZ0RhdGEAAAAAAAAAAAAAAHhwcQB+ACNzcQB+ADN2cgAOamF2YS5sYW5nLkxvbmc7i+SQzI8j3wIAAUoABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwcQB+ACd4AAAUV/0AAAABAAAAAQBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5Sb3dEYXRhU2VyaWFsaXplciRSb3dEYXRhU2VyaWFsaXplclNuYXBzaG90AAAAAwAAAAGs7QAFc3IALG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5NYXBUeXBlAAAAAAAAAAECAAJMAAdrZXlUeXBldAAyTG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZTtMAAl2YWx1ZVR5cGVxAH4AAXhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgACAX5xAH4ABXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AAIAfnEAfgAFdAAGQklHSU5UABRX/QAAAAEAAAABAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLk1hcERhdGFTZXJpYWxpemVyJE1hcERhdGFTZXJpYWxpemVyU25hcHNob3QAAAADrO0ABXNyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuVmFyQ2hhclR5cGUAAAAAAAAAAQIAAUkABmxlbmd0aHhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAHVkFSQ0hBUn////+s7QAFc3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cgAwb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlAAAAAAAAAAECAAJaAAppc051bGxhYmxlTAAIdHlwZVJvb3R0ADZMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlUm9vdDt4cAB+cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlUm9vdAAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQABkJJR0lOVKztAAVzcgA9b3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5TdHJpbmdEYXRhU2VyaWFsaXplcgAAAAAAAAABAgAAeHIAQm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5UeXBlU2VyaWFsaXplclNpbmdsZXRvbnmph6rHLndFAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwrO0ABXNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhyAEJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuVHlwZVNlcmlhbGl6ZXJTaW5nbGV0b255qYeqxy53RQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4cA==')" + }, { + "$slice_end" : "BIGINT" + } ] + }, + "description" : "LocalWindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS count1$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0, slice_end('w$) AS $slice_end])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "count1$0" : "BIGINT" + }, { + "count$1" : "BIGINT" + }, { + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlO3hwdnIAK29yZy5hcGFjaGUuZmxpbmsudGFibGUuYXBpLmRhdGF2aWV3Lk1hcFZpZXcAAAAAAAAAAAAAAHhwc3IAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZQAAAAAAAAABAgAFWgAOaXNJbnN0YW50aWFibGVMAAphdHRyaWJ1dGVzcQB+AAFMAAtjb21wYXJpc2lvbnQAS0xvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZENvbXBhcmlzaW9uO0wAE2ltcGxlbWVudGF0aW9uQ2xhc3NxAH4AA0wACXN1cGVyVHlwZXQANUxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGU7eHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Vc2VyRGVmaW5lZFR5cGUAAAAAAAAAAQIAA1oAB2lzRmluYWxMAAtkZXNjcmlwdGlvbnQAEkxqYXZhL2xhbmcvU3RyaW5nO0wAEG9iamVjdElkZW50aWZpZXJ0ADFMb3JnL2FwYWNoZS9mbGluay90YWJsZS9jYXRhbG9nL09iamVjdElkZW50aWZpZXI7eHIAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZQAAAAAAAAABAgACWgAKaXNOdWxsYWJsZUwACHR5cGVSb290dAA2TG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZVJvb3Q7eHABfnIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZVJvb3QAAAAAAAAAABIAAHhyAA5qYXZhLmxhbmcuRW51bQAAAAAAAAAAEgAAeHB0AA9TVFJVQ1RVUkVEX1RZUEUBcHABc3IAJmphdmEudXRpbC5Db2xsZWN0aW9ucyRVbm1vZGlmaWFibGVMaXN0/A8lMbXsjhACAAFMAARsaXN0cQB+AAF4cgAsamF2YS51dGlsLkNvbGxlY3Rpb25zJFVubW9kaWZpYWJsZUNvbGxlY3Rpb24ZQgCAy173HgIAAUwAAWN0ABZMamF2YS91dGlsL0NvbGxlY3Rpb247eHBzcgATamF2YS51dGlsLkFycmF5TGlzdHiB0h2Zx2GdAwABSQAEc2l6ZXhwAAAAAXcEAAAAAXNyAEdvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZEF0dHJpYnV0ZQAAAAAAAAABAgADTAALZGVzY3JpcHRpb25xAH4ADEwABG5hbWVxAH4ADEwABHR5cGVxAH4ABHhwcHQAA21hcHNyACxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTWFwVHlwZQAAAAAAAAABAgACTAAHa2V5VHlwZXEAfgAETAAJdmFsdWVUeXBlcQB+AAR4cQB+AA4BfnEAfgARdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgAOAX5xAH4AEXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AA4AfnEAfgARdAAGQklHSU5UeHEAfgAafnIASW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZSRTdHJ1Y3R1cmVkQ29tcGFyaXNpb24AAAAAAAAAABIAAHhxAH4AEnQABE5PTkVxAH4AB3BzcQB+ABkAAAABdwQAAAABc3IALW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuS2V5VmFsdWVEYXRhVHlwZY4kybjNPKCeAgACTAALa2V5RGF0YVR5cGV0ACdMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9EYXRhVHlwZTtMAA12YWx1ZURhdGFUeXBlcQB+AC94cQB+AAJ2cgANamF2YS51dGlsLk1hcAAAAAAAAAAAAAAAeHBxAH4AH3NyACtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkF0b21pY0RhdGFUeXBlGohTKfp6IzICAAB4cQB+AAJ2cgAmb3JnLmFwYWNoZS5mbGluay50YWJsZS5kYXRhLlN0cmluZ0RhdGEAAAAAAAAAAAAAAHhwcQB+ACNzcQB+ADN2cgAOamF2YS5sYW5nLkxvbmc7i+SQzI8j3wIAAUoABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwcQB+ACd4AAAUV/0AAAABAAAAAQBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5Sb3dEYXRhU2VyaWFsaXplciRSb3dEYXRhU2VyaWFsaXplclNuYXBzaG90AAAAAwAAAAGs7QAFc3IALG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5NYXBUeXBlAAAAAAAAAAECAAJMAAdrZXlUeXBldAAyTG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZTtMAAl2YWx1ZVR5cGVxAH4AAXhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgACAX5xAH4ABXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AAIAfnEAfgAFdAAGQklHSU5UABRX/QAAAAEAAAABAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLk1hcERhdGFTZXJpYWxpemVyJE1hcERhdGFTZXJpYWxpemVyU25hcHNob3QAAAADrO0ABXNyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuVmFyQ2hhclR5cGUAAAAAAAAAAQIAAUkABmxlbmd0aHhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAHVkFSQ0hBUn////+s7QAFc3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cgAwb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlAAAAAAAAAAECAAJaAAppc051bGxhYmxlTAAIdHlwZVJvb3R0ADZMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlUm9vdDt4cAB+cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlUm9vdAAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQABkJJR0lOVKztAAVzcgA9b3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5TdHJpbmdEYXRhU2VyaWFsaXplcgAAAAAAAAABAgAAeHIAQm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5UeXBlU2VyaWFsaXplclNpbmdsZXRvbnmph6rHLndFAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwrO0ABXNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhyAEJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuVHlwZVNlcmlhbGl6ZXJTaW5nbGV0b255qYeqxy53RQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4cA==')" + }, { + "$slice_end" : "BIGINT" + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "cnt", + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "name" : "uv", + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "windowing" : { + "strategy" : "SliceAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT15M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "sliceEnd" : 4, + "isRowtime" : true + }, + "namedWindowProperties" : [ { + "name" : "window_start", + "property" : { + "kind" : "WindowStart", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "window_end", + "property" : { + "kind" : "WindowEnd", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "window_time", + "property" : { + "kind" : "Rowtime", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + } ], + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "localAggInputRowType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "cnt" : "BIGINT NOT NULL" + }, { + "uv" : "BIGINT NOT NULL" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_time" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, COUNT(count1$0) AS cnt, COUNT(distinct$0 count$1) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "condition" : null, + "id" : 8, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt" : "BIGINT NOT NULL" + }, { + "uv" : "BIGINT NOT NULL" + } ] + }, + "description" : "Calc(select=[a, window_start, window_end, cnt, uv])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 9, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt" : "BIGINT NOT NULL" + }, { + "uv" : "BIGINT NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable2" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + } + }, + "id" : 10, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + } ], + "condition" : null, + "id" : 11, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 12, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "timestampKind" : "ROWTIME", + "typeName" : "TIMESTAMP", + "nullable" : true + } + } ], + "condition" : null, + "id" : 13, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Calc(select=[a, c, rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "cnt", + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "name" : "uv", + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "TumblingWindow", + "size" : "PT15M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 2, + "isRowtime" : true + }, + "id" : 14, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "count1$0" : "BIGINT" + }, { + "count$1" : "BIGINT" + }, { + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlO3hwdnIAK29yZy5hcGFjaGUuZmxpbmsudGFibGUuYXBpLmRhdGF2aWV3Lk1hcFZpZXcAAAAAAAAAAAAAAHhwc3IAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZQAAAAAAAAABAgAFWgAOaXNJbnN0YW50aWFibGVMAAphdHRyaWJ1dGVzcQB+AAFMAAtjb21wYXJpc2lvbnQAS0xvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZENvbXBhcmlzaW9uO0wAE2ltcGxlbWVudGF0aW9uQ2xhc3NxAH4AA0wACXN1cGVyVHlwZXQANUxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGU7eHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Vc2VyRGVmaW5lZFR5cGUAAAAAAAAAAQIAA1oAB2lzRmluYWxMAAtkZXNjcmlwdGlvbnQAEkxqYXZhL2xhbmcvU3RyaW5nO0wAEG9iamVjdElkZW50aWZpZXJ0ADFMb3JnL2FwYWNoZS9mbGluay90YWJsZS9jYXRhbG9nL09iamVjdElkZW50aWZpZXI7eHIAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZQAAAAAAAAABAgACWgAKaXNOdWxsYWJsZUwACHR5cGVSb290dAA2TG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZVJvb3Q7eHABfnIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZVJvb3QAAAAAAAAAABIAAHhyAA5qYXZhLmxhbmcuRW51bQAAAAAAAAAAEgAAeHB0AA9TVFJVQ1RVUkVEX1RZUEUBcHABc3IAJmphdmEudXRpbC5Db2xsZWN0aW9ucyRVbm1vZGlmaWFibGVMaXN0/A8lMbXsjhACAAFMAARsaXN0cQB+AAF4cgAsamF2YS51dGlsLkNvbGxlY3Rpb25zJFVubW9kaWZpYWJsZUNvbGxlY3Rpb24ZQgCAy173HgIAAUwAAWN0ABZMamF2YS91dGlsL0NvbGxlY3Rpb247eHBzcgATamF2YS51dGlsLkFycmF5TGlzdHiB0h2Zx2GdAwABSQAEc2l6ZXhwAAAAAXcEAAAAAXNyAEdvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZEF0dHJpYnV0ZQAAAAAAAAABAgADTAALZGVzY3JpcHRpb25xAH4ADEwABG5hbWVxAH4ADEwABHR5cGVxAH4ABHhwcHQAA21hcHNyACxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTWFwVHlwZQAAAAAAAAABAgACTAAHa2V5VHlwZXEAfgAETAAJdmFsdWVUeXBlcQB+AAR4cQB+AA4BfnEAfgARdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgAOAX5xAH4AEXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AA4AfnEAfgARdAAGQklHSU5UeHEAfgAafnIASW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZSRTdHJ1Y3R1cmVkQ29tcGFyaXNpb24AAAAAAAAAABIAAHhxAH4AEnQABE5PTkVxAH4AB3BzcQB+ABkAAAABdwQAAAABc3IALW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuS2V5VmFsdWVEYXRhVHlwZY4kybjNPKCeAgACTAALa2V5RGF0YVR5cGV0ACdMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9EYXRhVHlwZTtMAA12YWx1ZURhdGFUeXBlcQB+AC94cQB+AAJ2cgANamF2YS51dGlsLk1hcAAAAAAAAAAAAAAAeHBxAH4AH3NyACtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkF0b21pY0RhdGFUeXBlGohTKfp6IzICAAB4cQB+AAJ2cgAmb3JnLmFwYWNoZS5mbGluay50YWJsZS5kYXRhLlN0cmluZ0RhdGEAAAAAAAAAAAAAAHhwcQB+ACNzcQB+ADN2cgAOamF2YS5sYW5nLkxvbmc7i+SQzI8j3wIAAUoABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwcQB+ACd4AAAUV/0AAAABAAAAAQBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5Sb3dEYXRhU2VyaWFsaXplciRSb3dEYXRhU2VyaWFsaXplclNuYXBzaG90AAAAAwAAAAGs7QAFc3IALG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5NYXBUeXBlAAAAAAAAAAECAAJMAAdrZXlUeXBldAAyTG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZTtMAAl2YWx1ZVR5cGVxAH4AAXhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgACAX5xAH4ABXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AAIAfnEAfgAFdAAGQklHSU5UABRX/QAAAAEAAAABAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLk1hcERhdGFTZXJpYWxpemVyJE1hcERhdGFTZXJpYWxpemVyU25hcHNob3QAAAADrO0ABXNyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuVmFyQ2hhclR5cGUAAAAAAAAAAQIAAUkABmxlbmd0aHhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAHVkFSQ0hBUn////+s7QAFc3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cgAwb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlAAAAAAAAAAECAAJaAAppc051bGxhYmxlTAAIdHlwZVJvb3R0ADZMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlUm9vdDt4cAB+cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlUm9vdAAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQABkJJR0lOVKztAAVzcgA9b3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5TdHJpbmdEYXRhU2VyaWFsaXplcgAAAAAAAAABAgAAeHIAQm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5UeXBlU2VyaWFsaXplclNpbmdsZXRvbnmph6rHLndFAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwrO0ABXNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhyAEJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuVHlwZVNlcmlhbGl6ZXJTaW5nbGV0b255qYeqxy53RQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4cA==')" + }, { + "$slice_end" : "BIGINT" + } ] + }, + "description" : "LocalWindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS count1$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0, slice_end('w$) AS $slice_end])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 15, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "count1$0" : "BIGINT" + }, { + "count$1" : "BIGINT" + }, { + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlO3hwdnIAK29yZy5hcGFjaGUuZmxpbmsudGFibGUuYXBpLmRhdGF2aWV3Lk1hcFZpZXcAAAAAAAAAAAAAAHhwc3IAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZQAAAAAAAAABAgAFWgAOaXNJbnN0YW50aWFibGVMAAphdHRyaWJ1dGVzcQB+AAFMAAtjb21wYXJpc2lvbnQAS0xvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZENvbXBhcmlzaW9uO0wAE2ltcGxlbWVudGF0aW9uQ2xhc3NxAH4AA0wACXN1cGVyVHlwZXQANUxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGU7eHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Vc2VyRGVmaW5lZFR5cGUAAAAAAAAAAQIAA1oAB2lzRmluYWxMAAtkZXNjcmlwdGlvbnQAEkxqYXZhL2xhbmcvU3RyaW5nO0wAEG9iamVjdElkZW50aWZpZXJ0ADFMb3JnL2FwYWNoZS9mbGluay90YWJsZS9jYXRhbG9nL09iamVjdElkZW50aWZpZXI7eHIAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZQAAAAAAAAABAgACWgAKaXNOdWxsYWJsZUwACHR5cGVSb290dAA2TG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZVJvb3Q7eHABfnIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZVJvb3QAAAAAAAAAABIAAHhyAA5qYXZhLmxhbmcuRW51bQAAAAAAAAAAEgAAeHB0AA9TVFJVQ1RVUkVEX1RZUEUBcHABc3IAJmphdmEudXRpbC5Db2xsZWN0aW9ucyRVbm1vZGlmaWFibGVMaXN0/A8lMbXsjhACAAFMAARsaXN0cQB+AAF4cgAsamF2YS51dGlsLkNvbGxlY3Rpb25zJFVubW9kaWZpYWJsZUNvbGxlY3Rpb24ZQgCAy173HgIAAUwAAWN0ABZMamF2YS91dGlsL0NvbGxlY3Rpb247eHBzcgATamF2YS51dGlsLkFycmF5TGlzdHiB0h2Zx2GdAwABSQAEc2l6ZXhwAAAAAXcEAAAAAXNyAEdvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZEF0dHJpYnV0ZQAAAAAAAAABAgADTAALZGVzY3JpcHRpb25xAH4ADEwABG5hbWVxAH4ADEwABHR5cGVxAH4ABHhwcHQAA21hcHNyACxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTWFwVHlwZQAAAAAAAAABAgACTAAHa2V5VHlwZXEAfgAETAAJdmFsdWVUeXBlcQB+AAR4cQB+AA4BfnEAfgARdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgAOAX5xAH4AEXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AA4AfnEAfgARdAAGQklHSU5UeHEAfgAafnIASW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZSRTdHJ1Y3R1cmVkQ29tcGFyaXNpb24AAAAAAAAAABIAAHhxAH4AEnQABE5PTkVxAH4AB3BzcQB+ABkAAAABdwQAAAABc3IALW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuS2V5VmFsdWVEYXRhVHlwZY4kybjNPKCeAgACTAALa2V5RGF0YVR5cGV0ACdMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9EYXRhVHlwZTtMAA12YWx1ZURhdGFUeXBlcQB+AC94cQB+AAJ2cgANamF2YS51dGlsLk1hcAAAAAAAAAAAAAAAeHBxAH4AH3NyACtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkF0b21pY0RhdGFUeXBlGohTKfp6IzICAAB4cQB+AAJ2cgAmb3JnLmFwYWNoZS5mbGluay50YWJsZS5kYXRhLlN0cmluZ0RhdGEAAAAAAAAAAAAAAHhwcQB+ACNzcQB+ADN2cgAOamF2YS5sYW5nLkxvbmc7i+SQzI8j3wIAAUoABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwcQB+ACd4AAAUV/0AAAABAAAAAQBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5Sb3dEYXRhU2VyaWFsaXplciRSb3dEYXRhU2VyaWFsaXplclNuYXBzaG90AAAAAwAAAAGs7QAFc3IALG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5NYXBUeXBlAAAAAAAAAAECAAJMAAdrZXlUeXBldAAyTG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZTtMAAl2YWx1ZVR5cGVxAH4AAXhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgACAX5xAH4ABXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AAIAfnEAfgAFdAAGQklHSU5UABRX/QAAAAEAAAABAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLk1hcERhdGFTZXJpYWxpemVyJE1hcERhdGFTZXJpYWxpemVyU25hcHNob3QAAAADrO0ABXNyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuVmFyQ2hhclR5cGUAAAAAAAAAAQIAAUkABmxlbmd0aHhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAHVkFSQ0hBUn////+s7QAFc3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cgAwb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlAAAAAAAAAAECAAJaAAppc051bGxhYmxlTAAIdHlwZVJvb3R0ADZMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlUm9vdDt4cAB+cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlUm9vdAAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQABkJJR0lOVKztAAVzcgA9b3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5TdHJpbmdEYXRhU2VyaWFsaXplcgAAAAAAAAABAgAAeHIAQm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5UeXBlU2VyaWFsaXplclNpbmdsZXRvbnmph6rHLndFAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwrO0ABXNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhyAEJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuVHlwZVNlcmlhbGl6ZXJTaW5nbGV0b255qYeqxy53RQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4cA==')" + }, { + "$slice_end" : "BIGINT" + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "cnt", + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "name" : "uv", + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "windowing" : { + "strategy" : "SliceAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT15M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "sliceEnd" : 4, + "isRowtime" : true + }, + "namedWindowProperties" : [ { + "name" : "window_start", + "property" : { + "kind" : "WindowStart", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "window_end", + "property" : { + "kind" : "WindowEnd", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "window_time", + "property" : { + "kind" : "Rowtime", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + } ], + "id" : 16, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "localAggInputRowType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "cnt" : "BIGINT NOT NULL" + }, { + "uv" : "BIGINT NOT NULL" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_time" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, COUNT(count1$0) AS cnt, COUNT(distinct$0 count$1) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "condition" : null, + "id" : 17, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt" : "BIGINT NOT NULL" + }, { + "uv" : "BIGINT NOT NULL" + } ] + }, + "description" : "Calc(select=[a, window_start, window_end, cnt, uv])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 18, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt" : "BIGINT NOT NULL" + }, { + "uv" : "BIGINT NOT NULL" + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoin", + "joinSpec" : { + "joinType" : "INNER", + "leftKeys" : [ 0 ], + "rightKeys" : [ 0 ], + "filterNulls" : [ true ], + "nonEquiCondition" : null + }, + "leftWindowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT15M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 1, + "windowEnd" : 2, + "isRowtime" : true + }, + "rightWindowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT15M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 1, + "windowEnd" : 2, + "isRowtime" : true + }, + "id" : 19, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt" : "BIGINT NOT NULL" + }, { + "uv" : "BIGINT NOT NULL" + }, { + "a0" : "INT" + }, { + "window_start0" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end0" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt0" : "BIGINT NOT NULL" + }, { + "uv0" : "BIGINT NOT NULL" + } ] + }, + "description" : "WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[(a = a0)], select=[a, window_start, window_end, cnt, uv, a0, window_start0, window_end0, cnt0, uv0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 8, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 9, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "condition" : null, + "id" : 20, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt" : "BIGINT NOT NULL" + }, { + "uv" : "BIGINT NOT NULL" + }, { + "a0" : "INT" + }, { + "cnt0" : "BIGINT NOT NULL" + }, { + "uv0" : "BIGINT NOT NULL" + } ] + }, + "description" : "Calc(select=[a, window_start, window_end, cnt, uv, a0, cnt0, uv0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "schema.5.name" : "r_a", + "schema.7.data-type" : "BIGINT", + "schema.0.data-type" : "INT", + "schema.2.name" : "window_end", + "schema.1.name" : "window_start", + "schema.6.data-type" : "BIGINT", + "schema.4.name" : "l_uv", + "schema.1.data-type" : "TIMESTAMP(3)", + "schema.3.data-type" : "BIGINT", + "schema.2.data-type" : "TIMESTAMP(3)", + "schema.3.name" : "l_cnt", + "schema.7.name" : "r_uv", + "connector" : "values", + "schema.6.name" : "r_cnt", + "schema.5.data-type" : "INT", + "schema.4.data-type" : "BIGINT", + "schema.0.name" : "l_a" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 21, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt" : "BIGINT NOT NULL" + }, { + "uv" : "BIGINT NOT NULL" + }, { + "a0" : "INT" + }, { + "cnt0" : "BIGINT NOT NULL" + }, { + "uv0" : "BIGINT NOT NULL" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, window_start, window_end, cnt, uv, a0, cnt0, uv0])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala new file mode 100644 index 0000000000000..15794b28eaeff --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.sql + +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.planner.factories.TestValuesTableFactory +import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} +import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink} +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Before, Test} +import java.time.ZoneId +import java.util + +import scala.collection.JavaConversions._ + +@RunWith(classOf[Parameterized]) +class WindowJoinITCase(mode: StateBackendMode, useTimestampLtz: Boolean) + extends StreamingWithStateTestBase(mode) { + + val SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai") + + @Before + override def before(): Unit = { + super.before() + // enable checkpoint, we are using failing source to force have a complete checkpoint + // and cover restore path + env.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE) + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)) + FailingCollectionSource.reset() + + val dataId1 = TestValuesTableFactory.registerData(TestData.windowDataWithTimestamp) + val dataIdWithLtz = TestValuesTableFactory.registerData(TestData.windowDataWithLtzInShanghai) + tEnv.executeSql( + s""" + |CREATE TABLE T1 ( + | `ts` ${if (useTimestampLtz) "BIGINT" else "STRING"}, + | `int` INT, + | `double` DOUBLE, + | `float` FLOAT, + | `bigdec` DECIMAL(10, 2), + | `string` STRING, + | `name` STRING, + | `rowtime` AS + | ${if (useTimestampLtz) "TO_TIMESTAMP_LTZ(`ts`, 3)" else "TO_TIMESTAMP(`ts`)"}, + | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '${ if (useTimestampLtz) dataIdWithLtz else dataId1}', + | 'failing-source' = 'true' + |) + |""".stripMargin) + + val dataId2 = TestValuesTableFactory.registerData(TestData.windowData2WithTimestamp) + val dataIdWithLtz2 = TestValuesTableFactory.registerData(TestData.windowData2WithLtzInShanghai) + + tEnv.executeSql( + s""" + |CREATE TABLE T2 ( + | `ts` ${if (useTimestampLtz) "BIGINT" else "STRING"}, + | `int` INT, + | `double` DOUBLE, + | `float` FLOAT, + | `bigdec` DECIMAL(10, 2), + | `string` STRING, + | `name` STRING, + | `rowtime` AS + | ${if (useTimestampLtz) "TO_TIMESTAMP_LTZ(`ts`, 3)" else "TO_TIMESTAMP(`ts`)"}, + | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '${ if (useTimestampLtz) dataIdWithLtz2 else dataId2}', + | 'failing-source' = 'true' + |) + |""".stripMargin) + tEnv.getConfig.setLocalTimeZone(SHANGHAI_ZONE) + } + + @Test + def testInnerJoin(): Unit = { + val sql = + """ + |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2 + |FROM ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + |FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) L + |JOIN ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + |FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name` + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testInnerJoinWithIsNotDistinctFrom(): Unit = { + val sql = + """ + |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2 + |FROM ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + |FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) L + |JOIN ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + |FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND + |L.`name` IS NOT DISTINCT from R.`name` + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", + "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testSemiJoinExists(): Unit = { + val sql = + """ + |SELECT * FROM ( + | SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + | FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + | GROUP BY `name`, window_start, window_end + |) L WHERE EXISTS ( + |SELECT * FROM( + | SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + | FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + | GROUP BY `name`, window_start, window_end + |) R + | WHERE L.window_start = R.window_start AND L.window_end = R.window_end + | AND L.`name` = R.`name`) + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testSemiJoinIN(): Unit = { + val sql = + """ + |SELECT * FROM ( + | SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + | FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + | GROUP BY `name`, window_start, window_end + |) L WHERE L.`name` IN ( + |SELECT `name` FROM( + | SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + | FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + | GROUP BY `name`, window_start, window_end + |) R + | WHERE L.window_start = R.window_start AND L.window_end = R.window_end) + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testAntiJoinNotExists(): Unit = { + val sql = + """ + |SELECT * FROM ( + | SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + | FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + | GROUP BY `name`, window_start, window_end + |) L WHERE NOT EXISTS ( + |SELECT * FROM( + | SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + | FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + | GROUP BY `name`, window_start, window_end + |) R + | WHERE L.window_start = R.window_start AND L.window_end = R.window_end + | AND L.`name` = R.`name`) + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "a,2020-10-10T00:00,2020-10-10T00:00:05,2", + "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1", + "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testAntiJoinNotIN(): Unit = { + val sql = + """ + |SELECT * FROM ( + | SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + | FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + | GROUP BY `name`, window_start, window_end + |) L WHERE L.`name` NOT IN ( + |SELECT `name` FROM( + | SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + | FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + | GROUP BY `name`, window_start, window_end + |) R + | WHERE L.window_start = R.window_start AND L.window_end = R.window_end) + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "a,2020-10-10T00:00,2020-10-10T00:00:05,2", + "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testLeftJoin(): Unit = { + val sql = + """ + |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2 + |FROM ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + |FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) L + |LEFT JOIN ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + |FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name` + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "a,2020-10-10T00:00,2020-10-10T00:00:05,2,null", + "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,null", + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", + "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,null") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testLeftJoinWithIsNotDistinctFrom(): Unit = { + val sql = + """ + |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2 + |FROM ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + |FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) L + |LEFT JOIN ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + |FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND + | L.`name` IS NOT DISTINCT from R.`name` + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "a,2020-10-10T00:00,2020-10-10T00:00:05,2,null", + "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,null", + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", + "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testRightJoin(): Unit = { + val sql = + """ + |SELECT L.`name`, R.window_start, R.window_end, uv1, uv2 + |FROM ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + |FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) L + |RIGHT JOIN ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + |FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name` + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "null,2020-10-10T00:00,2020-10-10T00:00:05,null,2", + "null,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", + "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,0") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testRightJoinWithIsNotDistinctFrom(): Unit = { + val sql = + """ + |SELECT L.`name`, R.window_start, R.window_end, uv1, uv2 + |FROM ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + |FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) L + |RIGHT JOIN ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + |FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND + | L.`name` IS NOT DISTINCT from R.`name` + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "null,2020-10-10T00:00,2020-10-10T00:00:05,null,2", + "null,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", + "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testOuterJoin(): Unit = { + val sql = + """ + |SELECT L.`name`, L.window_start, L.window_end, R.`name`, R.window_start, R.window_end, + |uv1, uv2 + |FROM ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + |FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) L + |FULL OUTER JOIN ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + |FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name` + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "a,2020-10-10T00:00,2020-10-10T00:00:05,null,null,null,2,null", + "a,2020-10-10T00:00:05,2020-10-10T00:00:10,null,null,null,1,null", + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", + "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,null,null,0,null", + "null,null,null,a1,2020-10-10T00:00,2020-10-10T00:00:05,null,2", + "null,null,null,a1,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", + "null,null,null,null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,0") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } + + @Test + def testOuterJoinWithIsNotDistinctFrom(): Unit = { + val sql = + """ + |SELECT L.`name`, L.window_start, L.window_end, R.`name`, R.window_start, R.window_end, + |uv1, uv2 + |FROM ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv1 + |FROM TABLE( + | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) L + |FULL OUTER JOIN ( + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(DISTINCT `string`) as uv2 + |FROM TABLE( + | TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY `name`, window_start, window_end + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND + | L.`name` IS NOT DISTINCT from R.`name` + |""".stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "a,2020-10-10T00:00,2020-10-10T00:00:05,null,null,null,2,null", + "a,2020-10-10T00:00:05,2020-10-10T00:00:10,null,null,null,1,null", + "b,2020-10-10T00:00:05,2020-10-10T00:00:10,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", + "b,2020-10-10T00:00:15,2020-10-10T00:00:20,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", + "b,2020-10-10T00:00:30,2020-10-10T00:00:35,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", + "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,2020-10-10T00:00:30," + + "2020-10-10T00:00:35,0,0", + "null,null,null,a1,2020-10-10T00:00,2020-10-10T00:00:05,null,2", + "null,null,null,a1,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1") + assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) + } +} + +object WindowJoinITCase { + + @Parameterized.Parameters(name = "StateBackend={0}, UseTimestampLtz = {1}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(HEAP_BACKEND, java.lang.Boolean.TRUE), + Array(HEAP_BACKEND, java.lang.Boolean.FALSE), + Array(ROCKSDB_BACKEND, java.lang.Boolean.TRUE), + Array(ROCKSDB_BACKEND, java.lang.Boolean.FALSE)) + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala index 0ee5c85cd7121..8c1321682c16b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala @@ -614,7 +614,24 @@ object TestData { row("2020-10-10 00:00:32", 7, 7d, 7f, new JBigDecimal("7.77"), null, null), row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), "Comment#3", "b")) + val windowData2WithTimestamp: Seq[Row] = List( + row("2020-10-10 00:00:01", 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a1"), + row("2020-10-10 00:00:02", 2, 2d, 2f, new JBigDecimal("2.22"), "Comment#1", "a1"), + row("2020-10-10 00:00:03", 2, 2d, 2f, new JBigDecimal("2.22"), "Comment#1", "a1"), + row("2020-10-10 00:00:04", 5, 5d, 5f, new JBigDecimal("5.55"), null, "a1"), + + row("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"), + row("2020-10-10 00:00:06", 6, 6d, 6f, new JBigDecimal("6.66"), "Hi", "b"), // out of order + row("2020-10-10 00:00:08", 3, null, 3f, new JBigDecimal("3.33"), "Comment#2", "a1"), + row("2020-10-10 00:00:04", 5, 5d, null, new JBigDecimal("5.55"), "Hi", "a1"), // late event + + row("2020-10-10 00:00:16", 4, 4d, 4f, new JBigDecimal("4.44"), "Hi", "b"), + + row("2020-10-10 00:00:32", 7, 7d, 7f, new JBigDecimal("7.77"), null, null), + row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), "Comment#3", "b")) + val shanghaiZone = ZoneId.of("Asia/Shanghai") + val windowDataWithLtzInShanghai: Seq[Row] = List( row(toEpochMills("2020-10-10T00:00:01", shanghaiZone), 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a"), @@ -639,6 +656,30 @@ object TestData { row(toEpochMills("2020-10-10T00:00:34", shanghaiZone), 1, 3d, 3f, new JBigDecimal("3.33"), "Comment#3", "b")) + val windowData2WithLtzInShanghai: Seq[Row] = List( + row(toEpochMills("2020-10-10T00:00:01", shanghaiZone), + 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a1"), + row(toEpochMills("2020-10-10T00:00:02", shanghaiZone), + 2, 2d, 2f, new JBigDecimal("2.22"), "Comment#1", "a1"), + row(toEpochMills("2020-10-10T00:00:03", shanghaiZone), + 2, 2d, 2f, new JBigDecimal("2.22"), "Comment#1", "a1"), + row(toEpochMills("2020-10-10T00:00:04", shanghaiZone), + 5, 5d, 5f, new JBigDecimal("5.55"), null, "a1"), + row(toEpochMills("2020-10-10T00:00:07", shanghaiZone), + 3, 3d, 3f, null, "Hello", "b"), + row(toEpochMills("2020-10-10T00:00:06", shanghaiZone), + 6, 6d, 6f, new JBigDecimal("6.66"), "Hi", "b"), // out of order + row(toEpochMills("2020-10-10T00:00:08", shanghaiZone), + 3, null, 3f, new JBigDecimal("3.33"), "Comment#2", "a1"), + row(toEpochMills("2020-10-10T00:00:04", shanghaiZone), + 5, 5d, null, new JBigDecimal("5.55"), "Hi", "a1"), // late event + row(toEpochMills("2020-10-10T00:00:16", shanghaiZone), + 4, 4d, 4f, new JBigDecimal("4.44"), "Hi", "b"), + row(toEpochMills("2020-10-10T00:00:32", shanghaiZone), + 7, 7d, 7f, new JBigDecimal("7.77"), null, null), + row(toEpochMills("2020-10-10T00:00:34", shanghaiZone), + 1, 3d, 3f, new JBigDecimal("3.33"), "Comment#3", "b")) + val timestampData: Seq[Row] = List( row("1970-01-01 00:00:00.001", 1, 1d, 1f, new JBigDecimal("1"), "Hi", "a"), row("1970-01-01 00:00:00.002", 2, 2d, 2f, new JBigDecimal("2"), "Hallo", "a"),