Skip to content

Commit

Permalink
[FLINK-21302][table-planner-blink] Fix NPE when use row_number() in o…
Browse files Browse the repository at this point in the history
…ver agg.
  • Loading branch information
beyond1920 committed Feb 5, 2021
1 parent fce75b5 commit 958d56c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ class DeclarativeAggCodeGen(
}.map(_.accept(rexNodeGen)).map(exprCodegen.generateExpression).map(_.nullTerm)
}

private val argIndexes = aggInfo.argIndexes
private val argIndexes = if (aggInfo.argIndexes != null) {
aggInfo.argIndexes
} else {
Array()
}

private val argTypes = {
val types = inputTypes ++ constants.map(t => FlinkTypeFactory.toLogicalType(t.getType))
argIndexes.map(types(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@ class OverAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTest
env.getCheckpointConfig.enableUnalignedCheckpoints(false)
}

@Test
def testRowNumberOnOver(): Unit = {
val t = failingDataSource(TestData.tupleData5)
.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT a, ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime()) FROM MyTable"

val sink = new TestingAppendSink
tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
env.execute()

val expected = List(
"1,1",
"2,1",
"2,2",
"3,1",
"3,2",
"3,3",
"4,1",
"4,2",
"4,3",
"4,4",
"5,1",
"5,2",
"5,3",
"5,4",
"5,5")
assertEquals(expected, sink.getAppendResults)
}

@Test
def testProcTimeBoundedPartitionedRowsOver(): Unit = {
val t = failingDataSource(TestData.tupleData5)
Expand Down

0 comments on commit 958d56c

Please sign in to comment.