Skip to content

Commit

Permalink
Record source information for HBO stats
Browse files Browse the repository at this point in the history
Record the type of workers (CPP, JAVA) and query ID of the queries
which produce these stats.
  • Loading branch information
feilong-liu committed Mar 19, 2024
1 parent ee2e5c4 commit 6de650a
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.PlanStatistics;

import java.util.ArrayList;
Expand Down Expand Up @@ -58,7 +59,8 @@ public static HistoricalPlanStatistics updatePlanStatistics(
HistoricalPlanStatistics historicalPlanStatistics,
List<PlanStatistics> inputTableStatistics,
PlanStatistics current,
HistoryBasedOptimizationConfig config)
HistoryBasedOptimizationConfig config,
HistoricalPlanStatisticsEntryInfo historicalPlanStatisticsEntryInfo)
{
List<HistoricalPlanStatisticsEntry> lastRunsStatistics = historicalPlanStatistics.getLastRunsStatistics();

Expand All @@ -69,7 +71,7 @@ public static HistoricalPlanStatistics updatePlanStatistics(
newLastRunsStatistics.remove(similarStatsIndex.get().intValue());
}

newLastRunsStatistics.add(new HistoricalPlanStatisticsEntry(current, inputTableStatistics));
newLastRunsStatistics.add(new HistoricalPlanStatisticsEntry(current, inputTableStatistics, historicalPlanStatisticsEntryInfo));
int maxLastRuns = inputTableStatistics.isEmpty() ? 1 : config.getMaxLastRunsHistory();
if (newLastRunsStatistics.size() > maxLastRuns) {
newLastRunsStatistics.remove(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.spi.statistics.EmptyPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.CachingPlanCanonicalInfoProvider;
import com.facebook.presto.sql.planner.PlanCanonicalInfoProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -39,16 +40,18 @@ public class HistoryBasedPlanStatisticsManager

private HistoryBasedPlanStatisticsProvider historyBasedPlanStatisticsProvider = EmptyPlanStatisticsProvider.getInstance();
private boolean statisticsProviderAdded;
private final boolean isNativeExecution;

@Inject
public HistoryBasedPlanStatisticsManager(ObjectMapper objectMapper, SessionPropertyManager sessionPropertyManager, Metadata metadata, HistoryBasedOptimizationConfig config)
public HistoryBasedPlanStatisticsManager(ObjectMapper objectMapper, SessionPropertyManager sessionPropertyManager, Metadata metadata, HistoryBasedOptimizationConfig config, FeaturesConfig featuresConfig)
{
requireNonNull(objectMapper, "objectMapper is null");
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.historyBasedStatisticsCacheManager = new HistoryBasedStatisticsCacheManager();
ObjectMapper newObjectMapper = objectMapper.copy().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
this.planCanonicalInfoProvider = new CachingPlanCanonicalInfoProvider(historyBasedStatisticsCacheManager, newObjectMapper, metadata);
this.config = requireNonNull(config, "config is null");
this.isNativeExecution = featuresConfig.isNativeExecutionEnabled();
}

public void addHistoryBasedPlanStatisticsProviderFactory(HistoryBasedPlanStatisticsProvider historyBasedPlanStatisticsProvider)
Expand All @@ -67,7 +70,7 @@ public HistoryBasedPlanStatisticsCalculator getHistoryBasedPlanStatisticsCalcula

public HistoryBasedPlanStatisticsTracker getHistoryBasedPlanStatisticsTracker()
{
return new HistoryBasedPlanStatisticsTracker(() -> historyBasedPlanStatisticsProvider, historyBasedStatisticsCacheManager, sessionPropertyManager, config);
return new HistoryBasedPlanStatisticsTracker(() -> historyBasedPlanStatisticsProvider, historyBasedStatisticsCacheManager, sessionPropertyManager, config, isNativeExecution);
}

public PlanCanonicalInfoProvider getPlanCanonicalInfoProvider()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.HistoryBasedSourceInfo;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
Expand Down Expand Up @@ -78,17 +79,20 @@ public class HistoryBasedPlanStatisticsTracker
private final HistoryBasedStatisticsCacheManager historyBasedStatisticsCacheManager;
private final SessionPropertyManager sessionPropertyManager;
private final HistoryBasedOptimizationConfig config;
private final boolean isNativeExecution;

public HistoryBasedPlanStatisticsTracker(
Supplier<HistoryBasedPlanStatisticsProvider> historyBasedPlanStatisticsProvider,
HistoryBasedStatisticsCacheManager historyBasedStatisticsCacheManager,
SessionPropertyManager sessionPropertyManager,
HistoryBasedOptimizationConfig config)
HistoryBasedOptimizationConfig config,
boolean isNativeExecution)
{
this.historyBasedPlanStatisticsProvider = requireNonNull(historyBasedPlanStatisticsProvider, "historyBasedPlanStatisticsProvider is null");
this.historyBasedStatisticsCacheManager = requireNonNull(historyBasedStatisticsCacheManager, "historyBasedStatisticsCacheManager is null");
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.config = requireNonNull(config, "config is null");
this.isNativeExecution = isNativeExecution;
}

public void updateStatistics(QueryExecution queryExecution)
Expand Down Expand Up @@ -324,11 +328,14 @@ public void updateStatistics(QueryInfo queryInfo)
HistoricalPlanStatistics historicalPlanStatistics = Optional.ofNullable(historicalPlanStatisticsMap.get(entry.getKey()))
.orElseGet(HistoricalPlanStatistics::empty);
HistoryBasedSourceInfo historyBasedSourceInfo = (HistoryBasedSourceInfo) entry.getValue().getSourceInfo();
HistoricalPlanStatisticsEntryInfo historicalPlanStatisticsEntryInfo = new HistoricalPlanStatisticsEntryInfo(
isNativeExecution ? HistoricalPlanStatisticsEntryInfo.WorkerType.CPP : HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, queryInfo.getQueryId());
return updatePlanStatistics(
historicalPlanStatistics,
historyBasedSourceInfo.getInputTableStatistics().get(),
entry.getValue().getPlanStatistics(),
config);
config,
historicalPlanStatisticsEntryInfo);
}));

