From 14aa3b641fd0c7f3a6feb6869508703b113b6ce6 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 26 Sep 2017 15:04:42 +0900 Subject: [PATCH] Revert rename. --- .../org/apache/spark/api/python/PythonRDD.scala | 2 +- .../org/apache/spark/api/python/PythonRunner.scala | 12 ++++++------ .../sql/execution/python/ArrowEvalPythonExec.scala | 2 +- .../python/ArrowStreamPythonUDFRunner.scala | 2 +- .../sql/execution/python/BatchEvalPythonExec.scala | 2 +- .../spark/sql/execution/python/PythonUDFRunner.scala | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 319bd6ebee267..f6293c0dc5091 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -59,7 +59,7 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { - val runner = PythonCommandRunner(func, bufferSize, reuseWorker) + val runner = PythonRunner(func, bufferSize, reuseWorker) runner.compute(firstParent.iterator(split, context), split.index, context) } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index a9c5f2669a9ec..ddf37ecf14153 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -45,7 +45,7 @@ private[spark] object PythonEvalType { * funcs is a list of independent Python functions, each one of them is a list of chained Python * functions (from bottom to top). */ -private[spark] abstract class PythonRunner[IN, OUT]( +private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], bufferSize: Int, reuseWorker: Boolean, @@ -346,21 +346,21 @@ private[spark] abstract class PythonRunner[IN, OUT]( } } -private[spark] object PythonCommandRunner { +private[spark] object PythonRunner { - def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonCommandRunner = { - new PythonCommandRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker) + def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonRunner = { + new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker) } } /** * A helper class to run Python mapPartition in Spark. */ -private[spark] class PythonCommandRunner( +private[spark] class PythonRunner( funcs: Seq[ChainedPythonFunctions], bufferSize: Int, reuseWorker: Boolean) - extends PythonRunner[Array[Byte], Array[Byte]]( + extends BasePythonRunner[Array[Byte], Array[Byte]]( funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0))) { protected override def newWriterThread( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index d3d04b1ddc950..6c9ebd26299d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.python import org.apache.spark.TaskContext -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowStreamPythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowStreamPythonUDFRunner.scala index d19e14f091cc9..bf124fdea8013 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowStreamPythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowStreamPythonUDFRunner.scala @@ -45,7 +45,7 @@ class ArrowStreamPythonUDFRunner( evalType: Int, argOffsets: Array[Array[Int]], schema: StructType) - extends PythonRunner[InternalRow, ColumnarBatch]( + extends BasePythonRunner[InternalRow, ColumnarBatch]( funcs, bufferSize, reuseWorker, evalType, argOffsets) { protected override def newWriterThread( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index 5bdf816000c2c..26ee25f633ea4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import net.razorvine.pickle.{Pickler, Unpickler} import org.apache.spark.TaskContext -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index 9e46a9b1e5d5c..2bb18deb07e14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -33,7 +33,7 @@ class PythonUDFRunner( reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]]) - extends PythonRunner[Array[Byte], Array[Byte]]( + extends BasePythonRunner[Array[Byte], Array[Byte]]( funcs, bufferSize, reuseWorker, evalType, argOffsets) { protected override def newWriterThread(