From 8f6b09a7813cec22480a23f0301c5d5988090d02 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 24 Oct 2014 14:52:26 -0700 Subject: [PATCH] Update test harness to work with both Hive 12 and 13. --- dev/run-tests | 2 +- project/SparkBuild.scala | 6 +- .../catalyst/analysis/HiveTypeCoercion.scala | 13 +++ .../catalyst/expressions/complexTypes.scala | 25 +++++ .../org/apache/spark/sql/SQLContext.scala | 1 - .../execution/HiveCompatibilitySuite.scala | 102 ++++++++++++------ .../apache/spark/sql/hive/HiveContext.scala | 3 +- .../spark/sql/hive/HiveInspectors.scala | 1 - .../spark/sql/hive/HiveMetastoreCatalog.scala | 1 - .../org/apache/spark/sql/hive/HiveQl.scala | 21 ++-- .../apache/spark/sql/hive/TableReader.scala | 1 - .../org/apache/spark/sql/hive/TestHive.scala | 7 +- .../sql/hive/api/java/JavaHiveContext.scala | 6 +- .../execution/DescribeHiveTableCommand.scala | 2 +- sql/hive/src/test/resources/log4j.properties | 6 ++ .../spark/sql/hive/StatisticsSuite.scala | 2 - .../sql/hive/api/java/JavaHiveQLSuite.scala | 6 +- .../hive/execution/HiveComparisonTest.scala | 19 +++- .../sql/hive/execution/HiveQuerySuite.scala | 16 +-- .../sql/hive/{Shim.scala => Shim12.scala} | 15 ++- .../sql/hive/{Shim.scala => Shim13.scala} | 5 +- 21 files changed, 184 insertions(+), 76 deletions(-) rename sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/{Shim.scala => Shim12.scala} (92%) rename sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/{Shim.scala => Shim13.scala} (98%) diff --git a/dev/run-tests b/dev/run-tests index 7d06c86eb4b41..f55497ae2bfbd 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -167,7 +167,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled. # This must be a single argument, as it is. if [ -n "$_RUN_SQL_TESTS" ]; then - SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" + SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" fi if [ -n "$_SQL_TESTS_ONLY" ]; then diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 705937e3016e2..ea04473854007 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -253,7 +253,11 @@ object Hive { |import org.apache.spark.sql.hive._ |import org.apache.spark.sql.hive.test.TestHive._ |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin, - cleanupCommands in console := "sparkContext.stop()" + cleanupCommands in console := "sparkContext.stop()", + // Some of our log4j jars make it impossible to submit jobs from this JVM to Hive Map/Reduce + // in order to generate golden files. This is only required for developers who are adding new + // new query tests. + fullClasspath in Test := (fullClasspath in Test).value.filterNot { f => f.toString.contains("jcl-over") } ) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 7c480de107e7f..2b69c02b28285 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -52,6 +52,8 @@ object HiveTypeCoercion { */ trait HiveTypeCoercion { + import HiveTypeCoercion._ + val typeCoercionRules = PropagateTypes :: ConvertNaNs :: @@ -340,6 +342,13 @@ trait HiveTypeCoercion { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + case a @ CreateArray(children) if !a.resolved => + val commonType = a.childTypes.reduce( + (a,b) => + findTightestCommonType(a,b).getOrElse(StringType)) + CreateArray( + children.map(c => if (c.dataType == commonType) c else Cast(c, commonType))) + // Promote SUM, SUM DISTINCT and AVERAGE to largest types to prevent overflows. case s @ Sum(e @ DecimalType()) => s // Decimal is already the biggest. case Sum(e @ IntegralType()) if e.dataType != LongType => Sum(Cast(e, LongType)) @@ -356,6 +365,10 @@ trait HiveTypeCoercion { Average(Cast(e, LongType)) case Average(e @ FractionalType()) if e.dataType != DoubleType => Average(Cast(e, DoubleType)) + + // Hive lets you do aggregation of timestamps... for some reason + case Sum(e @ TimestampType()) => Sum(Cast(e, DoubleType)) + case Average(e @ TimestampType()) => Average(Cast(e, DoubleType)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index dafd745ec96c6..3b7d2bedec8b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -101,3 +101,28 @@ case class GetField(child: Expression, fieldName: String) extends UnaryExpressio override def toString = s"$child.$fieldName" } + +/** + * Returns an Array containing the evaluation of all children expressions. + */ +case class CreateArray(children: Seq[Expression]) extends Expression { + override type EvaluatedType = Any + + lazy val childTypes = children.map(_.dataType).distinct + + override lazy val resolved = + childrenResolved && childTypes.size <= 1 + + override def dataType: DataType = { + assert(resolved, s"Invalid dataType of mixed ArrayType ${childTypes.mkString(",")}") + ArrayType(childTypes.headOption.getOrElse(NullType)) + } + + override def nullable: Boolean = false + + override def eval(input: Row): Any = { + children.map(_.eval(input)) + } + + override def toString = s"Array(${children.mkString(",")})" +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 590dbf3cb893d..c4f4ef01d78df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.optimizer.{Optimizer, DefaultOptimizer} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types.DataType -import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.execution.{SparkStrategies, _} import org.apache.spark.sql.json._ import org.apache.spark.sql.parquet.ParquetRelation diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 463888551a359..e207a14a46904 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -23,6 +23,7 @@ import java.util.{Locale, TimeZone} import org.scalatest.BeforeAndAfter import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.hive.HiveShim import org.apache.spark.sql.hive.test.TestHive /** @@ -135,6 +136,9 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "stats20", "alter_merge_stats", "columnstats.*", + "annotate_stats.*", + "database_drop", + "index_serde", // Hive seems to think 1.0 > NaN = true && 1.0 < NaN = false... which is wrong. @@ -211,8 +215,20 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "describe_comment_indent", // Limit clause without a ordering, which causes failure. - "orc_predicate_pushdown" - ) + "orc_predicate_pushdown", + + // Requires precision decimal support: + "decimal_1", + "udf_pmod", + "udf_when", + "udf_case", + "udf_to_double", + "udf_to_float", + + // Needs constant object inspectors + "udf_round", + "udf7" + ) ++ HiveShim.compatibilityBlackList /** * The set of tests that are believed to be working in catalyst. Tests not on whiteList or @@ -220,23 +236,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { */ override def whiteList = Seq( "add_part_exist", - "dynamic_partition_skip_default", - "infer_bucket_sort_dyn_part", - "load_dyn_part1", - "load_dyn_part2", - "load_dyn_part3", - "load_dyn_part4", - "load_dyn_part5", - "load_dyn_part6", - "load_dyn_part7", - "load_dyn_part8", - "load_dyn_part9", - "load_dyn_part10", - "load_dyn_part11", - "load_dyn_part12", - "load_dyn_part13", - "load_dyn_part14", - "load_dyn_part14_win", "add_part_multiple", "add_partition_no_whitelist", "add_partition_with_whitelist", @@ -256,6 +255,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "alter_varchar2", "alter_view_as_select", "ambiguous_col", + "annotate_stats_join", + "annotate_stats_limit", + "annotate_stats_part", + "annotate_stats_table", + "annotate_stats_union", "auto_join0", "auto_join1", "auto_join10", @@ -299,6 +303,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "auto_sortmerge_join_13", "auto_sortmerge_join_14", "auto_sortmerge_join_15", + "auto_sortmerge_join_16", "auto_sortmerge_join_2", "auto_sortmerge_join_3", "auto_sortmerge_join_4", @@ -340,7 +345,10 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "create_skewed_table1", "create_struct_table", "cross_join", + "cross_product_check_1", + "cross_product_check_2", "ct_case_insensitive", + "database_drop", "database_location", "database_properties", "date_2", @@ -360,8 +368,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "diff_part_input_formats", "disable_file_format_check", "disallow_incompatible_type_change_off", + "distinct_stats", + "drop_database_removes_partition_dirs", "drop_function", "drop_index", + "drop_index_removes_partition_dirs", "drop_multi_partitions", "drop_partitions_filter", "drop_partitions_filter2", @@ -369,23 +380,30 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "drop_partitions_ignore_protection", "drop_table", "drop_table2", + "drop_table_removes_partition_dirs", "drop_view", + "dynamic_partition_skip_default", "escape_clusterby1", "escape_distributeby1", "escape_orderby1", "escape_sortby1", + "explain_rearrange", "fetch_aggregation", + "fileformat_mix", "fileformat_sequencefile", "fileformat_text", "filter_join_breaktask", "filter_join_breaktask2", "groupby1", "groupby11", + "groupby12", + "groupby1_limit", "groupby1_map", "groupby1_map_nomap", "groupby1_map_skew", "groupby1_noskew", "groupby2", + "groupby2_limit", "groupby2_map", "groupby2_map_skew", "groupby2_noskew", @@ -406,6 +424,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "groupby7_map_multi_single_reducer", "groupby7_map_skew", "groupby7_noskew", + "groupby7_noskew_multi_single_reducer", "groupby8", "groupby8_map", "groupby8_map_skew", @@ -432,6 +451,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "groupby_sort_test_1", "having", "implicit_cast1", + "index_serde", + "infer_bucket_sort_dyn_part", "innerjoin", "inoutdriver", "input", @@ -502,7 +523,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "join17", "join18", "join19", - "join_1to1", "join2", "join20", "join21", @@ -534,6 +554,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "join7", "join8", "join9", + "join_1to1", "join_array", "join_casesensitive", "join_empty", @@ -557,7 +578,21 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "literal_double", "literal_ints", "literal_string", + "load_dyn_part1", + "load_dyn_part10", + "load_dyn_part11", + "load_dyn_part12", + "load_dyn_part13", + "load_dyn_part14", + "load_dyn_part14_win", + "load_dyn_part2", + "load_dyn_part3", + "load_dyn_part4", + "load_dyn_part5", + "load_dyn_part6", "load_dyn_part7", + "load_dyn_part8", + "load_dyn_part9", "load_file_with_space_in_the_name", "loadpart1", "louter_join_ppr", @@ -578,13 +613,13 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "merge1", "merge2", "mergejoins", - "multigroupby_singlemr", + "multiMapJoin1", + "multiMapJoin2", "multi_insert_gby", "multi_insert_gby3", "multi_insert_lateral_view", "multi_join_union", - "multiMapJoin1", - "multiMapJoin2", + "multigroupby_singlemr", "noalias_subq1", "nomore_ambiguous_table_col", "nonblock_op_deduplicate", @@ -607,10 +642,10 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "outer_join_ppr", "parallel", "parenthesis_star_by", - "partcols1", "part_inherit_tbl_props", "part_inherit_tbl_props_empty", "part_inherit_tbl_props_with_star", + "partcols1", "partition_date", "partition_schema1", "partition_serde_format", @@ -641,7 +676,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "ppd_outer_join5", "ppd_random", "ppd_repeated_alias", - "ppd_transform", "ppd_udf_col", "ppd_union", "ppr_allchildsarenull", @@ -674,15 +708,15 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "serde_regex", "serde_reported_schema", "set_variable_sub", - "show_create_table_partitioned", - "show_create_table_delimited", + "show_columns", "show_create_table_alter", - "show_create_table_view", - "show_create_table_serde", "show_create_table_db_table", + "show_create_table_delimited", "show_create_table_does_not_exist", "show_create_table_index", - "show_columns", + "show_create_table_partitioned", + "show_create_table_serde", + "show_create_table_view", "show_describe_func_quotes", "show_functions", "show_partitions", @@ -738,12 +772,14 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udaf_covar_pop", "udaf_covar_samp", "udaf_histogram_numeric", - "udf_10_trims", "udf2", "udf6", "udf7", "udf8", "udf9", + "udf_10_trims", + "udf_E", + "udf_PI", "udf_abs", "udf_acos", "udf_add", @@ -774,14 +810,13 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_cos", "udf_count", "udf_date_add", - "udf_datediff", "udf_date_sub", + "udf_datediff", "udf_day", "udf_dayofmonth", "udf_degrees", "udf_div", "udf_double", - "udf_E", "udf_elt", "udf_equal", "udf_exp", @@ -826,7 +861,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_nvl", "udf_or", "udf_parse_url", - "udf_PI", "udf_pmod", "udf_positive", "udf_pow", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 34ed57b001637..fad4091d48a89 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -46,7 +46,6 @@ import org.apache.spark.sql.execution.ExtractPythonUdfs import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.{Command => PhysicalCommand} import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand -import org.apache.spark.sql.hive.HiveShim /** * DEPRECATED: Use HiveContext instead. @@ -230,7 +229,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * set() or a SET command inside sql() will be set in the SQLConf *as well as* * in the HiveConf. */ - @transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState]) + @transient lazy val hiveconf = new HiveConf(classOf[SessionState]) @transient protected[hive] lazy val sessionState = { val ss = new SessionState(hiveconf) setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index deaa1a2a154f2..fad7373a2fa39 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -26,7 +26,6 @@ import org.apache.hadoop.{io => hadoopIo} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.hive.HiveShim /* Implicit conversions */ import scala.collection.JavaConversions._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 904bb48691e35..04c48c385966e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.hive.HiveShim import org.apache.spark.util.Utils /* Implicit conversions */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ffcb6b505b9c6..54c619722ee12 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -251,6 +251,8 @@ private[hive] object HiveQl { s""" |Unsupported language features in query: $sql |${dumpTree(getAst(sql))} + |$e + |${e.getStackTrace.head} """.stripMargin) } } @@ -329,6 +331,7 @@ private[hive] object HiveQl { case Token("TOK_SMALLINT", Nil) => ShortType case Token("TOK_BOOLEAN", Nil) => BooleanType case Token("TOK_STRING", Nil) => StringType + case Token("TOK_VARCHAR", Token(_, Nil) :: Nil) => StringType case Token("TOK_FLOAT", Nil) => FloatType case Token("TOK_DOUBLE", Nil) => DoubleType case Token("TOK_DATE", Nil) => DateType @@ -854,9 +857,11 @@ private[hive] object HiveQl { HiveParser.Number, HiveParser.TinyintLiteral, HiveParser.SmallintLiteral, - HiveParser.BigintLiteral) + HiveParser.BigintLiteral, + HiveParser.DecimalLiteral) /* Case insensitive matches */ + val ARRAY = "(?i)ARRAY".r val COUNT = "(?i)COUNT".r val AVG = "(?i)AVG".r val SUM = "(?i)SUM".r @@ -917,7 +922,9 @@ private[hive] object HiveQl { /* Casts */ case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) => Cast(nodeToExpr(arg), StringType) - case Token("TOK_FUNCTION", Token("TOK_VARCHAR", Nil) :: arg :: Nil) => + case Token("TOK_FUNCTION", Token("TOK_VARCHAR", _) :: arg :: Nil) => + Cast(nodeToExpr(arg), StringType) + case Token("TOK_FUNCTION", Token("TOK_CHAR", _) :: arg :: Nil) => Cast(nodeToExpr(arg), StringType) case Token("TOK_FUNCTION", Token("TOK_INT", Nil) :: arg :: Nil) => Cast(nodeToExpr(arg), IntegerType) @@ -1009,6 +1016,8 @@ private[hive] object HiveQl { GetItem(nodeToExpr(child), nodeToExpr(ordinal)) /* Other functions */ + case Token("TOK_FUNCTION", Token(ARRAY(), Nil) :: children) => + CreateArray(children.map(nodeToExpr)) case Token("TOK_FUNCTION", Token(RAND(), Nil) :: Nil) => Rand case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType)) @@ -1042,10 +1051,10 @@ private[hive] object HiveQl { } else if (ast.getText.endsWith("Y")) { // Literal tinyint. v = Literal(ast.getText.substring(0, ast.getText.length() - 1).toByte, ByteType) - } else if (ast.getText.endsWith("BD")) { + } else if (ast.getText.endsWith("BD") || ast.getText.endsWith("D")) { // Literal decimal - val strVal = ast.getText.substring(0, ast.getText.length() - 2) - BigDecimal(strVal) + val strVal = ast.getText.stripSuffix("D").stripSuffix("B") + v = Literal(BigDecimal(strVal)) } else { v = Literal(ast.getText.toDouble, DoubleType) v = Literal(ast.getText.toLong, LongType) @@ -1056,7 +1065,7 @@ private[hive] object HiveQl { } if (v == null) { - sys.error(s"Failed to parse number ${ast.getText}") + sys.error(s"Failed to parse number '${ast.getText}'.") } else { v } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index e45eb57b3debf..9ff7ab5a124c1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -34,7 +34,6 @@ import org.apache.spark.SerializableWritable import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.hive.HiveShim /** * A trait for subclasses that handle table scans. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index c6ff4ea6de594..bb79ad5538046 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.hive.test import java.io.File import java.util.{Set => JavaSet} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.session.SessionState + import scala.collection.mutable import scala.language.implicitConversions @@ -119,7 +122,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { if (cmd.toUpperCase contains "LOAD DATA") { val testDataLocation = hiveDevHome.map(_.getCanonicalPath).getOrElse(inRepoTests.getCanonicalPath) - cmd.replaceAll("\\.\\.", testDataLocation) + cmd.replaceAll("\\.\\./\\.\\./", testDataLocation + "/") } else { cmd } @@ -417,6 +420,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { FunctionRegistry.unregisterTemporaryUDF(udfName) } + // Some tests corrupt this value on purpose, which breaks the RESET call below. + hiveconf.set("fs.default.name", new File(".").toURI.toString) // It is important that we RESET first as broken hooks that might have been set could break // other sql exec here. runSqlHive("RESET") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala index a201d2349a2ef..1817c7832490e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala @@ -19,15 +19,15 @@ package org.apache.spark.sql.hive.api.java import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD} -import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.{HiveContext, HiveQl} /** * The entry point for executing Spark SQL queries from a Java program. */ -class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(sparkContext) { +class JavaHiveContext(sqlContext: SQLContext) extends JavaSQLContext(sqlContext) { - override val sqlContext = new HiveContext(sparkContext) + def this(sparkContext: JavaSparkContext) = this(new HiveContext(sparkContext)) override def sql(sqlText: String): JavaSchemaRDD = { // TODO: Create a framework for registering parsers instead of just hardcoding if statements. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index fbd375639692f..5d98834c6fb33 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -45,7 +45,7 @@ case class DescribeHiveTableCommand( lazy val hiveString: Seq[String] = sideEffectResult.map { case Row(name: String, dataType: String, comment) => Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse(HiveShim.getEmptyCommentsFieldValue)) + Option(comment.asInstanceOf[String]).getOrElse("")) .map(s => String.format(s"%-20s", s)) .mkString("\t") } diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties index 9fdb526d945e0..5bc08062d30eb 100644 --- a/sql/hive/src/test/resources/log4j.properties +++ b/sql/hive/src/test/resources/log4j.properties @@ -42,6 +42,12 @@ log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF +log4j.additivity.hive.log=false +log4j.logger.hive.log=OFF + +log4j.additivity.parquet.hadoop.ParquetRecordReader=false +log4j.logger.parquet.hadoop.ParquetRecordReader=OFF + log4j.additivity.hive.ql.metadata.Hive=false log4j.logger.hive.ql.metadata.Hive=OFF diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index aaefe84ce81ea..a90fc023e67d8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -21,11 +21,9 @@ import org.scalatest.BeforeAndAfterAll import scala.reflect.ClassTag - import org.apache.spark.sql.{SQLConf, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.NativeCommand import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} -import org.apache.spark.sql.hive.HiveShim import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala index 46b11b582b26d..ca78dfba4fa38 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala @@ -22,7 +22,7 @@ import scala.util.Try import org.scalatest.FunSuite import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.api.java.JavaSchemaRDD +import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD} import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.sql.hive.test.TestHive @@ -33,9 +33,7 @@ class JavaHiveQLSuite extends FunSuite { lazy val javaCtx = new JavaSparkContext(TestHive.sparkContext) // There is a little trickery here to avoid instantiating two HiveContexts in the same JVM - lazy val javaHiveCtx = new JavaHiveContext(javaCtx) { - override val sqlContext = TestHive - } + lazy val javaHiveCtx = new JavaHiveContext(TestHive) test("SELECT * FROM src") { assert( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 79cc7a3fcc7d6..44eb4cfa59335 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -142,14 +142,25 @@ abstract class HiveComparisonTest // Hack: Hive simply prints the result of a SET command to screen, // and does not return it as a query answer. case _: SetCommand => Seq("0") + case LogicalNativeCommand(c) if c.toLowerCase.contains("desc") => + answer + .filterNot(nonDeterministicLine) + .map(_.replaceAll("from deserializer", "")) + .map(_.replaceAll("None", "")) + .map(_.trim) + .filterNot(_ == "") case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer case _: DescribeCommand => // Filter out non-deterministic lines and lines which do not have actual results but // can introduce problems because of the way Hive formats these lines. // Then, remove empty lines. Do not sort the results. - answer.filterNot( - r => nonDeterministicLine(r) || ignoredLine(r)).map(_.trim).filterNot(_ == "") + answer + .filterNot(r => nonDeterministicLine(r) || ignoredLine(r)) + .map(_.replaceAll("from deserializer", "")) + .map(_.replaceAll("None", "")) + .map(_.trim) + .filterNot(_ == "") case plan => if (isSorted(plan)) answer else answer.sorted } orderedAnswer.map(cleanPaths) @@ -164,6 +175,7 @@ abstract class HiveComparisonTest "last_modified_by", "last_modified_time", "Owner:", + "COLUMN_STATS_ACCURATE", // The following are hive specific schema parameters which we do not need to match exactly. "numFiles", "numRows", @@ -237,6 +249,7 @@ abstract class HiveComparisonTest // the system to return the wrong answer. Since we have no intention of mirroring their // previously broken behavior we simply filter out changes to this setting. .filterNot(_ contains "hive.outerjoin.supports.filters") + .filterNot(_ contains "hive.exec.post.hooks") if (allQueries != queryList) logWarning(s"Simplifications made on unsupported operations for test $testCaseName") @@ -345,7 +358,7 @@ abstract class HiveComparisonTest (queryList, hiveResults, catalystResults).zipped.foreach { case (query, hive, (hiveQuery, catalyst)) => // Check that the results match unless its an EXPLAIN query. - val preparedHive = prepareAnswer(hiveQuery,hive) + val preparedHive = prepareAnswer(hiveQuery, hive) if ((!hiveQuery.logical.isInstanceOf[ExplainCommand]) && preparedHive != catalyst) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 5de20175d9f57..322a25bb20837 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -508,19 +508,19 @@ class HiveQuerySuite extends HiveComparisonTest { // Describe a partition is a native command assertResult( Array( - Array("key", "int", HiveShim.getEmptyCommentsFieldValue), - Array("value", "string", HiveShim.getEmptyCommentsFieldValue), - Array("dt", "string", HiveShim.getEmptyCommentsFieldValue), - Array("", "", ""), - Array("# Partition Information", "", ""), + Array("key", "int"), + Array("value", "string"), + Array("dt", "string"), + Array(""), + Array("# Partition Information"), Array("# col_name", "data_type", "comment"), - Array("", "", ""), - Array("dt", "string", HiveShim.getEmptyCommentsFieldValue)) + Array(""), + Array("dt", "string")) ) { sql("DESCRIBE test_describe_commands1 PARTITION (dt='2008-06-08')") .select('result) .collect() - .map(_.getString(0).split("\t").map(_.trim)) + .map(_.getString(0).replaceAll("None", "").trim.split("\t").map(_.trim)) } // Describe a registered temporary table. diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala similarity index 92% rename from sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala rename to sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala index 6dde636965afd..922881798158d 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala @@ -58,9 +58,6 @@ private[hive] object HiveShim { def createDefaultDBIfNeeded(context: HiveContext) = { } - /** The string used to denote an empty comments field in the schema. */ - def getEmptyCommentsFieldValue = "None" - def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { CommandProcessorFactory.get(cmd(0), conf) } @@ -82,6 +79,18 @@ private[hive] object HiveShim { def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl) + def compatibilityBlackList = Seq( + "database_location", + "database_properties", + "decimal_.*", + "drop_partitions_filter2", + "show_.*", + "serde_regex", + "udf_to_date", + "udaf_collect_set", + "udf_concat" + ) + } class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean) diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala similarity index 98% rename from sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala rename to sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala index 8678c0c475db4..b8d893d8c1319 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala @@ -72,9 +72,6 @@ private[hive] object HiveShim { context.runSqlHive("USE default") } - /* The string used to denote an empty comments field in the schema. */ - def getEmptyCommentsFieldValue = "" - def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { CommandProcessorFactory.get(cmd, conf) } @@ -122,6 +119,8 @@ private[hive] object HiveShim { def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsOf(tbl) + def compatibilityBlackList = Seq() + /* * Bug introdiced in hive-0.13. FileSinkDesc is serializable, but its member path is not. * Fix it through wrapper.