From 83513484bc064b73cb5855e8c7aa8f244fcb4119 Mon Sep 17 00:00:00 2001 From: Paddy Xu Date: Fri, 10 Mar 2023 10:40:19 +0100 Subject: [PATCH] Return empty CDC result when no commit is in range This PR improves timestamp handling for CDC reads, so that a range with no commit in between can return an empty DF instead of throwing an exception: ``` version: 4 5 ---------|-------------------------------------------------|-------- ^ start timestamp ^ end timestamp ``` Before: fail with `end version 4 is older than the start version 5`. After: success and return an empty DF. GitOrigin-RevId: c048b00df27b18c7072c205481340007e8bba6f7 --- .../apache/spark/sql/delta/DeltaTable.scala | 2 +- .../sql/delta/commands/cdc/CDCReader.scala | 111 +++++++++++------- .../spark/sql/delta/DeltaCDCSuite.scala | 107 +++++++++++++++++ 3 files changed, 176 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index 858ecaa07a8..457b77247ff 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -377,7 +377,7 @@ object DeltaTableUtils extends PredicateHelper /** * Given a time travel node, resolve which version it is corresponding to for the given table and - * return the resolved version as well as the access type, i.e. by version or timestamp. + * return the resolved version as well as the access type, i.e. by `version` or `timestamp`. */ def resolveTimeTravelVersion( conf: SQLConf, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index 0d1faeb1384..9a54d46d6b4 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -156,42 +156,33 @@ trait CDCReaderImpl extends DeltaLogging { import org.apache.spark.sql.delta.commands.cdc.CDCReader._ /** - * Given timestamp or version this method returns the corresponding version for that timestamp - * or the version itself. + * Given timestamp or version, this method returns the corresponding version for that timestamp + * or the version itself, as well as how the return version is obtained: by `version` or + * `timestamp`. */ - def getVersionForCDC( + private def getVersionForCDC( spark: SparkSession, deltaLog: DeltaLog, conf: SQLConf, options: CaseInsensitiveStringMap, versionKey: String, - timestampKey: String): Option[Long] = { + timestampKey: String): Option[ResolvedCDFVersion] = { if (options.containsKey(versionKey)) { - Some(options.get(versionKey).toLong) + Some(ResolvedCDFVersion(options.get(versionKey).toLong, timestamp = None)) } else if (options.containsKey(timestampKey)) { val ts = options.get(timestampKey) val spec = DeltaTimeTravelSpec(Some(Literal(ts)), None, Some("cdcReader")) + val timestamp = spec.getTimestamp(spark.sessionState.conf) val allowOutOfRange = conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP) - if (timestampKey == DeltaDataSource.CDC_START_TIMESTAMP_KEY) { + val resolvedVersion = if (timestampKey == DeltaDataSource.CDC_START_TIMESTAMP_KEY) { // For the starting timestamp we need to find a version after the provided timestamp // we can use the same semantics as streaming. - val resolvedVersion = DeltaSource.getStartingVersionFromTimestamp( - spark, - deltaLog, - spec.getTimestamp(spark.sessionState.conf), - allowOutOfRange - ) - Some(resolvedVersion) + DeltaSource.getStartingVersionFromTimestamp(spark, deltaLog, timestamp, allowOutOfRange) } else { // For ending timestamp the version should be before the provided timestamp. - val resolvedVersion = DeltaTableUtils.resolveTimeTravelVersion( - conf, - deltaLog, - spec, - allowOutOfRange - ) - Some(resolvedVersion._1) + DeltaTableUtils.resolveTimeTravelVersion(conf, deltaLog, spec, allowOutOfRange)._1 } + Some(ResolvedCDFVersion(resolvedVersion, Some(timestamp))) } else { None } @@ -240,10 +231,7 @@ trait CDCReaderImpl extends DeltaLogging { conf, options, DeltaDataSource.CDC_START_VERSION_KEY, - DeltaDataSource.CDC_START_TIMESTAMP_KEY - ) - - if (startingVersion.isEmpty) { + DeltaDataSource.CDC_START_TIMESTAMP_KEY).getOrElse { throw DeltaErrors.noStartVersionForCDC() } @@ -259,29 +247,30 @@ trait CDCReaderImpl extends DeltaLogging { s"cannot be used with time travel options.") } + def emptyCDFRelation() = { + new DeltaCDFRelation( + SnapshotWithSchemaMode(snapshotToUse, schemaMode), + spark.sqlContext, + startingVersion = None, + endingVersion = None) { + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = + sqlContext.sparkSession.sparkContext.emptyRDD[Row] + } + } + // add a version check here that is cheap instead of after trying to list a large version // that doesn't exist - if (startingVersion.get > snapshotToUse.version) { + if (startingVersion.version > snapshotToUse.version) { val allowOutOfRange = conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP) // LS-129: return an empty relation if start version passed in is beyond latest commit version if (allowOutOfRange) { - return new DeltaCDFRelation( - SnapshotWithSchemaMode(snapshotToUse, schemaMode), - spark.sqlContext, - None, - None) { - override def buildScan( - requiredColumns: Array[String], - filters: Array[Filter]): RDD[Row] = { - sqlContext.sparkSession.sparkContext.emptyRDD[Row] - } - } + return emptyCDFRelation() } throw DeltaErrors.startVersionAfterLatestVersion( - startingVersion.get, snapshotToUse.version) + startingVersion.version, snapshotToUse.version) } - val endingVersion = getVersionForCDC( + val endingVersionOpt = getVersionForCDC( spark, snapshotToUse.deltaLog, conf, @@ -290,17 +279,42 @@ trait CDCReaderImpl extends DeltaLogging { DeltaDataSource.CDC_END_TIMESTAMP_KEY ) - if (endingVersion.exists(_ < startingVersion.get)) { - throw DeltaErrors.endBeforeStartVersionInCDC(startingVersion.get, endingVersion.get) + // Given two timestamps, there is a case when both of them lay closely between two versions: + // version: 4 5 + // ---------|-------------------------------------------------|-------- + // ^ start timestamp ^ end timestamp + // In this case the starting version will be 5 and ending version will be 4. We must not + // throw `endBeforeStartVersionInCDC` but return empty result. + endingVersionOpt.foreach { endingVersion => + if (startingVersion.resolvedByTimestamp && endingVersion.resolvedByTimestamp) { + // The next `if` is true when end is less than start but no commit is in between. + // We need to capture such a case and throw early. + if (startingVersion.timestamp.get.after(endingVersion.timestamp.get)) { + throw DeltaErrors.endBeforeStartVersionInCDC( + startingVersion.version, + endingVersion.version) + } + if (endingVersion.version == startingVersion.version - 1) { + return emptyCDFRelation() + } + } + } + + if (endingVersionOpt.exists(_.version < startingVersion.version)) { + throw DeltaErrors.endBeforeStartVersionInCDC( + startingVersion.version, + endingVersionOpt.get.version) } - logInfo(s"startingVersion: $startingVersion, endingVersion: $endingVersion") + logInfo( + s"startingVersion: ${startingVersion.version}, " + + s"endingVersion: ${endingVersionOpt.map(_.version)}") DeltaCDFRelation( SnapshotWithSchemaMode(snapshotToUse, schemaMode), spark.sqlContext, - startingVersion, - endingVersion) + Some(startingVersion.version), + endingVersionOpt.map(_.version)) } /** @@ -720,4 +734,15 @@ trait CDCReaderImpl extends DeltaLogging { * @param numBytes the total size of the AddFile + RemoveFile + AddCDCFiles that are in the df */ case class CDCVersionDiffInfo(fileChangeDf: DataFrame, numFiles: Long, numBytes: Long) + + /** + * Represents a Delta log version, and how the version is determined. + * @param version the determined version. + * @param timestamp the commit timestamp of the determined version. Will be filled when the + * version is determined by timestamp. + */ + private case class ResolvedCDFVersion(version: Long, timestamp: Option[Timestamp]) { + /** Whether this version is resolved by timestamp. */ + def resolvedByTimestamp: Boolean = timestamp.isDefined + } } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala index 22038a2845f..1bdfb2b627e 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -312,6 +312,113 @@ abstract class DeltaCDCSuiteBase } } + test("version from timestamp - before the first version") { + withTempDir { tempDir => + createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + + modifyDeltaTimestamp(deltaLog, 0, 4000) + modifyDeltaTimestamp(deltaLog, 1, 8000) + modifyDeltaTimestamp(deltaLog, 2, 12000) + + val ts0 = dateFormat.format(new Date(1000)) + val ts1 = dateFormat.format(new Date(3000)) + intercept[AnalysisException] { + cdcRead( + new TablePath(tempDir.getAbsolutePath), + StartingTimestamp(ts0), + EndingTimestamp(ts1)) + .collect() + }.getMessage.contains("before the earliest version") + } + } + + test("version from timestamp - between two valid versions") { + withTempDir { tempDir => + createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + + modifyDeltaTimestamp(deltaLog, 0, 0) + modifyDeltaTimestamp(deltaLog, 1, 4000) + modifyDeltaTimestamp(deltaLog, 2, 8000) + + val ts0 = dateFormat.format(new Date(1000)) + val ts1 = dateFormat.format(new Date(3000)) + val readDf = cdcRead( + new TablePath(tempDir.getAbsolutePath), StartingTimestamp(ts0), EndingTimestamp(ts1)) + checkCDCAnswer( + DeltaLog.forTable(spark, tempDir), + readDf, + spark.range(0) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType))) + } + } + + test("version from timestamp - one version in between") { + withTempDir { tempDir => + createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + + modifyDeltaTimestamp(deltaLog, 0, 0) + modifyDeltaTimestamp(deltaLog, 1, 4000) + modifyDeltaTimestamp(deltaLog, 2, 8000) + + val ts0 = dateFormat.format(new Date(3000)) + val ts1 = dateFormat.format(new Date(5000)) + val readDf = cdcRead( + new TablePath(tempDir.getAbsolutePath), StartingTimestamp(ts0), EndingTimestamp(ts1)) + checkCDCAnswer( + DeltaLog.forTable(spark, tempDir), + readDf, + spark.range(10, 20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType))) + } + } + + test("version from timestamp - end before start") { + withTempDir { tempDir => + createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + + modifyDeltaTimestamp(deltaLog, 0, 0) + modifyDeltaTimestamp(deltaLog, 1, 4000) + modifyDeltaTimestamp(deltaLog, 2, 8000) + + val ts0 = dateFormat.format(new Date(3000)) + val ts1 = dateFormat.format(new Date(1000)) + intercept[DeltaIllegalArgumentException] { + cdcRead( + new TablePath(tempDir.getAbsolutePath), + StartingTimestamp(ts0), + EndingTimestamp(ts1)) + .collect() + }.getErrorClass === "DELTA_INVALID_CDC_RANGE" + } + } + + test("version from timestamp - end before start with one version in between") { + withTempDir { tempDir => + createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath)) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + + modifyDeltaTimestamp(deltaLog, 0, 0) + modifyDeltaTimestamp(deltaLog, 1, 4000) + modifyDeltaTimestamp(deltaLog, 2, 8000) + + val ts0 = dateFormat.format(new Date(5000)) + val ts1 = dateFormat.format(new Date(3000)) + intercept[DeltaIllegalArgumentException] { + cdcRead( + new TablePath(tempDir.getAbsolutePath), + StartingTimestamp(ts0), + EndingTimestamp(ts1)) + .collect() + }.getErrorClass === "DELTA_INVALID_CDC_RANGE" + } + } + test("start version and end version are the same") { val tblName = "tbl" withTable(tblName) {