From 0b86bba902854c84cf8dd669482e9d7444b2624d Mon Sep 17 00:00:00 2001 From: yanz Date: Sat, 22 Feb 2025 14:23:50 -0800 Subject: [PATCH] [BugFix] fix iceberg table snapshot id does not match when refreshing partition Signed-off-by: yanz --- .../connector/iceberg/CachingIcebergCatalog.java | 15 ++++++++++----- .../connector/iceberg/IcebergCatalog.java | 7 +++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java index 9bc2452accf070..7d6c2f915277c8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java @@ -271,29 +271,34 @@ public synchronized void refreshTable(String dbName, String tableName, ExecutorS invalidateCache(icebergTableName); return; } + if (!currentLocation.equals(updateLocation)) { LOG.info("Refresh iceberg caching catalog table {}.{} from {} to {}", dbName, tableName, currentLocation, updateLocation); - long baseSnapshotId = currentOps.current().currentSnapshot().snapshotId(); - refreshTable(updateTable, baseSnapshotId, dbName, tableName, executorService); + refreshTable(currentTable, updateTable, dbName, tableName, executorService); LOG.info("Finished to refresh iceberg table {}.{}", dbName, tableName); } } } - private void refreshTable(BaseTable updatedTable, long baseSnapshotId, + private void refreshTable(BaseTable currentTable, BaseTable updatedTable, String dbName, String tableName, ExecutorService executorService) { + long baseSnapshotId = currentTable.currentSnapshot().snapshotId(); long updatedSnapshotId = updatedTable.currentSnapshot().snapshotId(); IcebergTableName baseIcebergTableName = new IcebergTableName(dbName, tableName, baseSnapshotId); IcebergTableName updatedIcebergTableName = new IcebergTableName(dbName, tableName, updatedSnapshotId); long latestRefreshTime = tableLatestRefreshTime.computeIfAbsent(new IcebergTableName(dbName, tableName), ignore -> -1L); - partitionCache.invalidate(baseIcebergTableName); - partitionCache.getUnchecked(updatedIcebergTableName); + // update tables before refresh partition cache + // so when refreshing partition cache, `getTables` can return the latest one. + // another way to fix is to call `delegate.getTables` when refreshing partition cache. synchronized (this) { tables.put(updatedIcebergTableName, updatedTable); } + partitionCache.invalidate(baseIcebergTableName); + partitionCache.getUnchecked(updatedIcebergTableName); + TableMetadata updatedTableMetadata = updatedTable.operations().current(); List manifestFiles = updatedTable.currentSnapshot().dataManifests(updatedTable.io()).stream() .filter(f -> updatedTableMetadata.snapshot(f.snapshotId()) != null) diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java index e979463cba7a28..a30c8ea3f2872e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java @@ -14,6 +14,7 @@ package com.starrocks.connector.iceberg; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.starrocks.catalog.Database; @@ -244,7 +245,13 @@ default Map getPartitions(IcebergTable icebergTable, long sna PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils. createMetadataTableInstance(nativeTable, MetadataTableType.PARTITIONS); TableScan tableScan = partitionsTable.newScan(); + // NOTE: if there is an exception raise because of snapshot id is not the latest one, it's expected + // using partition metadata table scan is more efficient than doing file scan, but limitation is + // it only supports the latest snapshot id. if (snapshotId != -1) { + Preconditions.checkArgument(nativeTable.currentSnapshot().snapshotId() == snapshotId, + "Ignore this error if snapshot id does not match. Iceberg partition metadata table only supports latest " + + "snapshot. current = " + nativeTable.currentSnapshot().snapshotId() + ", expect = " + snapshotId); tableScan = tableScan.useSnapshot(snapshotId); } if (executorService != null) {