Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16389] [SQL] Remove MetastoreRelation from SparkHiveWriterContainer and SparkHiveDynamicPartitionWriterContainer #14062

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,22 +223,18 @@ case class InsertIntoHiveTable(
jobConf,
fileSinkConf,
dynamicPartColNames,
child.output,
table)
child.output)
} else {
new SparkHiveWriterContainer(
jobConf,
fileSinkConf,
child.output,
table)
child.output)
}

@transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)

val outputPath = FileOutputFormat.getOutputPath(jobConf)
// Have to construct the format of dbname.tablename.
val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
// TODO: Correctly set holdDDLTime.
// In most of the time, we should have holdDDLTime = false.
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
Expand All @@ -260,7 +256,7 @@ case class InsertIntoHiveTable(
client.synchronized {
client.loadDynamicPartitions(
outputPath.toString,
qualifiedTableName,
table.catalogTable.qualifiedName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
Expand All @@ -274,13 +270,13 @@ case class InsertIntoHiveTable(
// scalastyle:on
val oldPart =
client.getPartitionOption(
client.getTable(table.databaseName, table.tableName),
table.catalogTable,
partitionSpec)

if (oldPart.isEmpty || !ifNotExists) {
client.loadPartition(
Copy link
Member Author

@gatorsmile gatorsmile Jul 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the unnecessary getTable call

outputPath.toString,
qualifiedTableName,
table.catalogTable.qualifiedName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
Expand All @@ -291,7 +287,7 @@ case class InsertIntoHiveTable(
} else {
client.loadTable(
outputPath.toString, // TODO: URI
qualifiedTableName,
table.catalogTable.qualifiedName,
overwrite,
holdDDLTime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
private[hive] class SparkHiveWriterContainer(
@transient private val jobConf: JobConf,
fileSinkConf: FileSinkDesc,
inputSchema: Seq[Attribute],
table: MetastoreRelation)
inputSchema: Seq[Attribute])
extends Logging
with HiveInspectors
with Serializable {
Expand Down Expand Up @@ -217,9 +216,8 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String],
inputSchema: Seq[Attribute],
table: MetastoreRelation)
extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) {
inputSchema: Seq[Attribute])
extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema) {

import SparkHiveDynamicPartitionWriterContainer._

Expand Down