From 6ec9b47a122e18088181cf0d0aefd562d0519292 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Sun, 29 Sep 2024 22:51:09 +0200 Subject: [PATCH] Uniform Iceberg --- docs/DeltaFileFormatWriter.md | 26 ++++++++++++++++++++++++ docs/DeltaOptimizedWriterExec.md | 7 +++++++ docs/TransactionalWrite.md | 35 ++++++++++++++++++++++++-------- docs/uniform/IcebergCompat.md | 26 ++++++++++++++++++++++++ mkdocs.yml | 2 ++ 5 files changed, 88 insertions(+), 8 deletions(-) create mode 100644 docs/DeltaFileFormatWriter.md create mode 100644 docs/DeltaOptimizedWriterExec.md create mode 100644 docs/uniform/IcebergCompat.md diff --git a/docs/DeltaFileFormatWriter.md b/docs/DeltaFileFormatWriter.md new file mode 100644 index 0000000000..913d4f8923 --- /dev/null +++ b/docs/DeltaFileFormatWriter.md @@ -0,0 +1,26 @@ +# DeltaFileFormatWriter + +## Write Data Out { #write } + +```scala +write( + sparkSession: SparkSession, + plan: SparkPlan, + fileFormat: FileFormat, + committer: FileCommitProtocol, + outputSpec: OutputSpec, + hadoopConf: Configuration, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + statsTrackers: Seq[WriteJobStatsTracker], + options: Map[String, String], + numStaticPartitionCols: Int = 0): Set[String] +``` + +`write`...FIXME + +--- + +`write` is used when: + +* `TransactionalWrite` is requested to [write data out](TransactionalWrite.md#writeFiles) diff --git a/docs/DeltaOptimizedWriterExec.md b/docs/DeltaOptimizedWriterExec.md new file mode 100644 index 0000000000..4f1663b182 --- /dev/null +++ b/docs/DeltaOptimizedWriterExec.md @@ -0,0 +1,7 @@ +--- +title: DeltaOptimizedWriterExec +--- + +# DeltaOptimizedWriterExec Physical Operator + +`DeltaOptimizedWriterExec` is...FIXME diff --git a/docs/TransactionalWrite.md b/docs/TransactionalWrite.md index 1ae69959c9..f22e762345 100644 --- a/docs/TransactionalWrite.md +++ b/docs/TransactionalWrite.md @@ -70,7 +70,7 @@ hasWritten: Boolean = false `hasWritten` is initially `false` and changes to `true` after [data is written out](#writeFiles). -## Writing Data Out { #writeFiles } +## Write Data Out { #writeFiles } ```scala writeFiles( @@ -81,6 +81,11 @@ writeFiles( writeFiles( inputData: Dataset[_], writeOptions: Option[DeltaOptions], + additionalConstraints: Seq[Constraint]): Seq[FileAction] // (4)! +writeFiles( + inputData: Dataset[_], + writeOptions: Option[DeltaOptions], + isOptimize: Boolean, additionalConstraints: Seq[Constraint]): Seq[FileAction] writeFiles( data: Dataset[_], @@ -90,6 +95,7 @@ writeFiles( 1. Uses no [Constraint](constraints/Constraint.md)s 2. Uses no write-related [DeltaOptions](spark-connector/DeltaOptions.md) 3. Uses no [Constraint](constraints/Constraint.md)s +4. `isOptimize` disabled `writeFiles` writes the given `data` (as a `Dataset`) to a [delta table](#deltaLog) and returns [AddFile](AddFile.md)s with [AddCDCFile](AddCDCFile.md)s (from the [DelayedCommitProtocol](#writeFiles-committer)). @@ -103,6 +109,7 @@ writeFiles( * `OptimizeExecutor` is requested to [runOptimizeBinJob](commands/optimize/OptimizeExecutor.md#runOptimizeBinJob) * `UpdateCommand` is requested to [rewriteFiles](commands/update/UpdateCommand.md#rewriteFiles) * `DeltaSink` is requested to [add a streaming micro-batch](spark-connector/DeltaSink.md#addBatch) +* `RemoveColumnMappingCommand` is requested to [write data out](commands/alter/RemoveColumnMappingCommand.md#writeData) --- @@ -169,7 +176,16 @@ Even though it is so early, `writeFiles` turns the [hasWritten](#hasWritten) fla !!! note The `DeltaInvariantCheckerExec` physical operator is later used as the physical plan to for the [data to be written out](#writeFiles-FileFormatWriter). -### Step 6.3 BasicWriteJobStatsTracker { #writeFiles-statsTrackers } +### Step 6.3 DeltaOptimizedWriterExec { #writeFiles-DeltaOptimizedWriterExec } + +`writeFiles` creates a [DeltaOptimizedWriterExec](DeltaOptimizedWriterExec.md) physical operator as the parent of the [DeltaInvariantCheckerExec](#writeFiles-physicalPlan) unary physical operator when all of the following hold true: + +1. `isOptimize` is disabled (`false`) +1. [shouldOptimizeWrite](#shouldOptimizeWrite) + +Otherwise, `writeFiles` leaves the [DeltaInvariantCheckerExec](#writeFiles-physicalPlan) unary physical operator intact. + +### Step 6.4 BasicWriteJobStatsTracker { #writeFiles-statsTrackers } `writeFiles` may or may not create a `BasicWriteJobStatsTracker` ([Spark SQL]({{ book.spark_sql }}/connectors/BasicWriteJobStatsTracker/)) based on [history.metricsEnabled](configuration-properties/index.md#history.metricsEnabled) configuration property. @@ -183,23 +199,26 @@ Metric Name | UI Description `numParts` | number of dynamic part `jobCommitTime` | job commit time -### Step 6.4 Write Options { #writeFiles-options } +### Step 6.5 Write Options { #writeFiles-options } `writeFiles` makes sure (_filters out_) that there are only the following [write options](spark-connector/DeltaOptions.md) used (from the given `writeOptions`), if specified: * [compression](spark-connector/DeltaOptions.md#COMPRESSION) * [maxRecordsPerFile](spark-connector/DeltaOptions.md#MAX_RECORDS_PER_FILE) -### Step 6.5 FileFormatWriter { #writeFiles-FileFormatWriter } +`writeFiles` adds one Uniform (Iceberg compatibility-specific) option: + +Option | Value +-|- + [writePartitionColumns](spark-connector/DeltaOptions.md#WRITE_PARTITION_COLUMNS) | [isAnyEnabled](uniform/IcebergCompat.md#isAnyEnabled) -As the very last step within the scope of the [new execution ID](#writeFiles-deltaTransactionalWrite), `writeFiles` writes out the data (using [Spark SQL]({{ book.spark_sql }}/connectors/FileFormatWriter/#write) infrastructure). +### Step 6.6 FileFormatWriter { #writeFiles-FileFormatWriter } -??? tip "Logging" - Enable `ALL` logging level for [org.apache.spark.sql.execution.datasources.FileFormatWriter]({{ book.spark_sql }}/connectors/FileFormatWriter#logging) logger to see what happens inside. +As the very last step within the scope of the [new execution ID](#writeFiles-deltaTransactionalWrite), `writeFiles` [writes out the data](DeltaFileFormatWriter.md#write). `writeFiles` uses the following (among the others): -* [DeltaInvariantCheckerExec](#writeFiles-physicalPlan) as the physical plan +* [DeltaInvariantCheckerExec](#writeFiles-physicalPlan) (possibly with [DeltaOptimizedWriterExec](#writeFiles-DeltaOptimizedWriterExec) parent) as the physical plan * The [partitioning columns](#writeFiles-partitioningColumns) * No bucketing * [DeltaJobStatisticsTracker](#writeFiles-optionalStatsTracker) and [BasicWriteJobStatsTracker](#writeFiles-statsTrackers) diff --git a/docs/uniform/IcebergCompat.md b/docs/uniform/IcebergCompat.md new file mode 100644 index 0000000000..fe0e323129 --- /dev/null +++ b/docs/uniform/IcebergCompat.md @@ -0,0 +1,26 @@ +# IcebergCompat + +## knownVersions { #knownVersions } + +`IcebergCompat` defines a collection of Iceberg Compatibility-related table properties and the versions: + + Table Property | ID +-|- + [delta.enableIcebergCompatV1](../table-properties/DeltaConfigs.md#ICEBERG_COMPAT_V1_ENABLED) | 1 + [delta.enableIcebergCompatV2](../table-properties/DeltaConfigs.md#ICEBERG_COMPAT_V2_ENABLED) | 2 + +## isAnyEnabled { #isAnyEnabled } + +```scala +isAnyEnabled( + metadata: Metadata): Boolean +``` + +`isAnyEnabled` checks if any version of the Iceberg Compatibility (table properties) is enabled (`true`) in the given [Metadata](../Metadata.md). + +--- + +`isAnyEnabled` is used when: + +* `UniversalFormat` is requested to [enforceIcebergInvariantsAndDependencies](UniversalFormat.md#enforceIcebergInvariantsAndDependencies) +* `TransactionalWrite` is requested to [write data out](../TransactionalWrite.md#writeFiles) diff --git a/mkdocs.yml b/mkdocs.yml index 5c1cfc31f5..1182ac63c8 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -327,8 +327,10 @@ nav: - ColumnWithDefaultExprUtils: ColumnWithDefaultExprUtils.md - ConflictChecker: ConflictChecker.md - CurrentTransactionInfo: CurrentTransactionInfo.md + - DeltaFileFormatWriter: DeltaFileFormatWriter.md - DeltaFileOperations: DeltaFileOperations.md - DeltaJobStatisticsTracker: DeltaJobStatisticsTracker.md + - DeltaOptimizedWriterExec: DeltaOptimizedWriterExec.md - DeltaRelation: DeltaRelation.md - DeltaTableOperations: DeltaTableOperations.md - DeltaTableUtils: DeltaTableUtils.md