Skip to content

Commit

Permalink
[SPARK-45584][SQL] Fix subquery execution failure with TakeOrderedAnd…
Browse files Browse the repository at this point in the history
…ProjectExec

This PR fixes a bug when there are subqueries in `TakeOrderedAndProjectExec`. The executeCollect method does not wait for subqueries to finish and it can result in IllegalArgumentException when executing a simple query.
For example this query:
```
WITH t2 AS (
  SELECT * FROM t1 ORDER BY id
)
SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
```
will fail with this error
```
 java.lang.IllegalArgumentException: requirement failed: Subquery subquery#242, [id=#109] has not finished
```

To fix a bug.

No

New unit test

No

Closes #43419 from allisonwang-db/spark-45584-subquery-failure.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 8fd915f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
allisonwang-db authored and cloud-fan committed Oct 20, 2023
1 parent feb48dc commit f47b63c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ case class TakeOrderedAndProjectExec(
projectList.map(_.toAttribute)
}

override def executeCollect(): Array[InternalRow] = {
override def executeCollect(): Array[InternalRow] = executeQuery {
val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder)
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val limited = if (orderingSatisfies) {
Expand Down
24 changes: 24 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2712,4 +2712,28 @@ class SubquerySuite extends QueryTest
expected)
}
}

test("SPARK-45584: subquery execution should not fail with ORDER BY and LIMIT") {
withTable("t1") {
sql(
"""
|CREATE TABLE t1 USING PARQUET
|AS SELECT * FROM VALUES
|(1, "a"),
|(2, "a"),
|(3, "a") t(id, value)
|""".stripMargin)
val df = sql(
"""
|WITH t2 AS (
| SELECT * FROM t1 ORDER BY id
|)
|SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
|""".stripMargin)
// This should not fail with IllegalArgumentException.
checkAnswer(
df,
Row(1, "a", 3) :: Row(2, "a", 3) :: Row(3, "a", 3) :: Nil)
}
}
}

0 comments on commit f47b63c

Please sign in to comment.