diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkDownloadDataOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkDownloadDataOperation.scala index 8c93053a20f93..64e2d65fe544e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkDownloadDataOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkDownloadDataOperation.scala @@ -387,12 +387,14 @@ private[hive] class SparkDownloadDataOperation( (output, result.schema) } - result.select(castCols: _*).write - .options(writeOptions) - .option("header", "false") - .format(outputFormat) - .mode(SaveMode.Overwrite) - .save(step1Path.toString) + withCommitAlgorithmV2 { + result.select(castCols: _*).write + .options(writeOptions) + .option("header", "false") + .format(outputFormat) + .mode(SaveMode.Overwrite) + .save(step1Path.toString) + } val contentSummary = fs.getContentSummary(step1Path) val dataSize = contentSummary.getLength val fileCount = contentSummary.getFileCount @@ -419,18 +421,20 @@ private[hive] class SparkDownloadDataOperation( } if (!isSortable && coalesceNum > 0) { - sqlContext.read - .schema(readSchema) - .format(outputFormat) - .options(writeOptions) - .option("header", "false") - .load(step1Path.toString) - .coalesce(coalesceNum) - .write - .options(writeOptions) - .format(outputFormat) - .mode(SaveMode.Overwrite) - .save(step2Path.toString) + withCommitAlgorithmV2 { + sqlContext.read + .schema(readSchema) + .format(outputFormat) + .options(writeOptions) + .option("header", "false") + .load(step1Path.toString) + .coalesce(coalesceNum) + .write + .options(writeOptions) + .format(outputFormat) + .mode(SaveMode.Overwrite) + .save(step2Path.toString) + } step2Path } else { @@ -483,11 +487,13 @@ private[hive] class SparkDownloadDataOperation( result.repartition() } - writePlan.write - .options(writeOptions) - .format(outputFormat) - .mode(SaveMode.Overwrite) - .save(outputPath.toString) + withCommitAlgorithmV2 { + writePlan.write + .options(writeOptions) + .format(outputFormat) + .mode(SaveMode.Overwrite) + .save(outputPath.toString) + } val contentSummary = fs.getContentSummary(outputPath) val dataSize = contentSummary.getLength @@ -663,6 +669,28 @@ private[hive] class SparkDownloadDataOperation( } sqlContext.sparkContext.closeJobGroup(statementId) } + + private def withCommitAlgorithmV2[T](f: => T): T = { + val originalFileOutputCommitterAlgorithm = sqlContext.sessionState.conf.getConfString( + org.apache.hadoop.mapreduce.lib.output. + FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "1") + if (!originalFileOutputCommitterAlgorithm.equals("2")) { + sqlContext.sessionState.conf.setConfString( + org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter. + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "2") + logInfo("Set file output committer algorithm as version 2 for download") + } + val res = f + if (!originalFileOutputCommitterAlgorithm.equals("2")) { + sqlContext.sessionState.conf.setConfString( + org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter. + FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + originalFileOutputCommitterAlgorithm) + logInfo(s"Set file output committer algorithm " + + s"back to version $originalFileOutputCommitterAlgorithm") + } + res + } } object SparkDownloadDataOperation {