Skip to content

Commit

Permalink
[SPARK-35378][SQL][FOLLOW-UP] Fix incorrect return type in CommandRes…
Browse files Browse the repository at this point in the history
…ultExec.executeCollect()

### What changes were proposed in this pull request?

This PR is a follow-up for #32513 and fixes an issue introduced by that patch.

CommandResultExec is supposed to return `UnsafeRow` records in all of the `executeXYZ` methods but `executeCollect` was left out which causes issues like this one:
```
Error in SQL statement: ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow
cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
```

We need to return `unsafeRows` instead of `rows` in `executeCollect` similar to other methods in the class.

### Why are the changes needed?

Fixes a bug in CommandResultExec.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a unit test to check the return type of all commands.

Closes #36632 from sadikovi/fix-command-exec.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
sadikovi authored and cloud-fan committed May 23, 2022
1 parent 82f2a98 commit a0decfc
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ case class CommandResultExec(
}

override def executeCollect(): Array[InternalRow] = {
longMetric("numOutputRows").add(rows.size)
rows.toArray
longMetric("numOutputRows").add(unsafeRows.size)
unsafeRows
}

override def executeTake(limit: Int): Array[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.io.Source

import org.apache.spark.sql.{AnalysisException, FastOperator}
import org.apache.spark.sql.catalyst.analysis.UnresolvedNamespace
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan, OneRowRelation, Project, ShowTables, SubqueryAlias}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
Expand Down Expand Up @@ -262,6 +263,14 @@ class QueryExecutionSuite extends SharedSparkSession {
assert(cmdResultExec.commandPhysicalPlan.isInstanceOf[ShowTablesExec])
}

test("SPARK-35378: Return UnsafeRow in CommandResultExecCheck execute methods") {
val plan = spark.sql("SHOW FUNCTIONS").queryExecution.executedPlan
assert(plan.isInstanceOf[CommandResultExec])
plan.executeCollect().foreach { row => assert(row.isInstanceOf[UnsafeRow]) }
plan.executeTake(10).foreach { row => assert(row.isInstanceOf[UnsafeRow]) }
plan.executeTail(10).foreach { row => assert(row.isInstanceOf[UnsafeRow]) }
}

test("SPARK-38198: check specify maxFields when call toFile method") {
withTempDir { dir =>
val path = dir.getCanonicalPath + "/plans.txt"
Expand Down

0 comments on commit a0decfc

Please sign in to comment.