Skip to content

Commit

Permalink
[iceberg] Fix bug on StatisticsFileCache miss
Browse files Browse the repository at this point in the history
In the case of partial miss on the StatisticsFileCache, the loaded
statistics were not combined with the cached statistics, causing
discrepancies in query planning
  • Loading branch information
ZacBlanco committed Feb 7, 2025
1 parent 744f1bb commit 53da02f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ private Map<Integer, ColumnStatistics> loadStatisticsFile(IcebergTableHandle tab
statisticsFileCache.put(new StatisticsFileCacheKey(file, key), value);
finalResult.put(key, value);
});
finalResult.putAll(cachedStats);
return finalResult.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.facebook.presto.Session;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.RuntimeMetric;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
Expand Down Expand Up @@ -52,6 +54,7 @@
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -75,6 +78,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.facebook.presto.SystemSessionProperties.OPTIMIZER_USE_HISTOGRAMS;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
Expand All @@ -85,6 +89,8 @@
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static com.facebook.presto.iceberg.IcebergSessionProperties.HIVE_METASTORE_STATISTICS_MERGE_STRATEGY;
import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.STATISTICS_KLL_SKETCH_K_PARAMETER;
import static com.facebook.presto.iceberg.statistics.KllHistogram.isKllHistogramSupportedType;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
Expand Down Expand Up @@ -402,6 +408,41 @@ public void testPredicateOnlyColumnInStatisticsOutput(boolean pushdownFilterEnab
}
}

@Test
public void testStatisticsCachePartialEviction()
throws Exception
{
try (DistributedQueryRunner queryRunner = createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of("iceberg.max-statistics-file-cache-size", "1024B"))) {
Session session = Session.builder(queryRunner.getDefaultSession())
// set histograms enabled
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.setCatalogSessionProperty("iceberg", STATISTICS_KLL_SKETCH_K_PARAMETER, "32768")
.build();

queryRunner.execute(session, "ANALYZE lineitem");
// get table statistics, to populate some of the cache
TableStatistics statistics = getTableStatistics(queryRunner, session, "lineitem");
RuntimeStats runtimeStats = session.getRuntimeStats();
runtimeStats.getMetrics().keySet().stream().filter(name -> name.contains("ColumnCount")).findFirst()
.ifPresent(stat -> assertEquals(32, runtimeStats.getMetric(stat).getSum()));
runtimeStats.getMetrics().keySet().stream().filter(name -> name.contains("PuffinFileSize")).findFirst()
.ifPresent(stat -> assertTrue(runtimeStats.getMetric(stat).getSum() > 1024));
// get them again to trigger retrieval of _some_ cached statistics
statistics = getTableStatistics(queryRunner, session, "lineitem");
RuntimeMetric partialMiss = runtimeStats.getMetrics().keySet().stream().filter(name -> name.contains("PartialMiss")).findFirst()
.map(runtimeStats::getMetric)
.orElseThrow(() -> new RuntimeException("partial miss on statistics cache should have occurred"));
assertTrue(partialMiss.getCount() > 0);

statistics.getColumnStatistics().forEach((handle, stats) -> {
assertFalse(stats.getDistinctValuesCount().isUnknown());
if (isKllHistogramSupportedType(((IcebergColumnHandle) handle).getType())) {
assertTrue(stats.getHistogram().isPresent());
}
});
}
}

private TableStatistics getScanStatsEstimate(Session session, @Language("SQL") String sql)
{
Plan plan = plan(sql, session);
Expand All @@ -418,14 +459,19 @@ private TableStatistics getScanStatsEstimate(Session session, @Language("SQL") S
new Constraint<>(node.getCurrentConstraint())));
}

private TableStatistics getTableStatistics(Session session, String table)
private static TableStatistics getTableStatistics(QueryRunner queryRunner, Session session, String table)
{
Metadata meta = getQueryRunner().getMetadata();
TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false);
Session txnSession = session.beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl());
Map<String, ColumnHandle> columnHandles = getColumnHandles(table, txnSession);
Metadata meta = queryRunner.getMetadata();
TransactionId txid = queryRunner.getTransactionManager().beginTransaction(false);
Session txnSession = session.beginTransactionId(txid, queryRunner.getTransactionManager(), new AllowAllAccessControl());
Map<String, ColumnHandle> columnHandles = getColumnHandles(queryRunner, table, txnSession);
List<ColumnHandle> columnHandleList = new ArrayList<>(columnHandles.values());
return meta.getTableStatistics(txnSession, getAnalyzeTableHandle(table, txnSession), columnHandleList, Constraint.alwaysTrue());
return meta.getTableStatistics(txnSession, getAnalyzeTableHandle(queryRunner, table, txnSession), columnHandleList, Constraint.alwaysTrue());
}

private TableStatistics getTableStatistics(Session session, String table)
{
return getTableStatistics(getQueryRunner(), session, table);
}

private void columnStatsEqual(Map<ColumnHandle, ColumnStatistics> actualStats, Map<ColumnHandle, ColumnStatistics> expectedStats)
Expand Down Expand Up @@ -454,28 +500,38 @@ private static Constraint<ColumnHandle> constraintWithMinValue(ColumnHandle col,
ImmutableMap.of(col, Domain.create(ValueSet.ofRanges(Range.greaterThan(DOUBLE, min)), true))));
}

private TableHandle getAnalyzeTableHandle(String tableName, Session session)
private static TableHandle getAnalyzeTableHandle(QueryRunner queryRunner, String tableName, Session session)
{
Metadata meta = getQueryRunner().getMetadata();
Metadata meta = queryRunner.getMetadata();
return meta.getTableHandleForStatisticsCollection(
session,
new QualifiedObjectName("iceberg", "tpch", tableName.toLowerCase(Locale.US)),
Collections.emptyMap()).get();
}

private TableHandle getTableHandle(String tableName, Session session)
private TableHandle getAnalyzeTableHandle(String tableName, Session session)
{
return getAnalyzeTableHandle(getQueryRunner(), tableName, session);
}

private static TableHandle getTableHandle(QueryRunner queryRunner, String tableName, Session session)
{
MetadataResolver resolver = getQueryRunner().getMetadata().getMetadataResolver(session);
MetadataResolver resolver = queryRunner.getMetadata().getMetadataResolver(session);
return resolver.getTableHandle(new QualifiedObjectName("iceberg", "tpch", tableName.toLowerCase(Locale.US))).get();
}

private Map<String, ColumnHandle> getColumnHandles(String tableName, Session session)
private static Map<String, ColumnHandle> getColumnHandles(QueryRunner queryRunner, String tableName, Session session)
{
return getQueryRunner().getMetadata().getColumnHandles(session, getTableHandle(tableName, session)).entrySet().stream()
return queryRunner.getMetadata().getColumnHandles(session, getTableHandle(queryRunner, tableName, session)).entrySet().stream()
.filter(entry -> !IcebergMetadataColumn.isMetadataColumnId(((IcebergColumnHandle) (entry.getValue())).getId()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private Map<String, ColumnHandle> getColumnHandles(String tableName, Session session)
{
return getColumnHandles(getQueryRunner(), tableName, session);
}

static void assertStatValuePresent(StatsSchema column, MaterializedResult result, Set<String> columnNames)
{
assertStatValue(column, result, columnNames, null, true);
Expand Down

0 comments on commit 53da02f

Please sign in to comment.