Skip to content

Commit

Permalink
[BugFix] fix iceberg table snapshot id does not match when refreshing…
Browse files Browse the repository at this point in the history
… partitions (backport #56179) (#56184)

Co-authored-by: RyanZ <dirtysalt1987@gmail.com>
  • Loading branch information
mergify[bot] and dirtysalt authored Feb 24, 2025
1 parent bf3124a commit 5ffef6f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,29 +266,34 @@ public synchronized void refreshTable(String dbName, String tableName, ExecutorS
invalidateCache(icebergTableName);
return;
}

if (!currentLocation.equals(updateLocation)) {
LOG.info("Refresh iceberg caching catalog table {}.{} from {} to {}",
dbName, tableName, currentLocation, updateLocation);
long baseSnapshotId = currentOps.current().currentSnapshot().snapshotId();
refreshTable(updateTable, baseSnapshotId, dbName, tableName, executorService);
refreshTable(currentTable, updateTable, dbName, tableName, executorService);
LOG.info("Finished to refresh iceberg table {}.{}", dbName, tableName);
}
}
}

private void refreshTable(BaseTable updatedTable, long baseSnapshotId,
private void refreshTable(BaseTable currentTable, BaseTable updatedTable,
String dbName, String tableName, ExecutorService executorService) {
long baseSnapshotId = currentTable.currentSnapshot().snapshotId();
long updatedSnapshotId = updatedTable.currentSnapshot().snapshotId();
IcebergTableName baseIcebergTableName = new IcebergTableName(dbName, tableName, baseSnapshotId);
IcebergTableName updatedIcebergTableName = new IcebergTableName(dbName, tableName, updatedSnapshotId);
long latestRefreshTime = tableLatestRefreshTime.computeIfAbsent(new IcebergTableName(dbName, tableName), ignore -> -1L);

partitionCache.invalidate(baseIcebergTableName);
partitionCache.getUnchecked(updatedIcebergTableName);
// update tables before refresh partition cache
// so when refreshing partition cache, `getTables` can return the latest one.
// another way to fix is to call `delegate.getTables` when refreshing partition cache.
synchronized (this) {
tables.put(updatedIcebergTableName, updatedTable);
}

partitionCache.invalidate(baseIcebergTableName);
partitionCache.getUnchecked(updatedIcebergTableName);

TableMetadata updatedTableMetadata = updatedTable.operations().current();
List<ManifestFile> manifestFiles = updatedTable.currentSnapshot().dataManifests(updatedTable.io()).stream()
.filter(f -> updatedTableMetadata.snapshot(f.snapshotId()) != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.connector.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Database;
Expand Down Expand Up @@ -193,7 +194,13 @@ default Map<String, Partition> getPartitions(IcebergTable icebergTable, long sna
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils.
createMetadataTableInstance(nativeTable, MetadataTableType.PARTITIONS);
TableScan tableScan = partitionsTable.newScan();
// NOTE: if there is an exception raise because of snapshot id is not the latest one, it's expected
// using partition metadata table scan is more efficient than doing file scan, but limitation is
// it only supports the latest snapshot id.
if (snapshotId != -1) {
Preconditions.checkArgument(nativeTable.currentSnapshot().snapshotId() == snapshotId,
"Ignore this error if snapshot id does not match. Iceberg partition metadata table only supports latest " +
"snapshot. current = " + nativeTable.currentSnapshot().snapshotId() + ", expect = " + snapshotId);
tableScan = tableScan.useSnapshot(snapshotId);
}
if (executorService != null) {
Expand Down

0 comments on commit 5ffef6f

Please sign in to comment.