Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] move getPartitions API into icebergCatalog #53007

Merged
merged 6 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.starrocks.catalog.Database;
import com.starrocks.common.Config;
import com.starrocks.common.MetaNotFoundException;
Expand All @@ -30,23 +31,17 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StarRocksIcebergTableScan;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.view.View;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -58,7 +53,6 @@
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import static com.starrocks.connector.PartitionUtil.convertIcebergPartitionToPartitionName;
import static java.util.concurrent.TimeUnit.SECONDS;

public class CachingIcebergCatalog implements IcebergCatalog {
Expand All @@ -70,7 +64,6 @@ public class CachingIcebergCatalog implements IcebergCatalog {
private final String catalogName;
private final IcebergCatalog delegate;
private final Cache<IcebergTableName, Table> tables;
private final Cache<IcebergTableName, List<String>> partitionNames;
private final Cache<String, Database> databases;
private final ExecutorService backgroundExecutor;

Expand All @@ -80,6 +73,8 @@ public class CachingIcebergCatalog implements IcebergCatalog {
private final Map<IcebergTableName, Long> tableLatestAccessTime = new ConcurrentHashMap<>();
private final Map<IcebergTableName, Long> tableLatestRefreshTime = new ConcurrentHashMap<>();

private final LoadingCache<IcebergTableName, Map<String, Partition>> partitionCache;

public CachingIcebergCatalog(String catalogName, IcebergCatalog delegate, IcebergCatalogProperties icebergProperties,
ExecutorService executorService) {
this.catalogName = catalogName;
Expand All @@ -90,8 +85,15 @@ public CachingIcebergCatalog(String catalogName, IcebergCatalog delegate, Iceber
enableCache ? DEFAULT_CACHE_NUM : NEVER_CACHE).build();
this.tables = newCacheBuilder(icebergProperties.getIcebergTableCacheTtlSec(),
enableCache ? DEFAULT_CACHE_NUM : NEVER_CACHE).build();
this.partitionNames = newCacheBuilder(icebergProperties.getIcebergMetaCacheTtlSec(),
enableCache ? DEFAULT_CACHE_NUM : NEVER_CACHE).build();
this.partitionCache = newCacheBuilder(icebergProperties.getIcebergMetaCacheTtlSec(),
enableCache ? DEFAULT_CACHE_NUM : NEVER_CACHE).build(
CacheLoader.asyncReloading(new CacheLoader<>() {
@Override
public Map<String, Partition> load(IcebergTableName key) throws Exception {
// use default executor service.
return delegate.getPartitions(key.dbName, key.tableName, key.snapshotId, null);
}
}, executorService));
this.dataFileCache = enableCache ?
newCacheBuilder(
icebergProperties.getIcebergMetaCacheTtlSec(), icebergProperties.getIcebergManifestCacheMaxNum()).build()
Expand Down Expand Up @@ -196,56 +198,10 @@ public View getView(String dbName, String viewName) {
}

@Override
public List<String> listPartitionNames(String dbName, String tableName, long snapshotId, ExecutorService executorService) {
IcebergTableName icebergTableName = new IcebergTableName(dbName, tableName, snapshotId);
if (partitionNames.asMap().containsKey(icebergTableName)) {
return partitionNames.getIfPresent(icebergTableName);
} else {
org.apache.iceberg.Table icebergTable = delegate.getTable(dbName, tableName);
List<String> partitionNames = Lists.newArrayList();

if (icebergTable.specs().values().stream().allMatch(PartitionSpec::isUnpartitioned)) {
return partitionNames;
}

if (snapshotId == -1) {
if (icebergTable.currentSnapshot() == null) {
return partitionNames;
} else {
snapshotId = icebergTable.currentSnapshot().snapshotId();
}
}

partitionNames = listPartitionNamesWithSnapshotId(icebergTable, dbName, tableName, snapshotId, executorService);
this.partitionNames.put(icebergTableName, partitionNames);
return partitionNames;
}
}

private List<String> listPartitionNamesWithSnapshotId(
Table table, String dbName, String tableName, long snapshotId, ExecutorService executorService) {
Set<String> partitionNames = Sets.newHashSet();
StarRocksIcebergTableScanContext scanContext = new StarRocksIcebergTableScanContext(
catalogName, dbName, tableName, PlanMode.LOCAL);
scanContext.setOnlyReadCache(true);
TableScan tableScan = getTableScan(table, scanContext)
.planWith(executorService)
.useSnapshot(snapshotId);

try (CloseableIterable<FileScanTask> fileScanTaskIterable = tableScan.planFiles();
CloseableIterator<FileScanTask> fileScanTaskIterator = fileScanTaskIterable.iterator()) {

while (fileScanTaskIterator.hasNext()) {
FileScanTask scanTask = fileScanTaskIterator.next();
StructLike partition = scanTask.file().partition();
partitionNames.add(convertIcebergPartitionToPartitionName(scanTask.spec(), partition));
}
} catch (IOException e) {
throw new StarRocksConnectorException(String.format("Failed to list iceberg partition names %s.%s",
dbName, tableName), e);
}

return new ArrayList<>(partitionNames);
public Map<String, Partition> getPartitions(String dbName, String tableName, long snapshotId,
ExecutorService executorService) {
IcebergTableName key = new IcebergTableName(dbName, tableName, snapshotId);
return partitionCache.getUnchecked(key);
}

@Override
Expand All @@ -257,7 +213,7 @@ public void deleteUncommittedDataFiles(List<String> fileLocations) {
public synchronized void refreshTable(String dbName, String tableName, ExecutorService executorService) {
IcebergTableName icebergTableName = new IcebergTableName(dbName, tableName);
if (tables.getIfPresent(icebergTableName) == null) {
partitionNames.invalidate(icebergTableName);
partitionCache.invalidate(icebergTableName);
} else {
BaseTable currentTable = (BaseTable) tables.getIfPresent(icebergTableName);
BaseTable updateTable = (BaseTable) delegate.getTable(dbName, tableName);
Expand Down Expand Up @@ -302,14 +258,10 @@ private void refreshTable(BaseTable updatedTable, long baseSnapshotId,
IcebergTableName updatedIcebergTableName = new IcebergTableName(dbName, tableName, updatedSnapshotId);
long latestRefreshTime = tableLatestRefreshTime.computeIfAbsent(new IcebergTableName(dbName, tableName), ignore -> -1L);

List<String> updatedPartitionNames = updatedTable.spec().isPartitioned() ?
listPartitionNamesWithSnapshotId(updatedTable, dbName, tableName, updatedSnapshotId, executorService) :
new ArrayList<>();

partitionCache.invalidate(baseIcebergTableName);
partitionCache.getUnchecked(updatedIcebergTableName);
synchronized (this) {
partitionNames.put(updatedIcebergTableName, updatedPartitionNames);
tables.put(updatedIcebergTableName, updatedTable);
partitionNames.invalidate(baseIcebergTableName);
}

TableMetadata updatedTableMetadata = updatedTable.operations().current();
Expand Down Expand Up @@ -346,7 +298,7 @@ public void refreshCatalog() {
Config.background_refresh_metadata_time_secs_since_last_access_secs) {
return;
}

refreshTable(identifier.dbName, identifier.tableName, backgroundExecutor);
} catch (Exception e) {
LOG.warn("refresh {}.{} metadata cache failed, msg : ", identifier.dbName, identifier.tableName, e);
Expand All @@ -356,12 +308,12 @@ public void refreshCatalog() {
}

public void invalidateCacheWithoutTable(IcebergTableName icebergTableName) {
partitionNames.invalidate(icebergTableName);
partitionCache.invalidate(icebergTableName);
}

public void invalidateCache(IcebergTableName icebergTableName) {
tables.invalidate(icebergTableName);
partitionNames.invalidate(icebergTableName);
partitionCache.invalidate(icebergTableName);
}

@Override
Expand Down Expand Up @@ -428,20 +380,29 @@ public String toString() {
}
}

private List<List<String>> getAllCachedPartitionNames() {
List<List<String>> ans = new ArrayList<>();
for (Map<String, Partition> kv : partitionCache.asMap().values()) {
ans.add(new ArrayList<>(kv.keySet()));
}
return ans;
}

@Override
public List<Pair<List<Object>, Long>> getSamples() {
Pair<List<Object>, Long> dbSamples = Pair.create(databases.asMap().values()
.stream()
.limit(MEMORY_META_SAMPLES)
.collect(Collectors.toList()),
.stream()
.limit(MEMORY_META_SAMPLES)
.collect(Collectors.toList()),
databases.size());

List<Object> partitions = partitionNames.asMap().values()
List<List<String>> partitionNames = getAllCachedPartitionNames();
List<Object> partitions = partitionNames
.stream()
.flatMap(List::stream)
.limit(MEMORY_FILE_SAMPLES)
.collect(Collectors.toList());
long partitionTotal = partitionNames.asMap().values()
long partitionTotal = partitionNames
.stream()
.mapToLong(List::size)
.sum();
Expand Down Expand Up @@ -473,13 +434,14 @@ public List<Pair<List<Object>, Long>> getSamples() {
@Override
public Map<String, Long> estimateCount() {
Map<String, Long> counter = new HashMap<>();
List<List<String>> partitionNames = getAllCachedPartitionNames();
counter.put("Database", databases.size());
counter.put("Table", tables.size());
counter.put("PartitionNames", partitionNames.asMap().values()
counter.put("PartitionNames", partitionNames
.stream()
.mapToLong(List::size)
.sum());
counter.put("ManifestOfDataFile", dataFileCache.asMap().values()
counter.put("ManifestOfDataFile", dataFileCache.asMap().values()
.stream()
.mapToLong(Set::size)
.sum());
Expand Down
Loading
Loading