Skip to content

Commit

Permalink
[CARMEL-6280] Data download query stuck in job commit phase (#1084)
Browse files Browse the repository at this point in the history
* [CARMEL-6280] Data download query stuck in job commit phase

* Add more.
  • Loading branch information
fenzhu authored and GitHub Enterprise committed Oct 14, 2022
1 parent aa62055 commit e0ff530
Showing 1 changed file with 51 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,14 @@ private[hive] class SparkDownloadDataOperation(
(output, result.schema)
}

result.select(castCols: _*).write
.options(writeOptions)
.option("header", "false")
.format(outputFormat)
.mode(SaveMode.Overwrite)
.save(step1Path.toString)
withCommitAlgorithmV2 {
result.select(castCols: _*).write
.options(writeOptions)
.option("header", "false")
.format(outputFormat)
.mode(SaveMode.Overwrite)
.save(step1Path.toString)
}
val contentSummary = fs.getContentSummary(step1Path)
val dataSize = contentSummary.getLength
val fileCount = contentSummary.getFileCount
Expand All @@ -419,18 +421,20 @@ private[hive] class SparkDownloadDataOperation(
}

if (!isSortable && coalesceNum > 0) {
sqlContext.read
.schema(readSchema)
.format(outputFormat)
.options(writeOptions)
.option("header", "false")
.load(step1Path.toString)
.coalesce(coalesceNum)
.write
.options(writeOptions)
.format(outputFormat)
.mode(SaveMode.Overwrite)
.save(step2Path.toString)
withCommitAlgorithmV2 {
sqlContext.read
.schema(readSchema)
.format(outputFormat)
.options(writeOptions)
.option("header", "false")
.load(step1Path.toString)
.coalesce(coalesceNum)
.write
.options(writeOptions)
.format(outputFormat)
.mode(SaveMode.Overwrite)
.save(step2Path.toString)
}

step2Path
} else {
Expand Down Expand Up @@ -483,11 +487,13 @@ private[hive] class SparkDownloadDataOperation(
result.repartition()
}

writePlan.write
.options(writeOptions)
.format(outputFormat)
.mode(SaveMode.Overwrite)
.save(outputPath.toString)
withCommitAlgorithmV2 {
writePlan.write
.options(writeOptions)
.format(outputFormat)
.mode(SaveMode.Overwrite)
.save(outputPath.toString)
}

val contentSummary = fs.getContentSummary(outputPath)
val dataSize = contentSummary.getLength
Expand Down Expand Up @@ -663,6 +669,28 @@ private[hive] class SparkDownloadDataOperation(
}
sqlContext.sparkContext.closeJobGroup(statementId)
}

private def withCommitAlgorithmV2[T](f: => T): T = {
val originalFileOutputCommitterAlgorithm = sqlContext.sessionState.conf.getConfString(
org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "1")
if (!originalFileOutputCommitterAlgorithm.equals("2")) {
sqlContext.sessionState.conf.setConfString(
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.
FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "2")
logInfo("Set file output committer algorithm as version 2 for download")
}
val res = f
if (!originalFileOutputCommitterAlgorithm.equals("2")) {
sqlContext.sessionState.conf.setConfString(
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.
FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
originalFileOutputCommitterAlgorithm)
logInfo(s"Set file output committer algorithm " +
s"back to version $originalFileOutputCommitterAlgorithm")
}
res
}
}

object SparkDownloadDataOperation {
Expand Down

0 comments on commit e0ff530

Please sign in to comment.