Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
[SPARK-21617][SQL] Store correct table metadata when altering schema …
Browse files Browse the repository at this point in the history
…in Hive metastore.

For Hive tables, the current "replace the schema" code is the correct
path, except that an exception in that path should result in an error, and
not in retrying in a different way.

For data source tables, Spark may generate a non-compatible Hive table;
but for that to work with Hive 2.1, the detection of data source tables needs
to be fixed in the Hive client, to also consider the raw tables used by code
such as `alterTableSchema`.

Tested with existing and added unit tests (plus internal tests with a 2.1 metastore).

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#18849 from vanzin/SPARK-21617.

(cherry picked from commit 84b5b16)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
Marcelo Vanzin authored and MatthewRBruce committed Jul 31, 2018
1 parent 6af9f83 commit 77d2d87
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2281,18 +2281,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}.getMessage
assert(e.contains("Found duplicate column(s)"))
} else {
if (isUsingHiveMetastore) {
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("HiveException"))
} else {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema ==
new StructType().add("c1", IntegerType).add("C1", StringType))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* should interpret these special data source properties and restore the original table metadata
* before returning it.
*/
private def getRawTable(db: String, table: String): CatalogTable = withClient {
private[hive] def getRawTable(db: String, table: String): CatalogTable = withClient {
client.getTable(db, table)
}

Expand Down Expand Up @@ -386,6 +386,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* can be used as table properties later.
*/
private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
tableMetaToTableProps(table, table.schema)
}

private def tableMetaToTableProps(
table: CatalogTable,
schema: StructType): mutable.Map[String, String] = {
val partitionColumns = table.partitionColumnNames
val bucketSpec = table.bucketSpec

Expand All @@ -394,7 +400,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = table.schema.json
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
Expand Down Expand Up @@ -627,20 +633,29 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
requireTableExists(db, table)
val rawTable = getRawTable(db, table)
val withNewSchema = rawTable.copy(schema = schema)
verifyColumnNames(withNewSchema)
// Add table metadata such as table schema, partition columns, etc. to table properties.
val updatedTable = withNewSchema.copy(
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
try {
client.alterTable(updatedTable)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema)
val withNewSchema = rawTable.copy(properties = updatedProperties, schema = schema)
verifyColumnNames(withNewSchema)

if (isDatasourceTable(rawTable)) {
// For data source tables, first try to write it with the schema set; if that does not work,
// try again with updated properties and the partition schema. This is a simplified version of
// what createDataSourceTable() does, and may leave the table in a state unreadable by Hive
// (for example, the schema does not match the data source schema, or does not match the
// storage descriptor).
try {
client.alterTable(withNewSchema)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(withNewSchema.copy(schema = rawTable.partitionSchema))
}
} else {
client.alterTable(withNewSchema)
}
}

Expand Down Expand Up @@ -1246,4 +1261,14 @@ object HiveExternalCatalog {
getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
}
}

/**
* Detects a data source table. This checks both the table provider and the table properties,
* unlike DDLUtils which just checks the former.
*/
private[spark] def isDatasourceTable(table: CatalogTable): Boolean = {
val provider = table.provider.orElse(table.properties.get(DATASOURCE_PROVIDER))
provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -838,7 +839,7 @@ private[hive] object HiveClientImpl {
}
// after SPARK-19279, it is not allowed to create a hive table with an empty schema,
// so here we should not add a default col schema
if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution

import scala.language.existentials

import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils

/**
* A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently
* from the built-in ones.
*/
@ExtendedHiveTest
class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach
with BeforeAndAfterAll {

// Create a custom HiveExternalCatalog instance with the desired configuration. We cannot
// use SparkSession here since there's already an active on managed by the TestHive object.
private var catalog = {
val warehouse = Utils.createTempDir()
val metastore = Utils.createTempDir()
metastore.delete()
val sparkConf = new SparkConf()
.set(SparkLauncher.SPARK_MASTER, "local")
.set(WAREHOUSE_PATH.key, warehouse.toURI().toString())
.set(CATALOG_IMPLEMENTATION.key, "hive")
.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")

val hadoopConf = new Configuration()
hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString())
hadoopConf.set("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
// These options are needed since the defaults in Hive 2.1 cause exceptions with an
// empty metastore db.
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")

new HiveExternalCatalog(sparkConf, hadoopConf)
}

override def afterEach: Unit = {
catalog.listTables("default").foreach { t =>
catalog.dropTable("default", t, true, false)
}
spark.sessionState.catalog.reset()
}

override def afterAll(): Unit = {
catalog = null
}

test("SPARK-21617: ALTER TABLE for non-compatible DataSource tables") {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 int) USING json",
StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))),
hiveCompatible = false)
}

test("SPARK-21617: ALTER TABLE for Hive-compatible DataSource tables") {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 int) USING parquet",
StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
}

test("SPARK-21617: ALTER TABLE for Hive tables") {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 int) STORED AS parquet",
StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
}

test("SPARK-21617: ALTER TABLE with incompatible schema on Hive-compatible table") {
val exception = intercept[AnalysisException] {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 string) USING parquet",
StructType(Array(StructField("c2", IntegerType))))
}
assert(exception.getMessage().contains("types incompatible with the existing columns"))
}

private def testAlterTable(
tableName: String,
createTableStmt: String,
updatedSchema: StructType,
hiveCompatible: Boolean = true): Unit = {
spark.sql(createTableStmt)
val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName)
catalog.createTable(oldTable, true)
catalog.alterTableSchema("default", tableName, updatedSchema)

val updatedTable = catalog.getTable("default", tableName)
assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames)
}

}

0 comments on commit 77d2d87

Please sign in to comment.