if (!newPlanStatistics.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
this.statsNormalizer = new StatsNormalizer();
this.scalarStatsCalculator = new ScalarStatsCalculator(metadata);
this.filterStatsCalculator = new FilterStatsCalculator(metadata, scalarStatsCalculator, statsNormalizer);
this.historyBasedPlanStatisticsManager = new HistoryBasedPlanStatisticsManager(objectMapper, new SessionPropertyManager(), metadata, new HistoryBasedOptimizationConfig());
this.historyBasedPlanStatisticsManager = new HistoryBasedPlanStatisticsManager(objectMapper, new SessionPropertyManager(), metadata, new HistoryBasedOptimizationConfig(), featuresConfig);
this.fragmentStatsProvider = new FragmentStatsProvider();
this.statsCalculator = createNewStatsCalculator(metadata, scalarStatsCalculator, statsNormalizer, filterStatsCalculator, historyBasedPlanStatisticsManager, fragmentStatsProvider);
this.taskCountEstimator = new TaskCountEstimator(() -> nodeCountForStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package com.facebook.presto.cost;

import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
import com.facebook.presto.spi.statistics.PartialAggregationStatistics;
import com.facebook.presto.spi.statistics.PlanStatistics;
Expand Down Expand Up @@ -97,7 +99,8 @@ private static HistoricalPlanStatistics updatePlanStatistics(
historicalPlanStatistics,
inputTableStatistics,
current,
new HistoryBasedOptimizationConfig());
new HistoryBasedOptimizationConfig(),
new HistoricalPlanStatisticsEntryInfo(HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, QueryId.valueOf("0")));
}

private static PlanStatistics getPredictedPlanStatistics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package com.facebook.presto.cost;

import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.PlanNodeWithHash;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
import com.facebook.presto.spi.statistics.PartialAggregationStatistics;
Expand Down Expand Up @@ -126,7 +128,8 @@ public Map<PlanNodeWithHash, HistoricalPlanStatistics> getStats(List<PlanNodeWit
if (node.getTable().toString().contains("orders")) {
return new HistoricalPlanStatistics(ImmutableList.of(new HistoricalPlanStatisticsEntry(
new PlanStatistics(Estimate.of(100), Estimate.of(1000), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty()),
ImmutableList.of(new PlanStatistics(Estimate.of(15000), Estimate.unknown(), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty())))));
ImmutableList.of(new PlanStatistics(Estimate.of(15000), Estimate.unknown(), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty())),
new HistoricalPlanStatisticsEntryInfo(HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, QueryId.valueOf("0")))));
}
}
return HistoricalPlanStatistics.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesContext;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.AccessDeniedException;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.transaction.TransactionManager;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -454,7 +455,7 @@ private QueryMonitor createQueryMonitor(CountingEventListener eventListener)
new SessionPropertyManager(),
metadata,
new QueryMonitorConfig(),
new HistoryBasedPlanStatisticsManager(new ObjectMapper(), new SessionPropertyManager(), metadata, new HistoryBasedOptimizationConfig()));
new HistoryBasedPlanStatisticsManager(new ObjectMapper(), new SessionPropertyManager(), metadata, new HistoryBasedOptimizationConfig(), new FeaturesConfig()));
}

