Skip to content

Commit

Permalink
[Iceberg] Skip data size stat on fixed-width types
Browse files Browse the repository at this point in the history
The optimizer utilizes the data size stats during join enumeration to
determine whether to do a partitioned or broadcast join. The stats
collection code before this change used the manifest file's data size
to estimate the total size. This value is actually mostly incorrect
and may change depending on the file format, compression, etc.

Since fixed-width type sizes are known prior, we can skip collecting
them and let the optimizer infer the data size instead. This is the same
behavior as in the hive connector.

This change will now skip any columns which are fixed-width types
when calculating the column statistics
  • Loading branch information
ZacBlanco committed Mar 20, 2024
1 parent b72b286 commit 12bfa3d
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName());
TableStatisticsMaker.writeTableStatistics(nodeVersion, icebergTableHandle, icebergTable, session, computedStatistics);
TableStatisticsMaker.writeTableStatistics(nodeVersion, typeManager, icebergTableHandle, icebergTable, session, computedStatistics);
}

public void rollback()
Expand Down Expand Up @@ -681,7 +681,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
return TableStatisticsMaker.getTableStatistics(session, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
return TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
HiveStatisticsMergeStrategy mergeStrategy = getHiveStatisticsMergeStrategy(session);
return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> {
TupleDomain<ColumnHandle> domainPredicate = layoutHandle.getDomainPredicate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;

class Partition
public class Partition
{
private final Map<Integer, Type.PrimitiveType> idToTypeMapping;
private final List<Types.NestedField> nonPartitionPrimitiveColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Constraint;
Expand Down Expand Up @@ -57,6 +59,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -77,6 +80,7 @@
import static com.facebook.presto.iceberg.IcebergSessionProperties.getStatisticSnapshotRecordDifferenceWeight;
import static com.facebook.presto.iceberg.IcebergUtil.getIdentityPartitions;
import static com.facebook.presto.iceberg.Partition.toMap;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
Expand All @@ -94,16 +98,18 @@ public class TableStatisticsMaker
private static final String ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY = "ndv";
private final Table icebergTable;
private final ConnectorSession session;
private final TypeManager typeManager;

private TableStatisticsMaker(Table icebergTable, ConnectorSession session)
private TableStatisticsMaker(Table icebergTable, ConnectorSession session, TypeManager typeManager)
{
this.icebergTable = icebergTable;
this.session = session;
this.typeManager = typeManager;
}

public static TableStatistics getTableStatistics(ConnectorSession session, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List<IcebergColumnHandle> columns)
public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List<IcebergColumnHandle> columns)
{
return new TableStatisticsMaker(icebergTable, session).makeTableStatistics(tableHandle, constraint, columns);
return new TableStatisticsMaker(icebergTable, session, typeManager).makeTableStatistics(tableHandle, constraint, columns);
}

private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Constraint constraint, List<IcebergColumnHandle> selectedColumns)
Expand Down Expand Up @@ -232,7 +238,8 @@ private Partition getSummaryFromFiles(CloseableIterable<ContentFile<?>> files,
toMap(idToTypeMapping, contentFile.lowerBounds()),
toMap(idToTypeMapping, contentFile.upperBounds()),
contentFile.nullValueCounts(),
contentFile.columnSizes());
new HashMap<>());
updateColumnSizes(summary, contentFile.columnSizes());
}
else {
summary.incrementFileCount();
Expand All @@ -251,9 +258,9 @@ private Partition getSummaryFromFiles(CloseableIterable<ContentFile<?>> files,
return summary;
}

public static void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle tableHandle, Table icebergTable, ConnectorSession session, Collection<ComputedStatistics> computedStatistics)
public static void writeTableStatistics(NodeVersion nodeVersion, TypeManager typeManager, IcebergTableHandle tableHandle, Table icebergTable, ConnectorSession session, Collection<ComputedStatistics> computedStatistics)
{
new TableStatisticsMaker(icebergTable, session).writeTableStatistics(nodeVersion, tableHandle, computedStatistics);
new TableStatisticsMaker(icebergTable, session, typeManager).writeTableStatistics(nodeVersion, tableHandle, computedStatistics);
}

