diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index ec640a5f6ab84..67257daa5ddec 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -225,10 +225,13 @@ Property Name Description ``iceberg.enable-parquet-dereference-pushdown`` Enable parquet dereference pushdown. ``true`` -``iceberg.hive-statistics-merge-strategy`` Determines how to merge statistics that are stored in the ``NONE`` - Hive Metastore. The available values are ``NONE``, - ``USE_NULLS_FRACTION_AND_NDV``, ``USE_NULLS_FRACTIONS`` - and, ``USE_NDV`` +``iceberg.hive-statistics-merge-strategy`` Comma separated list of statistics to use from the + Hive Metastore to override Iceberg table statistics. + The available values are ``NUMBER_OF_DISTINCT_VALUES`` + and ``TOTAL_SIZE_IN_BYTES``. + + **Note**: Only valid when the Iceberg connector is + configured with Hive. ``iceberg.statistic-snapshot-record-difference-weight`` The amount that the difference in total record count matters when calculating the closest snapshot when picking @@ -306,6 +309,8 @@ Property Name Description ============================================= ====================================================================== ``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property ``iceberg.delete-as-join-rewrite-enabled`` in the current session. +``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property + ``iceberg.hive-statistics-merge-strategy`` in the current session. ============================================= ====================================================================== Caching Support @@ -1172,7 +1177,7 @@ each Iceberg data type to the corresponding Presto data type, and from each Pres The following tables detail the specific type maps between PrestoDB and Iceberg. Iceberg to PrestoDB type mapping -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Map of Iceberg types to the relevant PrestoDB types: @@ -1215,7 +1220,7 @@ Map of Iceberg types to the relevant PrestoDB types: No other types are supported. PrestoDB to Iceberg type mapping -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Map of PrestoDB types to the relevant Iceberg types: diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index 8bf6686c9ec70..9b370b4d3576c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -16,7 +16,7 @@ import com.facebook.airlift.configuration.Config; import com.facebook.airlift.configuration.ConfigDescription; import com.facebook.presto.hive.HiveCompressionCodec; -import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import org.apache.iceberg.hadoop.HadoopFileIO; @@ -26,11 +26,13 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import java.util.EnumSet; import java.util.List; import static com.facebook.presto.hive.HiveCompressionCodec.GZIP; import static com.facebook.presto.iceberg.CatalogType.HIVE; import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET; +import static com.facebook.presto.iceberg.util.StatisticsUtil.decodeMergeFlags; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT; @@ -51,12 +53,13 @@ public class IcebergConfig private boolean pushdownFilterEnabled; private boolean deleteAsJoinRewriteEnabled = true; - private HiveStatisticsMergeStrategy hiveStatisticsMergeStrategy = HiveStatisticsMergeStrategy.NONE; + private EnumSet hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class); private String fileIOImpl = HadoopFileIO.class.getName(); private boolean manifestCachingEnabled; private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT; private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT; private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT; + @NotNull public FileFormat getFileFormat() { @@ -195,16 +198,16 @@ public boolean isMergeOnReadModeEnabled() } @Config("iceberg.hive-statistics-merge-strategy") - @ConfigDescription("determines how to merge statistics that are stored in the Hive Metastore") - public IcebergConfig setHiveStatisticsMergeStrategy(HiveStatisticsMergeStrategy mergeStrategy) + @ConfigDescription("Comma separated list of statistics to use from the Hive metastore to override iceberg table statistics") + public IcebergConfig setHiveStatisticsMergeFlags(String mergeFlags) { - this.hiveStatisticsMergeStrategy = mergeStrategy; + this.hiveStatisticsMergeFlags = decodeMergeFlags(mergeFlags); return this; } - public HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy() + public EnumSet getHiveStatisticsMergeFlags() { - return hiveStatisticsMergeStrategy; + return hiveStatisticsMergeFlags; } @Config("iceberg.statistic-snapshot-record-difference-weight") diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 113ccc931da7e..a7e718188f870 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -34,7 +34,6 @@ import com.facebook.presto.hive.metastore.PrestoTableType; import com.facebook.presto.hive.metastore.PrincipalPrivileges; import com.facebook.presto.hive.metastore.Table; -import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; import com.facebook.presto.spi.ConnectorOutputTableHandle; @@ -57,6 +56,7 @@ import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.spi.security.PrestoPrincipal; import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.ComputedStatistics; import com.facebook.presto.spi.statistics.Estimate; @@ -82,6 +82,7 @@ import java.io.IOException; import java.time.ZoneId; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -442,7 +443,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab IcebergTableHandle handle = (IcebergTableHandle) tableHandle; org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList())); - HiveStatisticsMergeStrategy mergeStrategy = getHiveStatisticsMergeStrategy(session); + EnumSet mergeFlags = getHiveStatisticsMergeStrategy(session); return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> { TupleDomain domainPredicate = layoutHandle.getDomainPredicate() .transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null) @@ -453,9 +454,13 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab IcebergColumnHandle columnHandle = (IcebergColumnHandle) icebergLayout; return new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType()); }); - RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate); - PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName()); - TableStatistics mergedStatistics = mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeStrategy, icebergTable.spec()); + TableStatistics mergedStatistics = Optional.of(mergeFlags) + .filter(set -> !set.isEmpty()) + .map(flags -> { + PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName()); + return mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeFlags, icebergTable.spec()); + }) + .orElse(icebergStatistics); TableStatistics.Builder filteredStatsBuilder = TableStatistics.builder() .setRowCount(mergedStatistics.getRowCount()); double totalSize = 0; @@ -469,6 +474,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab } } } + RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate); filteredStatsBuilder.setTotalSize(Estimate.of(totalSize)); return filterStatsCalculatorService.filterStats( filteredStatsBuilder.build(), @@ -481,9 +487,9 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab IcebergColumnHandle::getName, IcebergColumnHandle::getType))); }).orElseGet(() -> { - if (!mergeStrategy.equals(HiveStatisticsMergeStrategy.NONE)) { - PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName()); - return mergeHiveStatistics(icebergStatistics, hiveStats, mergeStrategy, icebergTable.spec()); + PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName()); + if (!mergeFlags.isEmpty()) { + return mergeHiveStatistics(icebergStatistics, hiveStats, mergeFlags, icebergTable.spec()); } return icebergStatistics; }); @@ -500,9 +506,16 @@ public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession { Set columnStatistics = tableMetadata.getColumns().stream() .filter(column -> !column.isHidden()) - .flatMap(meta -> metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType()) - .stream() - .map(statType -> statType.getColumnStatisticMetadata(meta.getName()))) + .flatMap(meta -> { + try { + return metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType()) + .stream() + .map(statType -> statType.getColumnStatisticMetadata(meta.getName())); + } + catch (IllegalArgumentException e) { + return ImmutableSet.of().stream(); + } + }) .collect(toImmutableSet()); Set tableStatistics = ImmutableSet.of(ROW_COUNT); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java index fe4c51aa75492..2261a33fd4dfc 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java @@ -18,9 +18,10 @@ import com.facebook.presto.hive.OrcFileWriterConfig; import com.facebook.presto.hive.ParquetFileWriterConfig; import com.facebook.presto.iceberg.nessie.NessieConfig; -import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; +import com.facebook.presto.iceberg.util.StatisticsUtil; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.session.PropertyMetadata; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; @@ -28,14 +29,18 @@ import javax.inject.Inject; +import java.util.EnumSet; import java.util.List; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; +import static com.facebook.presto.iceberg.util.StatisticsUtil.decodeMergeFlags; import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty; import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty; import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; public final class IcebergSessionProperties { @@ -156,14 +161,14 @@ public IcebergSessionProperties( false), new PropertyMetadata<>( HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, - "choose how to include statistics from the Hive Metastore when calculating table stats. Valid values are: " - + Joiner.on(", ").join(HiveStatisticsMergeStrategy.values()), + "Flags to choose which statistics from the Hive Metastore are used when calculating table stats. Valid values are: " + + Joiner.on(", ").join(NUMBER_OF_DISTINCT_VALUES, TOTAL_SIZE_IN_BYTES), VARCHAR, - HiveStatisticsMergeStrategy.class, - icebergConfig.getHiveStatisticsMergeStrategy(), + EnumSet.class, + icebergConfig.getHiveStatisticsMergeFlags(), false, - val -> HiveStatisticsMergeStrategy.valueOf((String) val), - HiveStatisticsMergeStrategy::name), + val -> decodeMergeFlags((String) val), + StatisticsUtil::encodeMergeFlags), booleanProperty( PUSHDOWN_FILTER_ENABLED, "Experimental: Enable Filter Pushdown for Iceberg. This is only supported with Native Worker.", @@ -272,9 +277,9 @@ public static boolean isMergeOnReadModeEnabled(ConnectorSession session) return session.getProperty(MERGE_ON_READ_MODE_ENABLED, Boolean.class); } - public static HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy(ConnectorSession session) + public static EnumSet getHiveStatisticsMergeStrategy(ConnectorSession session) { - return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, HiveStatisticsMergeStrategy.class); + return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, EnumSet.class); } public static boolean isPushdownFilterEnabled(ConnectorSession session) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java index a7c26a6694456..20d84e3e1681e 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java @@ -14,6 +14,7 @@ package com.facebook.presto.iceberg; import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.block.Block; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.FixedWidthType; import com.facebook.presto.common.type.TypeManager; @@ -22,6 +23,7 @@ import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.ComputedStatistics; import com.facebook.presto.spi.statistics.DoubleRange; @@ -44,13 +46,17 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.BlobMetadata; import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinReader; import org.apache.iceberg.puffin.PuffinWriter; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import java.io.IOException; import java.io.UncheckedIOException; @@ -68,6 +74,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.DateType.DATE; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; @@ -80,8 +87,8 @@ import static com.facebook.presto.iceberg.IcebergSessionProperties.getStatisticSnapshotRecordDifferenceWeight; import static com.facebook.presto.iceberg.IcebergUtil.getIdentityPartitions; import static com.facebook.presto.iceberg.Partition.toMap; -import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; import static java.lang.Long.parseLong; @@ -95,7 +102,9 @@ public class TableStatisticsMaker { private static final Logger log = Logger.get(TableStatisticsMaker.class); private static final String ICEBERG_THETA_SKETCH_BLOB_TYPE_ID = "apache-datasketches-theta-v1"; + private static final String ICEBERG_DATA_SIZE_BLOB_TYPE_ID = "presto-sum-data-size-bytes-v1"; private static final String ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY = "ndv"; + private static final String ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY = "data_size"; private final Table icebergTable; private final ConnectorSession session; private final TypeManager typeManager; @@ -107,6 +116,16 @@ private TableStatisticsMaker(Table icebergTable, ConnectorSession session, TypeM this.typeManager = typeManager; } + private static final Map puffinStatWriters = ImmutableMap.builder() + .put(NUMBER_OF_DISTINCT_VALUES, TableStatisticsMaker::generateNDVBlob) + .put(TOTAL_SIZE_IN_BYTES, TableStatisticsMaker::generateStatSizeBlob) + .build(); + + private static final Map puffinStatReaders = ImmutableMap.builder() + .put(ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readNDVBlob) + .put(ICEBERG_DATA_SIZE_BLOB_TYPE_ID, TableStatisticsMaker::readDataSizeBlob) + .build(); + public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List columns) { return new TableStatisticsMaker(icebergTable, session, typeManager).makeTableStatistics(tableHandle, constraint, columns); @@ -164,7 +183,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons result.setRowCount(Estimate.of(recordCount)); result.setTotalSize(Estimate.of(summary.getSize())); Map tableStats = getClosestStatisticsFileForSnapshot(tableHandle) - .map(TableStatisticsMaker::loadStatisticsFile).orElseGet(Collections::emptyMap); + .map(this::loadStatisticsFile).orElseGet(Collections::emptyMap); for (IcebergColumnHandle columnHandle : selectedColumns) { int fieldId = columnHandle.getId(); ColumnStatistics.Builder columnBuilder = tableStats.getOrDefault(fieldId, ColumnStatistics.builder()); @@ -172,12 +191,6 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons if (nullCount != null) { columnBuilder.setNullsFraction(Estimate.of(nullCount / recordCount)); } - if (summary.getColumnSizes() != null) { - Long columnSize = summary.getColumnSizes().get(fieldId); - if (columnSize != null) { - columnBuilder.setDataSize(Estimate.of(columnSize)); - } - } Object min = summary.getMinValues().get(fieldId); Object max = summary.getMaxValues().get(fieldId); if (min instanceof Number && max instanceof Number) { @@ -239,7 +252,6 @@ private Partition getSummaryFromFiles(CloseableIterable> files, toMap(idToTypeMapping, contentFile.upperBounds()), contentFile.nullValueCounts(), new HashMap<>()); - updateColumnSizes(summary, contentFile.columnSizes()); } else { summary.incrementFileCount(); @@ -248,7 +260,6 @@ private Partition getSummaryFromFiles(CloseableIterable> files, updateSummaryMin(summary, partitionFields, toMap(idToTypeMapping, contentFile.lowerBounds()), contentFile.nullValueCounts(), contentFile.recordCount()); updateSummaryMax(summary, partitionFields, toMap(idToTypeMapping, contentFile.upperBounds()), contentFile.nullValueCounts(), contentFile.recordCount()); summary.updateNullCount(contentFile.nullValueCounts()); - updateColumnSizes(summary, contentFile.columnSizes()); } } } @@ -282,24 +293,10 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta .flatMap(map -> map.entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) .forEach((key, value) -> { - if (!key.getStatisticType().equals(NUMBER_OF_DISTINCT_VALUES)) { - return; - } - Optional id = Optional.ofNullable(icebergTable.schema().findField(key.getColumnName())).map(Types.NestedField::fieldId); - if (!id.isPresent()) { - log.warn("failed to find column name %s in schema of table %s when writing distinct value statistics", key.getColumnName(), icebergTable.name()); - throw new PrestoException(ICEBERG_INVALID_METADATA, format("failed to find column name %s in schema of table %s when writing distinct value statistics", key.getColumnName(), icebergTable.name())); - } - ByteBuffer raw = VARBINARY.getSlice(value, 0).toByteBuffer(); - CompactSketch sketch = CompactSketch.wrap(Memory.wrap(raw, ByteOrder.nativeOrder())); - writer.add(new Blob( - ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, - ImmutableList.of(id.get()), - snapshot.snapshotId(), - snapshot.sequenceNumber(), - raw, - null, - ImmutableMap.of(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY, Long.toString((long) sketch.getEstimate())))); + Optional.ofNullable(puffinStatWriters.get(key.getStatisticType())) + .ifPresent(generator -> { + writer.add(generator.generate(key, value, icebergTable, snapshot)); + }); }); writer.finish(); icebergTable.updateStatistics().setStatistics( @@ -321,27 +318,87 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta } } - public void updateColumnSizes(Partition summary, Map addedColumnSizes) + @FunctionalInterface + private interface PuffinBlobGenerator { - Map columnSizes = summary.getColumnSizes(); - if (!summary.hasValidColumnMetrics() || columnSizes == null || addedColumnSizes == null) { - return; - } - for (Types.NestedField column : summary.getNonPartitionPrimitiveColumns()) { - int id = column.fieldId(); - com.facebook.presto.common.type.Type type = toPrestoType(column.type(), typeManager); - // allow the optimizer to infer the size of fixed-width types - // since it can be calculated accurately without collecting stats. - if (type instanceof FixedWidthType) { - continue; - } - columnSizes.compute(id, (key, value) -> { - if (value == null) { - value = 0L; - } - return value + addedColumnSizes.getOrDefault(id, 0L); - }); - } + Blob generate(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot); + } + + @FunctionalInterface + private interface PuffinBlobReader + { + /** + * Reads the stats from the blob and then updates the stats builder argument. + */ + void read(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder stats); + } + + private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot) + { + int id = getFieldId(metadata, icebergTable); + ByteBuffer raw = VARBINARY.getSlice(value, 0).toByteBuffer(); + CompactSketch sketch = CompactSketch.wrap(Memory.wrap(raw, ByteOrder.nativeOrder())); + return new Blob( + ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, + ImmutableList.of(id), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + raw, + null, + ImmutableMap.of(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY, Long.toString((long) sketch.getEstimate()))); + } + + private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot) + { + int id = getFieldId(metadata, icebergTable); + long size = BIGINT.getLong(value, 0); + return new Blob( + ICEBERG_DATA_SIZE_BLOB_TYPE_ID, + ImmutableList.of(id), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + ByteBuffer.allocate(0), // empty bytebuffer since the value is just stored on the blob properties + null, + ImmutableMap.of(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY, Long.toString(size))); + } + + private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics) + { + Optional.ofNullable(metadata.properties().get(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY)) + .ifPresent(ndvProp -> { + try { + long ndv = parseLong(ndvProp); + statistics.setDistinctValuesCount(Estimate.of(ndv)); + } + catch (NumberFormatException e) { + statistics.setDistinctValuesCount(Estimate.unknown()); + log.warn("bad long value when parsing NDVs for statistics file blob %s. bad value: %d", metadata.type(), ndvProp); + } + }); + } + + private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics) + { + Optional.ofNullable(metadata.properties().get(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY)) + .ifPresent(sizeProp -> { + try { + long size = parseLong(sizeProp); + statistics.setDataSize(Estimate.of(size)); + } + catch (NumberFormatException e) { + statistics.setDataSize(Estimate.unknown()); + log.warn("bad long value when parsing data size from statistics file blob %s. bad value: %d", metadata.type(), sizeProp); + } + }); + } + + private static int getFieldId(ColumnStatisticMetadata metadata, Table icebergTable) + { + return Optional.ofNullable(icebergTable.schema().findField(metadata.getColumnName())).map(Types.NestedField::fieldId) + .orElseThrow(() -> { + log.warn("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name()); + return new PrestoException(ICEBERG_INVALID_METADATA, format("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name())); + }); } private void updateSummaryMin(Partition summary, List partitionFields, Map lowerBounds, Map nullCounts, long recordCount) @@ -424,29 +481,35 @@ private Optional getClosestStatisticsFileForSnapshot(IcebergTabl /** * Builds a map of field ID to ColumnStatistics for a particular {@link StatisticsFile}. - * - * @return */ - private static Map loadStatisticsFile(StatisticsFile file) + private Map loadStatisticsFile(StatisticsFile file) { - ImmutableMap.Builder result = ImmutableMap.builder(); - file.blobMetadata().forEach(blob -> { - Integer field = getOnlyElement(blob.fields()); - ColumnStatistics.Builder colStats = ColumnStatistics.builder(); - Optional.ofNullable(blob.properties().get(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY)) - .ifPresent(ndvProp -> { - try { - long ndv = parseLong(ndvProp); - colStats.setDistinctValuesCount(Estimate.of(ndv)); - } - catch (NumberFormatException e) { - colStats.setDistinctValuesCount(Estimate.unknown()); - log.warn("bad long value when parsing statistics file %s, bad value: %d", file.path(), ndvProp); - } - }); - result.put(field, colStats); - }); - return result.build(); + Map result = new HashMap<>(); + try (FileIO io = icebergTable.io()) { + InputFile inputFile = io.newInputFile(file.path()); + try (PuffinReader reader = Puffin.read(inputFile).build()) { + for (Pair data : reader.readAll(reader.fileMetadata().blobs())) { + BlobMetadata metadata = data.first(); + ByteBuffer blob = data.second(); + Integer field = getOnlyElement(metadata.inputFields()); + Optional.ofNullable(puffinStatReaders.get(metadata.type())) + .ifPresent(statReader -> { + result.compute(field, (key, value) -> { + if (value == null) { + value = ColumnStatistics.builder(); + } + statReader.read(metadata, blob, value); + return value; + }); + }); + } + } + catch (IOException e) { + throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "failed to read statistics file at " + file.path(), e); + } + } + + return ImmutableMap.copyOf(result); } public static List getSupportedColumnStatistics(String columnName, com.facebook.presto.common.type.Type type) @@ -459,6 +522,10 @@ public static List getSupportedColumnStatistics(String supportedStatistics.add(NUMBER_OF_DISTINCT_VALUES.getColumnStatisticMetadataWithCustomFunction(columnName, "sketch_theta")); } + if (!(type instanceof FixedWidthType)) { + supportedStatistics.add(TOTAL_SIZE_IN_BYTES.getColumnStatisticMetadata(columnName)); + } + return supportedStatistics.build(); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/HiveStatisticsMergeStrategy.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/HiveStatisticsMergeStrategy.java deleted file mode 100644 index 419a04cad263f..0000000000000 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/HiveStatisticsMergeStrategy.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.iceberg.util; - -/** - * strategies that define how to merge hive column statistics into Iceberg column statistics. - */ -public enum HiveStatisticsMergeStrategy -{ - /** - * Do not merge statistics from Hive - */ - NONE, - /** - * Only merge NDV statistics from hive - */ - USE_NDV, - /** - * Only merge null fractions from hive - */ - USE_NULLS_FRACTIONS, - /** - * Merge both null fractions and NDVs from Hive - */ - USE_NULLS_FRACTION_AND_NDV, -} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java index c64b1218fdfd7..fd0fe2e000aa0 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java @@ -16,17 +16,20 @@ import com.facebook.presto.hive.metastore.HiveColumnStatistics; import com.facebook.presto.hive.metastore.PartitionStatistics; import com.facebook.presto.iceberg.IcebergColumnHandle; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.Estimate; import com.facebook.presto.spi.statistics.TableStatistics; +import com.google.common.base.Joiner; import org.apache.iceberg.PartitionSpec; +import java.util.Arrays; +import java.util.EnumSet; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.NONE; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTIONS; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTION_AND_NDV; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; public final class StatisticsUtil { @@ -34,9 +37,16 @@ private StatisticsUtil() { } - public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatistics, PartitionStatistics hiveStatistics, HiveStatisticsMergeStrategy mergeStrategy, PartitionSpec spec) + /** + * Attempts to merge statistics from Iceberg and hive tables. + *
+ * If a statistic is unknown on the Iceberg table, but known in Hive, the Hive statistic + * will always be used. Otherwise, hive statistics are only used if specified in the + * hive-statistic-merge-strategy + */ + public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatistics, PartitionStatistics hiveStatistics, EnumSet mergeFlags, PartitionSpec spec) { - if (mergeStrategy.equals(NONE) || spec.isPartitioned()) { + if (spec.isPartitioned()) { return icebergStatistics; } // We really only need to merge in NDVs and null fractions from the column statistics in hive's stats @@ -57,29 +67,33 @@ public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatist .setDistinctValuesCount(icebergColumnStats.getDistinctValuesCount()) .setRange(icebergColumnStats.getRange()); if (hiveColumnStats != null) { - if (mergeStrategy.equals(USE_NDV) || mergeStrategy.equals(USE_NULLS_FRACTION_AND_NDV)) { + // NDVs + if (mergeFlags.contains(NUMBER_OF_DISTINCT_VALUES)) { hiveColumnStats.getDistinctValuesCount().ifPresent(ndvs -> mergedStats.setDistinctValuesCount(Estimate.of(ndvs))); } - if (mergeStrategy.equals(USE_NULLS_FRACTIONS) || mergeStrategy.equals(USE_NULLS_FRACTION_AND_NDV)) { - hiveColumnStats.getNullsCount().ifPresent(nullCount -> { - Estimate nullsFraction; - if (!hiveStatistics.getBasicStatistics().getRowCount().isPresent()) { - if (icebergStatistics.getRowCount().isUnknown()) { - nullsFraction = Estimate.unknown(); - } - else { - nullsFraction = Estimate.of((double) nullCount / icebergStatistics.getRowCount().getValue()); - } - } - else { - nullsFraction = Estimate.of((double) nullCount / hiveStatistics.getBasicStatistics().getRowCount().getAsLong()); - } - mergedStats.setNullsFraction(nullsFraction); - }); + // data size + if (mergeFlags.contains(ColumnStatisticType.TOTAL_SIZE_IN_BYTES)) { + hiveColumnStats.getTotalSizeInBytes().ifPresent(size -> mergedStats.setDataSize(Estimate.of(size))); } } statsBuilder.setColumnStatistics(columnHandle, mergedStats.build()); }); return statsBuilder.build(); } + + public static EnumSet decodeMergeFlags(String input) + { + return Optional.of(Arrays.stream((input).trim().split(",")) + .filter(value -> !value.isEmpty()) + .map(ColumnStatisticType::valueOf) + .collect(Collectors.toSet())) + .filter(set -> !set.isEmpty()) + .map(EnumSet::copyOf) + .orElse(EnumSet.noneOf(ColumnStatisticType.class)); + } + + public static String encodeMergeFlags(EnumSet flags) + { + return Joiner.on(",").join(flags.stream().map(Enum::name).iterator()); + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index f808f3c59d6a7..3cac3903b5ee0 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -31,6 +31,7 @@ import com.facebook.presto.hive.authentication.NoHdfsAuthentication; import com.facebook.presto.iceberg.delete.DeleteFile; import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.TableHandle; @@ -633,49 +634,92 @@ public void testStringFilters() } @Test - public void testReadWriteNDVs() + public void testReadWriteStats() { - assertUpdate("CREATE TABLE test_stat_ndv (col0 int)"); - assertTrue(getQueryRunner().tableExists(getSession(), "test_stat_ndv")); - assertTableColumnNames("test_stat_ndv", "col0"); + assertUpdate("CREATE TABLE test_stats (col0 int, col1 varchar)"); + assertTrue(getQueryRunner().tableExists(getSession(), "test_stats")); + assertTableColumnNames("test_stats", "col0", "col1"); // test that stats don't exist before analyze - TableStatistics stats = getTableStats("test_stat_ndv"); - assertTrue(stats.getColumnStatistics().isEmpty()); + Function, Map> remapper = (input) -> input.entrySet().stream().collect(Collectors.toMap(e -> ((IcebergColumnHandle) e.getKey()).getName(), Map.Entry::getValue)); + Map columnStats; + TableStatistics stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + assertTrue(columnStats.isEmpty()); // test after simple insert we get a good estimate - assertUpdate("INSERT INTO test_stat_ndv VALUES 1, 2, 3", 3); - getQueryRunner().execute("ANALYZE test_stat_ndv"); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(3.0)); + assertUpdate("INSERT INTO test_stats VALUES (1, 'abc'), (2, 'xyz'), (3, 'lmnopqrst')", 3); + getQueryRunner().execute("ANALYZE test_stats"); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + ColumnStatistics columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + double dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col1) FROM test_stats").getOnlyValue(); + assertEquals(columnStat.getDataSize().getValue(), dataSize); // test after inserting the same values, we still get the same estimate - assertUpdate("INSERT INTO test_stat_ndv VALUES 1, 2, 3", 3); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(3.0)); - - // test after ANALYZING with the new inserts that the NDV estimate is the same - getQueryRunner().execute("ANALYZE test_stat_ndv"); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(3.0)); + assertUpdate("INSERT INTO test_stats VALUES (1, 'abc'), (2, 'xyz'), (3, 'lmnopqrst')", 3); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize().getValue(), dataSize); + + // test after ANALYZING with the new inserts that the NDV estimate is the same and the data size matches + getQueryRunner().execute("ANALYZE test_stats"); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col1) FROM test_stats").getOnlyValue(); + assertEquals(columnStat.getDataSize().getValue(), dataSize); // test after inserting a new value, but not analyzing, the estimate is the same. - assertUpdate("INSERT INTO test_stat_ndv VALUES 4", 1); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(3.0)); + assertUpdate("INSERT INTO test_stats VALUES (4, 'def')", 1); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize().getValue(), dataSize); // test that after analyzing, the updates stats show up. - getQueryRunner().execute("ANALYZE test_stat_ndv"); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(4.0)); + getQueryRunner().execute("ANALYZE test_stats"); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0)); + dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col1) FROM test_stats").getOnlyValue(); + assertEquals(columnStat.getDataSize().getValue(), dataSize); // test adding a null value is successful, and analyze still runs successfully - assertUpdate("INSERT INTO test_stat_ndv VALUES NULL", 1); - assertQuerySucceeds("ANALYZE test_stat_ndv"); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(4.0)); - - assertUpdate("DROP TABLE test_stat_ndv"); + assertUpdate("INSERT INTO test_stats VALUES (NULL, NULL)", 1); + assertQuerySucceeds("ANALYZE test_stats"); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0)); + dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col1) FROM test_stats").getOnlyValue(); + assertEquals(columnStat.getDataSize().getValue(), dataSize); + + assertUpdate("DROP TABLE test_stats"); } @Test @@ -817,6 +861,7 @@ public void testStatsDataSizePrimitives() { assertUpdate("CREATE TABLE test_stat_data_size (c0 int, c1 bigint, c2 double, c3 decimal(4, 0), c4 varchar, c5 varchar(10), c6 date, c7 time, c8 timestamp, c10 boolean)"); assertUpdate("INSERT INTO test_stat_data_size VALUES (0, 1, 2.0, CAST(4.01 as decimal(4, 0)), 'testvc', 'testvc10', date '2024-03-14', localtime, localtimestamp, TRUE)", 1); + assertQuerySucceeds("ANALYZE test_stat_data_size"); TableStatistics stats = getTableStats("test_stat_data_size"); stats.getColumnStatistics().entrySet().stream() .filter((e) -> ((IcebergColumnHandle) e.getKey()).getColumnType() != SYNTHESIZED) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index 7d84025f72ba5..90a4e3e4a6dff 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -13,7 +13,6 @@ */ package com.facebook.presto.iceberg; -import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; import com.google.common.collect.ImmutableMap; import org.apache.iceberg.hadoop.HadoopFileIO; import org.testng.annotations.Test; @@ -29,7 +28,8 @@ import static com.facebook.presto.iceberg.CatalogType.HIVE; import static com.facebook.presto.iceberg.IcebergFileFormat.ORC; import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_NON_NULL_VALUES; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT; @@ -46,7 +46,7 @@ public void testDefaults() .setCatalogWarehouse(null) .setCatalogCacheSize(10) .setHadoopConfigResources(null) - .setHiveStatisticsMergeStrategy(HiveStatisticsMergeStrategy.NONE) + .setHiveStatisticsMergeFlags("") .setStatisticSnapshotRecordDifferenceWeight(0.0) .setMaxPartitionsPerWriter(100) .setMinimumAssignedSplitWeight(0.05) @@ -76,7 +76,7 @@ public void testExplicitPropertyMappings() .put("iceberg.enable-parquet-dereference-pushdown", "false") .put("iceberg.enable-merge-on-read-mode", "false") .put("iceberg.statistic-snapshot-record-difference-weight", "1.0") - .put("iceberg.hive-statistics-merge-strategy", "USE_NDV") + .put("iceberg.hive-statistics-merge-strategy", NUMBER_OF_DISTINCT_VALUES.name() + "," + NUMBER_OF_NON_NULL_VALUES.name()) .put("iceberg.pushdown-filter-enabled", "true") .put("iceberg.delete-as-join-rewrite-enabled", "false") .put("iceberg.io.manifest.cache-enabled", "true") @@ -98,7 +98,7 @@ public void testExplicitPropertyMappings() .setStatisticSnapshotRecordDifferenceWeight(1.0) .setParquetDereferencePushdownEnabled(false) .setMergeOnReadModeEnabled(false) - .setHiveStatisticsMergeStrategy(USE_NDV) + .setHiveStatisticsMergeFlags("NUMBER_OF_DISTINCT_VALUES,NUMBER_OF_NON_NULL_VALUES") .setPushdownFilterEnabled(true) .setDeleteAsJoinRewriteEnabled(false) .setManifestCachingEnabled(true) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java index 57a7201bc090d..2d8eba1586e74 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java @@ -20,7 +20,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.Map; import java.util.stream.Collectors; import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; @@ -32,8 +31,7 @@ public class TestIcebergTableChangelog protected QueryRunner createQueryRunner() throws Exception { - Map properties = ImmutableMap.of("http-server.http.port", "8080"); - return createIcebergQueryRunner(properties, CatalogType.HADOOP); + return createIcebergQueryRunner(ImmutableMap.of(), CatalogType.HADOOP); } private long[] snapshots = new long[0]; diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java index 6d412d20bcda2..79ddca8834896 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java @@ -17,6 +17,7 @@ import com.facebook.presto.hive.metastore.HiveColumnStatistics; import com.facebook.presto.hive.metastore.PartitionStatistics; import com.facebook.presto.iceberg.ColumnIdentity.TypeCategory; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.DoubleRange; import com.facebook.presto.spi.statistics.Estimate; @@ -28,24 +29,27 @@ import org.testng.annotations.Test; import java.util.Collections; +import java.util.EnumSet; import java.util.Optional; import java.util.OptionalLong; import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.NONE; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTIONS; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTION_AND_NDV; +import static com.facebook.presto.iceberg.util.StatisticsUtil.decodeMergeFlags; +import static com.facebook.presto.iceberg.util.StatisticsUtil.encodeMergeFlags; import static com.facebook.presto.iceberg.util.StatisticsUtil.mergeHiveStatistics; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_NON_NULL_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class TestStatisticsUtil { @Test public void testMergeStrategyNone() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), NONE, PartitionSpec.unpartitioned()); + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.noneOf(ColumnStatisticType.class), PartitionSpec.unpartitioned()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); assertEquals(1, merged.getColumnStatistics().size()); @@ -59,7 +63,7 @@ public void testMergeStrategyNone() @Test public void testMergeStrategyWithPartitioned() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NULLS_FRACTION_AND_NDV, + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.of(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES), PartitionSpec.builderFor(new Schema(Types.NestedField.required(0, "test", Types.IntegerType.get()))).bucket("test", 100).build()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); @@ -74,7 +78,7 @@ public void testMergeStrategyWithPartitioned() @Test public void testMergeStrategyNDVs() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NDV, PartitionSpec.unpartitioned()); + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.of(NUMBER_OF_DISTINCT_VALUES), PartitionSpec.unpartitioned()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); assertEquals(1, merged.getColumnStatistics().size()); @@ -88,7 +92,7 @@ public void testMergeStrategyNDVs() @Test public void testMergeStrategyNulls() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NULLS_FRACTIONS, PartitionSpec.unpartitioned()); + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.of(NUMBER_OF_NON_NULL_VALUES), PartitionSpec.unpartitioned()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); assertEquals(1, merged.getColumnStatistics().size()); @@ -102,7 +106,7 @@ public void testMergeStrategyNulls() @Test public void testMergeStrategyNDVsAndNulls() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NULLS_FRACTION_AND_NDV, PartitionSpec.unpartitioned()); + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.of(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES), PartitionSpec.unpartitioned()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); assertEquals(1, merged.getColumnStatistics().size()); @@ -113,6 +117,24 @@ public void testMergeStrategyNDVsAndNulls() assertEquals(new DoubleRange(0.0, 1.0), stats.getRange().get()); } + @Test + public void testEncodeDecode() + { + assertTrue(decodeMergeFlags("").isEmpty()); + assertEquals(decodeMergeFlags(NUMBER_OF_DISTINCT_VALUES.name()), EnumSet.of(NUMBER_OF_DISTINCT_VALUES)); + assertEquals(decodeMergeFlags(NUMBER_OF_DISTINCT_VALUES + "," + NUMBER_OF_NON_NULL_VALUES), + EnumSet.of(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES)); + assertEquals(decodeMergeFlags(NUMBER_OF_DISTINCT_VALUES + "," + NUMBER_OF_NON_NULL_VALUES + "," + TOTAL_SIZE_IN_BYTES), + EnumSet.of(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES, TOTAL_SIZE_IN_BYTES)); + + assertEquals(encodeMergeFlags(EnumSet.noneOf(ColumnStatisticType.class)), ""); + assertEquals(encodeMergeFlags(EnumSet.of(NUMBER_OF_DISTINCT_VALUES)), NUMBER_OF_DISTINCT_VALUES.name()); + assertEquals(encodeMergeFlags(EnumSet.of(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES)), + NUMBER_OF_DISTINCT_VALUES.name() + "," + NUMBER_OF_NON_NULL_VALUES.name()); + assertEquals(encodeMergeFlags(EnumSet.of(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES, TOTAL_SIZE_IN_BYTES)), + NUMBER_OF_DISTINCT_VALUES.name() + "," + NUMBER_OF_NON_NULL_VALUES.name() + "," + TOTAL_SIZE_IN_BYTES); + } + private static TableStatistics generateSingleColumnIcebergStats() { return TableStatistics.builder() diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java index 1945e1122b8b0..c76babbd0cecc 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java @@ -20,6 +20,7 @@ import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.SchemaTableName; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import org.apache.iceberg.Table; import org.testng.annotations.Test; @@ -27,6 +28,8 @@ import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore; import static com.facebook.presto.iceberg.CatalogType.HIVE; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; @Test public class TestIcebergDistributedHive @@ -34,7 +37,7 @@ public class TestIcebergDistributedHive { public TestIcebergDistributedHive() { - super(HIVE, ImmutableMap.of("iceberg.hive-statistics-merge-strategy", "USE_NULLS_FRACTION_AND_NDV")); + super(HIVE, ImmutableMap.of("iceberg.hive-statistics-merge-strategy", Joiner.on(",").join(NUMBER_OF_DISTINCT_VALUES.name(), TOTAL_SIZE_IN_BYTES.name()))); } @Override diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java index c6c4d80896abc..8df787a8bac1c 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java @@ -29,6 +29,7 @@ import com.facebook.presto.spi.analyzer.MetadataResolver; import com.facebook.presto.spi.security.AllowAllAccessControl; import com.facebook.presto.spi.statistics.ColumnStatistics; +import com.facebook.presto.spi.statistics.Estimate; import com.facebook.presto.spi.statistics.TableStatistics; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.QueryRunner; @@ -50,6 +51,9 @@ import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static com.facebook.presto.iceberg.IcebergSessionProperties.HIVE_METASTORE_STATISTICS_MERGE_STRATEGY; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; import static com.facebook.presto.testing.assertions.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -67,7 +71,7 @@ public class TestIcebergHiveStatistics protected QueryRunner createQueryRunner() throws Exception { - return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of("iceberg.hive-statistics-merge-strategy", "USE_NDV")); + return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of("iceberg.hive-statistics-merge-strategy", NUMBER_OF_DISTINCT_VALUES.name())); } private static final Set NUMERIC_ORDERS_COLUMNS = ImmutableSet.builder() @@ -277,6 +281,67 @@ public void testStatsWithPartitionedTablesNoAnalyze() assertQuerySucceeds("DROP TABLE statsWithPartition"); } + @Test + public void testHiveStatisticsMergeFlags() + { + assertQuerySucceeds("CREATE TABLE mergeFlagsStats (i int, v varchar)"); + assertQuerySucceeds("INSERT INTO mergeFlagsStats VALUES (0, '1'), (1, '22'), (2, '333'), (NULL, 'aaaaa'), (4, NULL)"); + assertQuerySucceeds("ANALYZE mergeFlagsStats"); // stats stored in + // Test stats without merging doesn't return NDVs or data size + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, "") + .build(); + TableStatistics stats = getTableStatistics(session, "mergeFlagsStats"); + Map columnStatistics = getColumnNameMap(stats); + assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.unknown()); + assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown()); + assertEquals(columnStatistics.get("v").getDistinctValuesCount(), Estimate.unknown()); + assertEquals(columnStatistics.get("v").getDataSize(), Estimate.unknown()); + + // Test stats merging for NDVs + session = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, NUMBER_OF_DISTINCT_VALUES.name()) + .build(); + stats = getTableStatistics(session, "mergeFlagsStats"); + columnStatistics = getColumnNameMap(stats); + assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.of(4.0)); + assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown()); + assertEquals(columnStatistics.get("v").getDistinctValuesCount(), Estimate.of(4.0)); + assertEquals(columnStatistics.get("v").getDataSize(), Estimate.unknown()); + + // Test stats for data size + session = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, TOTAL_SIZE_IN_BYTES.name()) + .build(); + stats = getTableStatistics(session, "mergeFlagsStats"); + columnStatistics = getColumnNameMap(stats); + assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.unknown()); + assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown()); // fixed-width isn't collected + assertEquals(columnStatistics.get("v").getDistinctValuesCount(), Estimate.unknown()); + assertEquals(columnStatistics.get("v").getDataSize(), Estimate.of(11)); + + // Test stats for both + session = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, NUMBER_OF_DISTINCT_VALUES.name() + "," + TOTAL_SIZE_IN_BYTES) + .build(); + stats = getTableStatistics(session, "mergeFlagsStats"); + columnStatistics = getColumnNameMap(stats); + assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.of(4.0)); + assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown()); + assertEquals(columnStatistics.get("v").getDistinctValuesCount(), Estimate.of(4.0)); + assertEquals(columnStatistics.get("v").getDataSize(), Estimate.of(11)); + } + + private TableStatistics getTableStatistics(Session session, String table) + { + Metadata meta = getQueryRunner().getMetadata(); + TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false); + Session txnSession = session.beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl()); + Map columnHandles = getColumnHandles(table, txnSession); + List columnHandleList = new ArrayList<>(columnHandles.values()); + return meta.getTableStatistics(txnSession, getAnalyzeTableHandle(table, txnSession), columnHandleList, Constraint.alwaysTrue()); + } + private void columnStatsEqual(Map actualStats, Map expectedStats) { for (ColumnHandle handle : expectedStats.keySet()) { @@ -350,6 +415,13 @@ static void assertStatValue(StatsSchema column, MaterializedResult result, Set getColumnNameMap(TableStatistics statistics) + { + return statistics.getColumnStatistics().entrySet().stream().collect(Collectors.toMap(e -> + ((IcebergColumnHandle) e.getKey()).getName(), + Map.Entry::getValue)); + } + static void assertNDVsPresent(TableStatistics stats) { for (Map.Entry entry : stats.getColumnStatistics().entrySet()) {