From 6d441dcdc68dae886e375794a55658f70cd18d9d Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Sat, 15 Jun 2019 08:29:20 -0700 Subject: [PATCH] [SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series ## What changes were proposed in this pull request? Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series. Note the UDF input args will be always one iterator: * if the udf take only column as input, the iterator's element will be pd.Series (corresponding to the column values batch) * if the udf take multiple columns as inputs, the iterator's element will be a tuple composed of multiple `pd.Series`s, each one corresponding to the multiple columns as inputs (keep the same order). For example: ``` pandas_udf("int", PandasUDFType.SCALAR_ITER) def the_udf(iterator): for col1_batch, col2_batch in iterator: yield col1_batch + col2_batch df.select(the_udf("col1", "col2")) ``` The udf above will add col1 and col2. I haven't add unit tests, but manually tests show it works fine. So it is ready for first pass review. We can test several typical cases: ``` from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.functions import udf from pyspark.taskcontext import TaskContext df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"]) pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi1(it): pid = TaskContext.get().partitionId() print("DBG: fi1: do init stuff, partitionId=" + str(pid)) for batch in it: yield batch + 100 print("DBG: fi1: do close stuff, partitionId=" + str(pid)) pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi2(it): pid = TaskContext.get().partitionId() print("DBG: fi2: do init stuff, partitionId=" + str(pid)) for batch in it: yield batch + 10000 print("DBG: fi2: do close stuff, partitionId=" + str(pid)) pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi3(it): pid = TaskContext.get().partitionId() print("DBG: fi3: do init stuff, partitionId=" + str(pid)) for x, y in it: yield x + y * 10 + 100000 print("DBG: fi3: do close stuff, partitionId=" + str(pid)) pandas_udf("int", PandasUDFType.SCALAR) def fp1(x): return x + 1000 udf("int") def fu1(x): return x + 10 # test select "pandas iter udf/pandas udf/sql udf" expressions at the same time. # Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only one plan, # and `fu1("a")`, `fp1("a")` will generate another two separate plans. df.select(fi1("a"), fi2("b"), fi3("a", "b"), fu1("a"), fp1("a")).show() # test chain two pandas iter udf together # Note this case `fi2(fi1("a"))` will generate only one plan # Also note the init stuff/close stuff call order will be like: # (debug output following) # DBG: fi2: do init stuff, partitionId=0 # DBG: fi1: do init stuff, partitionId=0 # DBG: fi1: do close stuff, partitionId=0 # DBG: fi2: do close stuff, partitionId=0 df.select(fi2(fi1("a"))).show() # test more complex chain # Note this case `fi1("a"), fi2("a")` will generate one plan, # and `fi3(fi1_output, fi2_output)` will generate another plan df.select(fi3(fi1("a"), fi2("a"))).show() ``` ## How was this patch tested? To be added. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #24643 from WeichenXu123/pandas_udf_iter. Lead-authored-by: WeichenXu Co-authored-by: Xiangrui Meng Signed-off-by: Xiangrui Meng --- .../spark/api/python/PythonRunner.scala | 2 + python/pyspark/rdd.py | 1 + python/pyspark/sql/functions.py | 3 + .../sql/tests/test_pandas_udf_scalar.py | 882 ++++++++++++------ python/pyspark/sql/udf.py | 13 +- python/pyspark/worker.py | 93 +- .../sql/catalyst/expressions/PythonUDF.scala | 3 +- .../logical/pythonLogicalOperators.scala | 3 +- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../python/ArrowEvalPythonExec.scala | 5 +- .../execution/python/ExtractPythonUDFs.scala | 44 +- 11 files changed, 703 insertions(+), 350 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 7b02545031e1c..e0e35dfbab96b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -46,6 +46,7 @@ private[spark] object PythonEvalType { val SQL_GROUPED_MAP_PANDAS_UDF = 201 val SQL_GROUPED_AGG_PANDAS_UDF = 202 val SQL_WINDOW_AGG_PANDAS_UDF = 203 + val SQL_SCALAR_PANDAS_ITER_UDF = 204 def toString(pythonEvalType: Int): String = pythonEvalType match { case NON_UDF => "NON_UDF" @@ -54,6 +55,7 @@ private[spark] object PythonEvalType { case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF" case SQL_GROUPED_AGG_PANDAS_UDF => "SQL_GROUPED_AGG_PANDAS_UDF" case SQL_WINDOW_AGG_PANDAS_UDF => "SQL_WINDOW_AGG_PANDAS_UDF" + case SQL_SCALAR_PANDAS_ITER_UDF => "SQL_SCALAR_PANDAS_ITER_UDF" } } diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f0682e71a1780..395abc841827d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -73,6 +73,7 @@ class PythonEvalType(object): SQL_GROUPED_MAP_PANDAS_UDF = 201 SQL_GROUPED_AGG_PANDAS_UDF = 202 SQL_WINDOW_AGG_PANDAS_UDF = 203 + SQL_SCALAR_PANDAS_ITER_UDF = 204 def portable_hash(x): diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 613822b7edf2d..69730638dc061 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2796,6 +2796,8 @@ class PandasUDFType(object): """ SCALAR = PythonEvalType.SQL_SCALAR_PANDAS_UDF + SCALAR_ITER = PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF + GROUPED_MAP = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF @@ -3178,6 +3180,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): raise ValueError("Invalid returnType: returnType can not be None") if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]: raise ValueError("Invalid functionType: " diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py index b219624e5801b..d2145b85f6cec 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py @@ -29,9 +29,11 @@ from datetime import date, datetime from decimal import Decimal +from pyspark import TaskContext from pyspark.rdd import PythonEvalType from pyspark.sql import Column -from pyspark.sql.functions import array, col, expr, lit, sum, struct, udf, pandas_udf +from pyspark.sql.functions import array, col, expr, lit, sum, struct, udf, pandas_udf, \ + PandasUDFType from pyspark.sql.types import Row from pyspark.sql.types import * from pyspark.sql.utils import AnalysisException @@ -83,6 +85,18 @@ def random_udf(v): random_udf = random_udf.asNondeterministic() return random_udf + @property + def nondeterministic_vectorized_iter_udf(self): + import numpy as np + + @pandas_udf('double', PandasUDFType.SCALAR_ITER) + def random_udf(it): + for v in it: + yield pd.Series(np.random.random(len(v))) + + random_udf = random_udf.asNondeterministic() + return random_udf + def test_pandas_udf_tokenize(self): tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')), ArrayType(StringType())) @@ -110,19 +124,20 @@ def test_vectorized_udf_basic(self): col('id').cast('boolean').alias('bool'), array(col('id')).alias('array_long')) f = lambda x: x - str_f = pandas_udf(f, StringType()) - int_f = pandas_udf(f, IntegerType()) - long_f = pandas_udf(f, LongType()) - float_f = pandas_udf(f, FloatType()) - double_f = pandas_udf(f, DoubleType()) - decimal_f = pandas_udf(f, DecimalType()) - bool_f = pandas_udf(f, BooleanType()) - array_long_f = pandas_udf(f, ArrayType(LongType())) - res = df.select(str_f(col('str')), int_f(col('int')), - long_f(col('long')), float_f(col('float')), - double_f(col('double')), decimal_f('decimal'), - bool_f(col('bool')), array_long_f('array_long')) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + str_f = pandas_udf(f, StringType(), udf_type) + int_f = pandas_udf(f, IntegerType(), udf_type) + long_f = pandas_udf(f, LongType(), udf_type) + float_f = pandas_udf(f, FloatType(), udf_type) + double_f = pandas_udf(f, DoubleType(), udf_type) + decimal_f = pandas_udf(f, DecimalType(), udf_type) + bool_f = pandas_udf(f, BooleanType(), udf_type) + array_long_f = pandas_udf(f, ArrayType(LongType()), udf_type) + res = df.select(str_f(col('str')), int_f(col('int')), + long_f(col('long')), float_f(col('float')), + double_f(col('double')), decimal_f('decimal'), + bool_f(col('bool')), array_long_f('array_long')) + self.assertEquals(df.collect(), res.collect()) def test_register_nondeterministic_vectorized_udf_basic(self): random_pandas_udf = pandas_udf( @@ -136,84 +151,115 @@ def test_register_nondeterministic_vectorized_udf_basic(self): [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect() self.assertEqual(row[0], 7) + def random_iter_udf(it): + for i in it: + yield random.randint(6, 6) + i + random_pandas_iter_udf = pandas_udf( + random_iter_udf, IntegerType(), PandasUDFType.SCALAR_ITER).asNondeterministic() + self.assertEqual(random_pandas_iter_udf.deterministic, False) + self.assertEqual(random_pandas_iter_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) + nondeterministic_pandas_iter_udf = self.spark.catalog.registerFunction( + "randomPandasIterUDF", random_pandas_iter_udf) + self.assertEqual(nondeterministic_pandas_iter_udf.deterministic, False) + self.assertEqual(nondeterministic_pandas_iter_udf.evalType, + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) + [row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect() + self.assertEqual(row[0], 7) + def test_vectorized_udf_null_boolean(self): data = [(True,), (True,), (None,), (False,)] schema = StructType().add("bool", BooleanType()) df = self.spark.createDataFrame(data, schema) - bool_f = pandas_udf(lambda x: x, BooleanType()) - res = df.select(bool_f(col('bool'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + bool_f = pandas_udf(lambda x: x, BooleanType(), udf_type) + res = df.select(bool_f(col('bool'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_byte(self): data = [(None,), (2,), (3,), (4,)] schema = StructType().add("byte", ByteType()) df = self.spark.createDataFrame(data, schema) - byte_f = pandas_udf(lambda x: x, ByteType()) - res = df.select(byte_f(col('byte'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + byte_f = pandas_udf(lambda x: x, ByteType(), udf_type) + res = df.select(byte_f(col('byte'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_short(self): data = [(None,), (2,), (3,), (4,)] schema = StructType().add("short", ShortType()) df = self.spark.createDataFrame(data, schema) - short_f = pandas_udf(lambda x: x, ShortType()) - res = df.select(short_f(col('short'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + short_f = pandas_udf(lambda x: x, ShortType(), udf_type) + res = df.select(short_f(col('short'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_int(self): data = [(None,), (2,), (3,), (4,)] schema = StructType().add("int", IntegerType()) df = self.spark.createDataFrame(data, schema) - int_f = pandas_udf(lambda x: x, IntegerType()) - res = df.select(int_f(col('int'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + int_f = pandas_udf(lambda x: x, IntegerType(), udf_type) + res = df.select(int_f(col('int'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_long(self): data = [(None,), (2,), (3,), (4,)] schema = StructType().add("long", LongType()) df = self.spark.createDataFrame(data, schema) - long_f = pandas_udf(lambda x: x, LongType()) - res = df.select(long_f(col('long'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + long_f = pandas_udf(lambda x: x, LongType(), udf_type) + res = df.select(long_f(col('long'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_float(self): data = [(3.0,), (5.0,), (-1.0,), (None,)] schema = StructType().add("float", FloatType()) df = self.spark.createDataFrame(data, schema) - float_f = pandas_udf(lambda x: x, FloatType()) - res = df.select(float_f(col('float'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + float_f = pandas_udf(lambda x: x, FloatType(), udf_type) + res = df.select(float_f(col('float'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_double(self): data = [(3.0,), (5.0,), (-1.0,), (None,)] schema = StructType().add("double", DoubleType()) df = self.spark.createDataFrame(data, schema) - double_f = pandas_udf(lambda x: x, DoubleType()) - res = df.select(double_f(col('double'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + double_f = pandas_udf(lambda x: x, DoubleType(), udf_type) + res = df.select(double_f(col('double'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_decimal(self): data = [(Decimal(3.0),), (Decimal(5.0),), (Decimal(-1.0),), (None,)] schema = StructType().add("decimal", DecimalType(38, 18)) df = self.spark.createDataFrame(data, schema) - decimal_f = pandas_udf(lambda x: x, DecimalType(38, 18)) - res = df.select(decimal_f(col('decimal'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + decimal_f = pandas_udf(lambda x: x, DecimalType(38, 18), udf_type) + res = df.select(decimal_f(col('decimal'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_string(self): data = [("foo",), (None,), ("bar",), ("bar",)] schema = StructType().add("str", StringType()) df = self.spark.createDataFrame(data, schema) - str_f = pandas_udf(lambda x: x, StringType()) - res = df.select(str_f(col('str'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + str_f = pandas_udf(lambda x: x, StringType(), udf_type) + res = df.select(str_f(col('str'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_string_in_udf(self): df = self.spark.range(10) - str_f = pandas_udf(lambda x: pd.Series(map(str, x)), StringType()) - actual = df.select(str_f(col('id'))) - expected = df.select(col('id').cast('string')) - self.assertEquals(expected.collect(), actual.collect()) + scalar_f = lambda x: pd.Series(map(str, x)) + + def iter_f(it): + for i in it: + yield scalar_f(i) + + for f, udf_type in [(scalar_f, PandasUDFType.SCALAR), (iter_f, PandasUDFType.SCALAR_ITER)]: + str_f = pandas_udf(f, StringType(), udf_type) + actual = df.select(str_f(col('id'))) + expected = df.select(col('id').cast('string')) + self.assertEquals(expected.collect(), actual.collect()) def test_vectorized_udf_datatype_string(self): df = self.spark.range(10).select( @@ -225,42 +271,46 @@ def test_vectorized_udf_datatype_string(self): col('id').cast('decimal').alias('decimal'), col('id').cast('boolean').alias('bool')) f = lambda x: x - str_f = pandas_udf(f, 'string') - int_f = pandas_udf(f, 'integer') - long_f = pandas_udf(f, 'long') - float_f = pandas_udf(f, 'float') - double_f = pandas_udf(f, 'double') - decimal_f = pandas_udf(f, 'decimal(38, 18)') - bool_f = pandas_udf(f, 'boolean') - res = df.select(str_f(col('str')), int_f(col('int')), - long_f(col('long')), float_f(col('float')), - double_f(col('double')), decimal_f('decimal'), - bool_f(col('bool'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + str_f = pandas_udf(f, 'string', udf_type) + int_f = pandas_udf(f, 'integer', udf_type) + long_f = pandas_udf(f, 'long', udf_type) + float_f = pandas_udf(f, 'float', udf_type) + double_f = pandas_udf(f, 'double', udf_type) + decimal_f = pandas_udf(f, 'decimal(38, 18)', udf_type) + bool_f = pandas_udf(f, 'boolean', udf_type) + res = df.select(str_f(col('str')), int_f(col('int')), + long_f(col('long')), float_f(col('float')), + double_f(col('double')), decimal_f('decimal'), + bool_f(col('bool'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_binary(self): data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)] schema = StructType().add("binary", BinaryType()) df = self.spark.createDataFrame(data, schema) - str_f = pandas_udf(lambda x: x, BinaryType()) - res = df.select(str_f(col('binary'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + str_f = pandas_udf(lambda x: x, BinaryType(), udf_type) + res = df.select(str_f(col('binary'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_array_type(self): data = [([1, 2],), ([3, 4],)] array_schema = StructType([StructField("array", ArrayType(IntegerType()))]) df = self.spark.createDataFrame(data, schema=array_schema) - array_f = pandas_udf(lambda x: x, ArrayType(IntegerType())) - result = df.select(array_f(col('array'))) - self.assertEquals(df.collect(), result.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + array_f = pandas_udf(lambda x: x, ArrayType(IntegerType()), udf_type) + result = df.select(array_f(col('array'))) + self.assertEquals(df.collect(), result.collect()) def test_vectorized_udf_null_array(self): data = [([1, 2],), (None,), (None,), ([3, 4],), (None,)] array_schema = StructType([StructField("array", ArrayType(IntegerType()))]) df = self.spark.createDataFrame(data, schema=array_schema) - array_f = pandas_udf(lambda x: x, ArrayType(IntegerType())) - result = df.select(array_f(col('array'))) - self.assertEquals(df.collect(), result.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + array_f = pandas_udf(lambda x: x, ArrayType(IntegerType()), udf_type) + result = df.select(array_f(col('array'))) + self.assertEquals(df.collect(), result.collect()) def test_vectorized_udf_struct_type(self): df = self.spark.range(10) @@ -268,24 +318,30 @@ def test_vectorized_udf_struct_type(self): StructField('id', LongType()), StructField('str', StringType())]) - def func(id): + def scalar_func(id): return pd.DataFrame({'id': id, 'str': id.apply(unicode)}) - f = pandas_udf(func, returnType=return_type) + def iter_func(it): + for id in it: + yield scalar_func(id) - expected = df.select(struct(col('id'), col('id').cast('string').alias('str')) - .alias('struct')).collect() + for func, udf_type in [(scalar_func, PandasUDFType.SCALAR), + (iter_func, PandasUDFType.SCALAR_ITER)]: + f = pandas_udf(func, returnType=return_type, functionType=udf_type) - actual = df.select(f(col('id')).alias('struct')).collect() - self.assertEqual(expected, actual) + expected = df.select(struct(col('id'), col('id').cast('string').alias('str')) + .alias('struct')).collect() - g = pandas_udf(func, 'id: long, str: string') - actual = df.select(g(col('id')).alias('struct')).collect() - self.assertEqual(expected, actual) + actual = df.select(f(col('id')).alias('struct')).collect() + self.assertEqual(expected, actual) - struct_f = pandas_udf(lambda x: x, return_type) - actual = df.select(struct_f(struct(col('id'), col('id').cast('string').alias('str')))) - self.assertEqual(expected, actual.collect()) + g = pandas_udf(func, 'id: long, str: string', functionType=udf_type) + actual = df.select(g(col('id')).alias('struct')).collect() + self.assertEqual(expected, actual) + + struct_f = pandas_udf(lambda x: x, return_type, functionType=udf_type) + actual = df.select(struct_f(struct(col('id'), col('id').cast('string').alias('str')))) + self.assertEqual(expected, actual.collect()) def test_vectorized_udf_struct_complex(self): df = self.spark.range(10) @@ -293,17 +349,24 @@ def test_vectorized_udf_struct_complex(self): StructField('ts', TimestampType()), StructField('arr', ArrayType(LongType()))]) - @pandas_udf(returnType=return_type) - def f(id): + def _scalar_f(id): return pd.DataFrame({'ts': id.apply(lambda i: pd.Timestamp(i)), 'arr': id.apply(lambda i: [i, i + 1])}) - actual = df.withColumn('f', f(col('id'))).collect() - for i, row in enumerate(actual): - id, f = row - self.assertEqual(i, id) - self.assertEqual(pd.Timestamp(i).to_pydatetime(), f[0]) - self.assertListEqual([i, i + 1], f[1]) + scalar_f = pandas_udf(_scalar_f, returnType=return_type) + + @pandas_udf(returnType=return_type, functionType=PandasUDFType.SCALAR_ITER) + def iter_f(it): + for id in it: + yield _scalar_f(id) + + for f, udf_type in [(scalar_f, PandasUDFType.SCALAR), (iter_f, PandasUDFType.SCALAR_ITER)]: + actual = df.withColumn('f', f(col('id'))).collect() + for i, row in enumerate(actual): + id, f = row + self.assertEqual(i, id) + self.assertEqual(pd.Timestamp(i).to_pydatetime(), f[0]) + self.assertListEqual([i, i + 1], f[1]) def test_vectorized_udf_nested_struct(self): nested_type = StructType([ @@ -314,30 +377,56 @@ def test_vectorized_udf_nested_struct(self): ])) ]) - with QuietTest(self.sc): - with self.assertRaisesRegexp( - Exception, - 'Invalid returnType with scalar Pandas UDFs'): - pandas_udf(lambda x: x, returnType=nested_type) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + with QuietTest(self.sc): + with self.assertRaisesRegexp( + Exception, + 'Invalid returnType with scalar Pandas UDFs'): + pandas_udf(lambda x: x, returnType=nested_type, functionType=udf_type) def test_vectorized_udf_complex(self): df = self.spark.range(10).select( col('id').cast('int').alias('a'), col('id').cast('int').alias('b'), col('id').cast('double').alias('c')) - add = pandas_udf(lambda x, y: x + y, IntegerType()) - power2 = pandas_udf(lambda x: 2 ** x, IntegerType()) - mul = pandas_udf(lambda x, y: x * y, DoubleType()) - res = df.select(add(col('a'), col('b')), power2(col('a')), mul(col('b'), col('c'))) - expected = df.select(expr('a + b'), expr('power(2, a)'), expr('b * c')) - self.assertEquals(expected.collect(), res.collect()) + scalar_add = pandas_udf(lambda x, y: x + y, IntegerType()) + scalar_power2 = pandas_udf(lambda x: 2 ** x, IntegerType()) + scalar_mul = pandas_udf(lambda x, y: x * y, DoubleType()) + + @pandas_udf(IntegerType(), PandasUDFType.SCALAR_ITER) + def iter_add(it): + for x, y in it: + yield x + y + + @pandas_udf(IntegerType(), PandasUDFType.SCALAR_ITER) + def iter_power2(it): + for x in it: + yield 2 ** x + + @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER) + def iter_mul(it): + for x, y in it: + yield x * y + + for add, power2, mul in [(scalar_add, scalar_power2, scalar_mul), + (iter_add, iter_power2, iter_mul)]: + res = df.select(add(col('a'), col('b')), power2(col('a')), mul(col('b'), col('c'))) + expected = df.select(expr('a + b'), expr('power(2, a)'), expr('b * c')) + self.assertEquals(expected.collect(), res.collect()) def test_vectorized_udf_exception(self): df = self.spark.range(10) - raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType()) - with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, 'division( or modulo)? by zero'): - df.select(raise_exception(col('id'))).collect() + scalar_raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType()) + + @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER) + def iter_raise_exception(it): + for x in it: + yield x * (1 / 0) + + for raise_exception in [scalar_raise_exception, iter_raise_exception]: + with QuietTest(self.sc): + with self.assertRaisesRegexp(Exception, 'division( or modulo)? by zero'): + df.select(raise_exception(col('id'))).collect() def test_vectorized_udf_invalid_length(self): df = self.spark.range(10) @@ -348,12 +437,46 @@ def test_vectorized_udf_invalid_length(self): 'Result vector from pandas_udf was not the required length'): df.select(raise_exception(col('id'))).collect() + @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER) + def iter_udf_wong_output_size(it): + for _ in it: + yield pd.Series(1) + + with QuietTest(self.sc): + with self.assertRaisesRegexp( + Exception, + "The number of output rows of pandas iterator UDF should be " + "the same with input rows"): + df.select(iter_udf_wong_output_size(col('id'))).collect() + + @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER) + def iter_udf_not_reading_all_input(it): + for batch in it: + batch_len = len(batch) + yield pd.Series([1] * batch_len) + break + + with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 3}): + df1 = self.spark.range(10).repartition(1) + with QuietTest(self.sc): + with self.assertRaisesRegexp( + Exception, + "SQL_SCALAR_PANDAS_ITER_UDF should exhaust the input iterator"): + df1.select(iter_udf_not_reading_all_input(col('id'))).collect() + def test_vectorized_udf_chained(self): df = self.spark.range(10) - f = pandas_udf(lambda x: x + 1, LongType()) - g = pandas_udf(lambda x: x - 1, LongType()) - res = df.select(g(f(col('id')))) - self.assertEquals(df.collect(), res.collect()) + scalar_f = pandas_udf(lambda x: x + 1, LongType()) + scalar_g = pandas_udf(lambda x: x - 1, LongType()) + + iter_f = pandas_udf(lambda it: map(lambda x: x + 1, it), LongType(), + PandasUDFType.SCALAR_ITER) + iter_g = pandas_udf(lambda it: map(lambda x: x - 1, it), LongType(), + PandasUDFType.SCALAR_ITER) + + for f, g in [(scalar_f, scalar_g), (iter_f, iter_g)]: + res = df.select(g(f(col('id')))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_chained_struct_type(self): df = self.spark.range(10) @@ -362,76 +485,110 @@ def test_vectorized_udf_chained_struct_type(self): StructField('str', StringType())]) @pandas_udf(return_type) - def f(id): + def scalar_f(id): return pd.DataFrame({'id': id, 'str': id.apply(unicode)}) - g = pandas_udf(lambda x: x, return_type) + scalar_g = pandas_udf(lambda x: x, return_type) + + @pandas_udf(return_type, PandasUDFType.SCALAR_ITER) + def iter_f(it): + for id in it: + yield pd.DataFrame({'id': id, 'str': id.apply(unicode)}) + + iter_g = pandas_udf(lambda x: x, return_type, PandasUDFType.SCALAR_ITER) expected = df.select(struct(col('id'), col('id').cast('string').alias('str')) .alias('struct')).collect() - actual = df.select(g(f(col('id'))).alias('struct')).collect() - self.assertEqual(expected, actual) + for f, g in [(scalar_f, scalar_g), (iter_f, iter_g)]: + actual = df.select(g(f(col('id'))).alias('struct')).collect() + self.assertEqual(expected, actual) def test_vectorized_udf_wrong_return_type(self): with QuietTest(self.sc): - with self.assertRaisesRegexp( - NotImplementedError, - 'Invalid returnType.*scalar Pandas UDF.*MapType'): - pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType())) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + with self.assertRaisesRegexp( + NotImplementedError, + 'Invalid returnType.*scalar Pandas UDF.*MapType'): + pandas_udf(lambda x: x, MapType(LongType(), LongType()), udf_type) def test_vectorized_udf_return_scalar(self): df = self.spark.range(10) - f = pandas_udf(lambda x: 1.0, DoubleType()) - with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, 'Return.*type.*Series'): - df.select(f(col('id'))).collect() + scalar_f = pandas_udf(lambda x: 1.0, DoubleType()) + iter_f = pandas_udf(lambda it: map(lambda x: 1.0, it), DoubleType(), + PandasUDFType.SCALAR_ITER) + for f in [scalar_f, iter_f]: + with QuietTest(self.sc): + with self.assertRaisesRegexp(Exception, 'Return.*type.*Series'): + df.select(f(col('id'))).collect() def test_vectorized_udf_decorator(self): df = self.spark.range(10) @pandas_udf(returnType=LongType()) - def identity(x): + def scalar_identity(x): return x - res = df.select(identity(col('id'))) - self.assertEquals(df.collect(), res.collect()) + + @pandas_udf(returnType=LongType(), functionType=PandasUDFType.SCALAR_ITER) + def iter_identity(x): + return x + + for identity in [scalar_identity, iter_identity]: + res = df.select(identity(col('id'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_empty_partition(self): df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) - f = pandas_udf(lambda x: x, LongType()) - res = df.select(f(col('id'))) - self.assertEquals(df.collect(), res.collect()) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + f = pandas_udf(lambda x: x, LongType(), udf_type) + res = df.select(f(col('id'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_struct_with_empty_partition(self): df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))\ .withColumn('name', lit('John Doe')) @pandas_udf("first string, last string") - def split_expand(n): + def scalar_split_expand(n): return n.str.split(expand=True) - result = df.select(split_expand('name')).collect() - self.assertEqual(1, len(result)) - row = result[0] - self.assertEqual('John', row[0]['first']) - self.assertEqual('Doe', row[0]['last']) + @pandas_udf("first string, last string", PandasUDFType.SCALAR_ITER) + def iter_split_expand(it): + for n in it: + yield n.str.split(expand=True) + + for split_expand in [scalar_split_expand, iter_split_expand]: + result = df.select(split_expand('name')).collect() + self.assertEqual(1, len(result)) + row = result[0] + self.assertEqual('John', row[0]['first']) + self.assertEqual('Doe', row[0]['last']) def test_vectorized_udf_varargs(self): df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) - f = pandas_udf(lambda *v: v[0], LongType()) - res = df.select(f(col('id'))) - self.assertEquals(df.collect(), res.collect()) + scalar_f = pandas_udf(lambda *v: v[0], LongType()) + + @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER) + def iter_f(it): + for v in it: + yield v[0] + + for f in [scalar_f, iter_f]: + res = df.select(f(col('id'), col('id'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_unsupported_types(self): with QuietTest(self.sc): - with self.assertRaisesRegexp( - NotImplementedError, - 'Invalid returnType.*scalar Pandas UDF.*MapType'): - pandas_udf(lambda x: x, MapType(StringType(), IntegerType())) - with self.assertRaisesRegexp( - NotImplementedError, - 'Invalid returnType.*scalar Pandas UDF.*ArrayType.StructType'): - pandas_udf(lambda x: x, ArrayType(StructType([StructField('a', IntegerType())]))) + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + with self.assertRaisesRegexp( + NotImplementedError, + 'Invalid returnType.*scalar Pandas UDF.*MapType'): + pandas_udf(lambda x: x, MapType(StringType(), IntegerType()), udf_type) + with self.assertRaisesRegexp( + NotImplementedError, + 'Invalid returnType.*scalar Pandas UDF.*ArrayType.StructType'): + pandas_udf(lambda x: x, + ArrayType(StructType([StructField('a', IntegerType())])), udf_type) def test_vectorized_udf_dates(self): schema = StructType().add("idx", LongType()).add("date", DateType()) @@ -442,11 +599,7 @@ def test_vectorized_udf_dates(self): (4, date(2262, 4, 12),)] df = self.spark.createDataFrame(data, schema=schema) - date_copy = pandas_udf(lambda t: t, returnType=DateType()) - df = df.withColumn("date_copy", date_copy(col("date"))) - - @pandas_udf(returnType=StringType()) - def check_data(idx, date, date_copy): + def scalar_check_data(idx, date, date_copy): msgs = [] is_equal = date.isnull() for i in range(len(idx)): @@ -459,14 +612,26 @@ def check_data(idx, date, date_copy): % (date[i], idx[i], data[idx[i]][1])) return pd.Series(msgs) - result = df.withColumn("check_data", - check_data(col("idx"), col("date"), col("date_copy"))).collect() + def iter_check_data(it): + for idx, date, date_copy in it: + yield scalar_check_data(idx, date, date_copy) - self.assertEquals(len(data), len(result)) - for i in range(len(result)): - self.assertEquals(data[i][1], result[i][1]) # "date" col - self.assertEquals(data[i][1], result[i][2]) # "date_copy" col - self.assertIsNone(result[i][3]) # "check_data" col + pandas_scalar_check_data = pandas_udf(scalar_check_data, StringType()) + pandas_iter_check_data = pandas_udf(iter_check_data, StringType(), + PandasUDFType.SCALAR_ITER) + + for check_data, udf_type in [(pandas_scalar_check_data, PandasUDFType.SCALAR), + (pandas_iter_check_data, PandasUDFType.SCALAR_ITER)]: + date_copy = pandas_udf(lambda t: t, returnType=DateType(), functionType=udf_type) + df = df.withColumn("date_copy", date_copy(col("date"))) + result = df.withColumn("check_data", + check_data(col("idx"), col("date"), col("date_copy"))).collect() + + self.assertEquals(len(data), len(result)) + for i in range(len(result)): + self.assertEquals(data[i][1], result[i][1]) # "date" col + self.assertEquals(data[i][1], result[i][2]) # "date_copy" col + self.assertIsNone(result[i][3]) # "check_data" col def test_vectorized_udf_timestamps(self): schema = StructType([ @@ -479,12 +644,7 @@ def test_vectorized_udf_timestamps(self): df = self.spark.createDataFrame(data, schema=schema) - # Check that a timestamp passed through a pandas_udf will not be altered by timezone calc - f_timestamp_copy = pandas_udf(lambda t: t, returnType=TimestampType()) - df = df.withColumn("timestamp_copy", f_timestamp_copy(col("timestamp"))) - - @pandas_udf(returnType=StringType()) - def check_data(idx, timestamp, timestamp_copy): + def scalar_check_data(idx, timestamp, timestamp_copy): msgs = [] is_equal = timestamp.isnull() # use this array to check values are equal for i in range(len(idx)): @@ -498,42 +658,71 @@ def check_data(idx, timestamp, timestamp_copy): % (timestamp[i], idx[i], data[idx[i]][1])) return pd.Series(msgs) - result = df.withColumn("check_data", check_data(col("idx"), col("timestamp"), - col("timestamp_copy"))).collect() - # Check that collection values are correct - self.assertEquals(len(data), len(result)) - for i in range(len(result)): - self.assertEquals(data[i][1], result[i][1]) # "timestamp" col - self.assertEquals(data[i][1], result[i][2]) # "timestamp_copy" col - self.assertIsNone(result[i][3]) # "check_data" col + def iter_check_data(it): + for idx, timestamp, timestamp_copy in it: + yield scalar_check_data(idx, timestamp, timestamp_copy) + + pandas_scalar_check_data = pandas_udf(scalar_check_data, StringType()) + pandas_iter_check_data = pandas_udf(iter_check_data, StringType(), + PandasUDFType.SCALAR_ITER) + + for check_data, udf_type in [(pandas_scalar_check_data, PandasUDFType.SCALAR), + (pandas_iter_check_data, PandasUDFType.SCALAR_ITER)]: + # Check that a timestamp passed through a pandas_udf will not be altered by timezone + # calc + f_timestamp_copy = pandas_udf(lambda t: t, + returnType=TimestampType(), functionType=udf_type) + df = df.withColumn("timestamp_copy", f_timestamp_copy(col("timestamp"))) + result = df.withColumn("check_data", check_data(col("idx"), col("timestamp"), + col("timestamp_copy"))).collect() + # Check that collection values are correct + self.assertEquals(len(data), len(result)) + for i in range(len(result)): + self.assertEquals(data[i][1], result[i][1]) # "timestamp" col + self.assertEquals(data[i][1], result[i][2]) # "timestamp_copy" col + self.assertIsNone(result[i][3]) # "check_data" col def test_vectorized_udf_return_timestamp_tz(self): df = self.spark.range(10) @pandas_udf(returnType=TimestampType()) - def gen_timestamps(id): + def scalar_gen_timestamps(id): ts = [pd.Timestamp(i, unit='D', tz='America/Los_Angeles') for i in id] return pd.Series(ts) - result = df.withColumn("ts", gen_timestamps(col("id"))).collect() - spark_ts_t = TimestampType() - for r in result: - i, ts = r - ts_tz = pd.Timestamp(i, unit='D', tz='America/Los_Angeles').to_pydatetime() - expected = spark_ts_t.fromInternal(spark_ts_t.toInternal(ts_tz)) - self.assertEquals(expected, ts) + @pandas_udf(returnType=TimestampType(), functionType=PandasUDFType.SCALAR_ITER) + def iter_gen_timestamps(it): + for id in it: + ts = [pd.Timestamp(i, unit='D', tz='America/Los_Angeles') for i in id] + yield pd.Series(ts) + + for gen_timestamps in [scalar_gen_timestamps, iter_gen_timestamps]: + result = df.withColumn("ts", gen_timestamps(col("id"))).collect() + spark_ts_t = TimestampType() + for r in result: + i, ts = r + ts_tz = pd.Timestamp(i, unit='D', tz='America/Los_Angeles').to_pydatetime() + expected = spark_ts_t.fromInternal(spark_ts_t.toInternal(ts_tz)) + self.assertEquals(expected, ts) def test_vectorized_udf_check_config(self): with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 3}): df = self.spark.range(10, numPartitions=1) @pandas_udf(returnType=LongType()) - def check_records_per_batch(x): + def scalar_check_records_per_batch(x): return pd.Series(x.size).repeat(x.size) - result = df.select(check_records_per_batch(col("id"))).collect() - for (r,) in result: - self.assertTrue(r <= 3) + @pandas_udf(returnType=LongType(), functionType=PandasUDFType.SCALAR_ITER) + def iter_check_records_per_batch(it): + for x in it: + yield pd.Series(x.size).repeat(x.size) + + for check_records_per_batch in [scalar_check_records_per_batch, + iter_check_records_per_batch]: + result = df.select(check_records_per_batch(col("id"))).collect() + for (r,) in result: + self.assertTrue(r <= 3) def test_vectorized_udf_timestamps_respect_session_timezone(self): schema = StructType([ @@ -545,69 +734,121 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): (4, datetime(2100, 3, 3, 3, 3, 3))] df = self.spark.createDataFrame(data, schema=schema) - f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType()) - internal_value = pandas_udf( + scalar_internal_value = pandas_udf( lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT else None), LongType()) - timezone = "America/New_York" - with self.sql_conf({ - "spark.sql.execution.pandas.respectSessionTimeZone": False, - "spark.sql.session.timeZone": timezone}): - df_la = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \ - .withColumn("internal_value", internal_value(col("timestamp"))) - result_la = df_la.select(col("idx"), col("internal_value")).collect() - # Correct result_la by adjusting 3 hours difference between Los Angeles and New York - diff = 3 * 60 * 60 * 1000 * 1000 * 1000 - result_la_corrected = \ - df_la.select(col("idx"), col("tscopy"), col("internal_value") + diff).collect() - - with self.sql_conf({ - "spark.sql.execution.pandas.respectSessionTimeZone": True, - "spark.sql.session.timeZone": timezone}): - df_ny = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \ - .withColumn("internal_value", internal_value(col("timestamp"))) - result_ny = df_ny.select(col("idx"), col("tscopy"), col("internal_value")).collect() - - self.assertNotEqual(result_ny, result_la) - self.assertEqual(result_ny, result_la_corrected) + @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER) + def iter_internal_value(it): + for ts in it: + yield ts.apply(lambda ts: ts.value if ts is not pd.NaT else None) + + for internal_value, udf_type in [(scalar_internal_value, PandasUDFType.SCALAR), + (iter_internal_value, PandasUDFType.SCALAR_ITER)]: + f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType(), udf_type) + timezone = "America/New_York" + with self.sql_conf({ + "spark.sql.execution.pandas.respectSessionTimeZone": False, + "spark.sql.session.timeZone": timezone}): + df_la = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \ + .withColumn("internal_value", internal_value(col("timestamp"))) + result_la = df_la.select(col("idx"), col("internal_value")).collect() + # Correct result_la by adjusting 3 hours difference between Los Angeles and New York + diff = 3 * 60 * 60 * 1000 * 1000 * 1000 + result_la_corrected = \ + df_la.select(col("idx"), col("tscopy"), col("internal_value") + diff).collect() + + with self.sql_conf({ + "spark.sql.execution.pandas.respectSessionTimeZone": True, + "spark.sql.session.timeZone": timezone}): + df_ny = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \ + .withColumn("internal_value", internal_value(col("timestamp"))) + result_ny = df_ny.select(col("idx"), col("tscopy"), col("internal_value")).collect() + + self.assertNotEqual(result_ny, result_la) + self.assertEqual(result_ny, result_la_corrected) def test_nondeterministic_vectorized_udf(self): # Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations @pandas_udf('double') - def plus_ten(v): + def scalar_plus_ten(v): return v + 10 - random_udf = self.nondeterministic_vectorized_udf - df = self.spark.range(10).withColumn('rand', random_udf(col('id'))) - result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas() + @pandas_udf('double', PandasUDFType.SCALAR_ITER) + def iter_plus_ten(it): + for v in it: + yield v + 10 - self.assertEqual(random_udf.deterministic, False) - self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10)) + for plus_ten in [scalar_plus_ten, iter_plus_ten]: + random_udf = self.nondeterministic_vectorized_udf + + df = self.spark.range(10).withColumn('rand', random_udf(col('id'))) + result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas() + + self.assertEqual(random_udf.deterministic, False) + self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10)) def test_nondeterministic_vectorized_udf_in_aggregate(self): df = self.spark.range(10) - random_udf = self.nondeterministic_vectorized_udf - - with QuietTest(self.sc): - with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): - df.groupby(df.id).agg(sum(random_udf(df.id))).collect() - with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): - df.agg(sum(random_udf(df.id))).collect() + for random_udf in [self.nondeterministic_vectorized_udf, + self.nondeterministic_vectorized_iter_udf]: + with QuietTest(self.sc): + with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): + df.groupby(df.id).agg(sum(random_udf(df.id))).collect() + with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): + df.agg(sum(random_udf(df.id))).collect() def test_register_vectorized_udf_basic(self): df = self.spark.range(10).select( col('id').cast('int').alias('a'), col('id').cast('int').alias('b')) - original_add = pandas_udf(lambda x, y: x + y, IntegerType()) - self.assertEqual(original_add.deterministic, True) - self.assertEqual(original_add.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF) - new_add = self.spark.catalog.registerFunction("add1", original_add) - res1 = df.select(new_add(col('a'), col('b'))) - res2 = self.spark.sql( - "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM range(10)) t") - expected = df.select(expr('a + b')) - self.assertEquals(expected.collect(), res1.collect()) - self.assertEquals(expected.collect(), res2.collect()) + scalar_original_add = pandas_udf(lambda x, y: x + y, IntegerType()) + self.assertEqual(scalar_original_add.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF) + + @pandas_udf(IntegerType(), PandasUDFType.SCALAR_ITER) + def iter_original_add(it): + for x, y in it: + yield x + y + + self.assertEqual(iter_original_add.evalType, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) + + for original_add in [scalar_original_add, iter_original_add]: + self.assertEqual(original_add.deterministic, True) + new_add = self.spark.catalog.registerFunction("add1", original_add) + res1 = df.select(new_add(col('a'), col('b'))) + res2 = self.spark.sql( + "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM range(10)) t") + expected = df.select(expr('a + b')) + self.assertEquals(expected.collect(), res1.collect()) + self.assertEquals(expected.collect(), res2.collect()) + + def test_scalar_iter_udf_init(self): + import numpy as np + + @pandas_udf('int', PandasUDFType.SCALAR_ITER) + def rng(batch_iter): + context = TaskContext.get() + part = context.partitionId() + np.random.seed(part) + for batch in batch_iter: + yield pd.Series(np.random.randint(100, size=len(batch))) + with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 2}): + df = self.spark.range(10, numPartitions=2).select(rng(col("id").alias("v"))) + result1 = df.collect() + result2 = df.collect() + self.assertEqual(result1, result2, + "SCALAR ITER UDF can initialize state and produce deterministic RNG") + + def test_scalar_iter_udf_close(self): + @pandas_udf('int', PandasUDFType.SCALAR_ITER) + def test_close(batch_iter): + try: + for batch in batch_iter: + yield batch + finally: + raise RuntimeError("reached finally block") + with QuietTest(self.sc): + with self.assertRaisesRegexp(Exception, "reached finally block"): + self.spark.range(1).select(test_close(col("id"))).collect() # Regression test for SPARK-23314 def test_timestamp_dst(self): @@ -616,9 +857,11 @@ def test_timestamp_dst(self): datetime(2015, 11, 1, 1, 30), datetime(2015, 11, 1, 2, 30)] df = self.spark.createDataFrame(dt, 'timestamp').toDF('time') - foo_udf = pandas_udf(lambda x: x, 'timestamp') - result = df.withColumn('time', foo_udf(df.time)) - self.assertEquals(df.collect(), result.collect()) + + for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: + foo_udf = pandas_udf(lambda x: x, 'timestamp', udf_type) + result = df.withColumn('time', foo_udf(df.time)) + self.assertEquals(df.collect(), result.collect()) @unittest.skipIf(sys.version_info[:2] < (3, 5), "Type hints are supported from Python 3.5.") def test_type_annotation(self): @@ -648,76 +891,39 @@ def f1(x): return x + 1 @pandas_udf('int') - def f2(x): + def f2_scalar(x): assert type(x) == pd.Series return x + 10 + @pandas_udf('int', PandasUDFType.SCALAR_ITER) + def f2_iter(it): + for x in it: + assert type(x) == pd.Series + yield x + 10 + @udf('int') def f3(x): assert type(x) == int return x + 100 @pandas_udf('int') - def f4(x): + def f4_scalar(x): assert type(x) == pd.Series return x + 1000 - # Test single expression with chained UDFs - df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v']))) - df_chained_2 = df.withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) - df_chained_3 = df.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v']))))) - df_chained_4 = df.withColumn('f4_f2_f1', f4(f2(f1(df['v'])))) - df_chained_5 = df.withColumn('f4_f3_f1', f4(f3(f1(df['v'])))) - - expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11) - expected_chained_2 = df.withColumn('f3_f2_f1', df['v'] + 111) - expected_chained_3 = df.withColumn('f4_f3_f2_f1', df['v'] + 1111) - expected_chained_4 = df.withColumn('f4_f2_f1', df['v'] + 1011) - expected_chained_5 = df.withColumn('f4_f3_f1', df['v'] + 1101) - - self.assertEquals(expected_chained_1.collect(), df_chained_1.collect()) - self.assertEquals(expected_chained_2.collect(), df_chained_2.collect()) - self.assertEquals(expected_chained_3.collect(), df_chained_3.collect()) - self.assertEquals(expected_chained_4.collect(), df_chained_4.collect()) - self.assertEquals(expected_chained_5.collect(), df_chained_5.collect()) - - # Test multiple mixed UDF expressions in a single projection - df_multi_1 = df \ - .withColumn('f1', f1(col('v'))) \ - .withColumn('f2', f2(col('v'))) \ - .withColumn('f3', f3(col('v'))) \ - .withColumn('f4', f4(col('v'))) \ - .withColumn('f2_f1', f2(col('f1'))) \ - .withColumn('f3_f1', f3(col('f1'))) \ - .withColumn('f4_f1', f4(col('f1'))) \ - .withColumn('f3_f2', f3(col('f2'))) \ - .withColumn('f4_f2', f4(col('f2'))) \ - .withColumn('f4_f3', f4(col('f3'))) \ - .withColumn('f3_f2_f1', f3(col('f2_f1'))) \ - .withColumn('f4_f2_f1', f4(col('f2_f1'))) \ - .withColumn('f4_f3_f1', f4(col('f3_f1'))) \ - .withColumn('f4_f3_f2', f4(col('f3_f2'))) \ - .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) - - # Test mixed udfs in a single expression - df_multi_2 = df \ - .withColumn('f1', f1(col('v'))) \ - .withColumn('f2', f2(col('v'))) \ - .withColumn('f3', f3(col('v'))) \ - .withColumn('f4', f4(col('v'))) \ - .withColumn('f2_f1', f2(f1(col('v')))) \ - .withColumn('f3_f1', f3(f1(col('v')))) \ - .withColumn('f4_f1', f4(f1(col('v')))) \ - .withColumn('f3_f2', f3(f2(col('v')))) \ - .withColumn('f4_f2', f4(f2(col('v')))) \ - .withColumn('f4_f3', f4(f3(col('v')))) \ - .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \ - .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \ - .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \ - .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \ - .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')))))) - - expected = df \ + @pandas_udf('int', PandasUDFType.SCALAR_ITER) + def f4_iter(it): + for x in it: + assert type(x) == pd.Series + yield x + 1000 + + expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11).collect() + expected_chained_2 = df.withColumn('f3_f2_f1', df['v'] + 111).collect() + expected_chained_3 = df.withColumn('f4_f3_f2_f1', df['v'] + 1111).collect() + expected_chained_4 = df.withColumn('f4_f2_f1', df['v'] + 1011).collect() + expected_chained_5 = df.withColumn('f4_f3_f1', df['v'] + 1101).collect() + + expected_multi = df \ .withColumn('f1', df['v'] + 1) \ .withColumn('f2', df['v'] + 10) \ .withColumn('f3', df['v'] + 100) \ @@ -732,10 +938,62 @@ def f4(x): .withColumn('f4_f2_f1', df['v'] + 1011) \ .withColumn('f4_f3_f1', df['v'] + 1101) \ .withColumn('f4_f3_f2', df['v'] + 1110) \ - .withColumn('f4_f3_f2_f1', df['v'] + 1111) - - self.assertEquals(expected.collect(), df_multi_1.collect()) - self.assertEquals(expected.collect(), df_multi_2.collect()) + .withColumn('f4_f3_f2_f1', df['v'] + 1111) \ + .collect() + + for f2, f4 in [(f2_scalar, f4_scalar), (f2_scalar, f4_iter), + (f2_iter, f4_scalar), (f2_iter, f4_iter)]: + # Test single expression with chained UDFs + df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v']))) + df_chained_2 = df.withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) + df_chained_3 = df.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v']))))) + df_chained_4 = df.withColumn('f4_f2_f1', f4(f2(f1(df['v'])))) + df_chained_5 = df.withColumn('f4_f3_f1', f4(f3(f1(df['v'])))) + + self.assertEquals(expected_chained_1, df_chained_1.collect()) + self.assertEquals(expected_chained_2, df_chained_2.collect()) + self.assertEquals(expected_chained_3, df_chained_3.collect()) + self.assertEquals(expected_chained_4, df_chained_4.collect()) + self.assertEquals(expected_chained_5, df_chained_5.collect()) + + # Test multiple mixed UDF expressions in a single projection + df_multi_1 = df \ + .withColumn('f1', f1(col('v'))) \ + .withColumn('f2', f2(col('v'))) \ + .withColumn('f3', f3(col('v'))) \ + .withColumn('f4', f4(col('v'))) \ + .withColumn('f2_f1', f2(col('f1'))) \ + .withColumn('f3_f1', f3(col('f1'))) \ + .withColumn('f4_f1', f4(col('f1'))) \ + .withColumn('f3_f2', f3(col('f2'))) \ + .withColumn('f4_f2', f4(col('f2'))) \ + .withColumn('f4_f3', f4(col('f3'))) \ + .withColumn('f3_f2_f1', f3(col('f2_f1'))) \ + .withColumn('f4_f2_f1', f4(col('f2_f1'))) \ + .withColumn('f4_f3_f1', f4(col('f3_f1'))) \ + .withColumn('f4_f3_f2', f4(col('f3_f2'))) \ + .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1'))) + + # Test mixed udfs in a single expression + df_multi_2 = df \ + .withColumn('f1', f1(col('v'))) \ + .withColumn('f2', f2(col('v'))) \ + .withColumn('f3', f3(col('v'))) \ + .withColumn('f4', f4(col('v'))) \ + .withColumn('f2_f1', f2(f1(col('v')))) \ + .withColumn('f3_f1', f3(f1(col('v')))) \ + .withColumn('f4_f1', f4(f1(col('v')))) \ + .withColumn('f3_f2', f3(f2(col('v')))) \ + .withColumn('f4_f2', f4(f2(col('v')))) \ + .withColumn('f4_f3', f4(f3(col('v')))) \ + .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \ + .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \ + .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \ + .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \ + .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v')))))) + + self.assertEquals(expected_multi, df_multi_1.collect()) + self.assertEquals(expected_multi, df_multi_2.collect()) def test_mixed_udf_and_sql(self): df = self.spark.range(0, 1).toDF('v') @@ -752,25 +1010,15 @@ def f2(x): return x + 10 @pandas_udf('int') - def f3(x): + def f3s(x): assert type(x) == pd.Series return x + 100 - df1 = df.withColumn('f1', f1(df['v'])) \ - .withColumn('f2', f2(df['v'])) \ - .withColumn('f3', f3(df['v'])) \ - .withColumn('f1_f2', f1(f2(df['v']))) \ - .withColumn('f1_f3', f1(f3(df['v']))) \ - .withColumn('f2_f1', f2(f1(df['v']))) \ - .withColumn('f2_f3', f2(f3(df['v']))) \ - .withColumn('f3_f1', f3(f1(df['v']))) \ - .withColumn('f3_f2', f3(f2(df['v']))) \ - .withColumn('f1_f2_f3', f1(f2(f3(df['v'])))) \ - .withColumn('f1_f3_f2', f1(f3(f2(df['v'])))) \ - .withColumn('f2_f1_f3', f2(f1(f3(df['v'])))) \ - .withColumn('f2_f3_f1', f2(f3(f1(df['v'])))) \ - .withColumn('f3_f1_f2', f3(f1(f2(df['v'])))) \ - .withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) + @pandas_udf('int', PandasUDFType.SCALAR_ITER) + def f3i(it): + for x in it: + assert type(x) == pd.Series + yield x + 100 expected = df.withColumn('f1', df['v'] + 1) \ .withColumn('f2', df['v'] + 10) \ @@ -786,9 +1034,27 @@ def f3(x): .withColumn('f2_f1_f3', df['v'] + 111) \ .withColumn('f2_f3_f1', df['v'] + 111) \ .withColumn('f3_f1_f2', df['v'] + 111) \ - .withColumn('f3_f2_f1', df['v'] + 111) - - self.assertEquals(expected.collect(), df1.collect()) + .withColumn('f3_f2_f1', df['v'] + 111) \ + .collect() + + for f3 in [f3s, f3i]: + df1 = df.withColumn('f1', f1(df['v'])) \ + .withColumn('f2', f2(df['v'])) \ + .withColumn('f3', f3(df['v'])) \ + .withColumn('f1_f2', f1(f2(df['v']))) \ + .withColumn('f1_f3', f1(f3(df['v']))) \ + .withColumn('f2_f1', f2(f1(df['v']))) \ + .withColumn('f2_f3', f2(f3(df['v']))) \ + .withColumn('f3_f1', f3(f1(df['v']))) \ + .withColumn('f3_f2', f3(f2(df['v']))) \ + .withColumn('f1_f2_f3', f1(f2(f3(df['v'])))) \ + .withColumn('f1_f3_f2', f1(f3(f2(df['v'])))) \ + .withColumn('f2_f1_f3', f2(f1(f3(df['v'])))) \ + .withColumn('f2_f3_f1', f2(f3(f1(df['v'])))) \ + .withColumn('f3_f1_f2', f3(f1(f2(df['v'])))) \ + .withColumn('f3_f2_f1', f3(f2(f1(df['v'])))) + + self.assertEquals(expected, df1.collect()) # SPARK-24721 @unittest.skipIf(not test_compiled, test_not_compiled_message) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 275abe9c85d1e..84be2d24d9fbc 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -40,6 +40,7 @@ def _wrap_function(sc, func, returnType): def _create_udf(f, returnType, evalType): if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF): @@ -48,7 +49,9 @@ def _create_udf(f, returnType, evalType): argspec = _get_argspec(f) - if evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF and len(argspec.args) == 0 and \ + if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF or + evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) and \ + len(argspec.args) == 0 and \ argspec.varargs is None: raise ValueError( "Invalid function: 0-arg pandas_udfs are not supported. " @@ -113,7 +116,8 @@ def returnType(self): else: self._returnType_placeholder = _parse_datatype_string(self._returnType) - if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF: + if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF or \ + self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF: try: to_arrow_type(self._returnType_placeholder) except TypeError: @@ -323,10 +327,11 @@ def register(self, name, f, returnType=None): "a user-defined function, but got %s." % returnType) if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF, PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]: raise ValueError( - "Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF or " - "SQL_GROUPED_AGG_PANDAS_UDF") + "Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF, " + "SQL_SCALAR_PANDAS_ITER_UDF, or SQL_GROUPED_AGG_PANDAS_UDF") register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name, evalType=f.evalType, deterministic=f.deterministic) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 16257bef6b320..ee46bb649d1fe 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -86,21 +86,29 @@ def wrap_udf(f, return_type): return lambda *a: f(*a) -def wrap_scalar_pandas_udf(f, return_type): +def wrap_scalar_pandas_udf(f, return_type, eval_type): arrow_return_type = to_arrow_type(return_type) - def verify_result_length(*a): - result = f(*a) + def verify_result_type(result): if not hasattr(result, "__len__"): pd_type = "Pandas.DataFrame" if type(return_type) == StructType else "Pandas.Series" raise TypeError("Return type of the user-defined function should be " "{}, but is {}".format(pd_type, type(result))) - if len(result) != len(a[0]): + return result + + def verify_result_length(result, length): + if len(result) != length: raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) + "expected %d, got %d" % (length, len(result))) return result - return lambda *a: (verify_result_length(*a), arrow_return_type) + if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF: + return lambda *a: (verify_result_length( + verify_result_type(f(*a)), len(a[0])), arrow_return_type) + else: + # The result length verification is done at the end of a partition. + return lambda *iterator: map(lambda res: (res, arrow_return_type), + map(verify_result_type, f(*iterator))) def wrap_grouped_map_pandas_udf(f, return_type, argspec): @@ -201,23 +209,28 @@ def wrapped(begin_index, end_index, *series): def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index): num_arg = read_int(infile) arg_offsets = [read_int(infile) for i in range(num_arg)] - row_func = None + chained_func = None for i in range(read_int(infile)): f, return_type = read_command(pickleSer, infile) - if row_func is None: - row_func = f + if chained_func is None: + chained_func = f else: - row_func = chain(row_func, f) + chained_func = chain(chained_func, f) - # make sure StopIteration's raised in the user code are not ignored - # when they are processed in a for loop, raise them as RuntimeError's instead - func = fail_on_stopiteration(row_func) + if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF: + func = chained_func + else: + # make sure StopIteration's raised in the user code are not ignored + # when they are processed in a for loop, raise them as RuntimeError's instead + func = fail_on_stopiteration(chained_func) # the last returnType will be the return type of UDF if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF: - return arg_offsets, wrap_scalar_pandas_udf(func, return_type) + return arg_offsets, wrap_scalar_pandas_udf(func, return_type, eval_type) + elif eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF: + return arg_offsets, wrap_scalar_pandas_udf(func, return_type, eval_type) elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: - argspec = _get_argspec(row_func) # signature was lost when wrapping it + argspec = _get_argspec(chained_func) # signature was lost when wrapping it return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec) elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF: return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type) @@ -233,6 +246,7 @@ def read_udfs(pickleSer, infile, eval_type): runner_conf = {} if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF): @@ -255,13 +269,60 @@ def read_udfs(pickleSer, infile, eval_type): # Scalar Pandas UDF handles struct type arguments as pandas DataFrames instead of # pandas Series. See SPARK-27240. - df_for_struct = eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF + df_for_struct = (eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF or + eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) ser = ArrowStreamPandasUDFSerializer(timezone, safecheck, assign_cols_by_name, df_for_struct) else: ser = BatchedSerializer(PickleSerializer(), 100) num_udfs = read_int(infile) + + if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF: + assert num_udfs == 1, "One SQL_SCALAR_PANDAS_ITER_UDF expected here." + + arg_offsets, udf = read_single_udf( + pickleSer, infile, eval_type, runner_conf, udf_index=0) + + def func(_, iterator): + num_input_rows = [0] + + def map_batch(batch): + udf_args = [batch[offset] for offset in arg_offsets] + num_input_rows[0] += len(udf_args[0]) + if len(udf_args) == 1: + return udf_args[0] + else: + return tuple(udf_args) + + iterator = map(map_batch, iterator) + result_iter = udf(iterator) + + num_output_rows = 0 + for result_batch, result_type in result_iter: + num_output_rows += len(result_batch) + assert num_output_rows <= num_input_rows[0], \ + "Pandas SCALAR_ITER UDF outputted more rows than input rows." + yield (result_batch, result_type) + try: + if sys.version >= '3': + iterator.__next__() + else: + iterator.next() + except StopIteration: + pass + else: + raise RuntimeError("SQL_SCALAR_PANDAS_ITER_UDF should exhaust the input iterator.") + + if num_output_rows != num_input_rows[0]: + raise RuntimeError("The number of output rows of pandas iterator UDF should be " + "the same with input rows. The input rows number is %d but the " + "output rows number is %d." % + (num_input_rows[0], num_output_rows)) + + # profiling is not supported for UDF + return func, None, ser, ser + udfs = {} call_udf = [] mapper_str = "" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala index 2d823552dba2d..690969e1d359d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala @@ -27,7 +27,8 @@ import org.apache.spark.sql.types.DataType object PythonUDF { private[this] val SCALAR_TYPES = Set( PythonEvalType.SQL_BATCHED_UDF, - PythonEvalType.SQL_SCALAR_PANDAS_UDF + PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF ) def isScalarPythonUDF(e: Expression): Boolean = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index 2df30a1a53ad7..d87bbf72e8aa6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -64,4 +64,5 @@ case class BatchEvalPython( case class ArrowEvalPython( udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], - child: LogicalPlan) extends BaseEvalPython + child: LogicalPlan, + evalType: Int) extends BaseEvalPython diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index faf2fdd7dbcce..0c4775eb769bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -623,8 +623,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object PythonEvals extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ArrowEvalPython(udfs, output, child) => - ArrowEvalPythonExec(udfs, output, planLater(child)) :: Nil + case ArrowEvalPython(udfs, output, child, evalType) => + ArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil case BatchEvalPython(udfs, output, child) => BatchEvalPythonExec(udfs, output, planLater(child)) :: Nil case _ => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index 73a43afdf3498..e714554f108ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -59,7 +59,8 @@ private[spark] class BatchIterator[T](iter: Iterator[T], batchSize: Int) /** * A physical plan that evaluates a [[PythonUDF]]. */ -case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan) +case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan, + evalType: Int) extends EvalPythonExec(udfs, resultAttrs, child) { private val batchSize = conf.arrowMaxRecordsPerBatch @@ -80,7 +81,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute] val columnarBatchIter = new ArrowPythonRunner( funcs, - PythonEvalType.SQL_SCALAR_PANDAS_UDF, + evalType, argOffsets, schema, sessionLocalTimeZone, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 7f59d74b0cd41..58fe7d5cbc0f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -111,19 +111,27 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { } private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = { - // Eval type checker is set once when we find the first evaluable UDF and its value - // shouldn't change later. - // Used to check if subsequent UDFs are of the same type as the first UDF. (since we can only + // If fisrt UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF, + // otherwise check if subsequent UDFs are of the same type as the first UDF. (since we can only // extract UDFs of the same eval type) - var evalTypeChecker: Option[EvalTypeChecker] = None + + var firstVisitedScalarUDFEvalType: Option[Int] = None + + def canChainUDF(evalType: Int): Boolean = { + if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) { + false + } else { + evalType == firstVisitedScalarUDFEvalType.get + } + } def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match { case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) - && evalTypeChecker.isEmpty => - evalTypeChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType) + && firstVisitedScalarUDFEvalType.isEmpty => + firstVisitedScalarUDFEvalType = Some(udf.evalType) Seq(udf) case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) - && evalTypeChecker.get(udf.evalType) => + && canChainUDF(udf.evalType) => Seq(udf) case e => e.children.flatMap(collectEvaluableUDFs) } @@ -175,16 +183,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { AttributeReference(s"pythonUDF$i", u.dataType)() } - val evaluation = validUdfs.partition( - _.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF - ) match { - case (vectorizedUdfs, plainUdfs) if plainUdfs.isEmpty => - ArrowEvalPython(vectorizedUdfs, resultAttrs, child) - case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty => - BatchEvalPython(plainUdfs, resultAttrs, child) + val evalTypes = validUdfs.map(_.evalType).toSet + if (evalTypes.size != 1) { + throw new AnalysisException( + s"Expected udfs have the same evalType but got different evalTypes: " + + s"${evalTypes.mkString(",")}") + } + val evalType = evalTypes.head + val evaluation = evalType match { + case PythonEvalType.SQL_BATCHED_UDF => + BatchEvalPython(validUdfs, resultAttrs, child) + case PythonEvalType.SQL_SCALAR_PANDAS_UDF | PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF => + ArrowEvalPython(validUdfs, resultAttrs, child, evalType) case _ => - throw new AnalysisException( - "Expected either Scalar Pandas UDFs or Batched UDFs but got both") + throw new AnalysisException("Unexcepted UDF evalType") } attributeMap ++= validUdfs.zip(resultAttrs)