Skip to content

Commit

Permalink
readChangeFeed option for batch and streaming queries
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Dec 26, 2023
1 parent e48068b commit 86a6990
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 74 deletions.
3 changes: 3 additions & 0 deletions docs/change-data-feed/CDCReaderImpl.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

`CDCReaderImpl` is an marker abstraction of [Change Data Feed-aware Readers](#implementations).

!!! note "Fun Fact"
Despite the suffix (`Impl`), `CDCReaderImpl` is a trait not an implementation (_class_).

## Implementations

* [CDCReader](CDCReader.md)
Expand Down
5 changes: 2 additions & 3 deletions docs/delta/.pages
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
title: Delta Connector
nav:
- index.md
- Options:
- options.md
- ... | *Option*.md
- ... | [A-N]*.md
- options.md
- ...
19 changes: 4 additions & 15 deletions docs/delta/DeltaDataSource.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,23 +292,12 @@ parsePathIdentifier(

`parsePathIdentifier`...FIXME

---

`parsePathIdentifier` is used when:

* `DeltaTableV2` is requested for [metadata](../DeltaTableV2.md#rootPath) (for a non-catalog table)

## <span id="CDC_ENABLED_KEY"><span id="readChangeFeed"> readChangeFeed

`DeltaDataSource` utility defines `readChangeFeed` value to indicate [CDC-aware table scan](../change-data-feed/CDCReaderImpl.md#isCDCRead) (when it is used as an read option and `true`).

`readChangeFeed` is used alongside the following CDC options:

* [startingVersion](#CDC_START_VERSION_KEY)
* [startingTimestamp](#CDC_START_TIMESTAMP_KEY)
* [endingVersion](#CDC_END_VERSION_KEY)
* [endingTimestamp](#CDC_END_TIMESTAMP_KEY)

---

`readChangeFeed` is used when:
## <span id="CDC_ENABLED_KEY"> readChangeFeed { #readChangeFeed }

* `DeltaDataSource` is requested to [create a BaseRelation](#RelationProvider-createRelation)
[readChangeFeed](options.md#readChangeFeed)
36 changes: 21 additions & 15 deletions docs/delta/DeltaReadOptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

`DeltaReadOptions` is an extension of the [DeltaOptionParser](DeltaOptionParser.md) abstraction with the values of the read options of [DeltaOptions](DeltaOptions.md).

## <span id="excludeRegex"> excludeRegex
## excludeRegex { #excludeRegex }

```scala
excludeRegex: Option[Regex]
Expand All @@ -14,7 +14,7 @@ excludeRegex: Option[Regex]

* `DeltaSource` is requested for the [excludeRegex](DeltaSource.md#excludeRegex)

## <span id="failOnDataLoss"><span id="FAIL_ON_DATA_LOSS_OPTION"> failOnDataLoss
## <span id="FAIL_ON_DATA_LOSS_OPTION"> failOnDataLoss { #failOnDataLoss }

```scala
failOnDataLoss: Boolean
Expand All @@ -29,7 +29,7 @@ failOnDataLoss: Boolean
* `DeltaSource` is requested to [getFileChanges](DeltaSource.md#getFileChanges)
* `DeltaSourceCDCSupport` is requested to [getFileChangesForCDC](../change-data-feed/DeltaSourceCDCSupport.md#getFileChangesForCDC)

## <span id="ignoreChanges"> ignoreChanges
## ignoreChanges { #ignoreChanges }

```scala
ignoreChanges: Boolean
Expand All @@ -41,7 +41,7 @@ ignoreChanges: Boolean

* FIXME

## <span id="ignoreDeletes"> ignoreDeletes
## ignoreDeletes { #ignoreDeletes }

```scala
ignoreDeletes: Boolean
Expand All @@ -53,7 +53,7 @@ ignoreDeletes: Boolean

* FIXME

## <span id="ignoreFileDeletion"> ignoreFileDeletion
## ignoreFileDeletion { #ignoreFileDeletion }

```scala
ignoreFileDeletion: Boolean
Expand All @@ -65,7 +65,7 @@ ignoreFileDeletion: Boolean

* FIXME

## <span id="maxBytesPerTrigger"> maxBytesPerTrigger
## maxBytesPerTrigger { #maxBytesPerTrigger }

```scala
maxBytesPerTrigger: Option[Long]
Expand All @@ -77,7 +77,7 @@ maxBytesPerTrigger: Option[Long]

* FIXME

## <span id="maxFilesPerTrigger"> maxFilesPerTrigger
## maxFilesPerTrigger { #maxFilesPerTrigger }

```scala
maxFilesPerTrigger: Option[Int]
Expand All @@ -89,23 +89,30 @@ maxFilesPerTrigger: Option[Int]

* FIXME

## <span id="readChangeFeed"> readChangeFeed
## readChangeFeed { #readChangeFeed }

```scala
readChangeFeed: Boolean
```

`readChangeFeed` uses the [options](DeltaOptionParser.md#options) for the value of [readChangeFeed](options.md#CDC_READ_OPTION) option (if available or falls back to the legacy [readChangeData](options.md#readChangeData)).
`readChangeFeed` uses the [options](DeltaOptionParser.md#options) for the boolean value of [readChangeFeed](options.md#CDC_READ_OPTION) option (if available or falls back to the legacy [readChangeData](options.md#readChangeData)).

`readChangeFeed` is used when:
!!! note "DeltaDataSource"
Also known as [CDC_ENABLED_KEY](DeltaDataSource.md#CDC_ENABLED_KEY).

* `DeltaSourceBase` is requested for the [read schema](DeltaSourceBase.md#schema), to [getFileChangesWithRateLimit](DeltaSourceBase.md#getFileChangesWithRateLimit) (indirectly for `DeltaSource` to determine the [latest offset](DeltaSource.md#latestOffset)) and [getFileChangesAndCreateDataFrame](DeltaSourceBase.md#getFileChangesAndCreateDataFrame) (indirectly for the `DeltaSource` to [get a streaming micro-batch dataframe](DeltaSource.md#getBatch))
---

## <span id="startingTimestamp"> startingTimestamp
`readChangeFeed` is used when `DeltaSourceBase` is requested for the following:

* [checkReadIncompatibleSchemaChanges](DeltaSourceBase.md#checkReadIncompatibleSchemaChanges)
* [getFileChangesAndCreateDataFrame](DeltaSourceBase.md#getFileChangesAndCreateDataFrame) (for the `DeltaSource` to [get a streaming micro-batch dataframe](DeltaSource.md#getBatch))
* [getFileChangesWithRateLimit](DeltaSourceBase.md#getFileChangesWithRateLimit) (for `DeltaSource` to determine the [latest offset](DeltaSource.md#latestOffset))
* The [read schema](DeltaSourceBase.md#schema)

## startingTimestamp { #startingTimestamp }

```scala
startingTimestamp: Option[String]

```

`startingTimestamp`...FIXME
Expand All @@ -114,11 +121,10 @@ startingTimestamp: Option[String]

* FIXME

## <span id="startingVersion"> startingVersion
## startingVersion { #startingVersion }

```scala
startingVersion: Option[DeltaStartingVersion]

```

`startingVersion`...FIXME
Expand Down
2 changes: 1 addition & 1 deletion docs/delta/DeltaWriteOptionsImpl.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
`DeltaWriteOptionsImpl` is an [extension](#contract) of the [DeltaOptionParser](DeltaOptionParser.md) abstraction.

!!! note "Fun Fact"
Despite the suffix (`Impl`), `DeltaWriteOptionsImpl` is not an implementation (_class_) but a trait.
Despite the suffix (`Impl`), `DeltaWriteOptionsImpl` is a trait not an implementation (_class_).

## Auto Schema Merging { #canMergeSchema }

Expand Down
50 changes: 49 additions & 1 deletion docs/delta/index.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,51 @@
# Delta Connector

**Delta Connector** is an extension of Spark SQL (based on [Connector API]({{ book.spark_sql }}/connector)).
**Delta Connector** is an extension of Spark SQL (based on [Connector API]({{ book.spark_sql }}/connector)) to support batch and streaming queries over delta tables.

```scala
val rawDeltaTable = spark
.read
.format("delta")
.load("raw_delta_table")
```

```scala
myDeltaTable
.write
.format("delta")
.save("bronze_delta_table")
```

## Options

Delta Connector defines [options](options.md) for [reading](DeltaReadOptions.md) and [writing](DeltaWriteOptionsImpl.md) delta tables.

The options can be defined using `option` method of the following:

* For batch queries, `DataFrameReader` ([Spark SQL]({{ book.spark_sql }}/DataFrameReader)) and `DataFrameWriter` ([Spark SQL]({{ book.spark_sql }}/DataFrameWriter))
* For streaming queries, `DataStreamReader` ([Spark Structured Streaming]({{ book.structured_streaming }}/DataStreamReader)) and `DataStreamWriter` ([Spark Structured Streaming]({{ book.structured_streaming }}/DataStreamWriter))
* SQL queries

The options are available at runtime as [DeltaOptions](DeltaOptions.md).

```scala
import org.apache.spark.sql.delta.DeltaOptions
```

```scala
assert(DeltaOptions.OVERWRITE_SCHEMA_OPTION == "overwriteSchema")
```

```scala
val options = new DeltaOptions(Map.empty[String, String], spark.sessionState.conf)
assert(options.failOnDataLoss, "failOnDataLoss should be enabled by default")
```

```scala
val options = new DeltaOptions(
Map(DeltaOptions.OVERWRITE_SCHEMA_OPTION -> true.toString),
spark.sessionState.conf)
assert(
options.canOverwriteSchema,
s"${DeltaOptions.OVERWRITE_SCHEMA_OPTION} should be enabled")
```
58 changes: 20 additions & 38 deletions docs/delta/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,6 @@ title: Options

# Options

Delta Lake comes with options to fine-tune its uses. They can be defined using `option` method of the following:

* `DataFrameReader` ([Spark SQL]({{ book.spark_sql }}/DataFrameReader)) and `DataFrameWriter` ([Spark SQL]({{ book.spark_sql }}/DataFrameWriter)) for batch queries
* `DataStreamReader` ([Spark Structured Streaming]({{ book.structured_streaming }}/DataStreamReader)) and `DataStreamWriter` ([Spark Structured Streaming]({{ book.structured_streaming }}/DataStreamWriter)) for streaming queries
* SQL queries

## <span id="DeltaOptions"> Accessing Options

The options are available at runtime as [DeltaOptions](DeltaOptions.md).

```scala
import org.apache.spark.sql.delta.DeltaOptions
```

```scala
assert(DeltaOptions.OVERWRITE_SCHEMA_OPTION == "overwriteSchema")
```

```scala
val options = new DeltaOptions(Map.empty[String, String], spark.sessionState.conf)
assert(options.failOnDataLoss, "failOnDataLoss should be enabled by default")
```

```scala
val options = new DeltaOptions(
Map(DeltaOptions.OVERWRITE_SCHEMA_OPTION -> true.toString),
spark.sessionState.conf)
assert(
options.canOverwriteSchema,
s"${DeltaOptions.OVERWRITE_SCHEMA_OPTION} should be enabled")
```

## <span id="checkpointLocation"> checkpointLocation

Checkpoint directory for storing checkpoint data of streaming queries ([Spark Structured Streaming]({{ book.structured_streaming }}/configuration-properties/#spark.sql.streaming.checkpointLocation)).
Expand Down Expand Up @@ -127,17 +95,31 @@ Default: (undefined)
!!! note
Can also be specified using `load` method of `DataFrameReader` and `DataStreamReader`.

## <span id="queryName"> queryName
## queryName { #queryName }

## <span id="CDC_READ_OPTION"> readChangeFeed { #readChangeFeed }

## <span id="CDC_READ_OPTION"><span id="readChangeFeed"> readChangeFeed
Enables [Change Data Feed](../change-data-feed/index.md) while reading delta tables ([CDC-aware table scans](../change-data-feed/CDCReaderImpl.md#isCDCRead))

Enables [Change Data Feed](../change-data-feed/index.md) while reading a delta table (_CDC read_)
Use [DeltaOptions.readChangeFeed](DeltaReadOptions.md#readChangeFeed) for the value

!!! note
Use the following options to fine-tune [Change Data Feed](../change-data-feed/index.md)-aware queries:

* [startingVersion](#CDC_START_VERSION_KEY)
* [startingTimestamp](#CDC_START_TIMESTAMP_KEY)
* [endingVersion](#CDC_END_VERSION_KEY)
* [endingTimestamp](#CDC_END_TIMESTAMP_KEY)

---

Use [DeltaOptions.readChangeFeed](DeltaReadOptions.md#readChangeFeed) to access the value
`readChangeFeed` is used when:

Requires either [startingVersion](#startingVersion) or [startingTimestamp](#startingTimestamp) option
* `CDCStatementBase` is requested to `getOptions`
* `CDCReaderImpl` is requested to [isCDCRead](../change-data-feed/CDCReaderImpl.md#isCDCRead)
* `DeltaDataSource` is requested to [create a BaseRelation](#RelationProvider-createRelation)

## <span id="REPLACE_WHERE_OPTION"><span id="replaceWhere"> replaceWhere
## <span id="REPLACE_WHERE_OPTION"> replaceWhere { #replaceWhere }

Partition predicates (unless [replaceWhere.dataColumns.enabled](../configuration-properties/DeltaSQLConf.md#replaceWhere.dataColumns.enabled) is enabled to allow for arbitrary non-partition data predicates)

Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ nav:
- ... | configuration-properties/**.md
- ... | data-skipping/**.md
- ... | deletion-vectors/**.md
- ... | delta/**.md
- developer-api.md
- Generated Columns:
- generated-columns/index.md
Expand Down Expand Up @@ -253,7 +254,6 @@ nav:
- TahoeLogFileIndex: TahoeLogFileIndex.md
- DeltaLogFileIndex: DeltaLogFileIndex.md
- WriteIntoDeltaBuilder: WriteIntoDeltaBuilder.md
- ... | delta/**.md
- Developer API:
- DeltaTable: DeltaTable.md
- DeltaTableBuilder: DeltaTableBuilder.md
Expand Down

0 comments on commit 86a6990

Please sign in to comment.