Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34388][SQL] Propagate the registered UDF name to ScalaUDF, ScalaUDAF and ScalaAggregator #31500

Closed
wants to merge 4 commits into from

Conversation

imback82
Copy link
Contributor

@imback82 imback82 commented Feb 6, 2021

What changes were proposed in this pull request?

This PR proposes to propagate the name used for registering UDFs to ScalaUDF, ScalaUDAF and ScaalAggregator.

Note that PythonUDF gets the name correctly:

register_udf = UserDefinedFunction(f, returnType=returnType, name=name,
evalType=PythonEvalType.SQL_BATCHED_UDF)

, and same for Hive UDFs:
udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input))

Why are the changes needed?

This PR can help in the following scenarios:

  1. Better EXPLAIN output
  2. By adding def name: String to UserDefinedExpression, we can match an expression by UserDefinedExpression and look up the catalog, an use case needed for [SPARK-34152][SQL] Make CreateViewStatement.child to be LogicalPlan's children so that it's resolved in analyze phase #31273.

Does this PR introduce any user-facing change?

The EXPLAIN output involving udfs will be changed to use the name used for UDF registration.

For example, for the following:

sql("CREATE TEMPORARY FUNCTION test_udf AS 'org.apache.spark.examples.sql.Spark33084'")
sql("SELECT test_udf(col1) FROM VALUES (1), (2), (3)").explain(true)

The output of the optimized plan will change from:

Aggregate [spark33084(cast(col1#223 as bigint), org.apache.spark.examples.sql.Spark33084@6906be0f, 1, 1) AS spark33084(col1)#237]
+- LocalRelation [col1#223]

to

Aggregate [test_udf(cast(col1#223 as bigint), org.apache.spark.examples.sql.Spark33084@7a62d697, 1, 1, Some(test_udf)) AS test_udf(col1)#237]
+- LocalRelation [col1#223]

How was this patch tested?

Added new tests.

@github-actions github-actions bot added the SQL label Feb 6, 2021
@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Test build #134963 has finished for PR 31500 at commit a68b977.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39560/

@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39560/

@SparkQA
Copy link

SparkQA commented Feb 7, 2021

Test build #134977 has finished for PR 31500 at commit 56c3001.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

imback82 commented Feb 7, 2021

@cloud-fan this is from #31273 (comment)

@@ -1088,4 +1088,6 @@ trait ComplexTypeMergingExpression extends Expression {
* Common base trait for user-defined functions, including UDF/UDAF/UDTF of different languages
* and Hive function wrappers.
*/
trait UserDefinedExpression
trait UserDefinedExpression {
def name: String
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe default to using the class name or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an internal trait, seems OK to require it.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

imback82 added a commit to imback82/spark-4 that referenced this pull request Feb 8, 2021
…laUDAF and ScalaAggregator

### What changes were proposed in this pull request?

This PR proposes to propagate the name used for registering UDFs to `ScalaUDF`, `ScalaUDAF` and `ScaalAggregator`.

Note that `PythonUDF` gets the name correctly: https://github.com/apache/spark/blob/466c045bfac20b6ce19f5a3732e76a5de4eb4e4a/python/pyspark/sql/udf.py#L358-L359
, and same for Hive UDFs:
https://github.com/apache/spark/blob/466c045bfac20b6ce19f5a3732e76a5de4eb4e4a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala#L67
### Why are the changes needed?

This PR can help in the following scenarios:
1) Better EXPLAIN output
2) By adding  `def name: String` to `UserDefinedExpression`, we can match an expression by `UserDefinedExpression` and look up the catalog, an use case needed for apache#31273.

### Does this PR introduce _any_ user-facing change?

The EXPLAIN output involving udfs will be changed to use the name used for UDF registration.

For example, for the following:
```
sql("CREATE TEMPORARY FUNCTION test_udf AS 'org.apache.spark.examples.sql.Spark33084'")
sql("SELECT test_udf(col1) FROM VALUES (1), (2), (3)").explain(true)
```
The output of the optimized plan will change from:
```
Aggregate [spark33084(cast(col1#223 as bigint), org.apache.spark.examples.sql.Spark330846906be0f, 1, 1) AS spark33084(col1)apache#237]
+- LocalRelation [col1#223]
```
to
```
Aggregate [test_udf(cast(col1#223 as bigint), org.apache.spark.examples.sql.Spark330847a62d697, 1, 1, Some(test_udf)) AS test_udf(col1)apache#237]
+- LocalRelation [col1#223]
```

### How was this patch tested?

Added new tests.

Closes apache#31500 from imback82/udaf_name.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@maropu
Copy link
Member

maropu commented Feb 10, 2021

It seems this PR already has been merged, so I'll close this.

@maropu maropu closed this Feb 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants