Skip to content

Commit

Permalink
[SPARK-20430][SQL] Initialise RangeExec parameters in a driver side
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This pr initialised `RangeExec` parameters in a driver side.
In the current master, a query below throws `NullPointerException`;
```
sql("SET spark.sql.codegen.wholeStage=false")
sql("SELECT * FROM range(1)").show

17/04/20 17:11:05 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
        at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:54)
        at org.apache.spark.sql.execution.RangeExec.numSlices(basicPhysicalOperators.scala:343)
        at org.apache.spark.sql.execution.RangeExec$$anonfun$20.apply(basicPhysicalOperators.scala:506)
        at org.apache.spark.sql.execution.RangeExec$$anonfun$20.apply(basicPhysicalOperators.scala:505)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
```

## How was this patch tested?
Added a test in `DataFrameRangeSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #17717 from maropu/SPARK-20430.
  • Loading branch information
maropu authored and gatorsmile committed Apr 22, 2017
1 parent 05a4514 commit b3c572a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,11 @@ case class SampleExec(
case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
extends LeafExecNode with CodegenSupport {

def start: Long = range.start
def end: Long = range.end
def step: Long = range.step
def numSlices: Int = range.numSlices.getOrElse(sparkContext.defaultParallelism)
def numElements: BigInt = range.numElements
val start: Long = range.start
val end: Long = range.end
val step: Long = range.step
val numSlices: Int = range.numSlices.getOrElse(sparkContext.defaultParallelism)
val numElements: BigInt = range.numElements

override val output: Seq[Attribute] = range.output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
}
}
}

test("SPARK-20430 Initialize Range parameters in a driver side") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
checkAnswer(sql("SELECT * FROM range(3)"), Row(0) :: Row(1) :: Row(2) :: Nil)
}
}
}

object DataFrameRangeSuite {
Expand Down

0 comments on commit b3c572a

Please sign in to comment.