Skip to content

Commit

Permalink
[SPARK-45584][SQL] Fix subquery execution failure with TakeOrderedAnd…
Browse files Browse the repository at this point in the history
…ProjectExec

### What changes were proposed in this pull request?

This PR fixes a bug when there are subqueries in `TakeOrderedAndProjectExec`. The executeCollect method does not wait for subqueries to finish and it can result in IllegalArgumentException when executing a simple query.
For example this query:
```
WITH t2 AS (
  SELECT * FROM t1 ORDER BY id
)
SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
```
will fail with this error
```
 java.lang.IllegalArgumentException: requirement failed: Subquery subquery#242, [id=#109] has not finished
```

### Why are the changes needed?

To fix a bug.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43419 from allisonwang-db/spark-45584-subquery-failure.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
allisonwang-db authored and cloud-fan committed Oct 20, 2023
1 parent 63bfc10 commit 8fd915f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ case class TakeOrderedAndProjectExec(
projectList.map(_.toAttribute)
}

override def executeCollect(): Array[InternalRow] = {
override def executeCollect(): Array[InternalRow] = executeQuery {
val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder)
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val limited = if (orderingSatisfies) {
Expand Down
24 changes: 24 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2711,4 +2711,28 @@ class SubquerySuite extends QueryTest
checkAnswer(df, Row(1, "foo", 1, "foo"))
}
}

test("SPARK-45584: subquery execution should not fail with ORDER BY and LIMIT") {
withTable("t1") {
sql(
"""
|CREATE TABLE t1 USING PARQUET
|AS SELECT * FROM VALUES
|(1, "a"),
|(2, "a"),
|(3, "a") t(id, value)
|""".stripMargin)
val df = sql(
"""
|WITH t2 AS (
| SELECT * FROM t1 ORDER BY id
|)
|SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
|""".stripMargin)
// This should not fail with IllegalArgumentException.
checkAnswer(
df,
Row(1, "a", 3) :: Row(2, "a", 3) :: Row(3, "a", 3) :: Nil)
}
}
}

0 comments on commit 8fd915f

Please sign in to comment.