From 9fcb8001b9956441051aa1800cdc2df052f7a2c7 Mon Sep 17 00:00:00 2001 From: huangxingbo Date: Mon, 17 May 2021 11:06:36 +0800 Subject: [PATCH] add filter test --- .../stream/PythonCorrelateJsonPlanTest.java | 17 + .../testJoinWithFilter.out | 431 ++++++++++++++++++ 2 files changed, 448 insertions(+) create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java index c725d7b2130af0..7b2739ee46e62d 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java @@ -72,4 +72,21 @@ public void testPythonTableFunction() { + "LEFT JOIN LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) ON TRUE"; util.verifyJsonPlan(sqlQuery); } + + @Test + public void testJoinWithFilter() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a int,\n" + + " b int\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + + String sqlQuery = + "INSERT INTO MySink SELECT x, y FROM MyTable, " + + "LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) WHERE x = a and y + 1 = y * y"; + util.verifyJsonPlan(sqlQuery); + } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out new file mode 100644 index 00000000000000..fe5899b938e72a --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out @@ -0,0 +1,431 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "INT", + "schema.3.name" : "d", + "connector" : "values", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.0.name" : "a", + "schema.1.data-type" : "INT" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "INT" + }, { + "c" : "INT" + }, { + "d" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "*", + "kind" : "TIMES", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "INT" + }, { + "c" : "INT" + }, { + "d" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "f0" : "INT" + } ] + }, + "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate", + "joinType" : "INNER", + "functionCall" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "TableFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "displayName" : "pyFunc", + "functionKind" : "SCALAR", + "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRTY2FsYXJGdW5jdGlvbnMkUHl0aG9uU2NhbGFyRnVuY3Rpb275pBZGRJT8qAIAAUwABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb26383IwrjqOqQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cHQABnB5RnVuYw" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "structKind" : "FULLY_QUALIFIED", + "nullable" : false, + "fields" : [ { + "typeName" : "INTEGER", + "nullable" : true, + "fieldName" : "f0" + }, { + "typeName" : "INTEGER", + "nullable" : true, + "fieldName" : "f1" + } ] + } + }, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "INT" + }, { + "c" : "INT" + }, { + "d" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "f0" : "INT" + }, { + "f00" : "INT" + }, { + "f1" : "INT" + } ] + }, + "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,f00,f1], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER f00, INTEGER f1)], joinType=[INNER])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "condition" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "AND", + "kind" : "AND", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "REX_CALL", + "operator" : { + "name" : "=", + "kind" : "EQUALS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "REX_CALL", + "operator" : { + "name" : "+", + "kind" : "PLUS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "*", + "kind" : "TIMES", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BOOLEAN", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "=", + "kind" : "EQUALS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BOOLEAN", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BOOLEAN", + "nullable" : true + } + }, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "x" : "INT" + }, { + "y" : "INT" + } ] + }, + "description" : "Calc(select=[f00 AS x, f1 AS y], where=[(((f1 + 1) = (f1 * f1)) AND (f00 = a))])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "INT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "INT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "x" : "INT" + }, { + "y" : "INT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file