diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java index 05c73aec514a0e..fb0798212fc5c1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java @@ -78,6 +78,11 @@ public org.apache.paimon.table.Table getNativeTable() { return paimonNativeTable; } + // For refresh table only + public void setPaimonNativeTable(org.apache.paimon.table.Table paimonNativeTable) { + this.paimonNativeTable = paimonNativeTable; + } + @Override public String getUUID() { return String.join(".", catalogName, databaseName, tableName, Long.toString(createTime)); diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index af8c975fc4d396..926a25d11192b1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -2469,6 +2469,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static long iceberg_metadata_cache_max_entry_size = 8388608L; + /** + * paimon metadata cache preheat, default false + */ + @ConfField(mutable = true) + public static boolean enable_paimon_refresh_manifest_files = false; + /** * fe will call es api to get es index shard info every es_state_sync_interval_secs */ diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java index 647a7c945ce780..0f96c1046a6623 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java @@ -32,6 +32,8 @@ import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; import java.util.List; import java.util.Map; @@ -39,8 +41,10 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; public class ConnectorTableMetadataProcessor extends FrontendDaemon { @@ -53,6 +57,7 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon { private final ExecutorService refreshRemoteFileExecutor; private final Map cachingIcebergCatalogs = new ConcurrentHashMap<>(); + private final Map paimonCatalogs = new ConcurrentHashMap<>(); public void registerTableInfo(BaseTableInfo tableInfo) { registeredTableInfos.add(tableInfo); @@ -80,6 +85,16 @@ public void unRegisterCachingIcebergCatalog(String catalogName) { cachingIcebergCatalogs.remove(catalogName); } + public void registerPaimonCatalog(String catalogName, Catalog paimonCatalog) { + LOG.info("register to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName); + paimonCatalogs.put(catalogName, paimonCatalog); + } + + public void unRegisterPaimonCatalog(String catalogName) { + LOG.info("unregister to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName); + paimonCatalogs.remove(catalogName); + } + public ConnectorTableMetadataProcessor() { super(ConnectorTableMetadataProcessor.class.getName(), Config.background_refresh_metadata_interval_millis); refreshRemoteFileExecutor = Executors.newFixedThreadPool(Config.background_refresh_file_metadata_concurrency, @@ -97,6 +112,7 @@ protected void runAfterCatalogReady() { if (Config.enable_background_refresh_connector_metadata) { refreshCatalogTable(); refreshIcebergCachingCatalog(); + refreshPaimonCatalog(); } } @@ -153,6 +169,33 @@ private void refreshIcebergCachingCatalog() { } } + private void refreshPaimonCatalog() { + List catalogNames = Lists.newArrayList(paimonCatalogs.keySet()); + for (String catalogName : catalogNames) { + Catalog paimonCatalog = paimonCatalogs.get(catalogName); + if (paimonCatalog == null) { + LOG.error("Failed to get paimonCatalog by catalog {}.", catalogName); + continue; + } + LOG.info("Start to refresh paimon catalog {}", catalogName); + for (String dbName : paimonCatalog.listDatabases()) { + try { + for (String tblName : paimonCatalog.listTables(dbName)) { + List> futures = Lists.newArrayList(); + futures.add(refreshRemoteFileExecutor.submit(() + -> paimonCatalog.getTable(new Identifier(dbName, tblName)))); + for (Future future : futures) { + future.get(); + } + } + } catch (Catalog.DatabaseNotExistException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + LOG.info("Finish to refresh paimon catalog {}", catalogName); + } + } + private void refreshRegisteredTable() { MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr(); List registeredTableInfoList = Lists.newArrayList(registeredTableInfos); diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonConnector.java index f4ee8cfaefc2ff..0bced9a1ce289b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonConnector.java @@ -29,6 +29,7 @@ import com.starrocks.credential.aliyun.AliyunCloudCredential; import com.starrocks.credential.aws.AwsCloudConfiguration; import com.starrocks.credential.aws.AwsCloudCredential; +import com.starrocks.server.GlobalStateMgr; import org.apache.hadoop.conf.Configuration; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; @@ -91,6 +92,16 @@ public PaimonConnector(ConnectorContext context) { paimonOptions.setString(WAREHOUSE.key(), warehousePath); } initFsOption(cloudConfiguration); + + // cache expire time, set to 2h + this.paimonOptions.set("cache.expiration-interval", "7200s"); + // max num of cached partitions of a Paimon catalog + this.paimonOptions.set("cache.partition.max-num", "1000"); + // max size of cached manifest files, 10m means cache all since files usually no more than 8m + this.paimonOptions.set("cache.manifest.small-file-threshold", "10m"); + // max size of memory manifest cache uses + this.paimonOptions.set("cache.manifest.small-file-memory", "1g"); + String keyPrefix = "paimon.option."; Set optionKeys = properties.keySet().stream().filter(k -> k.startsWith(keyPrefix)).collect(Collectors.toSet()); for (String k : optionKeys) { @@ -139,6 +150,8 @@ public Catalog getPaimonNativeCatalog() { Configuration configuration = new Configuration(); hdfsEnvironment.getCloudConfiguration().applyToConfiguration(configuration); this.paimonNativeCatalog = CatalogFactory.createCatalog(CatalogContext.create(getPaimonOptions(), configuration)); + GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor() + .registerPaimonCatalog(catalogName, this.paimonNativeCatalog); } return paimonNativeCatalog; } @@ -147,4 +160,9 @@ public Catalog getPaimonNativeCatalog() { public ConnectorMetadata getMetadata() { return new PaimonMetadata(catalogName, hdfsEnvironment, getPaimonNativeCatalog(), connectorProperties); } + + @Override + public void shutdown() { + GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterPaimonCatalog(catalogName); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java index 8b26c5299360d4..a4753e72fc23ce 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java @@ -22,6 +22,7 @@ import com.starrocks.catalog.PartitionKey; import com.starrocks.catalog.Table; import com.starrocks.catalog.Type; +import com.starrocks.common.Config; import com.starrocks.connector.ColumnTypeConverter; import com.starrocks.connector.ConnectorMetadatRequestContext; import com.starrocks.connector.ConnectorMetadata; @@ -45,16 +46,17 @@ import com.starrocks.sql.optimizer.statistics.Statistics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.CachingCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.system.PartitionsTable; import org.apache.paimon.table.system.SchemasTable; import org.apache.paimon.table.system.SnapshotsTable; import org.apache.paimon.types.DataField; @@ -63,8 +65,11 @@ import org.apache.paimon.types.DateType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.PartitionPathUtils; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -80,7 +85,7 @@ public class PaimonMetadata implements ConnectorMetadata { private final Map tables = new ConcurrentHashMap<>(); private final Map databases = new ConcurrentHashMap<>(); private final Map paimonSplits = new ConcurrentHashMap<>(); - private final Map partitionInfos = new ConcurrentHashMap<>(); + private final Map partitionInfos = new ConcurrentHashMap<>(); private final ConnectorProperties properties; public PaimonMetadata(String catalogName, HdfsEnvironment hdfsEnvironment, Catalog paimonNativeCatalog, @@ -130,54 +135,54 @@ private void updatePartitionInfo(String databaseName, String tableName) { partitionColumnTypes.add(dataTableRowType.getTypeAt(dataTableRowType.getFieldIndex(partitionColumnName))); } - Identifier partitionTableIdentifier = new Identifier(databaseName, String.format("%s%s", tableName, "$partitions")); - RecordReaderIterator iterator = null; try { - PartitionsTable table = (PartitionsTable) paimonNativeCatalog.getTable(partitionTableIdentifier); - RowType partitionTableRowType = table.rowType(); - DataType lastUpdateTimeType = partitionTableRowType.getTypeAt(partitionTableRowType - .getFieldIndex("last_update_time")); - int[] projected = new int[] {0, 4}; - RecordReader recordReader = table.newReadBuilder().withProjection(projected) - .newRead().createReader(table.newScan().plan()); - iterator = new RecordReaderIterator<>(recordReader); - while (iterator.hasNext()) { - InternalRow rowData = iterator.next(); - String partition = rowData.getString(0).toString(); - org.apache.paimon.data.Timestamp lastUpdateTime = rowData.getTimestamp(1, - DataTypeChecks.getPrecision(lastUpdateTimeType)); - String[] partitionValues = partition.replace("[", "").replace("]", "") - .split(","); - if (partitionValues.length != partitionColumnNames.size()) { - String errorMsg = String.format("The length of partitionValues %s is not equal to " + - "the partitionColumnNames %s.", partitionValues.length, partitionColumnNames.size()); - throw new IllegalArgumentException(errorMsg); - } - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < partitionValues.length; i++) { - String column = partitionColumnNames.get(i); - String value = partitionValues[i].trim(); - if (partitionColumnTypes.get(i) instanceof DateType) { - value = DateTimeUtils.formatDate(Integer.parseInt(value)); - } - sb.append(column).append("=").append(value); - sb.append("/"); - } - sb.deleteCharAt(sb.length() - 1); - String partitionName = sb.toString(); - this.partitionInfos.put(partitionName, lastUpdateTime.getMillisecond()); + List partitions = paimonNativeCatalog.listPartitions(identifier); + for (org.apache.paimon.partition.Partition partition : partitions) { + String partitionPath = PartitionPathUtils + .generatePartitionPath(partition.spec(), dataTableRowType); + String[] partitionValues = Arrays.stream(partitionPath.split("/")) + .map(part -> part.split("=")[1]) + .toArray(String[]::new); + Partition srPartition = getPartition(partition.recordCount(), + partition.fileSizeInBytes(), partition.fileCount(), + partitionColumnNames, partitionColumnTypes, partitionValues, + Timestamp.fromEpochMillis(partition.lastFileCreationTime())); + this.partitionInfos.put(srPartition.getPartitionName(), srPartition); } - } catch (Exception e) { + return; + } catch (Catalog.TableNotExistException e) { LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e); - } finally { - if (iterator != null) { - try { - iterator.close(); - } catch (Exception e) { - LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e); - } + } + } + + private Partition getPartition(Long recordCount, + Long fileSizeInBytes, + Long fileCount, + List partitionColumnNames, + List partitionColumnTypes, + String[] partitionValues, + org.apache.paimon.data.Timestamp lastUpdateTime) { + if (partitionValues.length != partitionColumnNames.size()) { + String errorMsg = String.format("The length of partitionValues %s is not equal to " + + "the partitionColumnNames %s.", partitionValues.length, partitionColumnNames.size()); + throw new IllegalArgumentException(errorMsg); + } + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionValues.length; i++) { + String column = partitionColumnNames.get(i); + String value = partitionValues[i].trim(); + if (partitionColumnTypes.get(i) instanceof DateType) { + value = DateTimeUtils.formatDate(Integer.parseInt(value)); } + sb.append(column).append("=").append(value); + sb.append("/"); } + sb.deleteCharAt(sb.length() - 1); + String partitionName = sb.toString(); + + return new Partition(partitionName, fileCount); + } @Override @@ -348,7 +353,7 @@ public long getTableCreateTime(String dbName, String tblName) { DataType updateTimeType = rowType.getTypeAt(rowType.getFieldIndex("update_time")); int[] projected = new int[] {0, 6}; PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); - Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0); + Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0L); RecordReader recordReader = table.newReadBuilder().withProjection(projected) .withFilter(equal).newRead().createReader(table.newScan().plan()); iterator = new RecordReaderIterator<>(recordReader); @@ -429,11 +434,57 @@ public List getPartitions(Table table, List partitionName this.updatePartitionInfo(paimonTable.getCatalogDBName(), paimonTable.getCatalogTableName()); } if (this.partitionInfos.get(partitionName) != null) { - result.add(new Partition(partitionName, this.partitionInfos.get(partitionName))); + result.add(this.partitionInfos.get(partitionName)); } else { LOG.warn("Cannot find the paimon partition info: {}", partitionName); } } return result; } + + @Override + public void refreshTable(String srDbName, Table table, List partitionNames, boolean onlyCachedPartitions) { + String tableName = table.getCatalogTableName(); + Identifier identifier = new Identifier(srDbName, tableName); + paimonNativeCatalog.invalidateTable(identifier); + try { + ((PaimonTable) table).setPaimonNativeTable(paimonNativeCatalog.getTable(identifier)); + if (partitionNames != null && !partitionNames.isEmpty()) { + // todo: do not support refresh an exact partition + this.refreshPartitionInfo(identifier); + } else { + this.refreshPartitionInfo(identifier); + } + // Preheat manifest files, disabled by default + if (Config.enable_paimon_refresh_manifest_files) { + if (partitionNames == null || partitionNames.isEmpty()) { + ((PaimonTable) table).getNativeTable().newReadBuilder().newScan().plan(); + } else { + List partitionColumnNames = table.getPartitionColumnNames(); + Map partitionSpec = new HashMap<>(); + for (String partitionName : partitionNames) { + partitionSpec.put(String.join(",", partitionColumnNames), partitionName); + } + ((PaimonTable) table).getNativeTable().newReadBuilder() + .withPartitionFilter(partitionSpec).newScan().plan(); + } + } + tables.put(identifier, table); + } catch (Exception e) { + LOG.error("Failed to refresh table {}.{}.{}.", catalogName, srDbName, tableName, e); + } + } + + private void refreshPartitionInfo(Identifier identifier) { + if (paimonNativeCatalog instanceof CachingCatalog) { + try { + paimonNativeCatalog.invalidateTable(identifier); + ((CachingCatalog) paimonNativeCatalog).refreshPartitions(identifier); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } else { + LOG.warn("Current catalog {} does not support cache.", catalogName); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index 562b470fa0345b..0c8faad84db32f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -2504,7 +2504,7 @@ public Future refreshOtherFesTable(TNetworkAddress thriftAddress, Table private boolean supportRefreshTableType(Table table) { return table.isHiveTable() || table.isHudiTable() || table.isHiveView() || table.isIcebergTable() - || table.isJDBCTable() || table.isDeltalakeTable(); + || table.isJDBCTable() || table.isDeltalakeTable() || table.isPaimonTable(); } public void refreshExternalTable(TableName tableName, List partitions) {