Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3343] [SQL] Add serde support for CTAS #2570

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,13 @@ case class InsertIntoTable(
}
}

case class CreateTableAsSelect(
case class CreateTableAsSelect[T](
databaseName: Option[String],
tableName: String,
child: LogicalPlan) extends UnaryNode {
override def output = child.output
child: LogicalPlan,
allowExisting: Boolean,
desc: Option[T] = None) extends UnaryNode {
override def output = Seq.empty[Attribute]
override lazy val resolved = (databaseName != None && childrenResolved)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
baseLogicalPlan
Expand Down Expand Up @@ -123,7 +123,7 @@ private[sql] trait SchemaRDDLike {
*/
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd
sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd

/** Returns the schema as a string in the tree format.
*
Expand Down
19 changes: 19 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._

class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer contains all of the keywords, or the
* none of keywords are listed in the answer
* @param rdd the [[SchemaRDD]] to be executed
* @param exists true for make sure the keywords are listed in the output, otherwise
* to make sure none of the keyword are not listed in the output
* @param keywords keyword in string array
*/
def checkExistence(rdd: SchemaRDD, exists: Boolean, keywords: String*) {
val outputs = rdd.collect().map(_.mkString).mkString
for (key <- keywords) {
if (exists) {
assert(outputs.contains(key), s"Failed for $rdd ($key doens't exist in result)")
} else {
assert(!outputs.contains(key), s"Failed for $rdd ($key existed in the result)")
}
}
}

/**
* Runs the plan and makes sure the answer matches the expected result.
* @param rdd the [[SchemaRDD]] to be executed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {

// Needs constant object inspectors
"udf_round",
"udf7"
"udf7",

// Sort with Limit clause causes failure.
"ctas",
"ctas_hadoop20"
) ++ HiveShim.compatibilityBlackList

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,27 @@

package org.apache.spark.sql.hive

import java.io.IOException
import java.util.{List => JList}

import scala.util.parsing.combinator.RegexParsers

import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Partition => TPartition, Table => TTable}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.fs.Path

import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
import org.apache.hadoop.hive.ql.plan.{TableDesc, CreateTableDesc}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe

import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.Catalog
import org.apache.spark.sql.catalyst.analysis.{Catalog, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -66,37 +76,164 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
table.getTTable, partitions.map(part => part.getTPartition))(hive)
}

/**
* Create table with specified database, table name, table description and schema
* @param databaseName Database Name
* @param tableName Table Name
* @param schema Schema of the new table, if not specified, will use the schema
* specified in crtTbl
* @param allowExisting if true, ignore AlreadyExistsException
* @param desc CreateTableDesc object which contains the SerDe info. Currently
* we support most of the features except the bucket.
*/
def createTable(
databaseName: String,
tableName: String,
schema: Seq[Attribute],
allowExisting: Boolean = false): Unit = {
allowExisting: Boolean = false,
desc: Option[CreateTableDesc] = None) {
val hconf = hive.hiveconf

val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
val table = new Table(dbName, tblName)
val hiveSchema =
val tbl = new Table(dbName, tblName)

val crtTbl: CreateTableDesc = desc.getOrElse(null)

// We should respect the passed in schema, unless it's not set
val hiveSchema: JList[FieldSchema] = if (schema == null || schema.isEmpty) {
crtTbl.getCols
} else {
schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
table.setFields(hiveSchema)

val sd = new StorageDescriptor()
table.getTTable.setSd(sd)
sd.setCols(hiveSchema)

// TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs.
sd.setCompressed(false)
sd.setParameters(Map[String, String]())
sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat")
sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
val serDeInfo = new SerDeInfo()
serDeInfo.setName(tblName)
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
serDeInfo.setParameters(Map[String, String]())
sd.setSerdeInfo(serDeInfo)
}
tbl.setFields(hiveSchema)

// Most of code are similar with the DDLTask.createTable() of Hive,
if (crtTbl != null && crtTbl.getTblProps() != null) {
tbl.getTTable().getParameters().putAll(crtTbl.getTblProps())
}

if (crtTbl != null && crtTbl.getPartCols() != null) {
tbl.setPartCols(crtTbl.getPartCols())
}

if (crtTbl != null && crtTbl.getStorageHandler() != null) {
tbl.setProperty(
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE,
crtTbl.getStorageHandler())
}

/*
* We use LazySimpleSerDe by default.
*
* If the user didn't specify a SerDe, and any of the columns are not simple
* types, we will have to use DynamicSerDe instead.
*/
if (crtTbl == null || crtTbl.getSerName() == null) {
val storageHandler = tbl.getStorageHandler()
if (storageHandler == null) {
logInfo(s"Default to LazySimpleSerDe for table $dbName.$tblName")
tbl.setSerializationLib(classOf[LazySimpleSerDe].getName())

import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.io.Text

tbl.setInputFormatClass(classOf[TextInputFormat])
tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]])
tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
} else {
val serDeClassName = storageHandler.getSerDeClass().getName()
logInfo(s"Use StorageHandler-supplied $serDeClassName for table $dbName.$tblName")
tbl.setSerializationLib(serDeClassName)
}
} else {
// let's validate that the serde exists
val serdeName = crtTbl.getSerName()
try {
val d = ReflectionUtils.newInstance(hconf.getClassByName(serdeName), hconf)
if (d != null) {
logDebug("Found class for $serdeName")
}
} catch {
case e: SerDeException => throw new HiveException("Cannot validate serde: " + serdeName, e)
}
tbl.setSerializationLib(serdeName)
}

if (crtTbl != null && crtTbl.getFieldDelim() != null) {
tbl.setSerdeParam(serdeConstants.FIELD_DELIM, crtTbl.getFieldDelim())
tbl.setSerdeParam(serdeConstants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim())
}
if (crtTbl != null && crtTbl.getFieldEscape() != null) {
tbl.setSerdeParam(serdeConstants.ESCAPE_CHAR, crtTbl.getFieldEscape())
}

if (crtTbl != null && crtTbl.getCollItemDelim() != null) {
tbl.setSerdeParam(serdeConstants.COLLECTION_DELIM, crtTbl.getCollItemDelim())
}
if (crtTbl != null && crtTbl.getMapKeyDelim() != null) {
tbl.setSerdeParam(serdeConstants.MAPKEY_DELIM, crtTbl.getMapKeyDelim())
}
if (crtTbl != null && crtTbl.getLineDelim() != null) {
tbl.setSerdeParam(serdeConstants.LINE_DELIM, crtTbl.getLineDelim())
}

if (crtTbl != null && crtTbl.getSerdeProps() != null) {
val iter = crtTbl.getSerdeProps().entrySet().iterator()
while (iter.hasNext()) {
val m = iter.next()
tbl.setSerdeParam(m.getKey(), m.getValue())
}
}

if (crtTbl != null && crtTbl.getComment() != null) {
tbl.setProperty("comment", crtTbl.getComment())
}

if (crtTbl != null && crtTbl.getLocation() != null) {
HiveShim.setLocation(tbl, crtTbl)
}

if (crtTbl != null && crtTbl.getSkewedColNames() != null) {
tbl.setSkewedColNames(crtTbl.getSkewedColNames())
}
if (crtTbl != null && crtTbl.getSkewedColValues() != null) {
tbl.setSkewedColValues(crtTbl.getSkewedColValues())
}

if (crtTbl != null) {
tbl.setStoredAsSubDirectories(crtTbl.isStoredAsSubDirectories())
tbl.setInputFormatClass(crtTbl.getInputFormat())
tbl.setOutputFormatClass(crtTbl.getOutputFormat())
}

tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName())
tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName())

