Skip to content

Commit

Permalink
[Enhancement] Enhance observability in profile for paimon queries
Browse files Browse the repository at this point in the history
Signed-off-by: Jiao Mingye <mxdzs0612@gmail.com>
  • Loading branch information
mxdzs0612 committed Feb 11, 2025
1 parent 6b04504 commit 5c4689a
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.ColumnTypeConverter;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
Expand All @@ -44,13 +46,20 @@
import com.starrocks.sql.optimizer.statistics.Statistics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.PartitionsTable;
Expand All @@ -67,6 +76,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR;
Expand Down Expand Up @@ -257,7 +267,11 @@ public List<RemoteFileInfo> getRemoteFiles(Table table, GetRemoteFilesParams par
int[] projected =
params.getFieldNames().stream().mapToInt(name -> (paimonTable.getFieldNames().indexOf(name))).toArray();
List<Predicate> predicates = extractPredicates(paimonTable, params.getPredicate());
List<Split> splits = readBuilder.withFilter(predicates).withProjection(projected).newScan().plan().splits();
InnerTableScan scan = (InnerTableScan) readBuilder.withFilter(predicates).withProjection(projected).newScan();
PaimonMetricRegistry paimonMetricRegistry = new PaimonMetricRegistry();
List<Split> splits = scan.withMetricsRegistry(paimonMetricRegistry).plan().splits();
traceScanMetrics(paimonMetricRegistry, splits, table.getCatalogTableName(), predicates);

PaimonSplitsInfo paimonSplitsInfo = new PaimonSplitsInfo(predicates, splits);
paimonSplits.put(filter, paimonSplitsInfo);
List<RemoteFileDesc> remoteFileDescs = ImmutableList.of(
Expand All @@ -272,6 +286,55 @@ public List<RemoteFileInfo> getRemoteFiles(Table table, GetRemoteFilesParams par
return Lists.newArrayList(remoteFileInfo);
}

private void traceScanMetrics(PaimonMetricRegistry metricRegistry,
List<Split> splits,
String tableName,
List<Predicate> predicates) {
// Don't need scan metrics when selecting system table, in which metric group is null.
if (metricRegistry.getMetricGroup() == null) {
return;
}
String prefix = "Paimon.plan.";

if (paimonNativeCatalog instanceof CachingCatalog) {
CachingCatalog.CacheSizes cacheSizes = ((CachingCatalog) paimonNativeCatalog).estimatedCacheSizes();
Tracers.record(prefix + "total.cachedDatabaseNumInCatalog", String.valueOf(cacheSizes.databaseCacheSize()));
Tracers.record(prefix + "total.cachedTableNumInCatalog", String.valueOf(cacheSizes.tableCacheSize()));
Tracers.record(prefix + "total.cachedManifestNumInCatalog", String.valueOf(cacheSizes.manifestCacheSize()));
Tracers.record(prefix + "total.cachedManifestBytesInCatalog", cacheSizes.manifestCacheBytes() + " B");
Tracers.record(prefix + "total.cachedPartitionNumInCatalog", String.valueOf(cacheSizes.partitionCacheSize()));
}

for (int i = 0; i < predicates.size(); i++) {
Tracers.record(prefix + tableName + ".filter." + i, predicates.get(i).toString());
}

Map<String, Metric> metrics = metricRegistry.getMetrics();
long manifestFileReadTime = (long) ((Gauge<?>) metrics.get(ScanMetrics.LAST_SCAN_DURATION)).getValue();
long scannedManifestFileNum = (long) ((Gauge<?>) metrics.get(ScanMetrics.LAST_SCANNED_MANIFESTS)).getValue();
long skippedDataFilesNum = (long) ((Gauge<?>) metrics.get(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES)).getValue();
long resultedDataFilesNum = (long) ((Gauge<?>) metrics.get(ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES)).getValue();
long manifestNumReadFromCache = (long) ((Gauge<?>) metrics.get(ScanMetrics.MANIFEST_HIT_CACHE)).getValue();
long manifestNumReadFromRemote = (long) ((Gauge<?>) metrics.get(ScanMetrics.MANIFEST_MISSED_CACHE)).getValue();

Tracers.record(prefix + tableName + "." + "manifestFileReadTime", manifestFileReadTime + "ms");
Tracers.record(prefix + tableName + "." + "scannedManifestFileNum", String.valueOf(scannedManifestFileNum));
Tracers.record(prefix + tableName + "." + "skippedDataFilesNum", String.valueOf(skippedDataFilesNum));
Tracers.record(prefix + tableName + "." + "resultedDataFilesNum", String.valueOf(resultedDataFilesNum));
Tracers.record(prefix + tableName + "." + "manifestNumReadFromCache",
String.valueOf(manifestNumReadFromCache));
Tracers.record(prefix + tableName + "." + "manifestNumReadFromRemote",
String.valueOf(manifestNumReadFromRemote));
Tracers.record(prefix + "total.resultSplitsNum", String.valueOf(splits.size()));

AtomicLong resultedTableFilesSize = new AtomicLong(0);
for (Split split : splits) {
List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
dataFileMetas.forEach(dataFileMeta -> resultedTableFilesSize.addAndGet(dataFileMeta.fileSize()));
}
Tracers.record(prefix + tableName + "." + "resultedDataFilesSize", resultedTableFilesSize.get() + " B");
}

@Override
public Statistics getTableStatistics(OptimizerContext session,
Table table,
Expand All @@ -280,29 +343,31 @@ public Statistics getTableStatistics(OptimizerContext session,
ScalarOperator predicate,
long limit,
TableVersionRange versionRange) {
if (!properties.enableGetTableStatsFromExternalMetadata()) {
return StatisticsUtils.buildDefaultStatistics(columns.keySet());
}
try (Timer ignored = Tracers.watchScope("GetPaimonTableStatistics")) {
if (!properties.enableGetTableStatsFromExternalMetadata()) {
return StatisticsUtils.buildDefaultStatistics(columns.keySet());
}

Statistics.Builder builder = Statistics.builder();
for (ColumnRefOperator columnRefOperator : columns.keySet()) {
builder.addColumnStatistic(columnRefOperator, ColumnStatistic.unknown());
}
Statistics.Builder builder = Statistics.builder();
for (ColumnRefOperator columnRefOperator : columns.keySet()) {
builder.addColumnStatistic(columnRefOperator, ColumnStatistic.unknown());
}

List<String> fieldNames = columns.keySet().stream().map(ColumnRefOperator::getName).collect(Collectors.toList());
GetRemoteFilesParams params =
GetRemoteFilesParams.newBuilder().setPredicate(predicate).setFieldNames(fieldNames).setLimit(limit).build();
List<RemoteFileInfo> fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(table, params);
PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0);
List<Split> splits = remoteFileDesc.getPaimonSplitsInfo().getPaimonSplits();
long rowCount = getRowCount(splits);
if (rowCount == 0) {
builder.setOutputRowCount(1);
} else {
builder.setOutputRowCount(rowCount);
}
List<String> fieldNames = columns.keySet().stream().map(ColumnRefOperator::getName).collect(Collectors.toList());
GetRemoteFilesParams params =
GetRemoteFilesParams.newBuilder().setPredicate(predicate).setFieldNames(fieldNames).setLimit(limit).build();
List<RemoteFileInfo> fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(table, params);
PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0);
List<Split> splits = remoteFileDesc.getPaimonSplitsInfo().getPaimonSplits();
long rowCount = getRowCount(splits);
if (rowCount == 0) {
builder.setOutputRowCount(1);
} else {
builder.setOutputRowCount(rowCount);
}

return builder.build();
return builder.build();
}
}

public static long getRowCount(List<? extends Split> splits) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.paimon;

import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricGroupImpl;
import org.apache.paimon.metrics.MetricRegistry;

import java.util.Map;

public class PaimonMetricRegistry extends MetricRegistry {
private MetricGroup metricGroup;

@Override
protected MetricGroup createMetricGroup(String groupName, Map<String, String> variables) {
MetricGroup metricGroup = new MetricGroupImpl(groupName, variables);
this.metricGroup = metricGroup;
return metricGroup;
}

public MetricGroup getMetricGroup() {
return metricGroup;
}

public Map<String, Metric> getMetrics() {
return metricGroup.getMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.starrocks.catalog.Column;
import com.starrocks.catalog.PaimonTable;
import com.starrocks.catalog.Type;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.CatalogConnector;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.GetRemoteFilesParams;
Expand Down Expand Up @@ -129,8 +131,11 @@ public void setupScanRangeLocations(TupleDescriptor tupleDescriptor, ScalarOpera
tupleDescriptor.getSlots().stream().map(s -> s.getColumn().getName()).collect(Collectors.toList());
GetRemoteFilesParams params =
GetRemoteFilesParams.newBuilder().setPredicate(predicate).setFieldNames(fieldNames).build();
List<RemoteFileInfo> fileInfos =
GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(paimonTable, params);
List<RemoteFileInfo> fileInfos;
try (Timer ignored = Tracers.watchScope(paimonTable.getCatalogTableName() + ".getPaimonRemoteFileInfos")) {
fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(paimonTable, params);
}

PaimonRemoteFileDesc remoteFileDesc = (PaimonRemoteFileDesc) fileInfos.get(0).getFiles().get(0);
PaimonSplitsInfo splitsInfo = remoteFileDesc.getPaimonSplitsInfo();
String predicateInfo = encodeObjectToString(splitsInfo.getPredicate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
Expand Down Expand Up @@ -276,7 +277,8 @@ public InternalRow next() {

@Test
public void testGetRemoteFiles(@Mocked FileStoreTable paimonNativeTable,
@Mocked ReadBuilder readBuilder)
@Mocked ReadBuilder readBuilder,
@Mocked InnerTableScan scan)
throws Catalog.TableNotExistException {
new MockUp<PaimonMetadata>() {
@Mock
Expand All @@ -292,6 +294,8 @@ public long getTableCreateTime(String dbName, String tblName) {
result = readBuilder;
readBuilder.withFilter((List<Predicate>) any).withProjection((int[]) any).newScan().plan().splits();
result = splits;
readBuilder.newScan();
result = scan;
}
};
PaimonTable paimonTable = (PaimonTable) metadata.getTable("db1", "tbl1");
Expand Down

0 comments on commit 5c4689a

Please sign in to comment.