Skip to content

Commit

Permalink
first version
Browse files Browse the repository at this point in the history
Signed-off-by: Jiao Mingye <mxdzs0612@gmail.com>
  • Loading branch information
mxdzs0612 committed Feb 25, 2025
1 parent bb97981 commit 22cb578
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public org.apache.paimon.table.Table getNativeTable() {
return paimonNativeTable;
}

// For refresh table only
public void setPaimonNativeTable(org.apache.paimon.table.Table paimonNativeTable) {
this.paimonNativeTable = paimonNativeTable;
}

@Override
public String getUUID() {
return String.join(".", catalogName, databaseName, tableName, Long.toString(createTime));
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2469,6 +2469,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static long iceberg_metadata_cache_max_entry_size = 8388608L;

/**
* paimon metadata cache preheat, default false
*/
@ConfField(mutable = true)
public static boolean enable_paimon_refresh_manifest_files = false;

/**
* fe will call es api to get es index shard info every es_state_sync_interval_secs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,19 @@
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class ConnectorTableMetadataProcessor extends FrontendDaemon {
Expand All @@ -53,6 +57,7 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon {

private final ExecutorService refreshRemoteFileExecutor;
private final Map<String, IcebergCatalog> cachingIcebergCatalogs = new ConcurrentHashMap<>();
private final Map<String, Catalog> paimonCatalogs = new ConcurrentHashMap<>();

public void registerTableInfo(BaseTableInfo tableInfo) {
registeredTableInfos.add(tableInfo);
Expand Down Expand Up @@ -80,6 +85,16 @@ public void unRegisterCachingIcebergCatalog(String catalogName) {
cachingIcebergCatalogs.remove(catalogName);
}

public void registerPaimonCatalog(String catalogName, Catalog paimonCatalog) {
LOG.info("register to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName);
paimonCatalogs.put(catalogName, paimonCatalog);
}

public void unRegisterPaimonCatalog(String catalogName) {
LOG.info("unregister to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName);
paimonCatalogs.remove(catalogName);
}

public ConnectorTableMetadataProcessor() {
super(ConnectorTableMetadataProcessor.class.getName(), Config.background_refresh_metadata_interval_millis);
refreshRemoteFileExecutor = Executors.newFixedThreadPool(Config.background_refresh_file_metadata_concurrency,
Expand All @@ -97,6 +112,7 @@ protected void runAfterCatalogReady() {
if (Config.enable_background_refresh_connector_metadata) {
refreshCatalogTable();
refreshIcebergCachingCatalog();
refreshPaimonCatalog();
}
}

Expand Down Expand Up @@ -153,6 +169,33 @@ private void refreshIcebergCachingCatalog() {
}
}

private void refreshPaimonCatalog() {
List<String> catalogNames = Lists.newArrayList(paimonCatalogs.keySet());
for (String catalogName : catalogNames) {
Catalog paimonCatalog = paimonCatalogs.get(catalogName);
if (paimonCatalog == null) {
LOG.error("Failed to get paimonCatalog by catalog {}.", catalogName);
continue;
}
LOG.info("Start to refresh paimon catalog {}", catalogName);
for (String dbName : paimonCatalog.listDatabases()) {
try {
for (String tblName : paimonCatalog.listTables(dbName)) {
List<Future<?>> futures = Lists.newArrayList();
futures.add(refreshRemoteFileExecutor.submit(()
-> paimonCatalog.getTable(new Identifier(dbName, tblName))));
for (Future<?> future : futures) {
future.get();
}
}
} catch (Catalog.DatabaseNotExistException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
LOG.info("Finish to refresh paimon catalog {}", catalogName);
}
}

private void refreshRegisteredTable() {
MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
List<BaseTableInfo> registeredTableInfoList = Lists.newArrayList(registeredTableInfos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.starrocks.credential.aliyun.AliyunCloudCredential;
import com.starrocks.credential.aws.AwsCloudConfiguration;
import com.starrocks.credential.aws.AwsCloudCredential;
import com.starrocks.server.GlobalStateMgr;
import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
Expand Down Expand Up @@ -91,6 +92,16 @@ public PaimonConnector(ConnectorContext context) {
paimonOptions.setString(WAREHOUSE.key(), warehousePath);
}
initFsOption(cloudConfiguration);

// cache expire time, set to 2h
this.paimonOptions.set("cache.expiration-interval", "7200s");
// max num of cached partitions of a Paimon catalog
this.paimonOptions.set("cache.partition.max-num", "1000");
// max size of cached manifest files, 10m means cache all since files usually no more than 8m
this.paimonOptions.set("cache.manifest.small-file-threshold", "10m");
// max size of memory manifest cache uses
this.paimonOptions.set("cache.manifest.small-file-memory", "1g");

String keyPrefix = "paimon.option.";
Set<String> optionKeys = properties.keySet().stream().filter(k -> k.startsWith(keyPrefix)).collect(Collectors.toSet());
for (String k : optionKeys) {
Expand Down Expand Up @@ -139,6 +150,8 @@ public Catalog getPaimonNativeCatalog() {
Configuration configuration = new Configuration();
hdfsEnvironment.getCloudConfiguration().applyToConfiguration(configuration);
this.paimonNativeCatalog = CatalogFactory.createCatalog(CatalogContext.create(getPaimonOptions(), configuration));
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
.registerPaimonCatalog(catalogName, this.paimonNativeCatalog);
}
return paimonNativeCatalog;
}
Expand All @@ -147,4 +160,9 @@ public Catalog getPaimonNativeCatalog() {
public ConnectorMetadata getMetadata() {
return new PaimonMetadata(catalogName, hdfsEnvironment, getPaimonNativeCatalog(), connectorProperties);
}

@Override
public void shutdown() {
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterPaimonCatalog(catalogName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.Config;
import com.starrocks.connector.ColumnTypeConverter;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
Expand All @@ -45,16 +46,17 @@
import com.starrocks.sql.optimizer.statistics.Statistics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.PartitionsTable;
import org.apache.paimon.table.system.SchemasTable;
import org.apache.paimon.table.system.SnapshotsTable;
import org.apache.paimon.types.DataField;
Expand All @@ -63,8 +65,11 @@
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.PartitionPathUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -80,7 +85,7 @@ public class PaimonMetadata implements ConnectorMetadata {
private final Map<Identifier, Table> tables = new ConcurrentHashMap<>();
private final Map<String, Database> databases = new ConcurrentHashMap<>();
private final Map<PredicateSearchKey, PaimonSplitsInfo> paimonSplits = new ConcurrentHashMap<>();
private final Map<String, Long> partitionInfos = new ConcurrentHashMap<>();
private final Map<String, Partition> partitionInfos = new ConcurrentHashMap<>();
private final ConnectorProperties properties;

public PaimonMetadata(String catalogName, HdfsEnvironment hdfsEnvironment, Catalog paimonNativeCatalog,
Expand Down Expand Up @@ -130,54 +135,54 @@ private void updatePartitionInfo(String databaseName, String tableName) {
partitionColumnTypes.add(dataTableRowType.getTypeAt(dataTableRowType.getFieldIndex(partitionColumnName)));
}

Identifier partitionTableIdentifier = new Identifier(databaseName, String.format("%s%s", tableName, "$partitions"));
RecordReaderIterator<InternalRow> iterator = null;
try {
PartitionsTable table = (PartitionsTable) paimonNativeCatalog.getTable(partitionTableIdentifier);
RowType partitionTableRowType = table.rowType();
DataType lastUpdateTimeType = partitionTableRowType.getTypeAt(partitionTableRowType
.getFieldIndex("last_update_time"));
int[] projected = new int[] {0, 4};
RecordReader<InternalRow> recordReader = table.newReadBuilder().withProjection(projected)
.newRead().createReader(table.newScan().plan());
iterator = new RecordReaderIterator<>(recordReader);
while (iterator.hasNext()) {
InternalRow rowData = iterator.next();
String partition = rowData.getString(0).toString();
org.apache.paimon.data.Timestamp lastUpdateTime = rowData.getTimestamp(1,
DataTypeChecks.getPrecision(lastUpdateTimeType));
String[] partitionValues = partition.replace("[", "").replace("]", "")
.split(",");
if (partitionValues.length != partitionColumnNames.size()) {
String errorMsg = String.format("The length of partitionValues %s is not equal to " +
"the partitionColumnNames %s.", partitionValues.length, partitionColumnNames.size());
throw new IllegalArgumentException(errorMsg);
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < partitionValues.length; i++) {
String column = partitionColumnNames.get(i);
String value = partitionValues[i].trim();
if (partitionColumnTypes.get(i) instanceof DateType) {
value = DateTimeUtils.formatDate(Integer.parseInt(value));
}
sb.append(column).append("=").append(value);
sb.append("/");
}
sb.deleteCharAt(sb.length() - 1);
String partitionName = sb.toString();
this.partitionInfos.put(partitionName, lastUpdateTime.getMillisecond());
List<org.apache.paimon.partition.Partition> partitions = paimonNativeCatalog.listPartitions(identifier);
for (org.apache.paimon.partition.Partition partition : partitions) {
String partitionPath = PartitionPathUtils
.generatePartitionPath(partition.spec(), dataTableRowType);
String[] partitionValues = Arrays.stream(partitionPath.split("/"))
.map(part -> part.split("=")[1])
.toArray(String[]::new);
Partition srPartition = getPartition(partition.recordCount(),
partition.fileSizeInBytes(), partition.fileCount(),
partitionColumnNames, partitionColumnTypes, partitionValues,
Timestamp.fromEpochMillis(partition.lastFileCreationTime()));
this.partitionInfos.put(srPartition.getPartitionName(), srPartition);
}
} catch (Exception e) {
return;
} catch (Catalog.TableNotExistException e) {
LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e);
} finally {
if (iterator != null) {
try {
iterator.close();
} catch (Exception e) {
LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e);
}
}
}

private Partition getPartition(Long recordCount,
Long fileSizeInBytes,
Long fileCount,
List<String> partitionColumnNames,
List<DataType> partitionColumnTypes,
String[] partitionValues,
org.apache.paimon.data.Timestamp lastUpdateTime) {
if (partitionValues.length != partitionColumnNames.size()) {
String errorMsg = String.format("The length of partitionValues %s is not equal to " +
"the partitionColumnNames %s.", partitionValues.length, partitionColumnNames.size());
throw new IllegalArgumentException(errorMsg);
}

StringBuilder sb = new StringBuilder();
for (int i = 0; i < partitionValues.length; i++) {
String column = partitionColumnNames.get(i);
String value = partitionValues[i].trim();
if (partitionColumnTypes.get(i) instanceof DateType) {
value = DateTimeUtils.formatDate(Integer.parseInt(value));
}
sb.append(column).append("=").append(value);
sb.append("/");
}
sb.deleteCharAt(sb.length() - 1);
String partitionName = sb.toString();

return new Partition(partitionName, fileCount);

}

@Override
Expand Down Expand Up @@ -348,7 +353,7 @@ public long getTableCreateTime(String dbName, String tblName) {
DataType updateTimeType = rowType.getTypeAt(rowType.getFieldIndex("update_time"));
int[] projected = new int[] {0, 6};
PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0);
Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0L);
RecordReader<InternalRow> recordReader = table.newReadBuilder().withProjection(projected)
.withFilter(equal).newRead().createReader(table.newScan().plan());
iterator = new RecordReaderIterator<>(recordReader);
Expand Down Expand Up @@ -429,11 +434,57 @@ public List<PartitionInfo> getPartitions(Table table, List<String> partitionName
this.updatePartitionInfo(paimonTable.getCatalogDBName(), paimonTable.getCatalogTableName());
}
if (this.partitionInfos.get(partitionName) != null) {
result.add(new Partition(partitionName, this.partitionInfos.get(partitionName)));
result.add(this.partitionInfos.get(partitionName));
} else {
LOG.warn("Cannot find the paimon partition info: {}", partitionName);
}
}
return result;
}

@Override
public void refreshTable(String srDbName, Table table, List<String> partitionNames, boolean onlyCachedPartitions) {
String tableName = table.getCatalogTableName();
Identifier identifier = new Identifier(srDbName, tableName);
paimonNativeCatalog.invalidateTable(identifier);
try {
((PaimonTable) table).setPaimonNativeTable(paimonNativeCatalog.getTable(identifier));
if (partitionNames != null && !partitionNames.isEmpty()) {
// todo: do not support refresh an exact partition
this.refreshPartitionInfo(identifier);
} else {
this.refreshPartitionInfo(identifier);
}
// Preheat manifest files, disabled by default
if (Config.enable_paimon_refresh_manifest_files) {
if (partitionNames == null || partitionNames.isEmpty()) {
((PaimonTable) table).getNativeTable().newReadBuilder().newScan().plan();
} else {
List<String> partitionColumnNames = table.getPartitionColumnNames();
Map<String, String> partitionSpec = new HashMap<>();
for (String partitionName : partitionNames) {
partitionSpec.put(String.join(",", partitionColumnNames), partitionName);
}
((PaimonTable) table).getNativeTable().newReadBuilder()
.withPartitionFilter(partitionSpec).newScan().plan();
}
}
tables.put(identifier, table);
} catch (Exception e) {
LOG.error("Failed to refresh table {}.{}.{}.", catalogName, srDbName, tableName, e);
}
}

private void refreshPartitionInfo(Identifier identifier) {
if (paimonNativeCatalog instanceof CachingCatalog) {
try {
paimonNativeCatalog.invalidateTable(identifier);
((CachingCatalog) paimonNativeCatalog).refreshPartitions(identifier);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
} else {
LOG.warn("Current catalog {} does not support cache.", catalogName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2504,7 +2504,7 @@ public Future<TStatus> refreshOtherFesTable(TNetworkAddress thriftAddress, Table

private boolean supportRefreshTableType(Table table) {
return table.isHiveTable() || table.isHudiTable() || table.isHiveView() || table.isIcebergTable()
|| table.isJDBCTable() || table.isDeltalakeTable();
|| table.isJDBCTable() || table.isDeltalakeTable() || table.isPaimonTable();
}

public void refreshExternalTable(TableName tableName, List<String> partitions) {
Expand Down

0 comments on commit 22cb578

Please sign in to comment.