Skip to content

Commit

Permalink
[Spark] Allow stale reads when commit coordinator has not been regist…
Browse files Browse the repository at this point in the history
…ered (#3454)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Snapshot construction will now be allowed even if the table has a
configured coordinator but the coordinator implementation has not been
registered. Writes will still be blocked in such cases.

The change has been gated behind a new flag.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Added a new test in CoordinatedCommitsSuite.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
dhruvarya-db authored Aug 7, 2024
1 parent f8d7d76 commit 100cc4d
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 34 deletions.
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2778,6 +2778,12 @@
],
"sqlState" : "42807"
},
"DELTA_UNSUPPORTED_WRITES_WITHOUT_COORDINATOR" : {
"message" : [
"You are trying to perform writes on a table which has been registered with the commit coordinator <coordinatorName>. However, no implementation of this coordinator is available in the current environment and writes without coordinators are not allowed."
],
"sqlState" : "0AKDC"
},
"DELTA_UPDATE_SCHEMA_MISMATCH_EXPRESSION" : {
"message" : [
"Cannot cast <fromCatalog> to <toCatalog>. All nested columns must match."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3429,6 +3429,12 @@ trait DeltaErrorsBase
.getOrElse(Seq.empty)
.mkString(", ")))
}

def unsupportedWritesWithMissingCoordinators(coordinatorName: String): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_UNSUPPORTED_WRITES_WITHOUT_COORDINATOR",
messageParameters = Array(coordinatorName))
}
}

object DeltaErrors extends DeltaErrorsBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// The commit-coordinator of a table shouldn't change. If it is changed by a concurrent commit,
// then it will be detected as a conflict and the transaction will anyway fail.
private[delta] val readSnapshotTableCommitCoordinatorClientOpt:
Option[TableCommitCoordinatorClient] = {
snapshot.tableCommitCoordinatorClientOpt
}
Option[TableCommitCoordinatorClient] = snapshot.getTableCommitCoordinatorForWrites

