Skip to content

Commit

Permalink
Creating SetTransaction Action, Liquid Clustering and the DMLs
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Feb 10, 2024
1 parent 0991019 commit 155bf40
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 8 deletions.
20 changes: 19 additions & 1 deletion docs/ImplicitMetadataOperation.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ normalizePartitionColumns(

`normalizePartitionColumns`...FIXME

### <span id="mergeSchema"> mergeSchema
### mergeSchema { #mergeSchema }

```scala
mergeSchema(
Expand All @@ -147,6 +147,24 @@ mergeSchema(

`mergeSchema`...FIXME

## New Domain Metadata { #getNewDomainMetadata }

```scala
getNewDomainMetadata(
txn: OptimisticTransaction,
canUpdateMetadata: Boolean,
isReplacingTable: Boolean,
clusterBySpecOpt: Option[ClusterBySpec] = None): Seq[DomainMetadata]
```

`getNewDomainMetadata`...FIXME

---

`getNewDomainMetadata` is used when:

* `WriteIntoDelta` command is requested to [write data out](commands/WriteIntoDelta.md#write)

## Logging

`ImplicitMetadataOperation` is an abstract class and logging is configured using the logger of the [implementations](#implementations).
26 changes: 26 additions & 0 deletions docs/commands/DeltaCommand.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,32 @@ sendDriverMetrics(
* `MergeIntoCommand` is requested to [run merge](merge/MergeIntoCommand.md#runMerge)
* `UpdateCommand` is requested to [run](update/UpdateCommand.md#run) (and [performUpdate](update/UpdateCommand.md#performUpdate))

## Creating SetTransaction Action { #createSetTransaction }

```scala
createSetTransaction(
sparkSession: SparkSession,
deltaLog: DeltaLog,
options: Option[DeltaOptions] = None): Option[SetTransaction]
```

??? note "`options` Argument"
`options` is undefined (`None`) by default and is only defined when `WriteIntoDelta` is requested to [write data out](WriteIntoDelta.md#write).

`createSetTransaction` creates (and returns) a new [SetTransaction](../SetTransaction.md) when the [transaction version and application ID](#getTxnVersionAndAppId) are both available (in either the given `SparkSession` or [DeltaOptions](../spark-connector/DeltaOptions.md)).

??? note "spark.databricks.delta.write.txnVersion.autoReset.enabled"
`createSetTransaction` does something extra with [spark.databricks.delta.write.txnVersion.autoReset.enabled](../configuration-properties/index.md#spark.databricks.delta.write.txnVersion.autoReset.enabled) enabled.

---

`createSetTransaction` is used when:

* `DeleteCommand` is requested to [performDelete](delete/DeleteCommand.md#performDelete)
* `MergeIntoCommand` is requested to [commitAndRecordStats](merge/MergeIntoCommand.md#commitAndRecordStats)
* `UpdateCommand` is requested to [performUpdate](update/UpdateCommand.md#performUpdate)
* `WriteIntoDelta` is requested to [write data out](WriteIntoDelta.md#write)

## Logging

`DeltaCommand` is an abstract class and logging is configured using the logger of the [implementations](#implementations).
39 changes: 39 additions & 0 deletions docs/commands/DeltaDynamicPartitionOverwriteCommand.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# DeltaDynamicPartitionOverwriteCommand

`DeltaDynamicPartitionOverwriteCommand` is a `RunnableCommand` ([Spark SQL]({{ book.spark_sql }}/logical-operators/RunnableCommand/)) and a `V2WriteCommand` ([Spark SQL]({{ book.spark_sql }}/logical-operators/V2WriteCommand/)) for dynamic partition overwrite using [WriteIntoDelta](WriteIntoDelta.md).

`DeltaDynamicPartitionOverwriteCommand` sets [partitionOverwriteMode](../spark-connector/options.md#partitionOverwriteMode) option as `DYNAMIC` before [write](#run).

!!! note "Workaround"
`DeltaDynamicPartitionOverwriteCommand` is a workaround of Spark SQL not supporting V1 fallback for dynamic partition overwrite.

## Creating Instance

`DeltaDynamicPartitionOverwriteCommand` takes the following to be created:

* <span id="table"> Delta Table (`NamedRelation`)
* <span id="deltaTable"> [DeltaTableV2](../DeltaTableV2.md)
* <span id="query"> Logical Query Plan (`LogicalPlan`)
* <span id="writeOptions"> Write options (`Map[String, String]`)
* <span id="isByName"> `isByName` flag
* <span id="analyzedQuery"> Analyzed Logical Query Plan (`Option[LogicalPlan]`)

`DeltaDynamicPartitionOverwriteCommand` is created when:

* `DeltaAnalysis` logical resolution rule is requested to [resolve an OverwritePartitionsDynamic on a delta table](../DeltaAnalysis.md#apply) (`INSERT OVERWRITE` with dynamic partition overwrite)

??? note "OverwritePartitionsDynamic Unary Logical Command"
Learn more about `OverwritePartitionsDynamic` unary logical command in [The Internals of Spark SQL]({{ book.spark_sql }}/logical-operators/OverwritePartitionsDynamic/).

## Executing Command { #run }

??? note "RunnableCommand"

```scala
run(
sparkSession: SparkSession): Seq[Row]
```

`run` is part of the `RunnableCommand` ([Spark SQL]({{ book.spark_sql }}/logical-operators/RunnableCommand/#run)) abstraction.

`run`...FIXME
4 changes: 2 additions & 2 deletions docs/commands/WriteIntoDelta.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# WriteIntoDelta Command

`WriteIntoDelta` is a [delta command](DeltaCommand.md) that can write [data(frame)](#data) transactionally into a [delta table](#deltaLog).
`WriteIntoDelta` is a [delta command](DeltaCommand.md) that can write [data(frame)](#data) out transactionally into a [delta table](#deltaLog) (that can be already available or is about to be created).

`WriteIntoDelta` is a `LeafRunnableCommand` ([Spark SQL]({{ book.spark_sql }}/logical-operators/LeafRunnableCommand)) logical operator.

Expand All @@ -18,7 +18,7 @@

`WriteIntoDelta` is created when:

* `DeltaDynamicPartitionOverwriteCommand` is executed
* [DeltaDynamicPartitionOverwriteCommand](DeltaDynamicPartitionOverwriteCommand.md) is executed
* `DeltaLog` is requested to [create an insertable HadoopFsRelation](../DeltaLog.md#createRelation) (when `DeltaDataSource` is requested to create a relation as a [CreatableRelationProvider](../spark-connector/DeltaDataSource.md#CreatableRelationProvider) or a [RelationProvider](../spark-connector/DeltaDataSource.md#RelationProvider))
* `DeltaCatalog` is requested to [create a delta table](../DeltaCatalog.md#createDeltaTable)
* `WriteIntoDeltaBuilder` is requested to [build a V1Write](../WriteIntoDeltaBuilder.md#build)
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration-properties/DeltaSQLConf.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,7 @@
## streaming.unsafeReadOnIncompatibleColumnMappingSchemaChanges.enabled { #DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES }

[spark.databricks.delta.streaming.unsafeReadOnIncompatibleColumnMappingSchemaChanges.enabled](index.md#streaming.unsafeReadOnIncompatibleColumnMappingSchemaChanges.enabled)

## write.txnVersion.autoReset.enabled { #DELTA_IDEMPOTENT_DML_AUTO_RESET_ENABLED }

[spark.databricks.delta.write.txnVersion.autoReset.enabled](index.md#write.txnVersion.autoReset.enabled)
45 changes: 44 additions & 1 deletion docs/configuration-properties/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ Used when:

### <span id="optimize.minFileSize"><span id="DELTA_OPTIMIZE_MIN_FILE_SIZE"> optimize.minFileSize

**spark.databricks.delta.optimize.minFileSize** (internal) Files which are smaller than this threshold (in bytes) will be grouped together and rewritten as larger files by the [OPTIMIZE](../sql/index.md#OPTIMIZE) command.
**spark.databricks.delta.optimize.minFileSize**

(internal) Files which are smaller than this threshold (in bytes) will be grouped together and rewritten as larger files by the [OPTIMIZE](../sql/index.md#OPTIMIZE) command.

Default: `1024 * 1024 * 1024` (`1GB`)

Expand Down Expand Up @@ -668,6 +670,47 @@ Default: `false`

Enabling may result hitting rate limits on some storage backends. When enabled, parallelization is controlled by the default number of shuffle partitions.

### <span id="write.txnAppId"><span id="DELTA_IDEMPOTENT_DML_TXN_APP_ID"> write.txnAppId { #spark.databricks.delta.write.txnAppId }

**spark.databricks.delta.write.txnAppId**

**(internal)** The user-defined application ID a write will be committed with.
If specified, [spark.databricks.delta.write.txnVersion](#spark.databricks.delta.write.txnVersion) has to be set, too.

Default: (undefined)

Used when:

* `DeltaCommand` is requested for the [txnVersion and txnAppId](../commands/DeltaCommand.md#getTxnVersionAndAppId)

### <span id="write.txnVersion"><span id="DELTA_IDEMPOTENT_DML_TXN_VERSION"> write.txnVersion { #spark.databricks.delta.write.txnVersion }

**spark.databricks.delta.write.txnVersion**

**(internal)** The user-defined transaction version a write will be committed with.
If specified, [spark.databricks.delta.write.txnAppId](#spark.databricks.delta.write.txnAppId) has to be set, too.
To ensure idempotency, `txnVersion`s across different writes need to be monotonically increasing.

Default: (undefined)

Used when:

* `DeltaCommand` is requested to [create a SetTransaction action](../commands/DeltaCommand.md#createSetTransaction), for the [txnVersion and txnAppId](../commands/DeltaCommand.md#getTxnVersionAndAppId) and [hasBeenExecuted](../commands/DeltaCommand.md#hasBeenExecuted)

### <span id="write.txnVersion.autoReset.enabled"><span id="DELTA_IDEMPOTENT_DML_AUTO_RESET_ENABLED"> write.txnVersion.autoReset.enabled

**spark.databricks.delta.write.txnVersion.autoReset.enabled**

**(internal)** When enabled, automatically resets [spark.databricks.delta.write.txnVersion](#spark.databricks.delta.write.txnVersion) after every write

If the `txnAppId` and `txnVersion` both [come from the session config](../commands/DeltaCommand.md#getTxnVersionAndAppId) (based on [write.txnAppId](#write.txnAppId) and [write.txnVersion](#write.txnVersion), respectively), [spark.databricks.delta.write.txnVersion](#spark.databricks.delta.write.txnVersion) is reset (_unset_), after skipping the current transaction, as a safety measure to prevent data loss if the user forgets to manually reset `txnVersion`

Default: `false`

Used when:

* `DeltaCommand` is requested to [create a SetTransaction action](../commands/DeltaCommand.md#createSetTransaction) and [hasBeenExecuted](../commands/DeltaCommand.md#hasBeenExecuted)

## spark.delta.logStore.class { #spark.delta.logStore.class }

The fully-qualified class name of a [LogStore](../storage/LogStore.md)
Expand Down
24 changes: 20 additions & 4 deletions docs/liquid-clustering/ClusteredTableUtilsBase.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,40 @@ removeClusteringColumnsProperty(

`removeClusteringColumnsProperty` is used when:

* `CreateDeltaTableCommand` is requested to [getProvidedMetadata](../commands/create-table/CreateDeltaTableCommand.md#getProvidedMetadata)
* `CreateDeltaTableCommand` is requested for the [provided metadata](../commands/create-table/CreateDeltaTableCommand.md#getProvidedMetadata)

## getDomainMetadataOptional { #getDomainMetadataOptional }
## Domain Metadata { #getDomainMetadataOptional }

```scala
getDomainMetadataOptional(
table: CatalogTable,
txn: OptimisticTransaction): Option[DomainMetadata] // (1)!
getDomainMetadataOptional(
clusterBySpecOpt: Option[ClusterBySpec],
txn: OptimisticTransaction): Option[DomainMetadata]
```

`getDomainMetadataOptional`...FIXME
1. Uses [getClusterBySpecOptional](#getClusterBySpecOptional) with the given `CatalogTable` ([Spark SQL]({{ book.spark_sql }}/catalog/CatalogTable))

`getDomainMetadataOptional` [validateClusteringColumnsInStatsSchema](#validateClusteringColumnsInStatsSchema) in the given [ClusterBySpec](ClusterBySpec.md), if specified.

`getDomainMetadataOptional` [createDomainMetadata](#createDomainMetadata) with the [column names](ClusterBySpec.md#columnNames) (in the given [ClusterBySpec](ClusterBySpec.md)).

---

`getDomainMetadataOptional` is used when:

* `CreateDeltaTableCommand` is requested to [handleCreateTable](../commands/create-table/CreateDeltaTableCommand.md#handleCreateTable), [handleCreateTableAsSelect](../commands/create-table/CreateDeltaTableCommand.md#handleCreateTableAsSelect)

### Creating DomainMetadata { #createDomainMetadata }

```scala
createDomainMetadata(
clusteringColumns: Seq[ClusteringColumn]): DomainMetadata
```

`createDomainMetadata` [creates a ClusteringMetadataDomain](ClusteringMetadataDomain.md#fromClusteringColumns) from the given `ClusteringColumn`s and [converts it to a DomainMetadata](JsonMetadataDomain.md#toDomainMetadata).

## getClusterBySpecOptional { #getClusterBySpecOptional }

```scala
Expand All @@ -72,7 +88,7 @@ getClusterBySpecOptional(
`getClusterBySpecOptional` is used when:

* `CreateDeltaTableCommand` is requested to [handleCreateTableAsSelect](../commands/create-table/CreateDeltaTableCommand.md#handleCreateTableAsSelect)
* `ClusteredTableUtilsBase` is requested to [getDomainMetadataOptional](#getDomainMetadataOptional)
* `ClusteredTableUtilsBase` is requested for the [domain metadata](#getDomainMetadataOptional)

## getClusteringColumnsAsProperty { #getClusteringColumnsAsProperty }

Expand Down
3 changes: 3 additions & 0 deletions docs/liquid-clustering/ClusteringMetadataDomain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ClusteringMetadataDomain

`ClusteringMetadataDomain` is...FIXME
3 changes: 3 additions & 0 deletions docs/liquid-clustering/JsonMetadataDomain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# JsonMetadataDomain

`JsonMetadataDomain` is...FIXME
3 changes: 3 additions & 0 deletions docs/liquid-clustering/JsonMetadataDomainUtils.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# JsonMetadataDomainUtils

`JsonMetadataDomainUtils` is...FIXME
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ nav:
- create-table-like/index.md
- Delete:
- ... | flat | commands/delete/**.md
- DeltaDynamicPartitionOverwriteCommand: commands/DeltaDynamicPartitionOverwriteCommand.md
- DESCRIBE DETAIL:
- ... | flat | commands/describe-detail/**.md
- DESCRIBE HISTORY:
Expand Down

0 comments on commit 155bf40

Please sign in to comment.