Skip to content

Commit

Permalink
[FLINK-21302][table-planner-blink] Fix NPE when use row_number() in o…
Browse files Browse the repository at this point in the history
…verAgg

This closes apache#14880
  • Loading branch information
beyond1920 authored and zhaoxing committed Apr 22, 2021
1 parent 02c8c07 commit eb92421
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ object AggregateUtil extends Enumeration {
.zipWithIndex
.map { case (call, index) =>
val argIndexes = call.getAggregation match {
case _: SqlRankFunction => orderKeyIndexes
case _: SqlRankFunction => if (orderKeyIndexes != null) orderKeyIndexes else Array[Int]()
case _ => call.getArgList.map(_.intValue()).toArray
}
transformToAggregateInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@ class OverAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTest
env.getCheckpointConfig.enableUnalignedCheckpoints(false)
}

@Test
def testRowNumberOnOver(): Unit = {
val t = failingDataSource(TestData.tupleData5)
.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT a, ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime()) FROM MyTable"

val sink = new TestingAppendSink
tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
env.execute()

val expected = List(
"1,1",
"2,1",
"2,2",
"3,1",
"3,2",
"3,3",
"4,1",
"4,2",
"4,3",
"4,4",
"5,1",
"5,2",
"5,3",
"5,4",
"5,5")
assertEquals(expected, sink.getAppendResults)
}

@Test
def testProcTimeBoundedPartitionedRowsOver(): Unit = {
val t = failingDataSource(TestData.tupleData5)
Expand Down

0 comments on commit eb92421

Please sign in to comment.