Skip to content

Commit

Permalink
[CARMEL-6358] Change to file commit algorithm V2 in CTAS command for …
Browse files Browse the repository at this point in the history
…Hive table (#1125)
  • Loading branch information
fenzhu authored and GitHub Enterprise committed Nov 11, 2022
1 parent 9c158d8 commit 8a94223
Showing 1 changed file with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution

import scala.util.control.NonFatal

import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
Expand Down Expand Up @@ -73,8 +75,14 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
sparkSession.sessionState.catalog.validateTableLocation(tableDesc)
catalog.createTable(
tableDesc.copy(schema = tableSchema), ignoreIfExists = false)

val originalFileOutputCommitterAlgorithm = sparkSession.sessionState.conf.getConfString(
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "1")
try {
if (!originalFileOutputCommitterAlgorithm.equals("2")) {
sparkSession.sessionState.conf.setConfString(
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "2")
logInfo("Set file output committer algorithm as version 2 when CTAS for Hive table")
}
// Read back the metadata of the table which was created just now.
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
val command = getWritingCommand(catalog, createdTableMeta, tableExists = false)
Expand All @@ -87,6 +95,16 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
// drop the created table.
catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false)
throw e
} finally {
val currentFileOutputCommitterAlgorithm = sparkSession.sessionState.conf.getConfString(
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "1")
if (!currentFileOutputCommitterAlgorithm.equals(originalFileOutputCommitterAlgorithm)) {
sparkSession.sessionState.conf.setConfString(
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
originalFileOutputCommitterAlgorithm)
logInfo(s"Set file output committer algorithm " +
s"back to version $originalFileOutputCommitterAlgorithm")
}
}
}

Expand Down

0 comments on commit 8a94223

Please sign in to comment.