Skip to content

Commit 53da02f

Browse files
committed
[iceberg] Fix bug on StatisticsFileCache miss
In the case of partial miss on the StatisticsFileCache, the loaded statistics were not combined with the cached statistics, causing discrepancies in query planning
1 parent 744f1bb commit 53da02f

File tree

2 files changed

+69
-12
lines changed

2 files changed

+69
-12
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java

+1
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,7 @@ private Map<Integer, ColumnStatistics> loadStatisticsFile(IcebergTableHandle tab
658658
statisticsFileCache.put(new StatisticsFileCacheKey(file, key), value);
659659
finalResult.put(key, value);
660660
});
661+
finalResult.putAll(cachedStats);
661662
return finalResult.build();
662663
}
663664

presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java

+68-12
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import com.facebook.presto.Session;
1717
import com.facebook.presto.common.QualifiedObjectName;
18+
import com.facebook.presto.common.RuntimeMetric;
19+
import com.facebook.presto.common.RuntimeStats;
1820
import com.facebook.presto.common.predicate.Domain;
1921
import com.facebook.presto.common.predicate.Range;
2022
import com.facebook.presto.common.predicate.TupleDomain;
@@ -52,6 +54,7 @@
5254
import com.facebook.presto.testing.MaterializedResult;
5355
import com.facebook.presto.testing.QueryRunner;
5456
import com.facebook.presto.tests.AbstractTestQueryFramework;
57+
import com.facebook.presto.tests.DistributedQueryRunner;
5558
import com.google.common.collect.ImmutableList;
5659
import com.google.common.collect.ImmutableMap;
5760
import com.google.common.collect.ImmutableSet;
@@ -75,6 +78,7 @@
7578
import java.util.function.Function;
7679
import java.util.stream.Collectors;
7780

81+
import static com.facebook.presto.SystemSessionProperties.OPTIMIZER_USE_HISTOGRAMS;
7882
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
7983
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
8084
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
@@ -85,6 +89,8 @@
8589
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
8690
import static com.facebook.presto.iceberg.IcebergSessionProperties.HIVE_METASTORE_STATISTICS_MERGE_STRATEGY;
8791
import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED;
92+
import static com.facebook.presto.iceberg.IcebergSessionProperties.STATISTICS_KLL_SKETCH_K_PARAMETER;
93+
import static com.facebook.presto.iceberg.statistics.KllHistogram.isKllHistogramSupportedType;
8894
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
8995
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
9096
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
@@ -402,6 +408,41 @@ public void testPredicateOnlyColumnInStatisticsOutput(boolean pushdownFilterEnab
402408
}
403409
}
404410

411+
@Test
412+
public void testStatisticsCachePartialEviction()
413+
throws Exception
414+
{
415+
try (DistributedQueryRunner queryRunner = createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of("iceberg.max-statistics-file-cache-size", "1024B"))) {
416+
Session session = Session.builder(queryRunner.getDefaultSession())
417+
// set histograms enabled
418+
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
419+
.setCatalogSessionProperty("iceberg", STATISTICS_KLL_SKETCH_K_PARAMETER, "32768")
420+
.build();
421+
422+
queryRunner.execute(session, "ANALYZE lineitem");
423+
// get table statistics, to populate some of the cache
424+
TableStatistics statistics = getTableStatistics(queryRunner, session, "lineitem");
425+
RuntimeStats runtimeStats = session.getRuntimeStats();
426+
runtimeStats.getMetrics().keySet().stream().filter(name -> name.contains("ColumnCount")).findFirst()
427+
.ifPresent(stat -> assertEquals(32, runtimeStats.getMetric(stat).getSum()));
428+
runtimeStats.getMetrics().keySet().stream().filter(name -> name.contains("PuffinFileSize")).findFirst()
429+
.ifPresent(stat -> assertTrue(runtimeStats.getMetric(stat).getSum() > 1024));
430+
// get them again to trigger retrieval of _some_ cached statistics
431+
statistics = getTableStatistics(queryRunner, session, "lineitem");
432+
RuntimeMetric partialMiss = runtimeStats.getMetrics().keySet().stream().filter(name -> name.contains("PartialMiss")).findFirst()
433+
.map(runtimeStats::getMetric)
434+
.orElseThrow(() -> new RuntimeException("partial miss on statistics cache should have occurred"));
435+
assertTrue(partialMiss.getCount() > 0);
436+
437+
statistics.getColumnStatistics().forEach((handle, stats) -> {
438+
assertFalse(stats.getDistinctValuesCount().isUnknown());
439+
if (isKllHistogramSupportedType(((IcebergColumnHandle) handle).getType())) {
440+
assertTrue(stats.getHistogram().isPresent());
441+
}
442+
});
443+
}
444+
}
445+
405446
private TableStatistics getScanStatsEstimate(Session session, @Language("SQL") String sql)
406447
{
407448
Plan plan = plan(sql, session);
@@ -418,14 +459,19 @@ private TableStatistics getScanStatsEstimate(Session session, @Language("SQL") S
418459
new Constraint<>(node.getCurrentConstraint())));
419460
}
420461

