Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43440][PYTHON][CONNECT] Support registration of an Arrow-optimized Python UDF #41125

Closed

Conversation

xinrong-meng
Copy link
Member

@xinrong-meng xinrong-meng commented May 11, 2023

What changes were proposed in this pull request?

The PR proposes to provide support for the registration of an Arrow-optimized Python UDF in both vanilla PySpark and Spark Connect.

Why are the changes needed?

Currently, when users register an Arrow-optimized Python UDF, it will be registered as a pickled Python UDF and thus, executed without Arrow optimization.

We should support Arrow-optimized Python UDFs registration and execute them with Arrow optimization.

Does this PR introduce any user-facing change?

Yes. No API changes, but result differences are expected in some cases.

Previously, a registered Arrow-optimized Python UDF will be executed without Arrow optimization.
Now, it will be executed with Arrow optimization, as shown below.

>>> df = spark.range(2)
>>> df.createOrReplaceTempView("df")
>>> from pyspark.sql.functions import udf
>>> @udf(useArrow=True)
... def f(x):
...     return str(x)
... 

>>> spark.udf.register('str_f', f)
<pyspark.sql.udf.UserDefinedFunction object at 0x7fa1980c16a0>

>>> spark.sql("select str_f(id) from df").explain()  # Executed with Arrow optimization
== Physical Plan ==
*(2) Project [pythonUDF0#32 AS f(id)#30]
+- ArrowEvalPython [f(id#27L)#29], [pythonUDF0#32], 101
   +- *(1) Range (0, 2, step=1, splits=16)

Enabling or disabling Arrow optimization can produce result differences in some cases - we are working on minimizing the result differences though.

How was this patch tested?

Unit test.

SPARK-40307

return_udf = f
)
if f.evalType == PythonEvalType.SQL_ARROW_BATCHED_UDF:
register_udf = _create_arrow_py_udf(source_udf)._unwrapped
Copy link
Member Author

@xinrong-meng xinrong-meng May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the magic line :).

@xinrong-meng xinrong-meng marked this pull request as ready for review May 11, 2023 18:42
@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants