diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala index 21180743a56..dd01fc0ba2f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala @@ -289,6 +289,10 @@ object DeltaOptions extends DeltaLogging { */ val STREAMING_SOURCE_TRACKING_ID = "streamingSourceTrackingId" + /** + * An option to control if delta will write partition columns to data files + */ + val WRITE_PARTITION_COLUMNS = "writePartitionColumns" val validOptionKeys : Set[String] = Set( REPLACE_WHERE_OPTION, @@ -323,7 +327,8 @@ object DeltaOptions extends DeltaLogging { "checkpointLocation", "path", VERSION_AS_OF, - TIMESTAMP_AS_OF + TIMESTAMP_AS_OF, + WRITE_PARTITION_COLUMNS ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala new file mode 100644 index 00000000000..2a73b8b852a --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/DeltaFileFormatWriter.scala @@ -0,0 +1,479 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.files + +import java.util.{Date, UUID} + +import org.apache.spark.sql.delta.DeltaOptions +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} +import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.connector.write.WriterCommitMessage +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.FileFormatWriter._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{SerializableConfiguration, Utils} + +/** + * A helper object for writing FileFormat data out to a location. + * Logic is copied from FileFormatWriter from Spark 3.5 with added functionality to write partition + * values to data files. Specifically L123-126, L132, and L140 where it adds option + * WRITE_PARTITION_COLUMNS + */ +object DeltaFileFormatWriter extends Logging { + + /** + * A variable used in tests to check whether the output ordering of the query matches the + * required ordering of the write command. + */ + private var outputOrderingMatched: Boolean = false + + /** + * A variable used in tests to check the final executed plan. + */ + private var executedPlan: Option[SparkPlan] = None + + // scalastyle:off argcount + /** + * Basic work flow of this command is: + * 1. Driver side setup, including output committer initialization and data source specific + * preparation work for the write job to be issued. + * 2. Issues a write job consists of one or more executor side tasks, each of which writes all + * rows within an RDD partition. + * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any + * exception is thrown during task commitment, also aborts that task. + * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is + * thrown during job commitment, also aborts the job. + * 5. If the job is successfully committed, perform post-commit operations such as + * processing statistics. + * @return The set of all partition paths that were updated during this write job. + */ + def write( + sparkSession: SparkSession, + plan: SparkPlan, + fileFormat: FileFormat, + committer: FileCommitProtocol, + outputSpec: OutputSpec, + hadoopConf: Configuration, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + statsTrackers: Seq[WriteJobStatsTracker], + options: Map[String, String], + numStaticPartitionCols: Int = 0): Set[String] = { + require(partitionColumns.size >= numStaticPartitionCols) + + val job = Job.getInstance(hadoopConf) + job.setOutputKeyClass(classOf[Void]) + job.setOutputValueClass(classOf[InternalRow]) + FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) + + val partitionSet = AttributeSet(partitionColumns) + // cleanup the internal metadata information of + // the file source metadata attribute if any before write out + val finalOutputSpec = outputSpec.copy( + outputColumns = outputSpec.outputColumns + .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation) + ) + val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains) + + val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options) + val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns) + + val caseInsensitiveOptions = CaseInsensitiveMap(options) + + val dataSchema = dataColumns.toStructType + DataSourceUtils.verifySchema(fileFormat, dataSchema) + DataSourceUtils.checkFieldNames(fileFormat, dataSchema) + // Note: prepareWrite has side effect. It sets "job". + + val outputDataColumns = + if (caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true")) { + dataColumns ++ partitionColumns + } else dataColumns + + val outputWriterFactory = + fileFormat.prepareWrite( + sparkSession, + job, + caseInsensitiveOptions, + outputDataColumns.toStructType + ) + + val description = new WriteJobDescription( + uuid = UUID.randomUUID.toString, + serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), + outputWriterFactory = outputWriterFactory, + allColumns = finalOutputSpec.outputColumns, + dataColumns = outputDataColumns, + partitionColumns = partitionColumns, + bucketSpec = writerBucketSpec, + path = finalOutputSpec.outputPath, + customPartitionLocations = finalOutputSpec.customPartitionLocations, + maxRecordsPerFile = caseInsensitiveOptions + .get("maxRecordsPerFile") + .map(_.toLong) + .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile), + timeZoneId = caseInsensitiveOptions + .get(DateTimeUtils.TIMEZONE_OPTION) + .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone), + statsTrackers = statsTrackers + ) + + // We should first sort by dynamic partition columns, then bucket id, and finally sorting + // columns. + val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ + writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns + val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) + + // SPARK-40588: when planned writing is disabled and AQE is enabled, + // plan contains an AdaptiveSparkPlanExec, which does not know + // its final plan's ordering, so we have to materialize that plan first + // it is fine to use plan further down as the final plan is cached in that plan + def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan + case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan)) + } + + // the sort order doesn't matter + val actualOrdering = writeFilesOpt + .map(_.child) + .getOrElse(materializeAdaptiveSparkPlan(plan)) + .outputOrdering + val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) + + SQLExecution.checkSQLExecutionId(sparkSession) + + // propagate the description UUID into the jobs, so that committers + // get an ID guaranteed to be unique. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) + + // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will add logical sort + // operator based on the required ordering of the V1 write command. So the output + // ordering of the physical plan should always match the required ordering. Here + // we set the variable to verify this behavior in tests. + // There are two cases where FileFormatWriter still needs to add physical sort: + // 1) When the planned write config is disabled. + // 2) When the concurrent writers are enabled (in this case the required ordering of a + // V1 write command will be empty). + if (Utils.isTesting) outputOrderingMatched = orderingMatched + + if (writeFilesOpt.isDefined) { + // build `WriteFilesSpec` for `WriteFiles` + val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => { + val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec) + createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns) + } + val writeSpec = WriteFilesSpec( + description = description, + committer = committer, + concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc + ) + executeWrite(sparkSession, plan, writeSpec, job) + } else { + executeWrite( + sparkSession, + plan, + job, + description, + committer, + outputSpec, + requiredOrdering, + partitionColumns, + sortColumns, + orderingMatched + ) + } + } + // scalastyle:on argcount + + private def executeWrite( + sparkSession: SparkSession, + plan: SparkPlan, + job: Job, + description: WriteJobDescription, + committer: FileCommitProtocol, + outputSpec: OutputSpec, + requiredOrdering: Seq[Expression], + partitionColumns: Seq[Attribute], + sortColumns: Seq[Attribute], + orderingMatched: Boolean): Set[String] = { + val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns) + val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan + + writeAndCommit(job, description, committer) { + val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) { + (empty2NullPlan, None) + } else { + val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, outputSpec) + val concurrentOutputWriterSpec = + createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns) + if (concurrentOutputWriterSpec.isDefined) { + (empty2NullPlan, concurrentOutputWriterSpec) + } else { + (sortPlan, concurrentOutputWriterSpec) + } + } + + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planToExecute) + + val rdd = planToExecute.execute() + + // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single + // partition rdd to make sure we at least set up one write task to write the metadata. + val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) { + sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1) + } else { + rdd + } + + val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date()) + val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length) + sparkSession.sparkContext.runJob( + rddWithNonEmptyPartitions, + (taskContext: TaskContext, iter: Iterator[InternalRow]) => { + executeTask( + description = description, + jobTrackerID = jobTrackerID, + sparkStageId = taskContext.stageId(), + sparkPartitionId = taskContext.partitionId(), + sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, + committer, + iterator = iter, + concurrentOutputWriterSpec = concurrentOutputWriterSpec + ) + }, + rddWithNonEmptyPartitions.partitions.indices, + (index, res: WriteTaskResult) => { + committer.onTaskCommit(res.commitMsg) + ret(index) = res + } + ) + ret + } + } + + private def writeAndCommit( + job: Job, + description: WriteJobDescription, + committer: FileCommitProtocol)(f: => Array[WriteTaskResult]): Set[String] = { + // This call shouldn't be put into the `try` block below because it only initializes and + // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + committer.setupJob(job) + try { + val ret = f + val commitMsgs = ret.map(_.commitMsg) + + logInfo(s"Start to commit write Job ${description.uuid}.") + val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) } + logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.") + + processStats(description.statsTrackers, ret.map(_.summary.stats), duration) + logInfo(s"Finished processing stats for write job ${description.uuid}.") + + // return a set of all the partition paths that were updated during this job + ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty) + } catch { + case cause: Throwable => + logError(s"Aborting job ${description.uuid}.", cause) + committer.abortJob(job) + throw cause + } + } + + /** + * Write files using [[SparkPlan.executeWrite]] + */ + private def executeWrite( + session: SparkSession, + planForWrites: SparkPlan, + writeFilesSpec: WriteFilesSpec, + job: Job): Set[String] = { + val committer = writeFilesSpec.committer + val description = writeFilesSpec.description + + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planForWrites) + + writeAndCommit(job, description, committer) { + val rdd = planForWrites.executeWrite(writeFilesSpec) + val ret = new Array[WriteTaskResult](rdd.partitions.length) + session.sparkContext.runJob( + rdd, + (context: TaskContext, iter: Iterator[WriterCommitMessage]) => { + assert(iter.hasNext) + val commitMessage = iter.next() + assert(!iter.hasNext) + commitMessage + }, + rdd.partitions.indices, + (index, res: WriterCommitMessage) => { + assert(res.isInstanceOf[WriteTaskResult]) + val writeTaskResult = res.asInstanceOf[WriteTaskResult] + committer.onTaskCommit(writeTaskResult.commitMsg) + ret(index) = writeTaskResult + } + ) + ret + } + } + + private def createSortPlan( + plan: SparkPlan, + requiredOrdering: Seq[Expression], + outputSpec: OutputSpec): SortExec = { + // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and + // the physical plan may have different attribute ids due to optimizer removing some + // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. + val orderingExpr = + bindReferences(requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns) + SortExec(orderingExpr, global = false, child = plan) + } + + private def createConcurrentOutputWriterSpec( + sparkSession: SparkSession, + sortPlan: SortExec, + sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = { + val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters + val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty + if (concurrentWritersEnabled) { + Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())) + } else { + None + } + } + + /** Writes data out in a single Spark task. */ + private def executeTask( + description: WriteJobDescription, + jobTrackerID: String, + sparkStageId: Int, + sparkPartitionId: Int, + sparkAttemptNumber: Int, + committer: FileCommitProtocol, + iterator: Iterator[InternalRow], + concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = { + + val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId) + val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) + val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) + + // Set up the attempt context required to use in the output committer. + val taskAttemptContext: TaskAttemptContext = { + // Set up the configuration object + val hadoopConf = description.serializableHadoopConf.value + hadoopConf.set("mapreduce.job.id", jobId.toString) + hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) + hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString) + hadoopConf.setBoolean("mapreduce.task.ismap", true) + hadoopConf.setInt("mapreduce.task.partition", 0) + + new TaskAttemptContextImpl(hadoopConf, taskAttemptId) + } + + committer.setupTask(taskAttemptContext) + + val dataWriter = + if (sparkPartitionId != 0 && !iterator.hasNext) { + // In case of empty job, leave first partition to save meta for file format like parquet. + new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) + } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { + new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + } else { + concurrentOutputWriterSpec match { + case Some(spec) => + new DynamicPartitionDataConcurrentWriter( + description, + taskAttemptContext, + committer, + spec + ) + case _ => + new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + } + } + + try { + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + // Execute the task to write rows out and commit the task. + dataWriter.writeWithIterator(iterator) + dataWriter.commit() + })(catchBlock = { + // If there is an error, abort the task + dataWriter.abort() + logError(s"Job $jobId aborted.") + }, finallyBlock = { + dataWriter.close() + }) + } catch { + case e: FetchFailedException => + throw e + case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput => + // If any output file to write already exists, it does not make sense to re-run this task. + // We throw the exception and let Executor throw ExceptionFailure to abort the job. + throw new TaskOutputFileAlreadyExistException(f) + case t: Throwable => + throw QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t) + } + } + + /** + * For every registered [[WriteJobStatsTracker]], call `processStats()` on it, passing it + * the corresponding [[WriteTaskStats]] from all executors. + */ + private def processStats( + statsTrackers: Seq[WriteJobStatsTracker], + statsPerTask: Seq[Seq[WriteTaskStats]], + jobCommitDuration: Long): Unit = { + + val numStatsTrackers = statsTrackers.length + assert( + statsPerTask.forall(_.length == numStatsTrackers), + s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker. + |There are $numStatsTrackers statsTrackers, but some task returned + |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead. + """.stripMargin + ) + + val statsPerTracker = if (statsPerTask.nonEmpty) { + statsPerTask.transpose + } else { + statsTrackers.map(_ => Seq.empty) + } + + statsTrackers.zip(statsPerTracker).foreach { + case (statsTracker, stats) => statsTracker.processStats(stats, jobCommitDuration) + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala index 5a5c6b5b68b..33506412c42 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala @@ -415,19 +415,21 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl statsTrackers.append(basicWriteJobStatsTracker) } + // Iceberg spec requires partition columns in data files + val writePartitionColumns = IcebergCompatV1.isEnabled(metadata) // Retain only a minimal selection of Spark writer options to avoid any potential // compatibility issues - val options = writeOptions match { + val options = (writeOptions match { case None => Map.empty[String, String] case Some(writeOptions) => writeOptions.options.filterKeys { key => key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) || key.equalsIgnoreCase(DeltaOptions.COMPRESSION) }.toMap - } + }) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString) try { - FileFormatWriter.write( + DeltaFileFormatWriter.write( sparkSession = spark, plan = physicalPlan, fileFormat = deltaLog.fileFormat(protocol, metadata), // TODO support changing formats.