Skip to content

Commit

Permalink
[SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an iterator of p…
Browse files Browse the repository at this point in the history
…d.Series or an iterator of tuple of pd.Series

## What changes were proposed in this pull request?

Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series.
Note the UDF input args will be always one iterator:
* if the udf take only column as input, the iterator's element will be pd.Series (corresponding to the column values batch)
* if the udf take multiple columns as inputs, the iterator's element will be a tuple composed of multiple `pd.Series`s, each one corresponding to the multiple columns as inputs (keep the same order). For example:
```
pandas_udf("int", PandasUDFType.SCALAR_ITER)
def the_udf(iterator):
    for col1_batch, col2_batch in iterator:
        yield col1_batch + col2_batch

df.select(the_udf("col1", "col2"))
```
The udf above will add col1 and col2.

I haven't add unit tests, but manually tests show it works fine. So it is ready for first pass review.
We can test several typical cases:

```
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf
from pyspark.taskcontext import TaskContext

df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"])

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
    pid = TaskContext.get().partitionId()
    print("DBG: fi1: do init stuff, partitionId=" + str(pid))
    for batch in it:
        yield batch + 100
    print("DBG: fi1: do close stuff, partitionId=" + str(pid))

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi2(it):
    pid = TaskContext.get().partitionId()
    print("DBG: fi2: do init stuff, partitionId=" + str(pid))
    for batch in it:
        yield batch + 10000
    print("DBG: fi2: do close stuff, partitionId=" + str(pid))

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi3(it):
    pid = TaskContext.get().partitionId()
    print("DBG: fi3: do init stuff, partitionId=" + str(pid))
    for x, y in it:
        yield x + y * 10 + 100000
    print("DBG: fi3: do close stuff, partitionId=" + str(pid))

pandas_udf("int", PandasUDFType.SCALAR)
def fp1(x):
    return x + 1000

udf("int")
def fu1(x):
    return x + 10

# test select "pandas iter udf/pandas udf/sql udf" expressions at the same time.
# Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only one plan,
# and `fu1("a")`, `fp1("a")` will generate another two separate plans.
df.select(fi1("a"), fi2("b"), fi3("a", "b"), fu1("a"), fp1("a")).show()

# test chain two pandas iter udf together
# Note this case `fi2(fi1("a"))` will generate only one plan
# Also note the init stuff/close stuff call order will be like:
# (debug output following)
#     DBG: fi2: do init stuff, partitionId=0
#     DBG: fi1: do init stuff, partitionId=0
#     DBG: fi1: do close stuff, partitionId=0
#     DBG: fi2: do close stuff, partitionId=0
df.select(fi2(fi1("a"))).show()

# test more complex chain
# Note this case `fi1("a"), fi2("a")` will generate one plan,
# and `fi3(fi1_output, fi2_output)` will generate another plan
df.select(fi3(fi1("a"), fi2("a"))).show()
```

## How was this patch tested?

To be added.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #24643 from WeichenXu123/pandas_udf_iter.

Lead-authored-by: WeichenXu <weichen.xu@databricks.com>
Co-authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
  • Loading branch information
WeichenXu123 and mengxr committed Jun 15, 2019
1 parent a950570 commit 6d441dc
Show file tree
Hide file tree
Showing 11 changed files with 703 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[spark] object PythonEvalType {
val SQL_GROUPED_MAP_PANDAS_UDF = 201
val SQL_GROUPED_AGG_PANDAS_UDF = 202
val SQL_WINDOW_AGG_PANDAS_UDF = 203
val SQL_SCALAR_PANDAS_ITER_UDF = 204

def toString(pythonEvalType: Int): String = pythonEvalType match {
case NON_UDF => "NON_UDF"
Expand All @@ -54,6 +55,7 @@ private[spark] object PythonEvalType {
case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
case SQL_GROUPED_AGG_PANDAS_UDF => "SQL_GROUPED_AGG_PANDAS_UDF"
case SQL_WINDOW_AGG_PANDAS_UDF => "SQL_WINDOW_AGG_PANDAS_UDF"
case SQL_SCALAR_PANDAS_ITER_UDF => "SQL_SCALAR_PANDAS_ITER_UDF"
}
}

Expand Down
1 change: 1 addition & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class PythonEvalType(object):
SQL_GROUPED_MAP_PANDAS_UDF = 201
SQL_GROUPED_AGG_PANDAS_UDF = 202
SQL_WINDOW_AGG_PANDAS_UDF = 203
SQL_SCALAR_PANDAS_ITER_UDF = 204


def portable_hash(x):
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2796,6 +2796,8 @@ class PandasUDFType(object):
"""
SCALAR = PythonEvalType.SQL_SCALAR_PANDAS_UDF

SCALAR_ITER = PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF

GROUPED_MAP = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF

GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
Expand Down Expand Up @@ -3178,6 +3180,7 @@ def pandas_udf(f=None, returnType=None, functionType=None):
raise ValueError("Invalid returnType: returnType can not be None")

if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
raise ValueError("Invalid functionType: "
Expand Down
882 changes: 574 additions & 308 deletions python/pyspark/sql/tests/test_pandas_udf_scalar.py

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions python/pyspark/sql/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _wrap_function(sc, func, returnType):
def _create_udf(f, returnType, evalType):

if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):

Expand All @@ -48,7 +49,9 @@ def _create_udf(f, returnType, evalType):

argspec = _get_argspec(f)

if evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF and len(argspec.args) == 0 and \
if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF or
evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) and \
len(argspec.args) == 0 and \
argspec.varargs is None:
raise ValueError(
"Invalid function: 0-arg pandas_udfs are not supported. "
Expand Down Expand Up @@ -113,7 +116,8 @@ def returnType(self):
else:
self._returnType_placeholder = _parse_datatype_string(self._returnType)

if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF or \
self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
try:
to_arrow_type(self._returnType_placeholder)
except TypeError:
Expand Down Expand Up @@ -323,10 +327,11 @@ def register(self, name, f, returnType=None):
"a user-defined function, but got %s." % returnType)
if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
raise ValueError(
"Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF or "
"SQL_GROUPED_AGG_PANDAS_UDF")
"Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF, "
"SQL_SCALAR_PANDAS_ITER_UDF, or SQL_GROUPED_AGG_PANDAS_UDF")
register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
evalType=f.evalType,
deterministic=f.deterministic)
Expand Down
93 changes: 77 additions & 16 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,29 @@ def wrap_udf(f, return_type):
return lambda *a: f(*a)


def wrap_scalar_pandas_udf(f, return_type):
def wrap_scalar_pandas_udf(f, return_type, eval_type):
arrow_return_type = to_arrow_type(return_type)

def verify_result_length(*a):
result = f(*a)
def verify_result_type(result):
if not hasattr(result, "__len__"):
pd_type = "Pandas.DataFrame" if type(return_type) == StructType else "Pandas.Series"
raise TypeError("Return type of the user-defined function should be "
"{}, but is {}".format(pd_type, type(result)))
if len(result) != len(a[0]):
return result

def verify_result_length(result, length):
if len(result) != length:
raise RuntimeError("Result vector from pandas_udf was not the required length: "
"expected %d, got %d" % (len(a[0]), len(result)))
"expected %d, got %d" % (length, len(result)))
return result

return lambda *a: (verify_result_length(*a), arrow_return_type)
if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
return lambda *a: (verify_result_length(
verify_result_type(f(*a)), len(a[0])), arrow_return_type)
else:
# The result length verification is done at the end of a partition.
return lambda *iterator: map(lambda res: (res, arrow_return_type),
map(verify_result_type, f(*iterator)))


def wrap_grouped_map_pandas_udf(f, return_type, argspec):
Expand Down Expand Up @@ -201,23 +209,28 @@ def wrapped(begin_index, end_index, *series):
def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index):
num_arg = read_int(infile)
arg_offsets = [read_int(infile) for i in range(num_arg)]
row_func = None
chained_func = None
for i in range(read_int(infile)):
f, return_type = read_command(pickleSer, infile)
if row_func is None:
row_func = f
if chained_func is None:
chained_func = f
else:
row_func = chain(row_func, f)
chained_func = chain(chained_func, f)

# make sure StopIteration's raised in the user code are not ignored
# when they are processed in a for loop, raise them as RuntimeError's instead
func = fail_on_stopiteration(row_func)
if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
func = chained_func
else:
# make sure StopIteration's raised in the user code are not ignored
# when they are processed in a for loop, raise them as RuntimeError's instead
func = fail_on_stopiteration(chained_func)

# the last returnType will be the return type of UDF
if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
return arg_offsets, wrap_scalar_pandas_udf(func, return_type)
return arg_offsets, wrap_scalar_pandas_udf(func, return_type, eval_type)
elif eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
return arg_offsets, wrap_scalar_pandas_udf(func, return_type, eval_type)
elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
argspec = _get_argspec(row_func) # signature was lost when wrapping it
argspec = _get_argspec(chained_func) # signature was lost when wrapping it
return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec)
elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type)
Expand All @@ -233,6 +246,7 @@ def read_udfs(pickleSer, infile, eval_type):
runner_conf = {}

if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF):
Expand All @@ -255,13 +269,60 @@ def read_udfs(pickleSer, infile, eval_type):

# Scalar Pandas UDF handles struct type arguments as pandas DataFrames instead of
# pandas Series. See SPARK-27240.
df_for_struct = eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF
df_for_struct = (eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF or
eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF)
ser = ArrowStreamPandasUDFSerializer(timezone, safecheck, assign_cols_by_name,
df_for_struct)
else:
ser = BatchedSerializer(PickleSerializer(), 100)

num_udfs = read_int(infile)

if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
assert num_udfs == 1, "One SQL_SCALAR_PANDAS_ITER_UDF expected here."

arg_offsets, udf = read_single_udf(
pickleSer, infile, eval_type, runner_conf, udf_index=0)

def func(_, iterator):
num_input_rows = [0]

def map_batch(batch):
udf_args = [batch[offset] for offset in arg_offsets]
num_input_rows[0] += len(udf_args[0])
if len(udf_args) == 1:
return udf_args[0]
else:
return tuple(udf_args)

iterator = map(map_batch, iterator)
result_iter = udf(iterator)

num_output_rows = 0
for result_batch, result_type in result_iter:
num_output_rows += len(result_batch)
assert num_output_rows <= num_input_rows[0], \
"Pandas SCALAR_ITER UDF outputted more rows than input rows."
yield (result_batch, result_type)
try:
if sys.version >= '3':
iterator.__next__()
else:
iterator.next()
except StopIteration:
pass
else:
raise RuntimeError("SQL_SCALAR_PANDAS_ITER_UDF should exhaust the input iterator.")

if num_output_rows != num_input_rows[0]:
raise RuntimeError("The number of output rows of pandas iterator UDF should be "
"the same with input rows. The input rows number is %d but the "
"output rows number is %d." %
(num_input_rows[0], num_output_rows))

# profiling is not supported for UDF
return func, None, ser, ser

udfs = {}
call_udf = []
mapper_str = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.spark.sql.types.DataType
object PythonUDF {
private[this] val SCALAR_TYPES = Set(
PythonEvalType.SQL_BATCHED_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_UDF
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
)

def isScalarPythonUDF(e: Expression): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ case class BatchEvalPython(
case class ArrowEvalPython(
udfs: Seq[PythonUDF],
resultAttrs: Seq[Attribute],
child: LogicalPlan) extends BaseEvalPython
child: LogicalPlan,
evalType: Int) extends BaseEvalPython
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
object PythonEvals extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ArrowEvalPython(udfs, output, child) =>
ArrowEvalPythonExec(udfs, output, planLater(child)) :: Nil
case ArrowEvalPython(udfs, output, child, evalType) =>
ArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil
case BatchEvalPython(udfs, output, child) =>
BatchEvalPythonExec(udfs, output, planLater(child)) :: Nil
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private[spark] class BatchIterator[T](iter: Iterator[T], batchSize: Int)
/**
* A physical plan that evaluates a [[PythonUDF]].
*/
case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan)
case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan,
evalType: Int)
extends EvalPythonExec(udfs, resultAttrs, child) {

private val batchSize = conf.arrowMaxRecordsPerBatch
Expand All @@ -80,7 +81,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute]

val columnarBatchIter = new ArrowPythonRunner(
funcs,
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
evalType,
argOffsets,
schema,
sessionLocalTimeZone,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,27 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper {
}

private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = {
// Eval type checker is set once when we find the first evaluable UDF and its value
// shouldn't change later.
// Used to check if subsequent UDFs are of the same type as the first UDF. (since we can only
// If fisrt UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF,
// otherwise check if subsequent UDFs are of the same type as the first UDF. (since we can only
// extract UDFs of the same eval type)
var evalTypeChecker: Option[EvalTypeChecker] = None

var firstVisitedScalarUDFEvalType: Option[Int] = None

def canChainUDF(evalType: Int): Boolean = {
if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) {
false
} else {
evalType == firstVisitedScalarUDFEvalType.get
}
}

def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match {
case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
&& evalTypeChecker.isEmpty =>
evalTypeChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType)
&& firstVisitedScalarUDFEvalType.isEmpty =>
firstVisitedScalarUDFEvalType = Some(udf.evalType)
Seq(udf)
case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
&& evalTypeChecker.get(udf.evalType) =>
&& canChainUDF(udf.evalType) =>
Seq(udf)
case e => e.children.flatMap(collectEvaluableUDFs)
}
Expand Down Expand Up @@ -175,16 +183,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper {
AttributeReference(s"pythonUDF$i", u.dataType)()
}

val evaluation = validUdfs.partition(
_.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF
) match {
case (vectorizedUdfs, plainUdfs) if plainUdfs.isEmpty =>
ArrowEvalPython(vectorizedUdfs, resultAttrs, child)
case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty =>
BatchEvalPython(plainUdfs, resultAttrs, child)
val evalTypes = validUdfs.map(_.evalType).toSet
if (evalTypes.size != 1) {
throw new AnalysisException(
s"Expected udfs have the same evalType but got different evalTypes: " +
s"${evalTypes.mkString(",")}")
}
val evalType = evalTypes.head
val evaluation = evalType match {
case PythonEvalType.SQL_BATCHED_UDF =>
BatchEvalPython(validUdfs, resultAttrs, child)
case PythonEvalType.SQL_SCALAR_PANDAS_UDF | PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF =>
ArrowEvalPython(validUdfs, resultAttrs, child, evalType)
case _ =>
throw new AnalysisException(
"Expected either Scalar Pandas UDFs or Batched UDFs but got both")
throw new AnalysisException("Unexcepted UDF evalType")
}

attributeMap ++= validUdfs.zip(resultAttrs)
Expand Down

0 comments on commit 6d441dc

Please sign in to comment.