private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle tableHandle, Collection<ComputedStatistics> computedStatistics)
Expand Down Expand Up @@ -322,11 +329,18 @@ public void updateColumnSizes(Partition summary, Map<Integer, Long> addedColumnS
}
for (Types.NestedField column : summary.getNonPartitionPrimitiveColumns()) {
int id = column.fieldId();

Long addedSize = addedColumnSizes.get(id);
if (addedSize != null) {
columnSizes.put(id, addedSize + columnSizes.getOrDefault(id, 0L));
com.facebook.presto.common.type.Type type = toPrestoType(column.type(), typeManager);
// allow the optimizer to infer the size of fixed-width types
// since it can be calculated accurately without collecting stats.
if (type instanceof FixedWidthType) {
continue;
}
columnSizes.compute(id, (key, value) -> {
if (value == null) {
value = 0L;
}
return value + addedColumnSizes.getOrDefault(id, 0L);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,12 +1003,12 @@ public void testBasicTableStatistics()

assertQuery(session, "SHOW STATS FOR " + tableName,
"VALUES " +
" ('col', 96.0, NULL, 0.0, NULL, '-10.0', '100.0'), " +
" ('col', NULL, NULL, 0.0, NULL, '-10.0', '100.0'), " +
" (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)");
assertUpdate("INSERT INTO " + tableName + " VALUES 200", 1);
assertQuery(session, "SHOW STATS FOR " + tableName,
"VALUES " +
" ('col', 144.0, NULL, 0.0, NULL, '-10.0', '200.0'), " +
" ('col', NULL, NULL, 0.0, NULL, '-10.0', '200.0'), " +
" (NULL, NULL, NULL, NULL, 3e0, NULL, NULL)");

dropTable(session, tableName);
Expand Down Expand Up @@ -1165,7 +1165,7 @@ public void testTableStatisticsTimestamp()

assertQuery(session, "SHOW STATS FOR " + tableName,
"VALUES " +
" ('col', 113.0, NULL, 0.0, NULL, '2021-01-02 09:04:05.321', '2022-12-22 10:07:08.456'), " +
" ('col', NULL, NULL, 0.0, NULL, '2021-01-02 09:04:05.321', '2022-12-22 10:07:08.456'), " +
" (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)");
dropTable(session, tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
Expand Down Expand Up @@ -94,6 +95,7 @@
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
Expand All @@ -110,6 +112,7 @@
import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
Expand Down Expand Up @@ -809,6 +812,29 @@ public void testStatsByDistance()
assertUpdate("DROP TABLE test_stat_dist");
}

@Test
public void testStatsDataSizePrimitives()
{
assertUpdate("CREATE TABLE test_stat_data_size (c0 int, c1 bigint, c2 double, c3 decimal(4, 0), c4 varchar, c5 varchar(10), c6 date, c7 time, c8 timestamp, c10 boolean)");
assertUpdate("INSERT INTO test_stat_data_size VALUES (0, 1, 2.0, CAST(4.01 as decimal(4, 0)), 'testvc', 'testvc10', date '2024-03-14', localtime, localtimestamp, TRUE)", 1);
TableStatistics stats = getTableStats("test_stat_data_size");
stats.getColumnStatistics().entrySet().stream()
.filter((e) -> ((IcebergColumnHandle) e.getKey()).getColumnType() != SYNTHESIZED)
.forEach((entry) -> {
IcebergColumnHandle handle = (IcebergColumnHandle) entry.getKey();
ColumnStatistics stat = entry.getValue();
if (handle.getType() instanceof FixedWidthType) {
assertEquals(stat.getDataSize(), Estimate.unknown());
}
else {
assertNotEquals(stat.getDataSize(), Estimate.unknown(), String.format("for column %s", handle));
assertTrue(stat.getDataSize().getValue() > 0);
}
});

getQueryRunner().execute("DROP TABLE test_stat_data_size");
}

private static void assertEither(Runnable first, Runnable second)
{
try {
Expand Down

0 comments on commit 12bfa3d

Please sign in to comment.