-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_udf and add doctests #19325
[SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_udf and add doctests #19325
Conversation
@cloud-fan @ueshin I'm not sure if you are ok with merging this soon, but in adding the doctests I found there were problems with using the decorator and having empty partitions. I fixed those here and added new tests, but if it would be better I can try to separate those in another PR? |
This is a followup to #18659 |
Adding the patch to enable 0-parameter pandas_udf if it is requested in the future |
Test build #82093 has finished for PR 19325 at commit
|
retest this please |
Test build #82096 has finished for PR 19325 at commit
|
# TODO: doctest | ||
>>> from pyspark.sql.types import IntegerType, StringType | ||
>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) | ||
>>> @pandas_udf(returnType=StringType()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we installed pyarrow on Jenkins? The failed test complains ImportError: No module named pyarrow
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just do # doctest: +SKIP
maybe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I thought that the Jenkins environment for unit tests would be the same for doctests and have pyarrow installed. @holdenk or @shaneknapp do you know if that is the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding @JoshRosen too.
the doc building node (amp-jenkins-worker-01) doesn't have arrow installed for the default conda python 2.7 environment. for the python 3 environment, we're running arrow 0.4.0.
i looked at the script and it seems to be agnostic to python 2 vs 3... once i know which version of python we'll be running i can make sure that the version of arrow installed is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, thanks @shaneknapp!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm.. but shouldn't we skip those doctests because they are not hard dependencies anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, I see that toPandas()
also skips doctests. I'll skip this now and can always enable later if we decide differently. @shaneknapp , looks like we will hold off on this so no need to do anything to Jenkins I believe, sorry to bug you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good. I left some comments.
python/pyspark/serializers.py
Outdated
@@ -246,15 +243,9 @@ def cast_series(s, t): | |||
def loads(self, obj): | |||
""" | |||
Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, this bugged me. .
at the end ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha, no problem.. It should have a period, I just deleted it on accident.
python/pyspark/sql/tests.py
Outdated
res = df.select(f0()) | ||
self.assertEquals(df.select(lit(1)).collect(), res.collect()) | ||
with QuietTest(self.sc): | ||
with self.assertRaisesRegexp(Exception, '0-parameter pandas_udfs.*not.*supported'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we could catch narrower one, NotImplementedError
.
python/pyspark/sql/tests.py
Outdated
@@ -3308,12 +3306,12 @@ def test_vectorized_udf_invalid_length(self): | |||
from pyspark.sql.functions import pandas_udf, col | |||
import pandas as pd | |||
df = self.spark.range(10) | |||
raise_exception = pandas_udf(lambda: pd.Series(1), LongType()) | |||
raise_exception = pandas_udf(lambda i: pd.Series(1), LongType()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe lambda _: ...
.
python/pyspark/sql/tests.py
Outdated
@@ -3308,12 +3306,12 @@ def test_vectorized_udf_invalid_length(self): | |||
from pyspark.sql.functions import pandas_udf, col | |||
import pandas as pd | |||
df = self.spark.range(10) | |||
raise_exception = pandas_udf(lambda: pd.Series(1), LongType()) | |||
raise_exception = pandas_udf(lambda i: pd.Series(1), LongType()) | |||
with QuietTest(self.sc): | |||
with self.assertRaisesRegexp( | |||
Exception, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too, while we are here, let's catch narrower exception type. Looks RuntimeError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is actually a Py4jJavaError
so I don't think we can recognize the RuntimeError
python/pyspark/sql/functions.py
Outdated
return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) | ||
else: | ||
return _create_udf(f, returnType=returnType, vectorized=True) | ||
if not inspect.getargspec(wrapped_udf.func).args: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is totally a personal preference based on my little experience: I usually avoid to use if not something
expression .. because it confuses of the expected type, for example, this can be None
, 0
or 0-length of list or tuples because it coerces this to a bool. To me, I usually do is not None
or len(..) > 0
.
I am fine as is too (because I think it's a personal preference) but just wanted to leave a side note (and change it if this could persuade you too).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it probably would be a good idea to be explicit here since it's not obvious what type getargspec
returns
python/pyspark/worker.py
Outdated
result = f(*a[:-1], **kwargs) | ||
if len(result) != kwargs["length"]: | ||
result = f(*a) | ||
if len(result) != len(a[0]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we are not guaranteed to have __len__
in result
, e.g., pandas_udf(lambda x: 1, LongType())
. Probably, checking this attribute ahead should be done ahead, while we are here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we verify the returned is a Pandas.Series?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We should probably have a test that returns a scalar value too. I'm not sure we should limit the return type so much. As long as pyarrow can consume it, then it should be ok - it can also take a numpy array which might be useful. Otherwise it should raise a clear exception. Maybe checking that it has __len__
is good enough?
python/pyspark/sql/functions.py
Outdated
return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) | ||
else: | ||
return _create_udf(f, returnType=returnType, vectorized=True) | ||
if not inspect.getargspec(wrapped_udf.func).args: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks wrapped_udf.func
could be _udf
within _create_udf
, that takes a single argument, for example:
@pandas_udf(returnType=LongType())
def add_one():
return 1
I tried a rough idea to solve this:
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2124,11 +2124,14 @@ class UserDefinedFunction(object):
return wrapper
-def _create_udf(f, returnType, vectorized):
+def _create_udf(f, returnType, vectorized, checker=None):
def _udf(f, returnType=StringType(), vectorized=vectorized):
udf_obj = UserDefinedFunction(f, returnType, vectorized=vectorized)
- return udf_obj._wrapped()
+ wrapped = udf_obj._wrapped()
+ if checker is not None:
+ checker(wrapped)
+ return wrapped
# decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
if f is None or isinstance(f, (str, DataType)):
@@ -2201,10 +2204,14 @@ def pandas_udf(f=None, returnType=StringType()):
| 8| JOHN DOE| 22|
+----------+--------------+------------+
"""
- wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True)
- import inspect
- if not inspect.getargspec(wrapped_udf.func).args:
- raise NotImplementedError("0-parameter pandas_udfs are not currently supported")
+
+ def checker(wrapped):
+ import inspect
+ if not inspect.getargspec(wrapped.func).args:
+ raise NotImplementedError("0-parameter pandas_udfs are not currently supported")
+
+ wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True, checker=checker)
+
return wrapped_udf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into this some more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see now, decorators with arguments are handled a little different.. How about we just inspect in _udf
if the vectorized
flag is set? Similar to what you have above but I don't think we need the checker arg.
To me, I am willing to merge this one soon. |
|
||
def test_vectorized_udf_empty_partition(self): | ||
from pyspark.sql.functions import pandas_udf, col | ||
df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I miss something, but what this test is intended to test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. I see. One partition is empty and it is related to the added stuff in ArrowEvalPythonExec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, an empty partition leads to an empty iterator, so this is to make sure it can handle that.
.map { case (attr, i) => attr.withName(s"_$i") }) | ||
assert(schemaOut.equals(outputRowIterator.schema), | ||
s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") | ||
if (outputIterator.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After outputIterator
is consumed by ArrowConverters
, can nonEmpty
still return a meaningful value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we should use outputRowIterator
.
Btw how about using hasNext
instead of nonEmpty
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooops! that was a typo and you're right, it should have been outputRowIterator
Most looks pretty good. Only main question I have is about the empty partition issue. |
val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex | ||
.map { case (attr, i) => attr.withName(s"_$i") }) | ||
assert(schemaOut.equals(outputRowIterator.schema), | ||
s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't have a test against this case. We should add a test for invalid schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I tried to make one but since we are now casting the return Series in ArrowPandasSerializer.dumps
with astype
I have not found a case that triggers it. I think it would still be good to keep this, just in case there is some way it could happen and if we upgrade to Arrow 0.7 then we won't need the astype
logic and this will be used instead.
Thanks @HyukjinKwon @viirya and @ueshin , all good ideas! I'll push an update, but seems like we need to figure out what's going on with the Jenkins doctests, so I'll try to get some help there. |
…or, address comments from PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except #19325 (comment)
Test build #82159 has finished for PR 19325 at commit
|
LGTM, pending Jenkins. |
LGTM too |
... | ||
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) | ||
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ | ||
... .show() # doctest: +SKIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we don't skip it actually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks actually we do :). Let me test this one for sure in my local before merging it, (I have pypy
installed in my local that does not have pyarrow
or pandas
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. It is. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just double checked it passes
./run-tests --python-executables=pypy --modules pyspark-sql
...
Will test against the following Python executables: ['pypy']
Will test the following Python modules: ['pyspark-sql']
Starting test(pypy): pyspark.sql.functions
...
Finished test(pypy): pyspark.sql.functions (74s)
...
Also, checked without # doctest: +SKIP
:
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 63e9a830bbc..3265ecc974b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2199,7 +2199,7 @@ def pandas_udf(f=None, returnType=StringType()):
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
- ... .show() # doctest: +SKIP
+ ... .show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
./run-tests --python-executables=pypy --modules pyspark-sql
...
Will test against the following Python executables: ['pypy']
Will test the following Python modules: ['pyspark-sql']
...
Starting test(pypy): pyspark.sql.functions
...
Failed example:
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \
.show()
Exception raised:
Traceback (most recent call last):
File "/usr/local/Cellar/pypy/5.8.0/libexec/lib-python/2.7/doctest.py", line 1315, in __run
compileflags, 1) in test.globs
File "<doctest pyspark.sql.functions.pandas_udf[5]>", line 1, in <module>
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \
File "/.../spark/python/pyspark/sql/dataframe.py", line 347, in show
print(self._jdf.showString(n, 20, vertical))
File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o1373.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 93.0 failed 1 times, most recent failure: Lost task 0.0 in stage 93.0 (TID 1093, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 190, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 112, in read_udfs
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 102, in read_single_udf
return arg_offsets, wrap_pandas_udf(row_func, return_type)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 77, in wrap_pandas_udf
arrow_return_type = toArrowType(return_type)
File "/.../spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1603, in toArrowType
import pyarrow as pa
ImportError: No module named pyarrow
Test build #82161 has finished for PR 19325 at commit
|
Merged to master. |
... | ||
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) | ||
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ | ||
... .show() # doctest: +SKIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(D'oh, not a big deal but two spaces before inline comments..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HyukjinKwon! Sorry, I didn't notice this :( I'll make a note to fix that spacing on a related change.
What changes were proposed in this pull request?
This change disables the use of 0-parameter pandas_udfs due to the API being overly complex and awkward, and can easily be worked around by using an index column as an input argument. Also added doctests for pandas_udfs which revealed bugs for handling empty partitions and using the pandas_udf decorator.
How was this patch tested?
Reworked existing 0-parameter test to verify error is raised, added doctest for pandas_udf, added new tests for empty partition and decorator usage.