diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java index ee38b78268c8df..061a83784b588a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java @@ -22,6 +22,8 @@ import com.starrocks.catalog.PartitionKey; import com.starrocks.catalog.Table; import com.starrocks.catalog.Type; +import com.starrocks.common.profile.Timer; +import com.starrocks.common.profile.Tracers; import com.starrocks.connector.ColumnTypeConverter; import com.starrocks.connector.ConnectorMetadatRequestContext; import com.starrocks.connector.ConnectorMetadata; @@ -44,13 +46,20 @@ import com.starrocks.sql.optimizer.statistics.Statistics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.CachingCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.metrics.Gauge; +import org.apache.paimon.metrics.Metric; +import org.apache.paimon.operation.metrics.ScanMetrics; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.system.PartitionsTable; @@ -67,6 +76,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR; @@ -257,7 +267,11 @@ public List getRemoteFiles(Table table, GetRemoteFilesParams par int[] projected = params.getFieldNames().stream().mapToInt(name -> (paimonTable.getFieldNames().indexOf(name))).toArray(); List predicates = extractPredicates(paimonTable, params.getPredicate()); - List splits = readBuilder.withFilter(predicates).withProjection(projected).newScan().plan().splits(); + InnerTableScan scan = (InnerTableScan) readBuilder.withFilter(predicates).withProjection(projected).newScan(); + PaimonMetricRegistry paimonMetricRegistry = new PaimonMetricRegistry(); + List splits = scan.withMetricsRegistry(paimonMetricRegistry).plan().splits(); + traceScanMetrics(paimonMetricRegistry, splits, table.getCatalogTableName(), predicates); + PaimonSplitsInfo paimonSplitsInfo = new PaimonSplitsInfo(predicates, splits); paimonSplits.put(filter, paimonSplitsInfo); List remoteFileDescs = ImmutableList.of( @@ -272,6 +286,55 @@ public List getRemoteFiles(Table table, GetRemoteFilesParams par return Lists.newArrayList(remoteFileInfo); } + private void traceScanMetrics(PaimonMetricRegistry metricRegistry, + List splits, + String tableName, + List predicates) { + // Don't need scan metrics when selecting system table, in which metric group is null. + if (metricRegistry.getMetricGroup() == null) { + return; + } + String prefix = "Paimon.plan."; + + if (paimonNativeCatalog instanceof CachingCatalog) { + CachingCatalog.CacheSizes cacheSizes = ((CachingCatalog) paimonNativeCatalog).estimatedCacheSizes(); + Tracers.record(prefix + "total.cachedDatabaseNumInCatalog", String.valueOf(cacheSizes.databaseCacheSize())); + Tracers.record(prefix + "total.cachedTableNumInCatalog", String.valueOf(cacheSizes.tableCacheSize())); + Tracers.record(prefix + "total.cachedManifestNumInCatalog", String.valueOf(cacheSizes.manifestCacheSize())); + Tracers.record(prefix + "total.cachedManifestBytesInCatalog", cacheSizes.manifestCacheBytes() + " B"); + Tracers.record(prefix + "total.cachedPartitionNumInCatalog", String.valueOf(cacheSizes.partitionCacheSize())); + } + + for (int i = 0; i < predicates.size(); i++) { + Tracers.record(prefix + tableName + ".filter." + i, predicates.get(i).toString()); + } + + Map metrics = metricRegistry.getMetrics(); + long manifestFileReadTime = (long) ((Gauge) metrics.get(ScanMetrics.LAST_SCAN_DURATION)).getValue(); + long scannedManifestFileNum = (long) ((Gauge) metrics.get(ScanMetrics.LAST_SCANNED_MANIFESTS)).getValue(); + long skippedDataFilesNum = (long) ((Gauge) metrics.get(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES)).getValue(); + long resultedDataFilesNum = (long) ((Gauge) metrics.get(ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES)).getValue(); + long manifestNumReadFromCache = (long) ((Gauge) metrics.get(ScanMetrics.MANIFEST_HIT_CACHE)).getValue(); + long manifestNumReadFromRemote = (long) ((Gauge) metrics.get(ScanMetrics.MANIFEST_MISSED_CACHE)).getValue(); + + Tracers.record(prefix + tableName + "." + "manifestFileReadTime", manifestFileReadTime + "ms"); + Tracers.record(prefix + tableName + "." + "scannedManifestFileNum", String.valueOf(scannedManifestFileNum)); + Tracers.record(prefix + tableName + "." + "skippedDataFilesNum", String.valueOf(skippedDataFilesNum)); + Tracers.record(prefix + tableName + "." + "resultedDataFilesNum", String.valueOf(resultedDataFilesNum)); + Tracers.record(prefix + tableName + "." + "manifestNumReadFromCache", + String.valueOf(manifestNumReadFromCache)); + Tracers.record(prefix + tableName + "." + "manifestNumReadFromRemote", + String.valueOf(manifestNumReadFromRemote)); + Tracers.record(prefix + "total.resultSplitsNum", String.valueOf(splits.size())); + + AtomicLong resultedTableFilesSize = new AtomicLong(0); + for (Split split : splits) { + List dataFileMetas = ((DataSplit) split).dataFiles(); + dataFileMetas.forEach(dataFileMeta -> resultedTableFilesSize.addAndGet(dataFileMeta.fileSize())); + } + Tracers.record(prefix + tableName + "." + "resultedDataFilesSize", resultedTableFilesSize.get() + " B"); + } + @Override public Statistics getTableStatistics(OptimizerContext session, Table table, @@ -280,29 +343,31 @@ public Statistics getTableStatistics(OptimizerContext session, ScalarOperator predicate, long limit, TableVersionRange versionRange) { - if (!properties.enableGetTableStatsFromExternalMetadata()) { - return StatisticsUtils.buildDefaultStatistics(columns.keySet()); - } + try (Timer ignored = Tracers.watchScope("GetPaimonTableStatistics")) { + if (!properties.enableGetTableStatsFromExternalMetadata()) { + return StatisticsUtils.buildDefaultStatistics(columns.keySet()); + } - Statistics.Builder builder = Statistics.builder(); - for (ColumnRefOperator columnRefOperator : columns.keySet()) { - builder.addColumnStatistic(columnRefOperator, ColumnStatistic.unknown()); - } + Statistics.Builder builder = Statistics.builder(); + for (ColumnRefOperator columnRefOperator : columns.keySet()) { + builder.addColumnStatistic(columnRefOperator, ColumnStatistic.unknown()); + } - List fieldNames = columns.keySet().stream().map(ColumnRefOperator::getName).collect(Collectors.toList()); - GetRemoteFilesParams params = - GetRemoteFilesParams.newBuilder().setPredicate(predicate).setFieldNames(fieldNames).setLimit(limit).build(); - List fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(table, params); - PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0); - List splits = remoteFileDesc.getPaimonSplitsInfo().getPaimonSplits(); - long rowCount = getRowCount(splits); - if (rowCount == 0) { - builder.setOutputRowCount(1); - } else { - builder.setOutputRowCount(rowCount); - } + List fieldNames = columns.keySet().stream().map(ColumnRefOperator::getName).collect(Collectors.toList()); + GetRemoteFilesParams params = + GetRemoteFilesParams.newBuilder().setPredicate(predicate).setFieldNames(fieldNames).setLimit(limit).build(); + List fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(table, params); + PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0); + List splits = remoteFileDesc.getPaimonSplitsInfo().getPaimonSplits(); + long rowCount = getRowCount(splits); + if (rowCount == 0) { + builder.setOutputRowCount(1); + } else { + builder.setOutputRowCount(rowCount); + } - return builder.build(); + return builder.build(); + } } public static long getRowCount(List splits) { diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetricRegistry.java b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetricRegistry.java new file mode 100644 index 00000000000000..3078b251ab2546 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetricRegistry.java @@ -0,0 +1,41 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.paimon; + +import org.apache.paimon.metrics.Metric; +import org.apache.paimon.metrics.MetricGroup; +import org.apache.paimon.metrics.MetricGroupImpl; +import org.apache.paimon.metrics.MetricRegistry; + +import java.util.Map; + +public class PaimonMetricRegistry extends MetricRegistry { + private MetricGroup metricGroup; + + @Override + protected MetricGroup createMetricGroup(String groupName, Map variables) { + MetricGroup metricGroup = new MetricGroupImpl(groupName, variables); + this.metricGroup = metricGroup; + return metricGroup; + } + + public MetricGroup getMetricGroup() { + return metricGroup; + } + + public Map getMetrics() { + return metricGroup.getMetrics(); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/PaimonScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/PaimonScanNode.java index 365e71898b1146..67063644ff37bb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/PaimonScanNode.java @@ -23,6 +23,8 @@ import com.starrocks.catalog.Column; import com.starrocks.catalog.PaimonTable; import com.starrocks.catalog.Type; +import com.starrocks.common.profile.Timer; +import com.starrocks.common.profile.Tracers; import com.starrocks.connector.CatalogConnector; import com.starrocks.connector.ConnectorMetadatRequestContext; import com.starrocks.connector.GetRemoteFilesParams; @@ -129,8 +131,11 @@ public void setupScanRangeLocations(TupleDescriptor tupleDescriptor, ScalarOpera tupleDescriptor.getSlots().stream().map(s -> s.getColumn().getName()).collect(Collectors.toList()); GetRemoteFilesParams params = GetRemoteFilesParams.newBuilder().setPredicate(predicate).setFieldNames(fieldNames).build(); - List fileInfos = - GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(paimonTable, params); + List fileInfos; + try (Timer ignored = Tracers.watchScope(paimonTable.getCatalogTableName() + ".getPaimonRemoteFileInfos")) { + fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(paimonTable, params); + } + PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0); PaimonSplitsInfo splitsInfo = remoteFileDesc.getPaimonSplitsInfo(); String predicateInfo = encodeObjectToString(splitsInfo.getPredicate()); diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/paimon/PaimonMetadataTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/paimon/PaimonMetadataTest.java index e6acd969928c76..4a913bc761ced7 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/paimon/PaimonMetadataTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/paimon/PaimonMetadataTest.java @@ -53,6 +53,7 @@ import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableScan; @@ -276,7 +277,8 @@ public InternalRow next() { @Test public void testGetRemoteFiles(@Mocked FileStoreTable paimonNativeTable, - @Mocked ReadBuilder readBuilder) + @Mocked ReadBuilder readBuilder, + @Mocked InnerTableScan scan) throws Catalog.TableNotExistException { new MockUp() { @Mock @@ -292,6 +294,8 @@ public long getTableCreateTime(String dbName, String tblName) { result = readBuilder; readBuilder.withFilter((List) any).withProjection((int[]) any).newScan().plan().splits(); result = splits; + readBuilder.newScan(); + result = scan; } }; PaimonTable paimonTable = (PaimonTable) metadata.getTable("db1", "tbl1");