-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Support getting snapshots by timestamp (time-travel) #2662
[Kernel] Support getting snapshots by timestamp (time-travel) #2662
Conversation
import io.delta.kernel.internal.util.Tuple2; | ||
import static io.delta.kernel.internal.fs.Path.getName; | ||
|
||
public final class DeltaHistoryManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala
Outdated
Show resolved
Hide resolved
Seemed like some of the test refactoring was convoluting this PR... Separated it out to #2663 |
@@ -66,4 +66,15 @@ Snapshot getLatestSnapshot(TableClient tableClient) | |||
*/ | |||
Snapshot getSnapshotAtVersion(TableClient tableClient, long versionId) | |||
throws TableNotFoundException; | |||
|
|||
/** | |||
* Get the snapshot at the given {@code timestamp}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you be a bit more specific?
- timestamp T-1: Table Version 10 created
- timestamp T:
- timestamp T+1: Table Version 11 created
if we query for the snapshot at version T, what do we return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Need clarity on latest snapshot at or before
or at or after
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the docs to be more clear
tablePath, | ||
providedTimestamp, | ||
commitTimestamp); | ||
// TODO format the timestamps? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would print both, the millis and a human readable one. See how delta-spark formats timestamps (if they are ever in millis format). I'd expect you can print them in the local time zone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good thing to discuss when we revisit exceptions. I agree formatting them somehow makes sense but I'm not sure it's clear whether it should be in the local timezone or UTC and what we might expect the connectors to do with the exception. I'll add them in UTC for now.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/DeltaHistoryManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/DeltaHistoryManager.java
Outdated
Show resolved
Hide resolved
* unix epoch | ||
* @return an instance of {@link Snapshot} | ||
*/ | ||
Snapshot getSnapshotAtTimestamp(TableClient tableClient, long millisSinceEpochUTC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will eventually need 2 types of time travel semantics for batch and streaming semantics
- getVersionBeforeOrAtTimestamp
- getVersionAtOrAfterTimestamp
We should think a bit more about this public API (method name) ... so that adding the other method doesn't cause confusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Standalone DeltaLog
, the above-mentioned methods return a version instead of a snapshot.
Looking at Flink how these APIs are used:
- can't find
getVersionBeforeOrAtTimestamp
used anywhere. getVersionAtOrAfterTimestamp
is used as to get the version immediately followed by `getSnapshotAtVersion(version returned by getVersionAtOrAfterTimestamp).
what if we provide two APIs:
getSnapshotBeforeOrAtTimestamp
getSnapshotAtOrAfterTimestamp
Are there any case where we just need the version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkorukanti was there an ask from delta-sharing to support version getting in this scenario? Or is just the snapshot sufficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can rename this method to getSnapshotBeforeOrAtTimestamp
? I'm wondering if that's a bit hard to parse for the simple batch time-travel case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the Delta sharing protocol, it seems like sharing needs atOrAfter
. @linzhou-db Please confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also seems like this requires versionAtOrAfter
to get the version (to avoid unnecessarily loading the snapshot metadata/protocol)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, delta sharing do need both snapshot and version AtOrAfter a timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #2679 to revisit this API + add additional functionality
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/DeltaHistoryManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/DeltaHistoryManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/DeltaHistoryManager.java
Outdated
Show resolved
Hide resolved
…ile system handler) (#2663)
81f4afc
to
a93d5d5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java
Outdated
Show resolved
Hide resolved
while (commits.hasNext()) { | ||
Commit newElem = commits.next(); | ||
assert(prevVersion < newElem.version); // Verify commits are ordered | ||
if (prevTimestamp >= newElem.timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this what delta-spark
also does? If yes, should we have a deisclaimer in the API docs saying that the timestamps should be valid. If the commit files are copied over (or any operation that changes the timestamp of commit files), the API is a best effort to return the snapshot at the given timestamp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark (and standalone) does this. I couldn't find a good test in delta-spark for this scenario though. I think if commit files are copied over and timestamps are changed time-travel by version is fully expected to be broken. I'm guessing this is more aimed at commit versions with the same timestamp (since precision is only ms).
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java
Outdated
Show resolved
Hide resolved
throw new RuntimeException( | ||
String.format("No recreatable commits found at %s", logPath)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to create an InvalidTableStateException to capture all these runtime exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed let's add this to the exception discussion
Which Delta project/connector is this regarding?
Description
Resolves #2276
Adds support for reading the snapshot of the table at a specific timestamp using
Table:: getSnapshotAtTimestamp
.How was this patch tested?
Adds unit tests.