Skip to content

Commit

Permalink
DeltaTableV2 and CDF-Aware Relation
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Dec 23, 2023
1 parent 76ea4c0 commit 57bd8e7
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 46 deletions.
26 changes: 25 additions & 1 deletion docs/DeltaTableV2.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,31 @@ cdcOptions: CaseInsensitiveStringMap

* `DeltaTableV2` is requested for a [BaseRelation](#toBaseRelation)

## <span id="options"> Options
## CDF-Aware Relation { #cdcRelation }

```scala
cdcRelation: Option[BaseRelation]
```

??? note "Lazy Value"
`cdcRelation` is a Scala **lazy value** to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#lazy).

With [CDF-aware read](change-data-feed/CDCReader.md#isCDCRead), `cdcRelation` returns a [CDF-aware relation](change-data-feed/CDCReader.md#getCDCRelation) for the following:

* [initialSnapshot](#initialSnapshot)
* [timeTravelSpec](#timeTravelSpec)

Otherwise, `cdcRelation` returns `None` (an _undefined_ value).

---

`cdcRelation` is used when:

* `DeltaTableV2` is requested for the [table schema](#tableSchema) and the [relation](#toBaseRelation)

## Options { #options }

`DeltaTableV2` can be given options (as a `Map[String, String]`). Options are empty by default.

Expand Down
44 changes: 0 additions & 44 deletions docs/change-data-feed/CDCReader.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,6 @@

`CDCReader` utility is the key class for CDF and CDC in DeltaLake (per [this comment](https://github.com/delta-io/delta/commit/d90f90b6656648e170835f92152b69f77346dfcf)).

## <span id="getCDCRelation"> getCDCRelation

```scala
getCDCRelation(
spark: SparkSession,
deltaLog: DeltaLog,
snapshotToUse: Snapshot,
partitionFilters: Seq[Expression],
conf: SQLConf,
options: CaseInsensitiveStringMap): BaseRelation
```

!!! note
`partitionFilters` argument is not used.

`getCDCRelation` [getVersionForCDC](#getVersionForCDC) (with the [startingVersion](../delta/DeltaDataSource.md#CDC_START_VERSION_KEY) and [startingTimestamp](../delta/DeltaDataSource.md#CDC_START_TIMESTAMP_KEY) for the version and timestamp keys, respectively).

`getCDCRelation`...FIXME

`getCDCRelation` is used when:

* `DeltaLog` is requested to [create a relation](../DeltaLog.md#createRelation)

### <span id="getVersionForCDC"> Resolving Version

```scala
getVersionForCDC(
spark: SparkSession,
deltaLog: DeltaLog,
conf: SQLConf,
options: CaseInsensitiveStringMap,
versionKey: String,
timestampKey: String): Option[Long]
```

`getVersionForCDC` uses the given `options` map to get the value of the given `versionKey` key, if available.

!!! note "When `versionKey` and `timestampKey` are specified"
`versionKey` and `timestampKey` are specified in the given `options` argument that is passed down through [getCDCRelation](#getCDCRelation) unmodified when `DeltaLog` is requested to [create a relation](../DeltaLog.md#createRelation) with non-empty `cdcOptions`.

Otherwise, `getVersionForCDC` uses the given `options` map to get the value of the given `timestampKey` key, if available. `getVersionForCDC`...FIXME

If neither the given `versionKey` nor the `timestampKey` key is available in the `options` map, `getVersionForCDC` returns `None` (_undefined value_).

## <span id="_change_data"> _change_data Directory { #CDC_LOCATION }

`CDCReader` uses `_change_data` as the name of the directory (under the data directory) where data changes of a delta table are written out (using [DelayedCommitProtocol](../DelayedCommitProtocol.md#newTaskTempFile)).
Expand Down
27 changes: 26 additions & 1 deletion docs/change-data-feed/CDCReaderImpl.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,38 @@ getCDCRelation(
options: CaseInsensitiveStringMap): BaseRelation
```

`getCDCRelation` [getVersionForCDC](#getVersionForCDC) with the following:

* [startingVersion](../delta/DeltaDataSource.md#startingVersion) for the version key
* [startingTimestamp](../delta/DeltaDataSource.md#startingTimestamp) for the timestamp key

`getCDCRelation`...FIXME

---

`getCDCRelation` is used when:

* `DeltaLog` is requested to [create a BaseRelation](../DeltaLog.md#createRelation)
* `DeltaTableV2` is requested for the [CDC-aware relation](../DeltaTableV2.md#cdcRelation)

### Resolving Version { #getVersionForCDC }

```scala
getVersionForCDC(
spark: SparkSession,
deltaLog: DeltaLog,
conf: SQLConf,
options: CaseInsensitiveStringMap,
versionKey: String,
timestampKey: String): Option[ResolvedCDFVersion]
```

!!! note "FIXME Review Me"

`getVersionForCDC` uses the given `options` map to get the value of the given `versionKey` key, if available.

Otherwise, `getVersionForCDC` uses the given `options` map to get the value of the given `timestampKey` key, if available. `getVersionForCDC`...FIXME

If neither the given `versionKey` nor the `timestampKey` key is available in the `options` map, `getVersionForCDC` returns `None` (_undefined value_).

## changesToBatchDF { #changesToBatchDF }

Expand Down

0 comments on commit 57bd8e7

Please sign in to comment.