diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java index f05c524f0012d..b138a899c94cf 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java @@ -28,11 +28,13 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.hive.metastore.Statistics.fromComputedStatistics; import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableMap.toImmutableMap; public final class HiveStatisticsUtil { @@ -61,9 +63,14 @@ public static PartitionStatistics createPartitionStatistics( ConnectorSession session, Map columnTypes, ComputedStatistics computedStatistics, + Set supportedColumnStatistics, DateTimeZone timeZone) { - Map computedColumnStatistics = computedStatistics.getColumnStatistics(); + Map computedColumnStatistics = computedStatistics.getColumnStatistics() + .entrySet() + .stream() + .filter((entry) -> supportedColumnStatistics.contains(entry.getKey())) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); Block rowCountBlock = Optional.ofNullable(computedStatistics.getTableStatistics().get(ROW_COUNT)) .orElseThrow(() -> new VerifyException("rowCount not present")); @@ -73,6 +80,15 @@ public static PartitionStatistics createPartitionStatistics( return createPartitionStatistics(session, rowCountOnlyBasicStatistics, columnTypes, computedColumnStatistics, timeZone); } + public static PartitionStatistics createPartitionStatistics( + ConnectorSession session, + Map columnTypes, + ComputedStatistics computedStatistics, + DateTimeZone timeZone) + { + return createPartitionStatistics(session, columnTypes, computedStatistics, computedStatistics.getColumnStatistics().keySet(), timeZone); + } + public static Map getColumnStatistics(Map, ComputedStatistics> statistics, List partitionValues) { return Optional.ofNullable(statistics.get(partitionValues)) @@ -81,10 +97,11 @@ public static Map getColumnStatistics(Map hiveColumnStatistics = getHiveSupportedColumnStatistics(session, icebergTable, tableMetadata); + Set supportedStatistics = ImmutableSet.builder() + .addAll(hiveColumnStatistics) + // iceberg table-supported statistics + .addAll(super.getStatisticsCollectionMetadata(session, tableMetadata).getColumnStatistics()) + .build(); + Set tableStatistics = ImmutableSet.of(ROW_COUNT); + return new TableStatisticsMetadata(supportedStatistics, tableStatistics, emptyList()); + } + + private Set getHiveSupportedColumnStatistics(ConnectorSession session, org.apache.iceberg.Table table, ConnectorTableMetadata tableMetadata) + { MetricsConfig metricsConfig = MetricsConfig.forTable(table); - Set columnStatistics = tableMetadata.getColumns().stream() - .filter(column -> !column.isHidden() && metricsConfig.columnMode(column.getName()) != None.get()) + return tableMetadata.getColumns().stream() + .filter(column -> !column.isHidden()) + .filter(column -> metricsConfig.columnMode(column.getName()) != None.get()) .flatMap(meta -> { try { return metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType()) @@ -494,9 +507,6 @@ public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession } }) .collect(toImmutableSet()); - - Set tableStatistics = ImmutableSet.of(ROW_COUNT); - return new TableStatisticsMetadata(columnStatistics, tableStatistics, emptyList()); } @Override @@ -525,11 +535,31 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH Map, ComputedStatistics> computedStatisticsMap = createComputedStatisticsToPartitionMap(computedStatistics, partitionColumnNames, columnTypes); // commit analyze to unpartitioned table - PartitionStatistics tableStatistics = createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.of()), timeZone); + ConnectorTableMetadata metadata = getTableMetadata(session, tableHandle); + org.apache.iceberg.Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName()); + Set hiveSupportedStatistics = getHiveSupportedColumnStatistics(session, icebergTable, metadata); + PartitionStatistics tableStatistics = createPartitionStatistics( + session, + columnTypes, + computedStatisticsMap.get(ImmutableList.of()), + hiveSupportedStatistics, + timeZone); metastore.updateTableStatistics(metastoreContext, table.getDatabaseName(), table.getTableName(), oldStats -> updatePartitionStatistics(oldStats, tableStatistics)); + + Set icebergSupportedStatistics = super.getStatisticsCollectionMetadata(session, metadata).getColumnStatistics(); + Collection icebergComputedStatistics = computedStatistics.stream().map(stat -> { + ComputedStatistics.Builder builder = ComputedStatistics.builder(stat.getGroupingColumns(), stat.getGroupingValues()); + stat.getTableStatistics() + .forEach(builder::addTableStatistic); + stat.getColumnStatistics().entrySet().stream() + .filter(entry -> icebergSupportedStatistics.contains(entry.getKey())) + .forEach(entry -> builder.addColumnStatistic(entry.getKey(), entry.getValue())); + return builder.build(); + }).collect(toImmutableList()); + super.finishStatisticsCollection(session, tableHandle, icebergComputedStatistics); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java index a0587ccd8b069..f19ae11b07956 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java @@ -103,6 +103,7 @@ public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatist .setRange(icebergColumnStats.getRange()) .setNullsFraction(icebergColumnStats.getNullsFraction()) .setDistinctValuesCount(icebergColumnStats.getDistinctValuesCount()) + .setHistogram(icebergColumnStats.getHistogram()) .setRange(icebergColumnStats.getRange()); if (hiveColumnStats != null) { // NDVs diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java index 11be7a0e9857c..0d12c203b80f8 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java @@ -215,7 +215,7 @@ public static DistributedQueryRunner createIcebergQueryRunner( queryRunner.createCatalog("jmx", "jmx"); } - if (catalogType == HIVE.name()) { + if (catalogType.equals(HIVE.name())) { ExtendedHiveMetastore metastore = getFileHiveMetastore(icebergDataDirectory); if (!metastore.getDatabase(METASTORE_CONTEXT, "tpch").isPresent()) { queryRunner.execute("CREATE SCHEMA tpch"); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java index 98fd3e94c2f45..be90d5fc0dd02 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java @@ -20,11 +20,27 @@ import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.predicate.ValueSet; import com.facebook.presto.common.transaction.TransactionId; +import com.facebook.presto.hive.HdfsConfiguration; +import com.facebook.presto.hive.HdfsConfigurationInitializer; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveHdfsConfiguration; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.authentication.NoHdfsAuthentication; +import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; +import com.facebook.presto.hive.metastore.file.FileHiveMetastore; +import com.facebook.presto.iceberg.CatalogType; import com.facebook.presto.iceberg.IcebergColumnHandle; +import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig; import com.facebook.presto.iceberg.IcebergMetadataColumn; +import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.analyzer.MetadataResolver; import com.facebook.presto.spi.plan.TableScanNode; @@ -39,16 +55,22 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateStatistics; import org.intellij.lang.annotations.Language; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.File; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -56,9 +78,14 @@ import static com.facebook.presto.common.type.DoubleType.DOUBLE; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; +import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore; +import static com.facebook.presto.iceberg.CatalogType.HIVE; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.IcebergQueryRunner.TEST_DATA_DIRECTORY; import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static com.facebook.presto.iceberg.IcebergSessionProperties.HIVE_METASTORE_STATISTICS_MERGE_STRATEGY; import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; import static com.facebook.presto.testing.assertions.Assert.assertEquals; @@ -159,6 +186,7 @@ public void testStatsWithPartitionedTableAnalyzed() assertQuerySucceeds("CREATE TABLE statsWithPartitionAnalyze WITH (partitioning = ARRAY['orderdate']) as SELECT * FROM statsNoPartitionAnalyze"); assertQuerySucceeds("ANALYZE statsNoPartitionAnalyze"); assertQuerySucceeds("ANALYZE statsWithPartitionAnalyze"); + deleteTableStatistics("statsWithPartitionAnalyze"); Metadata meta = getQueryRunner().getMetadata(); TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false); Session session = getSession().beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl()); @@ -295,12 +323,17 @@ public void testHiveStatisticsMergeFlags() { assertQuerySucceeds("CREATE TABLE mergeFlagsStats (i int, v varchar)"); assertQuerySucceeds("INSERT INTO mergeFlagsStats VALUES (0, '1'), (1, '22'), (2, '333'), (NULL, 'aaaaa'), (4, NULL)"); - assertQuerySucceeds("ANALYZE mergeFlagsStats"); // stats stored in + assertQuerySucceeds("ANALYZE mergeFlagsStats"); + + // invalidate puffin files so only hive stats can be returned + deleteTableStatistics("mergeFlagsStats"); + // Test stats without merging doesn't return NDVs or data size Session session = Session.builder(getSession()) .setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, "") .build(); TableStatistics stats = getTableStatistics(session, "mergeFlagsStats"); + Map columnStatistics = getColumnNameMap(stats); assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.unknown()); assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown()); @@ -468,6 +501,64 @@ static void assertStatValue(StatsSchema column, MaterializedResult result, Set new RuntimeException("Catalog directory does not exist: " + getCatalogDirectory(HIVE))), + "test"); + return memoizeMetastore(fileHiveMetastore, false, 1000, 0); + } + + protected static HdfsEnvironment getHdfsEnvironment() + { + HiveClientConfig hiveClientConfig = new HiveClientConfig(); + MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig(); + HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), + ImmutableSet.of(), + hiveClientConfig); + return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication()); + } + + protected File getCatalogDirectory(CatalogType catalogType) + { + Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + switch (catalogType) { + case HIVE: + return dataDirectory + .resolve(TEST_DATA_DIRECTORY) + .resolve(HIVE.name()) + .toFile(); + case HADOOP: + case NESSIE: + return dataDirectory.toFile(); + } + + throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType); + } + private static Map getColumnNameMap(TableStatistics statistics) { return statistics.getColumnStatistics().entrySet().stream().collect(Collectors.toMap(e -> diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index 190271f3e39ae..ceb0186ffb413 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -62,6 +62,7 @@ import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.shouldOptimizerUseHistograms; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables; import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames; @@ -121,7 +122,7 @@ public BasePlanFragmenter( this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null"); this.outputTableWriterNodeIds = ImmutableSet.copyOf(requireNonNull(outputTableWriterNodeIds, "outputTableWriterNodeIds is null")); - this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(variableAllocator, metadata.getFunctionAndTypeManager(), session); + this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(variableAllocator, metadata.getFunctionAndTypeManager(), session, shouldOptimizerUseHistograms(session)); } public SubPlan buildRootFragment(PlanNode root, FragmentProperties properties) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java index 9d0d1aaaf1804..2b68e642dec97 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java @@ -86,6 +86,7 @@ import java.util.Optional; import java.util.Set; +import static com.facebook.presto.SystemSessionProperties.shouldOptimizerUseHistograms; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.metadata.MetadataUtil.getConnectorIdOrThrow; @@ -134,7 +135,7 @@ public LogicalPlanner( this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); this.metadata = requireNonNull(metadata, "metadata is null"); this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null"); - this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(this.variableAllocator, metadata.getFunctionAndTypeManager(), session); + this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(this.variableAllocator, metadata.getFunctionAndTypeManager(), session, shouldOptimizerUseHistograms(session)); this.sqlParser = requireNonNull(sqlParser, "sqlParser is null"); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java index b84a5e23d7b51..01da076c68078 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/StatisticsAggregationPlanner.java @@ -58,15 +58,17 @@ public class StatisticsAggregationPlanner { private final VariableAllocator variableAllocator; private final FunctionAndTypeResolver functionAndTypeResolver; + private final boolean useHistograms; private final Session session; private final FunctionAndTypeManager functionAndTypeManager; - public StatisticsAggregationPlanner(VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager, Session session) + public StatisticsAggregationPlanner(VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager, Session session, boolean useHistograms) { this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null"); this.session = requireNonNull(session, "session is null"); this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null"); this.functionAndTypeResolver = functionAndTypeManager.getFunctionAndTypeResolver(); + this.useHistograms = useHistograms; } public TableStatisticAggregation createStatisticsAggregation(TableStatisticsMetadata statisticsMetadata, Map columnToVariableMap) @@ -105,6 +107,9 @@ public TableStatisticAggregation createStatisticsAggregation(TableStatisticsMeta } for (ColumnStatisticMetadata columnStatisticMetadata : statisticsMetadata.getColumnStatistics()) { + if (!useHistograms && columnStatisticMetadata.getStatisticType() == ColumnStatisticType.HISTOGRAM) { + continue; + } String columnName = columnStatisticMetadata.getColumnName(); ColumnStatisticType statisticType = columnStatisticMetadata.getStatisticType(); VariableReferenceExpression inputVariable = columnToVariableMap.get(columnName);