-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Introduce InCommitTimestamp feature and write monotonically increasing timestamps in CommitInfo #2596
[Spark] Introduce InCommitTimestamp feature and write monotonically increasing timestamps in CommitInfo #2596
Conversation
a8b7d16
to
0805347
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this PR needs to implement the version/timestamp enablement table properties as well. Even if they're not used yet, we can unit test their validity, including proper handling of physical conflicts when newly enabling the feature.
spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala
Outdated
Show resolved
Hide resolved
val currentTransactionCommitInfo = currentTransactionInfo.commitInfo.getOrElse { | ||
throw new IllegalStateException("CommitInfo must be present in the current transaction" + | ||
s" as it has ${InCommitTimestampTableFeature.name} feature enabled.") | ||
} | ||
val currentTransactionTimestamp = currentTransactionCommitInfo.inCommitTimestamp.getOrElse { | ||
throw new IllegalStateException( | ||
"CommitInfo.commitTimestamp must be present in the current transaction" + | ||
s" as it has ${InCommitTimestampTableFeature.name} feature enabled.") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem fundamentally different than what CommitInfo.getRequiredInCommitTimestamp
does.
Why is it different/worse for current commit timestamp to be missing, vs. the winning commit timestamp below, that it should throw an internal error where the other throws an error class error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I was thinking about it was:
- Winning Commit is missing timestamp: It was written by a non-compliant writer --> This is potentially a user error.
- Current txn is missing timestamp: This is worse in the sense that it should never actually happen. It is inconsistent behaviour by the current writer.
@@ -547,6 +548,48 @@ private[delta] class ConflictChecker( | |||
currentTransactionInfo = currentTransactionInfo.copy(actions = newActions) | |||
} | |||
|
|||
/** Ensure that commitInfo.commitTimestamp is monotonically increasing. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** Ensure that commitInfo.commitTimestamp is monotonically increasing. */ | |
/** | |
* Adjust the current transaction's commit timestamp to account for the winning | |
* transaction's commit timestamp. If this transaction newly enabled ICT, also update | |
* the table properties to reflect the adjusted enablement version and timestamp. | |
*/ |
// The Metadata() of InitialSnapshot can enable ICT by default, so we make sure | ||
// that readSnapshot is not InitialSnapshot (version != -1). | ||
val wasICTEnabledInReadSnapshot = currentTransactionInfo.readSnapshot.version != -1 && | ||
DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData( | ||
currentTransactionInfo.readSnapshot.metadata) | ||
// If the current transaction has ICT enabled but the read snapshot did not, we can | ||
// be sure that the current transaction enabled ICT. Had the winning transaction enabled ICT, | ||
// the current transaction would have already been aborted since ICT-enablement is a | ||
// metadata change operation (see [[checkNoMetadataUpdates]]). | ||
val didCurrentTransactionEnableICT = isICTCurrentlyEnabled && !wasICTEnabledInReadSnapshot | ||
val winningCommitTimestamp = if (didCurrentTransactionEnableICT) { | ||
// Since the current transaction enabled inCommitTimestamps, we should use the file | ||
// timestamp from the winning transaction as its commit timestamp. | ||
winningCommitFileStatus.getModificationTime | ||
} else { | ||
// Get the inCommitTimestamp from the winning transaction. | ||
CommitInfo.getRequiredInCommitTimestamp(winningCommitSummary.commitInfo, winningCommitVersion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already know isICTCurrentlyEnabled=true
because of the early return.
Also, why do we care at all about the read transaction here? Right now, none of the logic actually cares who enabled ICT -- we just need to extract the correct timestamp from the winning transaction, so we can adjust ours.
That said... we need to set two table properties upon enabling the feature, and they'll need to be adjusted in this code. At that point we will care whether the feature is newly-enabled or not, and I guess once we have that, we can simplify things a bit by reusing that knowledge?
// If the read snapshot did not have ICT enabled, then the current transaction must
// have enabled it. Any winning transaction that enabled it after our read snapshot
// would have caused a metadata conflict abort (see [[checkNoMetadataUpdates]]), so we
// know the winning transaction's ICT enablement status must match the read snapshot.
//
// WARNING: The Metadata() of InitialSnapshot can enable ICT by default, so we make sure
// that readSnapshot is not InitialSnapshot (version != -1).
val wasICTEnabledInReadSnapshot = currentTransactionInfo.readSnapshot.version != -1 &&
DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(
currentTransactionInfo.readSnapshot.metadata)
val winningCommitTimestamp = if (wasICTEnabledInReadSnapshot) {
CommitInfo.getRequiredInCommitTimestamp(winningCommitSummary.commitInfo, winningCommitVersion)
} else {
winningCommitFileStatus.getModificationTime
}
val updatedCommitTimestamp = ...
val updatedCommitInfo = ...
// Adjust the enablement version and timestamp table properties, If ICT is newly enabled.
if (!wasICTEnabledInReadSnapshot) {
... set enablement version and timestamp table properties ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, why do we care at all about the read transaction here? Right now, none of the logic actually cares who enabled ICT -- we just need to extract the correct timestamp from the winning transaction, so we can adjust ours.
That is true, we just need to know whether the winning transaction has ICT enabled and get the right timestamp accordingly. However, the way to do that will effectively be the same as checking whether the current transaction enabled ICT.
* Throws an exception if either `commitInfoOpt` is not defined or `inCommitTimestamp` is not | ||
* defined in `commitInfoOpt`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Throws an exception if either `commitInfoOpt` is not defined or `inCommitTimestamp` is not | |
* defined in `commitInfoOpt`. | |
* Throws an exception if `commitInfoOpt` is empty or contains an empty `inCommitTimestamp`. |
} | ||
|
||
val winningCommitTimestamp = | ||
if (InCommitTimestampUtils.didCurrentTransactionEnableICT(currentTransactionInfo)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a definition for InCommitTimestampUtils
... perhaps a file was not added to the PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just added it.
winningCommitFileStatus.getModificationTime | ||
} else { | ||
// Get the inCommitTimestamp from the winning transaction. | ||
CommitInfo.getRequiredInCommitTimestamp(winningCommitSummary.commitInfo, winningCommitVersion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The similar naming between CommitInfo.getRequiredInCommitTimestamp
vs. InCommitTimestampUtils.getRequiredCommitInfoAndCommitTimestamp
will likely confuse people, since they don't behave the same. How would somebody know which one to call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have refactored the code a bit and removed InCommitTimestampUtils .getRequiredCommitInfoAndCommitTimestamp
getUserMetadata(op), | ||
time = clock.getTimeMillis(), | ||
operation = op.name, | ||
inCommitTimestamp = generateInCommitTimestampForFirstCommitAttempt(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have two getTimeMillis
calls in the same constructor here. Shouldn't we use the same value for both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, a third clock call at L1134 below.
commitAttemptStartTime = clock.getTimeMillis()
Seems helpful for the attempt start time to match the proposed ICT?
Maybe just move this code above L1097 and then everything can use the commitAttemptStartTime
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated all such callsites so that they use the same value.
operation = op.name, | ||
generateInCommitTimestampForFirstCommitAttempt(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be safe, I'm pretty surecommitLarge
needs the ability to enable ICT as well (e.g. in case somebody disabled ICT, but a RESTORE TABLE transitions back to enabled state). If so, we need to updateInCommitTimestampEnablementInfo
here as well. It would be incorrect to restore the original (old) enablement properties, because ICT was not necessarily enabled for the intervening commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty surecommitLarge needs the ability to enable ICT as well
Yes, I completely missed this. I have updated this and added a new test for commitLarge.
updatedCurrentTransactionInfo = InCommitTimestampUtils | ||
.updateInCommitTimestampEnablementInfo(updatedCurrentTransactionInfo, commitVersion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love how the logic to update ICT info is scattered in so many parts of the code. It would be really nice if we could find a simpler way.
For example -- can we move this inside doCommit
? It has access to everything we need, and then we wouldn't need to worry about duplicating that code inside the conflict checker, because the retry path also calls doCommit
after resolving conflicts. The commit info update is anyway far separated from this code, so we wouldn't give up any locality.
Downside is, doCommit
doesn't currently transform the actions, so it's an unexpected place for such logic. But this spot is no better in that regard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be tempting to move the enablement check into updateMetadataInternal
, where other table feature enablement checks already live, but it won't have the proposed commit timestamp yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this to one level up in the stack trace to commitImpl
.
9b3c8d7
to
c276dbc
Compare
tags = if (tags.nonEmpty) Some(tags) else None, | ||
txnId = Some(txnId)) | ||
|
||
val firstAttemptVersion = getFirstAttemptVersion | ||
val updatedMetadataOpt = commitInfo.inCommitTimestamp.flatMap { inCommitTimestamp => | ||
InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The part where we update the enablement tracking properties has been moved to this place from doCommitIteratively
.
} | ||
} | ||
|
||
test("commitLarge should correctly set the enablement tracking properties") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a new test for commitLarge
inCommitTimestampOpt.getOrElse { | ||
val commitInfoOpt = DeltaHistoryManager.getCommitInfoOpt( | ||
deltaLog.store, deltaLog.logPath, version, deltaLog.newDeltaHadoopConf()) | ||
CommitInfo.getRequiredInCommitTimestamp(commitInfoOpt, version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method can throw an exception when the commitInfo doesn't have inCommitTimestamp or when commitInfo itself is not present?
In such a scenario, snapshot.timestamp will fail. Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is intentional. Such a thing happening is indicative of a writer not following the protocol. In my opinion, we should surface this issue as soon as we see it to prevent further breakage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. There are several annoying almost-redundancies in the code, but there are just enough differences mixed in that I couldn't find any good way to factor them out better.
// If the read snapshot did not have ICT enabled, then the current transaction must | ||
// have enabled it. | ||
// In case of a conflict, the winning transaction that enabled it after | ||
// our read snapshot would have caused a metadata conflict abort | ||
// (see [[ConflictChecker.checkNoMetadataUpdates]]), so we know | ||
// that the winning transaction's ICT enablement status must match the read snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this comment quite matches the current code/use, where we don't already know the current transaction enabled ICT?
// If the read snapshot did not have ICT enabled, then the current transaction must | |
// have enabled it. | |
// In case of a conflict, the winning transaction that enabled it after | |
// our read snapshot would have caused a metadata conflict abort | |
// (see [[ConflictChecker.checkNoMetadataUpdates]]), so we know | |
// that the winning transaction's ICT enablement status must match the read snapshot. | |
// If ICT is currently enabled, and the read snapshot did not have ICT enabled, | |
// then the current transaction must have enabled it. | |
// In case of a conflict, any winning transaction that enabled it after | |
// our read snapshot would have caused a metadata conflict abort | |
// (see [[ConflictChecker.checkNoMetadataUpdates]]), so we know that | |
// all winning transactions' ICT enablement status must match the read snapshot. |
0238d88
to
f6086cb
Compare
f6086cb
to
0255f85
Compare
0255f85
to
19a1000
Compare
Which Delta project/connector is this regarding?
Description
Follow-up for #2532.
Adds a new writer feature called
inCommitTimestamp
. When this feature is enabled, the writer will make sure that it writescommitTimestamp
in CommitInfo which contains a monotonically increasing timestamp.This PR is an initial implementation, it does not handle timestamp retrieval efficiently. It does not try to populate the inCommitTimestamp in Snapshot even in places where it is already available, instead Snapshot has to perform an IO to read the timestamp.
How was this patch tested?
Added a new suite called
InCommitTimestampSuite
.Does this PR introduce any user-facing changes?
No