Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix iceberg table snapshot id does not match when refreshing partitions #56179

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,29 +271,34 @@ public synchronized void refreshTable(String dbName, String tableName, ExecutorS
invalidateCache(icebergTableName);
return;
}

if (!currentLocation.equals(updateLocation)) {
LOG.info("Refresh iceberg caching catalog table {}.{} from {} to {}",
dbName, tableName, currentLocation, updateLocation);
long baseSnapshotId = currentOps.current().currentSnapshot().snapshotId();
refreshTable(updateTable, baseSnapshotId, dbName, tableName, executorService);
refreshTable(currentTable, updateTable, dbName, tableName, executorService);
LOG.info("Finished to refresh iceberg table {}.{}", dbName, tableName);
}
}
}

private void refreshTable(BaseTable updatedTable, long baseSnapshotId,
private void refreshTable(BaseTable currentTable, BaseTable updatedTable,
String dbName, String tableName, ExecutorService executorService) {
long baseSnapshotId = currentTable.currentSnapshot().snapshotId();
long updatedSnapshotId = updatedTable.currentSnapshot().snapshotId();
IcebergTableName baseIcebergTableName = new IcebergTableName(dbName, tableName, baseSnapshotId);
IcebergTableName updatedIcebergTableName = new IcebergTableName(dbName, tableName, updatedSnapshotId);
long latestRefreshTime = tableLatestRefreshTime.computeIfAbsent(new IcebergTableName(dbName, tableName), ignore -> -1L);

partitionCache.invalidate(baseIcebergTableName);
partitionCache.getUnchecked(updatedIcebergTableName);
// update tables before refresh partition cache
// so when refreshing partition cache, `getTables` can return the latest one.
// another way to fix is to call `delegate.getTables` when refreshing partition cache.
synchronized (this) {
tables.put(updatedIcebergTableName, updatedTable);
}

partitionCache.invalidate(baseIcebergTableName);
partitionCache.getUnchecked(updatedIcebergTableName);

TableMetadata updatedTableMetadata = updatedTable.operations().current();
List<ManifestFile> manifestFiles = updatedTable.currentSnapshot().dataManifests(updatedTable.io()).stream()
.filter(f -> updatedTableMetadata.snapshot(f.snapshotId()) != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.connector.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Database;
Expand Down Expand Up @@ -244,7 +245,13 @@ default Map<String, Partition> getPartitions(IcebergTable icebergTable, long sna
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils.
createMetadataTableInstance(nativeTable, MetadataTableType.PARTITIONS);
TableScan tableScan = partitionsTable.newScan();
// NOTE: if there is an exception raise because of snapshot id is not the latest one, it's expected
// using partition metadata table scan is more efficient than doing file scan, but limitation is
// it only supports the latest snapshot id.
if (snapshotId != -1) {
Preconditions.checkArgument(nativeTable.currentSnapshot().snapshotId() == snapshotId,
"Ignore this error if snapshot id does not match. Iceberg partition metadata table only supports latest " +
"snapshot. current = " + nativeTable.currentSnapshot().snapshotId() + ", expect = " + snapshotId);
tableScan = tableScan.useSnapshot(snapshotId);
}
if (executorService != null) {
Expand Down
Loading