-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21499] [SQL] Support creating persistent function for Spark UDAF(UserDefinedAggregateFunction) #18700
Conversation
Test build #79829 has finished for PR 18700 at commit
|
retest this please |
Test build #80553 has finished for PR 18700 at commit
|
Test build #80575 has finished for PR 18700 at commit
|
Test build #80577 has finished for PR 18700 at commit
|
(children: Seq[Expression]) => { | ||
try { | ||
val clsForUDAF = | ||
Utils.classForName("org.apache.spark.sql.expressions.UserDefinedAggregateFunction") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we move the UDAF interface to catalyst?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
* The base class for implementing user-defined aggregate functions (UDAF).
*
* @since 1.5.0
*/
@InterfaceStability.Stable
abstract class UserDefinedAggregateFunction
This interface has been marked as stable. Can we still move it? or make a trait in Catalyst?
@@ -1096,8 +1099,42 @@ class SessionCatalog( | |||
* This performs reflection to decide what type of [[Expression]] to return in the builder. | |||
*/ | |||
protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be overwritten by HiveSessionCatalog
, does it mean we can not register spark UDAF if hive support is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes here are for HiveSessionCatalog
. Also, we have a test case in HiveUDAFSuite.scala
to verify it.
Test build #80924 has finished for PR 18700 at commit
|
// When we instantiate hive UDF wrapper class, we may throw exception if the input | ||
// expressions don't satisfy the hive UDF, such as type mismatch, input number | ||
// mismatch, etc. Here we catch the exception and throw AnalysisException instead. | ||
override def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need to overwrite this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for issuing different exceptions.
Test build #80933 has finished for PR 18700 at commit
|
val clazz = Utils.classForName(functionClassName) | ||
(children: Seq[Expression]) => { | ||
try { | ||
makeFunctionExpression(name, Utils.classForName(functionClassName), children).getOrElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Utils.classForName(functionClassName)
-> clazz
protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = { | ||
// TODO: at least support UDAFs here | ||
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.") | ||
protected def makeFunctionExpression( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we need to catch exception for this method anyway, how about we just make this method return Expression
and document that it can throw exception if the given class is not supported? Then HiveSessionCatalog
can define its own exception message.
Test build #80974 has finished for PR 18700 at commit
|
/** | ||
* Checks whether the Hive metastore is being used | ||
*/ | ||
private def isUsingHiveMetastore: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need this?
} | ||
|
||
/** | ||
* Constructs a [[FunctionBuilder]] based on the provided class that represents a function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method returns Expression, not FunctionBuilder
LGTM |
Test build #80992 has finished for PR 18700 at commit
|
Test build #80995 has finished for PR 18700 at commit
|
Test build #80997 has finished for PR 18700 at commit
|
Test build #80996 has finished for PR 18700 at commit
|
The latest |
Thanks! Merging to master. |
What changes were proposed in this pull request?
This PR is to enable users to create persistent Scala UDAF (that extends UserDefinedAggregateFunction).
Before this PR, Spark UDAF only can be registered through the API
spark.udf.register(...)
How was this patch tested?
Added test cases