Skip to content

Commit

Permalink
CdcAddFileIndex, ChangeDataFeedTableFeature and Table Properties
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Dec 16, 2023
1 parent e0697f0 commit affe799
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 15 deletions.
3 changes: 3 additions & 0 deletions docs/SupportsRowIndexFilters.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SupportsRowIndexFilters

`SupportsRowIndexFilters` is...FIXME
10 changes: 5 additions & 5 deletions docs/TahoeFileIndex.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The aim of `TahoeFileIndex` (and `FileIndex` in general) is to reduce usage of v

## Contract

### <span id="matchingFiles"> matchingFiles
### Matching Files { #matchingFiles }

```scala
matchingFiles(
Expand Down Expand Up @@ -34,7 +34,7 @@ Used for [listing data files](#listFiles)
??? note "Abstract Class"
`TahoeFileIndex` is an abstract class and cannot be created directly. It is created indirectly for the [concrete TahoeFileIndexes](#implementations).

## <span id="rootPaths"> Root Paths
## Root Paths { #rootPaths }

```scala
rootPaths: Seq[Path]
Expand All @@ -44,7 +44,7 @@ rootPaths: Seq[Path]

`rootPaths` is part of the `FileIndex` ([Spark SQL]({{ book.spark_sql }}/datasources/FileIndex/#rootPaths)) abstraction.

## <span id="listFiles"> Listing Files
## Listing Files { #listFiles }

```scala
listFiles(
Expand All @@ -56,7 +56,7 @@ listFiles(

`listFiles` is part of the `FileIndex` ([Spark SQL]({{ book.spark_sql }}/datasources/FileIndex/#listFiles))abstraction.

## <span id="partitionSchema"> Partitions
## Partitions { #partitionSchema }

```scala
partitionSchema: StructType
Expand All @@ -76,7 +76,7 @@ tableVersion: Long

`tableVersion` is used when `TahoeFileIndex` is requested for the [human-friendly textual representation](#toString).

## <span id="toString"> Textual Representation
## Textual Representation { #toString }

```scala
toString: String
Expand Down
14 changes: 7 additions & 7 deletions docs/change-data-feed/CDCReader.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Used when:
* `MergeIntoCommand` is requested to [writeAllChanges](../commands/merge/MergeIntoCommand.md#writeAllChanges) (to [matchedClauseOutput](../commands/merge/MergeIntoCommand.md#matchedClauseOutput) and [notMatchedClauseOutput](../commands/merge/MergeIntoCommand.md#notMatchedClauseOutput))
* `UpdateCommand` is requested to [withUpdatedColumns](../commands/update/UpdateCommand.md#withUpdatedColumns)
* `WriteIntoDelta` is requested to [write](../commands/WriteIntoDelta.md#write)
* `CdcAddFileIndex` is requested to [matchingFiles](CdcAddFileIndex.md#matchingFiles)
* `CdcAddFileIndex` is requested for the [matching files](CdcAddFileIndex.md#matchingFiles)
* `TahoeRemoveFileIndex` is requested to [matchingFiles](TahoeRemoveFileIndex.md#matchingFiles)
* `TransactionalWrite` is requested to [performCDCPartition](../TransactionalWrite.md#performCDCPartition)
* `SchemaUtils` utility is used to [normalizeColumnNames](../SchemaUtils.md#normalizeColumnNames)
Expand Down Expand Up @@ -172,7 +172,7 @@ cdcReadSchema(
* `CDCReader` utility is used to [getCDCRelation](#getCDCRelation) and [scanIndex](#scanIndex)
* `DeltaRelation` utility is used to [fromV2Relation](../DeltaRelation.md#fromV2Relation)
* `OptimisticTransactionImpl` is requested to [performCdcMetadataCheck](../OptimisticTransactionImpl.md#performCdcMetadataCheck)
* `CdcAddFileIndex` is requested for the [partitionSchema](CdcAddFileIndex.md#partitionSchema)
* `CdcAddFileIndex` is requested for the [partitions](CdcAddFileIndex.md#partitionSchema)
* `TahoeRemoveFileIndex` is requested for the [partitionSchema](TahoeRemoveFileIndex.md#partitionSchema)
* `DeltaDataSource` is requested for the [sourceSchema](../delta/DeltaDataSource.md#sourceSchema)
* `DeltaSourceBase` is requested for the [schema](../delta/DeltaSourceBase.md#schema)
Expand Down Expand Up @@ -226,7 +226,7 @@ isCDCEnabledOnTable(
metadata: Metadata): Boolean
```

`isCDCEnabledOnTable` [metadataRequiresFeatureToBeEnabled](ChangeDataFeedTableFeature.md#metadataRequiresFeatureToBeEnabled).
`isCDCEnabledOnTable` is an alias of [metadataRequiresFeatureToBeEnabled](ChangeDataFeedTableFeature.md#metadataRequiresFeatureToBeEnabled).

---

Expand All @@ -237,10 +237,10 @@ isCDCEnabledOnTable(
* `CDCReader` is requested to [changesToDF](#changesToDF)
* `TransactionalWrite` is requested to [performCDCPartition](../TransactionalWrite.md#performCDCPartition)

## <span id="CDC_TYPE_INSERT"><span id="insert"> insert Change Type
## <span id="insert"> insert Change Type { #CDC_TYPE_INSERT }

`CDCReader` defines `insert` value for the value of the [_change_type](#CDC_TYPE_COLUMN_NAME) column in the following:
`CDCReader` uses `insert` value as the value of the [_change_type](#CDC_TYPE_COLUMN_NAME) column in the following:

* [notMatchedClauseOutput](../commands/merge/MergeIntoCommand.md#notMatchedClauseOutput) with [cdcEnabled](../commands/merge/MergeIntoCommand.md#cdcEnabled) (when [writeAllChanges](../commands/merge/MergeIntoCommand.md#writeAllChanges))
* `WriteIntoDelta` is requested to [write data out](../commands/WriteIntoDelta.md#write) (with [isCDCEnabledOnTable](#isCDCEnabledOnTable))
* `CdcAddFileIndex` is requested to [matchingFiles](CdcAddFileIndex.md#matchingFiles)
* `MergeOutputGeneration` is requested to [generateAllActionExprs](../commands/merge/MergeOutputGeneration.md#generateAllActionExprs) and [generateCdcAndOutputRows](../commands/merge/MergeOutputGeneration.md#generateCdcAndOutputRows)
* `CdcAddFileIndex` is requested for the [matching files](CdcAddFileIndex.md#matchingFiles)
74 changes: 73 additions & 1 deletion docs/change-data-feed/CdcAddFileIndex.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,75 @@
# CdcAddFileIndex

`CdcAddFileIndex` is...FIXME
`CdcAddFileIndex` is a [TahoeBatchFileIndex](../TahoeBatchFileIndex.md) with the following:

Property | Value
---------|------
[Action Type](../TahoeBatchFileIndex.md#actionType) | `cdcRead`
[addFiles](../TahoeBatchFileIndex.md#addFiles) | The [AddFile](../AddFile.md)s of the given [CDCDataSpecs](#filesByVersion)

`CdcAddFileIndex` is used by [CDCReaderImpl](CDCReaderImpl.md) to [scanIndex](CDCReaderImpl.md#scanIndex).

## Creating Instance

`CdcAddFileIndex` takes the following to be created:

* <span id="spark"> `SparkSession`
* <span id="filesByVersion"> [AddFile](../AddFile.md)s by Version (`Seq[CDCDataSpec[AddFile]]`)
* <span id="deltaLog"> [DeltaLog](../DeltaLog.md)
* <span id="path"> `Path`
* <span id="snapshot"> [SnapshotDescriptor](../SnapshotDescriptor.md)
* [Row Index Filters](#rowIndexFilters)

`CdcAddFileIndex` is created when:

* `CDCReaderImpl` is requested for the [DataFrame with deleted and added rows](CDCReaderImpl.md#getDeletedAndAddedRows) and to [processDeletionVectorActions](CDCReaderImpl.md#processDeletionVectorActions)

### Row Index Filters { #rowIndexFilters }

??? note "SupportsRowIndexFilters"

```scala
rowIndexFilters: Option[Map[String, RowIndexFilterType]] = None
```

`rowIndexFilters` is part of the [SupportsRowIndexFilters](../SupportsRowIndexFilters.md#rowIndexFilters) abstraction.

`CdcAddFileIndex` is given Row Index Filters when [created](#creating-instance).

## Input Files { #inputFiles }

??? note "FileIndex"

```scala
inputFiles: Array[String]
```

`inputFiles` is part of the `FileIndex` ([Spark SQL]({{ book.spark_sql }}/connectors/FileIndex#inputFiles)) abstraction.

`inputFiles`...FIXME

## Matching Files { #matchingFiles }

??? note "TahoeFileIndex"

```scala
matchingFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[AddFile]
```

`matchingFiles` is part of the [TahoeFileIndex](../TahoeFileIndex.md#matchingFiles) abstraction.

`matchingFiles`...FIXME

## Partitions { #partitionSchema }

??? note "FileIndex"

```scala
partitionSchema: StructType
```

`partitionSchema` is part of the `FileIndex` ([Spark SQL]({{ book.spark_sql }}/connectors/FileIndex#partitionSchema)) abstraction.

`partitionSchema` [cdcReadSchema](CDCReader.md#cdcReadSchema) for the [partitions](../Metadata.md#partitionSchema) of (the [Metadata](../SnapshotDescriptor.md#metadata) of) the given [SnapshotDescriptor](#snapshot).
24 changes: 24 additions & 0 deletions docs/change-data-feed/ChangeDataFeedTableFeature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# ChangeDataFeedTableFeature

`ChangeDataFeedTableFeature` is a [LegacyWriterFeature](../table-features/LegacyWriterFeature.md) with the following properties:

Property | Value
---------|------
[Name](../table-features/LegacyWriterFeature.md#name) | `changeDataFeed`
[Minimum writer protocol version](../table-features/LegacyWriterFeature.md#minWriterVersion) | `4`

`ChangeDataFeedTableFeature` is a [FeatureAutomaticallyEnabledByMetadata](../table-features/FeatureAutomaticallyEnabledByMetadata.md) that uses [delta.enableChangeDataFeed](../DeltaConfigs.md#enableChangeDataFeed) table property to control [Change Data Feed](index.md) feature.

## metadataRequiresFeatureToBeEnabled { #metadataRequiresFeatureToBeEnabled }

??? note "FeatureAutomaticallyEnabledByMetadata"

```scala
metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean
```

`metadataRequiresFeatureToBeEnabled` is part of the [FeatureAutomaticallyEnabledByMetadata](../table-features/FeatureAutomaticallyEnabledByMetadata.md#metadataRequiresFeatureToBeEnabled) abstraction.

`metadataRequiresFeatureToBeEnabled` is the value of [delta.enableChangeDataFeed](../DeltaConfigs.md#enableChangeDataFeed) table property in (the [configuration](../Metadata.md#configuration) of) the given [Metadata](../Metadata.md).
4 changes: 2 additions & 2 deletions docs/commands/merge/MergeOutputGeneration.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ generatePrecomputedConditionsAndDF(
clauses: Seq[DeltaMergeIntoClause]): (DataFrame, Seq[DeltaMergeIntoClause])
```

`generatePrecomputedConditionsAndDF` [rewrites conditional clauses](#rewriteCondition) of all the given [DeltaMergeIntoClause](DeltaMergeIntoClause.md)s
`generatePrecomputedConditionsAndDF` [rewrites conditional clauses](#rewriteCondition) of all the given [DeltaMergeIntoClause](DeltaMergeIntoClause.md)s.

??? note "rewriteCondition"
[rewriteCondition](#rewriteCondition) populates an internal `preComputedClauseConditions` registry of pairs of a generated column name and a rewritten condition for every conditional clause (i.e., [DeltaMergeIntoClause](DeltaMergeIntoClause.md) with a [condition](DeltaMergeIntoClause.md#condition)).
[rewriteCondition](#rewriteCondition) populates an internal `preComputedClauseConditions` registry of pairs of a generated column name (e.g., `_update_condition_0_`) and a rewritten condition for every conditional clause (i.e., [DeltaMergeIntoClause](DeltaMergeIntoClause.md)s with a [condition](DeltaMergeIntoClause.md#condition)).

`generatePrecomputedConditionsAndDF` adds the generated columns (of the conditional clauses) to the given `sourceDF` (to precompute clause conditions).

Expand Down
4 changes: 4 additions & 0 deletions docs/table-properties/.pages
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
title: Table Properties
nav:
- index.md
- ...
90 changes: 90 additions & 0 deletions docs/table-properties/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Table Properties

Delta Lake uses [DeltaConfigs](../DeltaConfigs.md) with the table properties of delta tables.

Table properties start with `delta.` prefix.

Table Properties can be set on delta tables using [ALTER TABLE SET TBLPROPERTIES](../commands/alter/AlterTableSetPropertiesDeltaCommand.md) or [CREATE TABLE](../commands/CreateDeltaTableCommand.md) SQL commands.

```sql
ALTER TABLE delta_demo
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
```

```sql
CREATE TABLE delta_demo (id INT, name STRING, age INT)
USING delta
TBLPROPERTIES (delta.enableChangeDataFeed = true)
```

Use `SHOW TBLPROPERTIES` SQL command to review the table properties of a delta table.

```sql
SHOW TBLPROPERTIES delta_demo;
```

## SHOW TBLPROPERTIES

Table properties can be displayed using `SHOW TBLPROPERTIES` SQL command:

```sql
SHOW TBLPROPERTIES <table_name>
[(comma-separated properties)]
```

---

```scala
sql("SHOW TBLPROPERTIES delta.`/tmp/delta/t1`").show(truncate = false)
```

```text
+----------------------+-----+
|key |value|
+----------------------+-----+
|delta.minReaderVersion|1 |
|delta.minWriterVersion|2 |
+----------------------+-----+
```

```scala
sql("SHOW TBLPROPERTIES delta.`/tmp/delta/t1` (delta.minReaderVersion)").show(truncate = false)
```

```text
+----------------------+-----+
|key |value|
+----------------------+-----+
|delta.minReaderVersion|1 |
+----------------------+-----+
```

## ALTER TABLE SET TBLPROPERTIES

Table properties can be set a value or unset using `ALTER TABLE` SQL command:

```sql
ALTER TABLE <table_name> SET TBLPROPERTIES (<key>=<value>)
```

```sql
ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
```

---

```text
sql("ALTER TABLE delta.`/tmp/delta/t1` SET TBLPROPERTIES (delta.enableExpiredLogCleanup=true)")
```

```scala
sql("SHOW TBLPROPERTIES delta.`/tmp/delta/t1` (delta.enableExpiredLogCleanup)").show(truncate = false)
```

```text
+-----------------------------+-----+
|key |value|
+-----------------------------+-----+
|delta.enableExpiredLogCleanup|true |
+-----------------------------+-----+
```
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ nav:
- FileFormat:
- DeltaFileFormat: DeltaFileFormat.md
- DeltaParquetFileFormat: DeltaParquetFileFormat.md
- SupportsRowIndexFilters.md
- Transaction Log (DeltaLog):
- DeltaLog: DeltaLog.md
- Operation: Operation.md
Expand Down

0 comments on commit affe799

Please sign in to comment.