if (crtTbl != null && crtTbl.isExternal()) {
tbl.setProperty("EXTERNAL", "TRUE")
tbl.setTableType(TableType.EXTERNAL_TABLE)
}

// set owner
try {
tbl.setOwner(hive.hiveconf.getUser)
} catch {
case e: IOException => throw new HiveException("Unable to get current user", e)
}

// set create time
tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])

// TODO add bucket support
// TODO set more info if Hive upgrade

// create the table
synchronized {
try client.createTable(table) catch {
case e: org.apache.hadoop.hive.ql.metadata.HiveException
if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
allowExisting => // Do nothing.
try client.createTable(tbl, allowExisting) catch {
case e: org.apache.hadoop.hive.metastore.api.AlreadyExistsException
if allowExisting => // Do nothing
case e: Throwable => throw e
}
}
}
Expand All @@ -110,11 +247,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p

case CreateTableAsSelect(db, tableName, child) =>
case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)

CreateTableAsSelect(Some(databaseName), tableName, child)
CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra)
}
}

Expand Down
15 changes: 5 additions & 10 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -447,14 +447,14 @@ private[hive] object HiveQl {
}

case Token("TOK_CREATETABLE", children)
if children.collect { case t@Token("TOK_QUERY", _) => t }.nonEmpty =>
// TODO: Parse other clauses.
if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
val (
Some(tableNameParts) ::
_ /* likeTable */ ::
Some(query) +:
notImplemented) =
Some(query) ::
allowExisting +:
ignores) =
getClauses(
Seq(
"TOK_TABNAME",
Expand All @@ -478,14 +478,9 @@ private[hive] object HiveQl {
"TOK_TABLELOCATION",
"TOK_TABLEPROPERTIES"),
children)
if (notImplemented.exists(token => !token.isEmpty)) {
throw new NotImplementedError(
s"Unhandled clauses: ${notImplemented.flatten.map(dumpTree(_)).mkString("\n")}")
}

val (db, tableName) = extractDbNameTableName(tableNameParts)

CreateTableAsSelect(db, tableName, nodeToPlan(query))
CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node))

// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
case Token("TOK_CREATETABLE", _) => NativePlaceholder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive

import org.apache.hadoop.hive.ql.parse.ASTNode

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -160,17 +162,14 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil

case logical.CreateTableAsSelect(database, tableName, child) =>
val query = planLater(child)
case logical.CreateTableAsSelect(
Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the database always be specified? If so maybe it shouldn't be an Option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's always specified here, we just to extract the real value in pattern matching, so, we can use the variable database directly later on, otherwise, we need to call database.get

CreateTableAsSelect(
database.get,
database,
tableName,
query,
InsertIntoHiveTable(_: MetastoreRelation,
Map(),
query,
overwrite = true)(hiveContext)) :: Nil
child,
allowExisting,
extra) :: Nil
case _ => Nil
}
}
Expand Down
Loading