Skip to content

Commit

Permalink
[Kernel] Add Snapshot::getTimestamp public API (#3791)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Add a new `Snapshot::getTimestamp` public API.

## How was this patch tested?

N/A. Trivial.

## Does this PR introduce _any_ user-facing changes?

Yes. Adds a new `Snapshot::getTimestamp` public API.
  • Loading branch information
scottsand-db authored Dec 4, 2024
1 parent 09e7523 commit a52578b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 43 deletions.
9 changes: 9 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ public interface Snapshot {
* @return a list of partition column names, or an empty list if the table is not partitioned.
*/
List<String> getPartitionColumnNames(Engine engine);

/**
* Get the timestamp (in milliseconds since the Unix epoch) of the latest commit in this snapshot.
*
* @param engine the engine to use for IO operations
* @return the timestamp of the latest commit
*/
long getTimestamp(Engine engine);

/**
* Get the schema of the table at this snapshot.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,44 @@ public SnapshotImpl(
this.inCommitTimestampOpt = Optional.empty();
}

/////////////////
// Public APIs //
/////////////////

@Override
public long getVersion(Engine engine) {
return version;
}

/**
* Get the timestamp (in milliseconds since the Unix epoch) of the latest commit in this Snapshot.
* If the table does not yet exist (i.e. this Snapshot is being used to create the new table),
* this method returns -1. Note that this -1 value will never be exposed to users - either they
* get a valid snapshot for a table or they get an exception.
*
* <p>When InCommitTimestampTableFeature is enabled, the timestamp is retrieved from the
* CommitInfo of the latest commit in this Snapshot, which can result in an IO operation.
*
* <p>For non-ICT tables, this is the same as the file modification time of the latest commit in
* this Snapshot.
*/
@Override
public long getTimestamp(Engine engine) {
if (IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) {
if (!inCommitTimestampOpt.isPresent()) {
Optional<CommitInfo> commitInfoOpt =
CommitInfo.getCommitInfoOpt(engine, logPath, logSegment.version);
inCommitTimestampOpt =
Optional.of(
CommitInfo.getRequiredInCommitTimestamp(
commitInfoOpt, String.valueOf(logSegment.version), dataPath));
}
return inCommitTimestampOpt.get();
} else {
return logSegment.lastCommitTimestamp;
}
}

@Override
public StructType getSchema(Engine engine) {
return getMetadata().getSchema();
Expand All @@ -79,8 +112,16 @@ public ScanBuilder getScanBuilder(Engine engine) {
return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(engine), logReplay, engine);
}

public Metadata getMetadata() {
return metadata;
///////////////////
// Internal APIs //
///////////////////

public Path getLogPath() {
return logPath;
}

public Path getDataPath() {
return dataPath;
}

public Protocol getProtocol() {
Expand All @@ -102,6 +143,14 @@ public Map<String, DomainMetadata> getDomainMetadataMap() {
return logReplay.getDomainMetadataMap();
}

public Metadata getMetadata() {
return metadata;
}

public LogSegment getLogSegment() {
return logSegment;
}

public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) {
long minFileRetentionTimestampMillis =
System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata);
Expand All @@ -122,47 +171,6 @@ public Optional<Long> getLatestTransactionVersion(Engine engine, String applicat
return logReplay.getLatestTransactionIdentifier(engine, applicationId);
}

public LogSegment getLogSegment() {
return logSegment;
}

public Path getLogPath() {
return logPath;
}

public Path getDataPath() {
return dataPath;
}

/**
* Returns the timestamp of the latest commit of this snapshot. For an uninitialized snapshot,
* this returns -1.
*
* <p>When InCommitTimestampTableFeature is enabled, the timestamp is retrieved from the
* CommitInfo of the latest commit which can result in an IO operation.
*
* <p>For non-ICT tables, this is the same as the file modification time of the latest commit in
* the snapshot.
*
* @param engine the engine to use for IO operations
* @return the timestamp of the latest commit
*/
public long getTimestamp(Engine engine) {
if (IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) {
if (!inCommitTimestampOpt.isPresent()) {
Optional<CommitInfo> commitInfoOpt =
CommitInfo.getCommitInfoOpt(engine, logPath, logSegment.version);
inCommitTimestampOpt =
Optional.of(
CommitInfo.getRequiredInCommitTimestamp(
commitInfoOpt, String.valueOf(logSegment.version), dataPath));
}
return inCommitTimestampOpt.get();
} else {
return logSegment.lastCommitTimestamp;
}
}

/**
* Returns the commit coordinator client handler based on the table metadata in this snapshot.
*
Expand Down

0 comments on commit a52578b

Please sign in to comment.