From 950777091eb7b88a8d71a689d4a7e23aab10126f Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Mon, 30 Sep 2024 20:38:45 +0200 Subject: [PATCH] TransactionalWrite and WriteIntoDeltaLikes --- docs/TransactionalWrite.md | 7 ++-- docs/commands/WriteIntoDelta.md | 33 +++++++++++++++++-- docs/commands/WriteIntoDeltaLike.md | 36 ++++++++++++++++++++- docs/commands/merge/MergeIntoCommandBase.md | 8 ++--- 4 files changed, 73 insertions(+), 11 deletions(-) diff --git a/docs/TransactionalWrite.md b/docs/TransactionalWrite.md index 33e0322f6..1d6f7fed4 100644 --- a/docs/TransactionalWrite.md +++ b/docs/TransactionalWrite.md @@ -103,14 +103,13 @@ writeFiles( `writeFiles` is used when: -* `WriteIntoDelta` is requested to [writeAndReturnCommitData](commands/WriteIntoDelta.md#writeAndReturnCommitData) * `DeleteCommand` is requested to [rewriteFiles](commands/delete/DeleteCommand.md#rewriteFiles) -* `MergeIntoCommandBase` is requested to [writeFiles](commands/merge/MergeIntoCommandBase.md#writeFiles) -* `UpdateCommand` is requested to [rewriteFiles](commands/update/UpdateCommand.md#rewriteFiles) -* `WriteIntoDelta` is requested to [writeAndReturnCommitData](commands/WriteIntoDelta.md#writeAndReturnCommitData), [writeFiles](commands/WriteIntoDelta.md#writeFiles) * `DeltaSink` is requested to [add a streaming micro-batch](spark-connector/DeltaSink.md#addBatch) +* `MergeIntoCommandBase` is requested to [write data out](commands/merge/MergeIntoCommandBase.md#writeFiles) * `OptimizeExecutor` is requested to [runOptimizeBinJob](commands/optimize/OptimizeExecutor.md#runOptimizeBinJob) * `RemoveColumnMappingCommand` is requested to [write data out](commands/alter/RemoveColumnMappingCommand.md#writeData) +* `UpdateCommand` is requested to [rewriteFiles](commands/update/UpdateCommand.md#rewriteFiles) +* `WriteIntoDelta` is requested to [writeAndReturnCommitData](commands/WriteIntoDelta.md#writeAndReturnCommitData) (and [writeFiles](commands/WriteIntoDelta.md#writeFiles)) --- diff --git a/docs/commands/WriteIntoDelta.md b/docs/commands/WriteIntoDelta.md index db0e3f991..352023e1d 100644 --- a/docs/commands/WriteIntoDelta.md +++ b/docs/commands/WriteIntoDelta.md @@ -150,7 +150,7 @@ val writeCmd = WriteIntoDelta( writeCmd.run(spark) ``` -## canOverwriteSchema +## canOverwriteSchema { #canOverwriteSchema } ??? note "ImplicitMetadataOperation" @@ -166,7 +166,7 @@ writeCmd.run(spark) 1. This `WriteIntoDelta` is [overwrite](#isOverwriteOperation) operation 1. [replaceWhere](../spark-connector/DeltaWriteOptions.md#replaceWhere) option is not defined (in the [DeltaOptions](#options)) -## isOverwriteOperation +## isOverwriteOperation { #isOverwriteOperation } ```scala isOverwriteOperation: Boolean @@ -174,6 +174,35 @@ isOverwriteOperation: Boolean `isOverwriteOperation` is `true` for the [SaveMode](#mode) to be `SaveMode.Overwrite`. +--- + `isOverwriteOperation` is used when: * `WriteIntoDelta` is requested for the [canOverwriteSchema](#canOverwriteSchema) and to [write](#write) + +## writeAndReturnCommitData { #writeAndReturnCommitData } + +??? note "WriteIntoDeltaLike" + + ```scala + writeAndReturnCommitData( + txn: OptimisticTransaction, + sparkSession: SparkSession, + clusterBySpecOpt: Option[ClusterBySpec] = None, + isTableReplace: Boolean = false): TaggedCommitData[Action] + ``` + + `writeAndReturnCommitData` is part of the [WriteIntoDeltaLike](WriteIntoDeltaLike.md#writeAndReturnCommitData) abstraction. + +`writeAndReturnCommitData`...FIXME + +### writeFiles { #writeFiles } + +```scala +writeFiles( + txn: OptimisticTransaction, + data: DataFrame, + options: DeltaOptions): Seq[FileAction] +``` + +`writeFiles`...FIXME diff --git a/docs/commands/WriteIntoDeltaLike.md b/docs/commands/WriteIntoDeltaLike.md index e59d9efd7..f8cb062be 100644 --- a/docs/commands/WriteIntoDeltaLike.md +++ b/docs/commands/WriteIntoDeltaLike.md @@ -1,3 +1,37 @@ # WriteIntoDeltaLike -`WriteIntoDeltaLike` is...FIXME +`WriteIntoDeltaLike` is an [abstraction](#contract) of [commands](#implementations) that can write data out into delta tables. + +## Contract (Subset) + +### writeAndReturnCommitData { #writeAndReturnCommitData } + +```scala +writeAndReturnCommitData( + txn: OptimisticTransaction, + sparkSession: SparkSession, + clusterBySpecOpt: Option[ClusterBySpec] = None, + isTableReplace: Boolean = false): TaggedCommitData[Action] +``` + +Used when: + +* `CreateDeltaTableCommand` is requested to [handleCreateTableAsSelect](create-table/CreateDeltaTableCommand.md#handleCreateTableAsSelect) +* `WriteIntoDelta` is [executed](WriteIntoDelta.md#run) +* `WriteIntoDeltaLike` is requested to [write data out](#write) + +## Implementations + +* [WriteIntoDelta](WriteIntoDelta.md) + +## Write Data Out { #write } + +```scala +write( + txn: OptimisticTransaction, + sparkSession: SparkSession, + clusterBySpecOpt: Option[ClusterBySpec] = None, + isTableReplace: Boolean = false): Seq[Action] +``` + +`write` [writeAndReturnCommitData](#writeAndReturnCommitData) and returns the [Action](../Action.md)s. diff --git a/docs/commands/merge/MergeIntoCommandBase.md b/docs/commands/merge/MergeIntoCommandBase.md index c04900153..6f114120f 100644 --- a/docs/commands/merge/MergeIntoCommandBase.md +++ b/docs/commands/merge/MergeIntoCommandBase.md @@ -464,14 +464,14 @@ Usage | Metric Name | valueToReturn ------|-------------|-------------- `ClassicMergeExecutor` to [find files to rewrite](ClassicMergeExecutor.md#findTouchedFiles) | [numSourceRows](#numSourceRows) | `true` `ClassicMergeExecutor` to [write out merge changes](ClassicMergeExecutor.md#writeAllChanges) | [numSourceRowsInSecondScan](#numSourceRowsInSecondScan) | `true` - | [numTargetRowsCopied](#numTargetRowsCopied) | `false` +   | [numTargetRowsCopied](#numTargetRowsCopied) | `false` `InsertOnlyMergeExecutor` to [write out inserts](InsertOnlyMergeExecutor.md#writeOnlyInserts) | [numSourceRows](#numSourceRows) or [numSourceRowsInSecondScan](#numSourceRowsInSecondScan) | `true` `InsertOnlyMergeExecutor` to [generateInsertsOnlyOutputCols](InsertOnlyMergeExecutor.md#generateInsertsOnlyOutputCols) | [numTargetRowsInserted](#numTargetRowsInserted) | `false` `MergeOutputGeneration` to [generateAllActionExprs](MergeOutputGeneration.md#generateAllActionExprs) | [numTargetRowsUpdated](#numTargetRowsUpdated) | `false` - | [numTargetRowsDeleted](#numTargetRowsDeleted) | `true` - | [numTargetRowsInserted](#numTargetRowsInserted) | `false` +   | [numTargetRowsDeleted](#numTargetRowsDeleted) | `true` +   | [numTargetRowsInserted](#numTargetRowsInserted) | `false` -## Writing Data(Frame) Out to Delta Table { #writeFiles } +## Write Data(Frame) Out to Delta Table { #writeFiles } ```scala writeFiles(