Skip to content

Commit

Permalink
remove metastorerelation
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Jul 6, 2016
1 parent 07d9c53 commit ccc26c9
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,22 +223,18 @@ case class InsertIntoHiveTable(
jobConf,
fileSinkConf,
dynamicPartColNames,
child.output,
table)
child.output)
} else {
new SparkHiveWriterContainer(
jobConf,
fileSinkConf,
child.output,
table)
child.output)
}

@transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)

val outputPath = FileOutputFormat.getOutputPath(jobConf)
// Have to construct the format of dbname.tablename.
val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
// TODO: Correctly set holdDDLTime.
// In most of the time, we should have holdDDLTime = false.
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
Expand All @@ -260,7 +256,7 @@ case class InsertIntoHiveTable(
client.synchronized {
client.loadDynamicPartitions(
outputPath.toString,
qualifiedTableName,
table.catalogTable.qualifiedName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
Expand All @@ -274,13 +270,13 @@ case class InsertIntoHiveTable(
// scalastyle:on
val oldPart =
client.getPartitionOption(
client.getTable(table.databaseName, table.tableName),
table.catalogTable,
partitionSpec)

if (oldPart.isEmpty || !ifNotExists) {
client.loadPartition(
outputPath.toString,
qualifiedTableName,
table.catalogTable.qualifiedName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
Expand All @@ -291,7 +287,7 @@ case class InsertIntoHiveTable(
} else {
client.loadTable(
outputPath.toString, // TODO: URI
qualifiedTableName,
table.catalogTable.qualifiedName,
overwrite,
holdDDLTime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
private[hive] class SparkHiveWriterContainer(
@transient private val jobConf: JobConf,
fileSinkConf: FileSinkDesc,
inputSchema: Seq[Attribute],
table: MetastoreRelation)
inputSchema: Seq[Attribute])
extends Logging
with HiveInspectors
with Serializable {
Expand Down Expand Up @@ -217,9 +216,8 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String],
inputSchema: Seq[Attribute],
table: MetastoreRelation)
extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) {
inputSchema: Seq[Attribute])
extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema) {

import SparkHiveDynamicPartitionWriterContainer._

Expand Down

0 comments on commit ccc26c9

Please sign in to comment.