private EventListenerManager createEventListenerManager(CountingEventListener countingEventListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
import com.facebook.presto.spi.statistics.PartialAggregationStatistics;
Expand Down Expand Up @@ -129,7 +130,7 @@ public void testUsesHboStatsWhenMatchRuntime()
ImmutableList.of(
new HistoricalPlanStatisticsEntry(
new PlanStatistics(Estimate.of(100), Estimate.of(1000), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty()),
ImmutableList.of())))));
ImmutableList.of(), new HistoricalPlanStatisticsEntryInfo(HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, QueryId.valueOf("0")))))));

tester.assertStatsFor(pb -> pb.remoteSource(ImmutableList.of(new PlanFragmentId(1)), statsEquivalentRemoteSource))
.check(check -> check.totalSize(1000)
Expand Down Expand Up @@ -178,7 +179,7 @@ public void testUsesRuntimeStatsWhenHboDisabled()
ImmutableList.of(
new HistoricalPlanStatisticsEntry(
new PlanStatistics(Estimate.of(100), Estimate.of(1000), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty()),
ImmutableList.of())))));
ImmutableList.of(), new HistoricalPlanStatisticsEntryInfo(HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, QueryId.valueOf("0")))))));

tester.assertStatsFor(pb -> pb.remoteSource(ImmutableList.of(new PlanFragmentId(1))))
.check(check -> check.totalSize(1000)
Expand All @@ -205,7 +206,7 @@ public void testUsesRuntimeStatsWhenDiffersFromHbo()
ImmutableList.of(
new HistoricalPlanStatisticsEntry(
new PlanStatistics(Estimate.of(10), Estimate.of(100), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty()),
ImmutableList.of())))));
ImmutableList.of(), new HistoricalPlanStatisticsEntryInfo(HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, QueryId.valueOf("0")))))));

tester.assertStatsFor(pb -> pb.remoteSource(ImmutableList.of(new PlanFragmentId(1))))
.check(check -> check.totalSize(1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ public class HistoricalPlanStatisticsEntry
private final PlanStatistics planStatistics;
// Size of input tables when plan statistics was recorded. This list will be sorted by input tables canonical order.
private final List<PlanStatistics> inputTableStatistics;
private final HistoricalPlanStatisticsEntryInfo historicalPlanStatisticsEntryInfo;

@ThriftConstructor
public HistoricalPlanStatisticsEntry(PlanStatistics planStatistics, List<PlanStatistics> inputTableStatistics)
public HistoricalPlanStatisticsEntry(PlanStatistics planStatistics, List<PlanStatistics> inputTableStatistics, HistoricalPlanStatisticsEntryInfo historicalPlanStatisticsEntryInfo)
{
// Check for nulls, to make it thrift backwards compatible
this.planStatistics = planStatistics == null ? PlanStatistics.empty() : planStatistics;
this.inputTableStatistics = unmodifiableList(inputTableStatistics == null ? emptyList() : inputTableStatistics);
this.historicalPlanStatisticsEntryInfo = historicalPlanStatisticsEntryInfo;
}

@ThriftField(value = 1, requiredness = OPTIONAL)
Expand All @@ -52,6 +54,12 @@ public List<PlanStatistics> getInputTableStatistics()
return inputTableStatistics;
}

@ThriftField(value = 3, requiredness = OPTIONAL)
public HistoricalPlanStatisticsEntryInfo getHistoricalPlanStatisticsEntryInfo()
{
return historicalPlanStatisticsEntryInfo;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -62,18 +70,18 @@ public boolean equals(Object o)
return false;
}
HistoricalPlanStatisticsEntry that = (HistoricalPlanStatisticsEntry) o;
return Objects.equals(planStatistics, that.planStatistics) && Objects.equals(inputTableStatistics, that.inputTableStatistics);
return Objects.equals(planStatistics, that.planStatistics) && Objects.equals(inputTableStatistics, that.inputTableStatistics) && Objects.equals(historicalPlanStatisticsEntryInfo, that.historicalPlanStatisticsEntryInfo);
}

@Override
public int hashCode()
{
return Objects.hash(planStatistics, inputTableStatistics);
return Objects.hash(planStatistics, inputTableStatistics, historicalPlanStatisticsEntryInfo);
}

@Override
public String toString()
{
return format("HistoricalPlanStatisticsEntry{planStatistics=%s, inputTableStatistics=%s}", planStatistics, inputTableStatistics);
return format("HistoricalPlanStatisticsEntry{planStatistics=%s, inputTableStatistics=%s, historicalPlanStatisticsEntryInfo=%s}", planStatistics, inputTableStatistics, historicalPlanStatisticsEntryInfo);
}
}
Loading

0 comments on commit 6de650a

Please sign in to comment.