Skip to content

Commit

Permalink
Revert rename.
Browse files Browse the repository at this point in the history
  • Loading branch information
ueshin committed Sep 26, 2017
1 parent e62d619 commit 14aa3b6
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[spark] class PythonRDD(
val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)

override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val runner = PythonCommandRunner(func, bufferSize, reuseWorker)
val runner = PythonRunner(func, bufferSize, reuseWorker)
runner.compute(firstParent.iterator(split, context), split.index, context)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] object PythonEvalType {
* funcs is a list of independent Python functions, each one of them is a list of chained Python
* functions (from bottom to top).
*/
private[spark] abstract class PythonRunner[IN, OUT](
private[spark] abstract class BasePythonRunner[IN, OUT](
funcs: Seq[ChainedPythonFunctions],
bufferSize: Int,
reuseWorker: Boolean,
Expand Down Expand Up @@ -346,21 +346,21 @@ private[spark] abstract class PythonRunner[IN, OUT](
}
}

private[spark] object PythonCommandRunner {
private[spark] object PythonRunner {

def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonCommandRunner = {
new PythonCommandRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker)
def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonRunner = {
new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker)
}
}

/**
* A helper class to run Python mapPartition in Spark.
*/
private[spark] class PythonCommandRunner(
private[spark] class PythonRunner(
funcs: Seq[ChainedPythonFunctions],
bufferSize: Int,
reuseWorker: Boolean)
extends PythonRunner[Array[Byte], Array[Byte]](
extends BasePythonRunner[Array[Byte], Array[Byte]](
funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0))) {

protected override def newWriterThread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.python

import org.apache.spark.TaskContext
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner}
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ArrowStreamPythonUDFRunner(
evalType: Int,
argOffsets: Array[Array[Int]],
schema: StructType)
extends PythonRunner[InternalRow, ColumnarBatch](
extends BasePythonRunner[InternalRow, ColumnarBatch](
funcs, bufferSize, reuseWorker, evalType, argOffsets) {

protected override def newWriterThread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import net.razorvine.pickle.{Pickler, Unpickler}

import org.apache.spark.TaskContext
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner}
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class PythonUDFRunner(
reuseWorker: Boolean,
evalType: Int,
argOffsets: Array[Array[Int]])
extends PythonRunner[Array[Byte], Array[Byte]](
extends BasePythonRunner[Array[Byte], Array[Byte]](
funcs, bufferSize, reuseWorker, evalType, argOffsets) {

protected override def newWriterThread(
Expand Down

0 comments on commit 14aa3b6

Please sign in to comment.