Skip to content

Commit

Permalink
fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Aug 21, 2017
1 parent bd5ae26 commit 7251be9
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -129,6 +130,13 @@ class SessionCatalog(
if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
}

/**
* Checks whether the Hive metastore is being used
*/
private def isUsingHiveMetastore: Boolean = {
conf.getConf(CATALOG_IMPLEMENTATION).toLowerCase(Locale.ROOT) == "hive"
}

private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
val cacheSize = conf.tableRelationCacheSize
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
Expand Down Expand Up @@ -1094,27 +1102,24 @@ class SessionCatalog(
// ----------------------------------------------------------------

/**
* Construct a [[FunctionBuilder]] based on the provided class that represents a function.
*
* This performs reflection to decide what type of [[Expression]] to return in the builder.
* Constructs a [[FunctionBuilder]] based on the provided class that represents a function.
*/
protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
val clazz = Utils.classForName(functionClassName)
(children: Seq[Expression]) => {
try {
makeFunctionExpression(name, Utils.classForName(functionClassName), children).getOrElse {
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
val extraMsg =
if (!isUsingHiveMetastore) "Use sparkSession.udf.register(...) instead." else ""
throw new AnalysisException(
s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'. $extraMsg")
}
} catch {
case NonFatal(exception) =>
val e = exception match {
// Since we are using shim, the exceptions thrown by the underlying method of
// Method.invoke() are wrapped by InvocationTargetException
case i: InvocationTargetException => i.getCause
case o => o
}
case ae: AnalysisException =>
throw ae
case NonFatal(e) =>
val analysisException =
new AnalysisException(s"No handler for UDAF '${clazz.getCanonicalName}': $e")
new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e")
analysisException.setStackTrace(e.getStackTrace)
throw analysisException
}
Expand All @@ -1123,6 +1128,8 @@ class SessionCatalog(

/**
* Construct a [[FunctionBuilder]] based on the provided class that represents a function.
*
* This performs reflection to decide what type of [[Expression]] to return in the builder.
*/
protected def makeFunctionExpression(
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
Expand All @@ -37,7 +36,6 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, DoubleType}
import org.apache.spark.util.Utils


private[sql] class HiveSessionCatalog(
Expand All @@ -58,25 +56,6 @@ private[sql] class HiveSessionCatalog(
parser,
functionResourceLoader) {

override def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
val clazz = Utils.classForName(functionClassName)
(children: Seq[Expression]) => {
try {
makeFunctionExpression(name, Utils.classForName(functionClassName), children).getOrElse {
throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'")
}
} catch {
case ae: AnalysisException =>
throw ae
case NonFatal(e) =>
val analysisException =
new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e")
analysisException.setStackTrace(e.getStackTrace)
throw analysisException
}
}
}

/**
* Construct a [[FunctionBuilder]] based on the provided class that represents a function.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,59 +404,34 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
}

test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") {
Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")
withTempView("testUDF") {
Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")

def testErrorMsgForFunc(funcName: String, className: String): Unit = {
withUserDefinedFunction(funcName -> true) {
sql(s"CREATE TEMPORARY FUNCTION $funcName AS '$className'")
val message = intercept[AnalysisException] {
sql(s"SELECT $funcName() FROM testUDF")
}.getMessage
assert(message.contains(s"No handler for UDF/UDAF/UDTF '$className'"))
}
}

{
// HiveSimpleUDF
sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDFTwoListList() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")
}
testErrorMsgForFunc("testUDFTwoListList", classOf[UDFTwoListList].getName)

{
// HiveGenericUDF
sql(s"CREATE TEMPORARY FUNCTION testUDFAnd AS '${classOf[GenericUDFOPAnd].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDFAnd() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd")
}
testErrorMsgForFunc("testUDFAnd", classOf[GenericUDFOPAnd].getName)

{
// Hive UDAF
sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b")
}.getMessage
assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile")
}
testErrorMsgForFunc("testUDAFPercentile", classOf[UDAFPercentile].getName)

{
// AbstractGenericUDAFResolver
sql(s"CREATE TEMPORARY FUNCTION testUDAFAverage AS '${classOf[GenericUDAFAverage].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b")
}.getMessage
assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage")
}
testErrorMsgForFunc("testUDAFAverage", classOf[GenericUDAFAverage].getName)

{
// Hive UDTF
sql(s"CREATE TEMPORARY FUNCTION testUDTFExplode AS '${classOf[GenericUDTFExplode].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDTFExplode() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode")
// AbstractGenericUDAFResolver
testErrorMsgForFunc("testUDTFExplode", classOf[GenericUDTFExplode].getName)
}

spark.catalog.dropTempView("testUDF")
}

test("Hive UDF in group by") {
Expand Down

0 comments on commit 7251be9

Please sign in to comment.