From 22a70a7cf3b7e6e86d30d42d0a5d4034c4c2716c Mon Sep 17 00:00:00 2001 From: Zac Blanco Date: Mon, 22 Apr 2024 15:15:16 -0400 Subject: [PATCH] [Iceberg] Collect superset of supported statistics The set of ColumnStatisticsMetadata defined by the Hive and non-Hive connectors are not equivalent. However, it is possible to collect the superset of the relevant metadata and use it for ANALYZE. The returned statistics just need to be filtered out to contain only the relevant column statistics. This may include duplicate calculations for some statistics. For example, with distinct values Iceberg puffin files can store the result of sketch_theta for distinct values, but the code path for storing the statistic in the HMS requires a direct value from approx_distinct. Thus, ANALYZE may compute a value twice. --- .../presto/hive/HiveStatisticsUtil.java | 25 ++++- .../presto/iceberg/IcebergHiveMetadata.java | 44 +++++++-- .../presto/iceberg/util/StatisticsUtil.java | 1 + .../presto/iceberg/IcebergQueryRunner.java | 2 +- .../hive/TestIcebergHiveStatistics.java | 93 ++++++++++++++++++- .../sql/planner/BasePlanFragmenter.java | 3 +- .../presto/sql/planner/LogicalPlanner.java | 3 +- .../planner/StatisticsAggregationPlanner.java | 7 +- 8 files changed, 162 insertions(+), 16 deletions(-) diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java index f05c524f0012d..b138a899c94cf 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java @@ -28,11 +28,13 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.hive.metastore.Statistics.fromComputedStatistics; import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableMap.toImmutableMap; public final class HiveStatisticsUtil { @@ -61,9 +63,14 @@ public static PartitionStatistics createPartitionStatistics( ConnectorSession session, Map columnTypes, ComputedStatistics computedStatistics, + Set supportedColumnStatistics, DateTimeZone timeZone) { - Map computedColumnStatistics = computedStatistics.getColumnStatistics(); + Map computedColumnStatistics = computedStatistics.getColumnStatistics() + .entrySet() + .stream() + .filter((entry) -> supportedColumnStatistics.contains(entry.getKey())) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); Block rowCountBlock = Optional.ofNullable(computedStatistics.getTableStatistics().get(ROW_COUNT)) .orElseThrow(() -> new VerifyException("rowCount not present")); @@ -73,6 +80,15 @@ public static PartitionStatistics createPartitionStatistics( return createPartitionStatistics(session, rowCountOnlyBasicStatistics, columnTypes, computedColumnStatistics, timeZone); } + public static PartitionStatistics createPartitionStatistics( + ConnectorSession session, + Map columnTypes, + ComputedStatistics computedStatistics, + DateTimeZone timeZone) + { + return createPartitionStatistics(session, columnTypes, computedStatistics, computedStatistics.getColumnStatistics().keySet(), timeZone); + } + public static Map getColumnStatistics(Map, ComputedStatistics> statistics, List partitionValues) { return Optional.ofNullable(statistics.get(partitionValues)) @@ -81,10 +97,11 @@ public static Map getColumnStatistics(Map hiveColumnStatistics = getHiveSupportedColumnStatistics(session, icebergTable, tableMetadata); + Set supportedStatistics = ImmutableSet.builder() + .addAll(hiveColumnStatistics) + // iceberg table-supported statistics + .addAll(super.getStatisticsCollectionMetadata(session, tableMetadata).getColumnStatistics()) + .build(); + Set tableStatistics = ImmutableSet.of(ROW_COUNT); + return new TableStatisticsMetadata(supportedStatistics, tableStatistics, emptyList()); + } + + private Set getHiveSupportedColumnStatistics(ConnectorSession session, org.apache.iceberg.Table table, ConnectorTableMetadata tableMetadata) + { MetricsConfig metricsConfig = MetricsConfig.forTable(table); - Set columnStatistics = tableMetadata.getColumns().stream() - .filter(column -> !column.isHidden() && metricsConfig.columnMode(column.getName()) != None.get()) + return tableMetadata.getColumns().stream() + .filter(column -> !column.isHidden()) + .filter(column -> metricsConfig.columnMode(column.getName()) != None.get()) .flatMap(meta -> { try { return metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType()) @@ -494,9 +507,6 @@ public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession } }) .collect(toImmutableSet()); - - Set tableStatistics = ImmutableSet.of(ROW_COUNT); - return new TableStatisticsMetadata(columnStatistics, tableStatistics, emptyList()); } @Override @@ -525,11 +535,31 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH Map, ComputedStatistics> computedStatisticsMap = createComputedStatisticsToPartitionMap(computedStatistics, partitionColumnNames, columnTypes); // commit analyze to unpartitioned table - PartitionStatistics tableStatistics = createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.of()), timeZone); + ConnectorTableMetadata metadata = getTableMetadata(session, tableHandle); + org.apache.iceberg.Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName()); + Set hiveSupportedStatistics = getHiveSupportedColumnStatistics(session, icebergTable, metadata); + PartitionStatistics tableStatistics = createPartitionStatistics( + session, + columnTypes, + computedStatisticsMap.get(ImmutableList.of()), + hiveSupportedStatistics, + timeZone); metastore.updateTableStatistics(metastoreContext, table.getDatabaseName(), table.getTableName(), oldStats -> updatePartitionStatistics(oldStats, tableStatistics)); + + Set icebergSupportedStatistics = super.getStatisticsCollectionMetadata(session, metadata).getColumnStatistics(); + Collection icebergComputedStatistics = computedStatistics.stream().map(stat -> { + ComputedStatistics.Builder builder = ComputedStatistics.builder(stat.getGroupingColumns(), stat.getGroupingValues()); + stat.getTableStatistics() + .forEach(builder::addTableStatistic); + stat.getColumnStatistics().entrySet().stream() + .filter(entry -> icebergSupportedStatistics.contains(entry.getKey())) + .forEach(entry -> builder.addColumnStatistic(entry.getKey(), entry.getValue())); + return builder.build(); + }).collect(toImmutableList()); + super.finishStatisticsCollection(session, tableHandle, icebergComputedStatistics); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java index a0587ccd8b069..f19ae11b07956 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java @@ -103,6 +103,7 @@ public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatist .setRange(icebergColumnStats.getRange()) .setNullsFraction(icebergColumnStats.getNullsFraction()) .setDistinctValuesCount(icebergColumnStats.getDistinctValuesCount()) + .setHistogram(icebergColumnStats.getHistogram()) .setRange(icebergColumnStats.getRange()); if (hiveColumnStats != null) { // NDVs diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java index 11be7a0e9857c..0d12c203b80f8 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java @@ -215,7 +215,7 @@ public static DistributedQueryRunner createIcebergQueryRunner( queryRunner.createCatalog("jmx", "jmx"); } - if (catalogType == HIVE.name()) { + if (catalogType.equals(HIVE.name())) { ExtendedHiveMetastore metastore = getFileHiveMetastore(icebergDataDirectory); if (!metastore.getDatabase(METASTORE_CONTEXT, "tpch").isPresent()) { queryRunner.execute("CREATE SCHEMA tpch"); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java index 98fd3e94c2f45..be90d5fc0dd02 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java @@ -20,11 +20,27 @@ import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.predicate.ValueSet; import com.facebook.presto.common.transaction.TransactionId; +import com.facebook.presto.hive.HdfsConfiguration; +import com.facebook.presto.hive.HdfsConfigurationInitializer; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveHdfsConfiguration; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.authentication.NoHdfsAuthentication; +import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; +import com.facebook.presto.hive.metastore.file.FileHiveMetastore; +import com.facebook.presto.iceberg.CatalogType; import com.facebook.presto.iceberg.IcebergColumnHandle; +import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig; import com.facebook.presto.iceberg.IcebergMetadataColumn; +import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.analyzer.MetadataResolver; import com.facebook.presto.spi.plan.TableScanNode; @@ -39,16 +55,22 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateStatistics; import org.intellij.lang.annotations.Language; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.File; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -56,9 +78,14 @@ import static com.facebook.presto.common.type.DoubleType.DOUBLE; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; +import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore; +import static com.facebook.presto.iceberg.CatalogType.HIVE; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.IcebergQueryRunner.TEST_DATA_DIRECTORY; import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static com.facebook.presto.iceberg.IcebergSessionProperties.HIVE_METASTORE_STATISTICS_MERGE_STRATEGY; import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; import static com.facebook.presto.testing.assertions.Assert.assertEquals; @@ -159,6 +186,7 @@ public void testStatsWithPartitionedTableAnalyzed() assertQuerySucceeds("CREATE TABLE statsWithPartitionAnalyze WITH (partitioning = ARRAY['orderdate']) as SELECT * FROM statsNoPartitionAnalyze"); assertQuerySucceeds("ANALYZE statsNoPartitionAnalyze"); assertQuerySucceeds("ANALYZE statsWithPartitionAnalyze"); + deleteTableStatistics("statsWithPartitionAnalyze"); Metadata meta = getQueryRunner().getMetadata(); TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false); Session session = getSession().beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl()); @@ -295,12 +323,17 @@ public void testHiveStatisticsMergeFlags() { assertQuerySucceeds("CREATE TABLE mergeFlagsStats (i int, v varchar)"); assertQuerySucceeds("INSERT INTO mergeFlagsStats VALUES (0, '1'), (1, '22'), (2, '333'), (NULL, 'aaaaa'), (4, NULL)"); - assertQuerySucceeds("ANALYZE mergeFlagsStats"); // stats stored in + assertQuerySucceeds("ANALYZE mergeFlagsStats"); + + // invalidate puffin files so only hive stats can be returned + deleteTableStatistics("mergeFlagsStats"); + // Test stats without merging doesn't return NDVs or data size Session session = Session.builder(getSession()) .setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, "") .build(); TableStatistics stats = getTableStatistics(session, "mergeFlagsStats"); + Map columnStatistics = getColumnNameMap(stats); assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.unknown()); assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown()); @@ -468,6 +501,64 @@ static void assertStatValue(StatsSchema column, MaterializedResult result, Set new RuntimeException("Catalog directory does not exist: " + getCatalogDirectory(HIVE))), + "test"); + return memoizeMetastore(fileHiveMetastore, false, 1000, 0); + } + + protected static HdfsEnvironment getHdfsEnvironment() + { + HiveClientConfig hiveClientConfig = new HiveClientConfig(); + MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig(); + HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), + ImmutableSet.of(), + hiveClientConfig); + return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication()); + } + + protected File getCatalogDirectory(CatalogType catalogType) + { + Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + switch (catalogType) { + case HIVE: + return dataDirectory + .resolve(TEST_DATA_DIRECTORY) + .resolve(HIVE.name()) + .toFile(); + case HADOOP: + case NESSIE: + return dataDirectory.toFile(); + } + + throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType); + } + private static Map getColumnNameMap(TableStatistics statistics) { return statistics.getColumnStatistics().entrySet().stream().collect(Collectors.toMap(e -> diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index 190271f3e39ae..ceb0186ffb413 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -62,6 +62,7 @@ import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.shouldOptimizerUseHistograms; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables; import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames; @@ -121,7 +122,7 @@ public BasePlanFragmenter( this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null"); this.outputTableWriterNodeIds = ImmutableSet.copyOf(requireNonNull(outputTableWriterNodeIds, "outputTableWriterNodeIds is null")); - this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(variableAllocator, metadata.getFunctionAndTypeManager(), session); + this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(variableAllocator, metadata.getFunctionAndTypeManager(), session, shouldOptimizerUseHistograms(session)); } public SubPlan buildRootFragment(PlanNode root, FragmentProperties properties) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java index 9d0d1aaaf1804..2b68e642dec97 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java @@ -86,6 +86,7 @@ import java.util.Optional; import java.util.Set; +import static com.facebook.presto.SystemSessionProperties.shouldOptimizerUseHistograms; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.metadata.MetadataUtil.getConnectorIdOrThrow; @@ -134,7 +135,7 @@ public LogicalPlanner( this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); this.metadata = requireNonNull(metadata, "metadata is null"); this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null"); - this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(this.variableAllocator, metadata.getFunctionAndTypeManager(), session); + this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(this.variableAllocator, metadata.getFunctionAndTypeManager(), session, shouldOptimizerUseHistograms(session)); this.sqlParser = requireNonNull(sqlParser, "sqlParser is null"); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java index b84a5e23d7b51..01da076c68078 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java @@ -58,15 +58,17 @@ public class StatisticsAggregationPlanner { private final VariableAllocator variableAllocator; private final FunctionAndTypeResolver functionAndTypeResolver; + private final boolean useHistograms; private final Session session; private final FunctionAndTypeManager functionAndTypeManager; - public StatisticsAggregationPlanner(VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager, Session session) + public StatisticsAggregationPlanner(VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager, Session session, boolean useHistograms) { this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null"); this.session = requireNonNull(session, "session is null"); this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null"); this.functionAndTypeResolver = functionAndTypeManager.getFunctionAndTypeResolver(); + this.useHistograms = useHistograms; } public TableStatisticAggregation createStatisticsAggregation(TableStatisticsMetadata statisticsMetadata, Map columnToVariableMap) @@ -105,6 +107,9 @@ public TableStatisticAggregation createStatisticsAggregation(TableStatisticsMeta } for (ColumnStatisticMetadata columnStatisticMetadata : statisticsMetadata.getColumnStatistics()) { + if (!useHistograms && columnStatisticMetadata.getStatisticType() == ColumnStatisticType.HISTOGRAM) { + continue; + } String columnName = columnStatisticMetadata.getColumnName(); ColumnStatisticType statisticType = columnStatisticMetadata.getStatisticType(); VariableReferenceExpression inputVariable = columnToVariableMap.get(columnName);