421-
private TableStatistics getTableStatistics(Session session, String table)
462+
private static TableStatistics getTableStatistics(QueryRunner queryRunner, Session session, String table)
422463
{
423-
Metadata meta = getQueryRunner().getMetadata();
424-
TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false);
425-
Session txnSession = session.beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl());
426-
Map<String, ColumnHandle> columnHandles = getColumnHandles(table, txnSession);
464+
Metadata meta = queryRunner.getMetadata();
465+
TransactionId txid = queryRunner.getTransactionManager().beginTransaction(false);
466+
Session txnSession = session.beginTransactionId(txid, queryRunner.getTransactionManager(), new AllowAllAccessControl());
467+
Map<String, ColumnHandle> columnHandles = getColumnHandles(queryRunner, table, txnSession);
427468
List<ColumnHandle> columnHandleList = new ArrayList<>(columnHandles.values());
428-
return meta.getTableStatistics(txnSession, getAnalyzeTableHandle(table, txnSession), columnHandleList, Constraint.alwaysTrue());
469+
return meta.getTableStatistics(txnSession, getAnalyzeTableHandle(queryRunner, table, txnSession), columnHandleList, Constraint.alwaysTrue());
470+
}
471+
472+
private TableStatistics getTableStatistics(Session session, String table)
473+
{
474+
return getTableStatistics(getQueryRunner(), session, table);
429475
}
430476

431477
private void columnStatsEqual(Map<ColumnHandle, ColumnStatistics> actualStats, Map<ColumnHandle, ColumnStatistics> expectedStats)
@@ -454,28 +500,38 @@ private static Constraint<ColumnHandle> constraintWithMinValue(ColumnHandle col,
454500
ImmutableMap.of(col, Domain.create(ValueSet.ofRanges(Range.greaterThan(DOUBLE, min)), true))));
455501
}
456502

457-
private TableHandle getAnalyzeTableHandle(String tableName, Session session)
503+
private static TableHandle getAnalyzeTableHandle(QueryRunner queryRunner, String tableName, Session session)
458504
{
459-
Metadata meta = getQueryRunner().getMetadata();
505+
Metadata meta = queryRunner.getMetadata();
460506
return meta.getTableHandleForStatisticsCollection(
461507
session,
462508
new QualifiedObjectName("iceberg", "tpch", tableName.toLowerCase(Locale.US)),
463509
Collections.emptyMap()).get();
464510
}
465511

466-
private TableHandle getTableHandle(String tableName, Session session)
512+
private TableHandle getAnalyzeTableHandle(String tableName, Session session)
513+
{
514+
return getAnalyzeTableHandle(getQueryRunner(), tableName, session);
515+
}
516+
517+
private static TableHandle getTableHandle(QueryRunner queryRunner, String tableName, Session session)
467518
{
468-
MetadataResolver resolver = getQueryRunner().getMetadata().getMetadataResolver(session);
519+
MetadataResolver resolver = queryRunner.getMetadata().getMetadataResolver(session);
469520
return resolver.getTableHandle(new QualifiedObjectName("iceberg", "tpch", tableName.toLowerCase(Locale.US))).get();
470521
}
471522

472-
private Map<String, ColumnHandle> getColumnHandles(String tableName, Session session)
523+
private static Map<String, ColumnHandle> getColumnHandles(QueryRunner queryRunner, String tableName, Session session)
473524
{
474-
return getQueryRunner().getMetadata().getColumnHandles(session, getTableHandle(tableName, session)).entrySet().stream()
525+
return queryRunner.getMetadata().getColumnHandles(session, getTableHandle(queryRunner, tableName, session)).entrySet().stream()
475526
.filter(entry -> !IcebergMetadataColumn.isMetadataColumnId(((IcebergColumnHandle) (entry.getValue())).getId()))
476527
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
477528
}
478529

530+
private Map<String, ColumnHandle> getColumnHandles(String tableName, Session session)
531+
{
532+
return getColumnHandles(getQueryRunner(), tableName, session);
533+
}
534+
479535
static void assertStatValuePresent(StatsSchema column, MaterializedResult result, Set<String> columnNames)
480536
{
481537
assertStatValue(column, result, columnNames, null, true);

0 commit comments

Comments
 (0)