/**
* Generates a timestamp which is greater than the commit timestamp
Expand Down Expand Up @@ -1613,16 +1611,17 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}

def createCoordinatedCommitsStats(): CoordinatedCommitsStats = {
val (coordinatedCommitsType, metadataToUse) = snapshot.tableCommitCoordinatorClientOpt match {
case Some(_) if metadata.coordinatedCommitsCoordinatorName.isEmpty => // CC -> FS
(CoordinatedCommitType.CC_TO_FS_DOWNGRADE_COMMIT, snapshot.metadata)
case None if metadata.coordinatedCommitsCoordinatorName.isDefined => // FS -> CC
(CoordinatedCommitType.FS_TO_CC_UPGRADE_COMMIT, metadata)
case Some(_) => // CC commit
(CoordinatedCommitType.CC_COMMIT, snapshot.metadata)
case None => // FS commit
(CoordinatedCommitType.FS_COMMIT, snapshot.metadata)
}
val (coordinatedCommitsType, metadataToUse) =
readSnapshotTableCommitCoordinatorClientOpt match {
case Some(_) if metadata.coordinatedCommitsCoordinatorName.isEmpty => // CC -> FS
(CoordinatedCommitType.CC_TO_FS_DOWNGRADE_COMMIT, snapshot.metadata)
case None if metadata.coordinatedCommitsCoordinatorName.isDefined => // FS -> CC
(CoordinatedCommitType.FS_TO_CC_UPGRADE_COMMIT, metadata)
case Some(_) => // CC commit
(CoordinatedCommitType.CC_COMMIT, snapshot.metadata)
case None => // FS commit
(CoordinatedCommitType.FS_COMMIT, snapshot.metadata)
}
CoordinatedCommitsStats(
coordinatedCommitsType.toString,
metadataToUse.coordinatedCommitsCoordinatorName.getOrElse(""),
Expand Down Expand Up @@ -1669,8 +1668,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
CoordinatedCommitsUtils.getCoordinatedCommitsConfs(snapshot.metadata)
var newCoordinatedCommitsTableConf: Option[Map[String, String]] = None
if (finalMetadata.configuration != snapshot.metadata.configuration || snapshot.version == -1L) {
val newCommitCoordinatorClientOpt =
CoordinatedCommitsUtils.getCommitCoordinatorClient(spark, finalMetadata, finalProtocol)
val newCommitCoordinatorClientOpt = CoordinatedCommitsUtils.getCommitCoordinatorClient(
spark, deltaLog, finalMetadata, finalProtocol, failIfImplUnavailable = true)
(newCommitCoordinatorClientOpt, readSnapshotTableCommitCoordinatorClientOpt) match {
case (Some(newCommitCoordinatorClient), None) =>
// FS -> CC conversion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ case class CoordinatedCommitsPreDowngradeCommand(table: DeltaTableV2)
var postDisablementUnbackfilledCommitsPresent = false
if (exceptionOpt.isEmpty) {
val snapshotAfterDisabling = table.deltaLog.update()
assert(snapshotAfterDisabling.tableCommitCoordinatorClientOpt.isEmpty)
assert(snapshotAfterDisabling.getTableCommitCoordinatorForWrites.isEmpty)
postDisablementUnbackfilledCommitsPresent =
CoordinatedCommitsUtils.unbackfilledCommitsPresent(snapshotAfterDisabling)
if (postDisablementUnbackfilledCommitsPresent) {
Expand Down
51 changes: 44 additions & 7 deletions spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable

import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.actions.Action.logSchema
import org.apache.spark.sql.delta.coordinatedcommits.{CoordinatedCommitsUtils, TableCommitCoordinatorClient}
import org.apache.spark.sql.delta.coordinatedcommits.{CommitCoordinatorClient, CoordinatedCommitsUsageLogs, CoordinatedCommitsUtils, TableCommitCoordinatorClient}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
Expand Down Expand Up @@ -232,14 +232,47 @@ class Snapshot(

/**
* [[CommitCoordinatorClient]] for the given delta table as of this snapshot.
* - This must be present when coordinated commits is enabled.
* - This should not be None when a coordinator has been configured for this table. However, if
* the configured coordinator implementation has not been registered, this will be None. In such
* cases, the user will see potentially stale reads for the table. For strict enforcement of
* coordinated commits, the user can set the configuration
* [[DeltaSQLConf.COORDINATED_COMMITS_IGNORE_MISSING_COORDINATOR_IMPLEMENTATION]] to false.
* - This must be None when coordinated commits is disabled.
*/
val tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient] = {
initializeTableCommitCoordinator()
val failIfImplUnavailable =
!spark.conf.get(DeltaSQLConf.COORDINATED_COMMITS_IGNORE_MISSING_COORDINATOR_IMPLEMENTATION)
CoordinatedCommitsUtils.getTableCommitCoordinator(
spark,
deltaLog,
this,
failIfImplUnavailable
)
}
protected def initializeTableCommitCoordinator(): Option[TableCommitCoordinatorClient] = {
CoordinatedCommitsUtils.getTableCommitCoordinator(spark, this)

/**
* Returns the [[TableCommitCoordinatorClient]] that should be used for any type of mutation
* operation on the table. This includes, data writes, backfills etc.
* This method will throw an error if the configured coordinator could not be instantiated.
* @return [[TableCommitCoordinatorClient]] if the table is configured for coordinated commits,
* None if the table is not configured for coordinated commits.
*/
def getTableCommitCoordinatorForWrites: Option[TableCommitCoordinatorClient] = {
val coordinatorOpt = tableCommitCoordinatorClientOpt
val coordinatorName =
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.fromMetaData(metadata)
if (coordinatorName.isDefined && coordinatorOpt.isEmpty) {
recordDeltaEvent(
deltaLog,
CoordinatedCommitsUsageLogs.COMMIT_COORDINATOR_MISSING_IMPLEMENTATION_WRITE,
data = Map(
"commitCoordinatorName" -> coordinatorName.get,
"readVersion" -> version.toString
)
)
throw DeltaErrors.unsupportedWritesWithMissingCoordinators(coordinatorName.get)
}
coordinatorOpt
}

/** Number of columns to collect stats on for data skipping */
Expand Down Expand Up @@ -486,7 +519,7 @@ class Snapshot(
* if the delta file for the current version is not found after backfilling.
*/
def ensureCommitFilesBackfilled(): Unit = {
val tableCommitCoordinatorClient = tableCommitCoordinatorClientOpt.getOrElse {
val tableCommitCoordinatorClient = getTableCommitCoordinatorForWrites.getOrElse {
return
}
val minUnbackfilledVersion = DeltaCommitFileProvider(this).minUnbackfilledVersion
Expand Down Expand Up @@ -601,6 +634,10 @@ class InitialSnapshot(
override protected lazy val getInCommitTimestampOpt: Option[Long] = None

// The [[InitialSnapshot]] is not backed by any external commit-coordinator.
override def initializeTableCommitCoordinator(): Option[TableCommitCoordinatorClient] = None
override val tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient] = None

// Commit 0 cannot be performed through a commit coordinator.
override def getTableCommitCoordinatorForWrites: Option[TableCommitCoordinatorClient] = None

override def timestamp: Long = -1L
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,34 @@ object CommitCoordinatorProvider {
}
}

/** Returns a [[CommitCoordinatorClient]] for the given `name`, `conf`, and `spark` */
/**
* Returns a [[CommitCoordinatorClient]] for the given `name`, `conf`, and `spark`.
* If the commit-coordinator with the given name is not registered, an exception is thrown.
*/
def getCommitCoordinatorClient(
name: String,
conf: Map[String, String],
spark: SparkSession): CommitCoordinatorClient = synchronized {
nameToBuilderMapping.get(name).map(_.build(spark, conf)).getOrElse {
getCommitCoordinatorClientOpt(name, conf, spark).getOrElse {
throw new IllegalArgumentException(s"Unknown commit-coordinator: $name")
}
}

/**
* Returns a [[CommitCoordinatorClient]] for the given `name`, `conf`, and `spark`.
* Returns None if the commit-coordinator with the given name is not registered.
*/
def getCommitCoordinatorClientOpt(
name: String,
conf: Map[String, String],
spark: SparkSession): Option[CommitCoordinatorClient] = synchronized {
nameToBuilderMapping.get(name).map(_.build(spark, conf))
}

def getRegisteredCoordinatorNames: Seq[String] = synchronized {
nameToBuilderMapping.keys.toSeq
}

// Visible only for UTs
private[delta] def clearNonDefaultBuilders(): Unit = synchronized {
val initialCommitCoordinatorNames = initialCommitCoordinatorBuilders.map(_.getName).toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,13 @@ object CoordinatedCommitsUsageLogs {
// (i.e. getCommits) shows an unexpected gap.
val FS_COMMIT_COORDINATOR_LISTING_UNEXPECTED_GAPS =
s"$PREFIX.listDeltaAndCheckpointFiles.unexpectedGapsInResults"

// Usage log emitted when a requested Commit Coordinator implementation is missing
val COMMIT_COORDINATOR_MISSING_IMPLEMENTATION =
s"$PREFIX.commitCoordinator.missingImplementation"

// Usage log emitted when a client attempts to write to a CC table even though the
// commit coordinator implementation is missing.
val COMMIT_COORDINATOR_MISSING_IMPLEMENTATION_WRITE =
s"$PREFIX.commitCoordinator.missingImplementationWrite"
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.delta.{CoordinatedCommitsTableFeature, DeltaConfig,
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.delta.util.FileNames.{DeltaFile, UnbackfilledDeltaFile}
import io.delta.storage.LogStore
import io.delta.storage.commit.{CommitCoordinatorClient, GetCommitsResponse => JGetCommitsResponse}
Expand Down Expand Up @@ -165,20 +165,52 @@ object CoordinatedCommitsUtils extends DeltaLogging {

def getCommitCoordinatorClient(
spark: SparkSession,
deltaLog: DeltaLog, // Used for logging
metadata: Metadata,
protocol: Protocol): Option[CommitCoordinatorClient] = {
metadata.coordinatedCommitsCoordinatorName.map { commitCoordinatorStr =>
protocol: Protocol,
failIfImplUnavailable: Boolean): Option[CommitCoordinatorClient] = {
metadata.coordinatedCommitsCoordinatorName.flatMap { commitCoordinatorStr =>
assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature))
CommitCoordinatorProvider.getCommitCoordinatorClient(
commitCoordinatorStr, metadata.coordinatedCommitsCoordinatorConf, spark)
val coordinatorConf = metadata.coordinatedCommitsCoordinatorConf
val coordinatorOpt = CommitCoordinatorProvider.getCommitCoordinatorClientOpt(
commitCoordinatorStr, coordinatorConf, spark)
if (coordinatorOpt.isEmpty) {
recordDeltaEvent(
deltaLog,
CoordinatedCommitsUsageLogs.COMMIT_COORDINATOR_MISSING_IMPLEMENTATION,
data = Map(
"commitCoordinatorName" -> commitCoordinatorStr,
"registeredCommitCoordinators" ->
CommitCoordinatorProvider.getRegisteredCoordinatorNames.mkString(", "),
"commitCoordinatorConf" -> JsonUtils.toJson(coordinatorConf),
"failIfImplUnavailable" -> failIfImplUnavailable.toString
)
)
if (failIfImplUnavailable) {
throw new IllegalArgumentException(
s"Unknown commit-coordinator: $commitCoordinatorStr")
}
}
coordinatorOpt
}
}

/**
* Get the table commit coordinator client from the provided snapshot descriptor.
* Returns None if either this is not a coordinated-commits table. Also returns None when
* `failIfImplUnavailable` is false and the commit-coordinator implementation is not available.
*/
def getTableCommitCoordinator(
spark: SparkSession,
snapshotDescriptor: SnapshotDescriptor): Option[TableCommitCoordinatorClient] = {
deltaLog: DeltaLog, // Used for logging
snapshotDescriptor: SnapshotDescriptor,
failIfImplUnavailable: Boolean): Option[TableCommitCoordinatorClient] = {
getCommitCoordinatorClient(
spark, snapshotDescriptor.metadata, snapshotDescriptor.protocol).map {
spark,
deltaLog,
snapshotDescriptor.metadata,
snapshotDescriptor.protocol,
failIfImplUnavailable).map {
commitCoordinator =>
TableCommitCoordinatorClient(
commitCoordinator,
Expand Down Expand Up @@ -271,7 +303,7 @@ object CoordinatedCommitsUtils extends DeltaLogging {
* be a gap in the backfilled commit sequence.
*/
def backfillWhenCoordinatedCommitsDisabled(snapshot: Snapshot): Unit = {
if (snapshot.tableCommitCoordinatorClientOpt.nonEmpty) {
if (snapshot.getTableCommitCoordinatorForWrites.nonEmpty) {
// Coordinated commits is enabled on the table. Don't backfill as backfills are managed by
// commit-coordinators.
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,15 @@ trait DeltaSQLConfBase {
.checkValue(_ > 0, "threadPoolSize must be positive")
.createWithDefault(5)

val COORDINATED_COMMITS_IGNORE_MISSING_COORDINATOR_IMPLEMENTATION =
buildConf("coordinatedCommits.ignoreMissingCoordinatorImplementation")
.internal()
.doc("When enabled, reads will not fail if the commit coordinator implementation " +
"is missing. Writes will still fail and reads will just rely on backfilled commits. " +
"This also means that reads can be stale.")
.booleanConf
.createWithDefault(true)

val PARQUET_OUTPUT_TIMESTAMP_TYPE =
buildConf("parquet.outputTimestampType")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3204,6 +3204,19 @@ trait DeltaErrorsSuiteBase
startWith = true
)
}
{
val e = intercept[DeltaUnsupportedOperationException] {
throw DeltaErrors.unsupportedWritesWithMissingCoordinators("test")
}
checkErrorMessage(
e,
Some("DELTA_UNSUPPORTED_WRITES_WITHOUT_COORDINATOR"),
Some("0AKDC"),
Some("You are trying to perform writes on a table which has been registered with " +
"the commit coordinator test"),
startWith = true
)
}
}
}

Expand Down
Loading

0 comments on commit 100cc4d

Please sign in to comment.