Skip to content

Commit

Permalink
Add Exchange before GroupId to improve Partial Aggregation
Browse files Browse the repository at this point in the history
Based on: trinodb/trino@dc1d66fb
co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com>
Based on : trinodb/trino@c573b34
co-authored-by: Lukasz Stec <lukasz.stec@starburstdata.com>
Based on: trinodb/trino@29328d3
co-authored-by: praveenkrishna <praveenkrishna@tutanota.com>
  • Loading branch information
aaneja committed Feb 4, 2025
1 parent 928fdb3 commit f95fab9
Show file tree
Hide file tree
Showing 14 changed files with 821 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public final class SystemSessionProperties
public static final String INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER = "include_values_node_in_connector_optimizer";
public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled";
public static final String EXPRESSION_OPTIMIZER_NAME = "expression_optimizer_name";
public static final String ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID = "add_exchange_below_partial_aggregation_over_group_id";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
Expand Down Expand Up @@ -1858,6 +1859,10 @@ public SystemSessionProperties(
EXPRESSION_OPTIMIZER_NAME,
"Configure which expression optimizer to use",
featuresConfig.getExpressionOptimizerName(),
false),
booleanProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID,
"Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance",
featuresConfig.getAddExchangeBelowPartialAggregationOverGroupId(),
false));
}

Expand Down Expand Up @@ -3164,4 +3169,9 @@ public static String getExpressionOptimizerName(Session session)
{
return session.getSystemProperty(EXPRESSION_OPTIMIZER_NAME, String.class);
}

public static boolean isEnabledAddExchangeBelowGroupId(Session session)
{
return session.getSystemProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
Expand All @@ -22,6 +23,8 @@
import java.util.Set;
import java.util.function.IntSupplier;

import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -54,4 +57,9 @@ public int estimateSourceDistributedTaskCount()
{
return numberOfNodes.getAsInt();
}

public int estimateHashedTaskCount(Session session)
{
return min(numberOfNodes.getAsInt(), getHashPartitionCount(session));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public class FeaturesConfig
private boolean singleNodeExecutionEnabled;
private boolean nativeExecutionScaleWritersThreadsEnabled;
private String expressionOptimizerName = DEFAULT_EXPRESSION_OPTIMIZER_NAME;
private boolean addExchangeBelowPartialAggregationOverGroupId;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2930,4 +2931,17 @@ public FeaturesConfig setExpressionOptimizerName(String expressionOptimizerName)
this.expressionOptimizerName = expressionOptimizerName;
return this;
}

@Config("optimizer.add-exchange-below-partial-aggregation-over-group-id")
@ConfigDescription("Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance")
public FeaturesConfig setAddExchangeBelowPartialAggregationOverGroupId(boolean addExchangeBelowPartialAggregationOverGroupId)
{
this.addExchangeBelowPartialAggregationOverGroupId = addExchangeBelowPartialAggregationOverGroupId;
return this;
}

public boolean getAddExchangeBelowPartialAggregationOverGroupId()
{
return addExchangeBelowPartialAggregationOverGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.cost.CostComparator;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.cost.TaskCountEstimator;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.split.SplitManager;
Expand All @@ -27,6 +28,7 @@
import com.facebook.presto.sql.planner.iterative.IterativeOptimizer;
import com.facebook.presto.sql.planner.iterative.Rule;
import com.facebook.presto.sql.planner.iterative.properties.LogicalPropertiesProviderImpl;
import com.facebook.presto.sql.planner.iterative.rule.AddExchangesBelowPartialAggregationOverGroupIdRuleSet;
import com.facebook.presto.sql.planner.iterative.rule.AddIntermediateAggregations;
import com.facebook.presto.sql.planner.iterative.rule.AddNotNullFiltersToJoinNode;
import com.facebook.presto.sql.planner.iterative.rule.CombineApproxPercentileFunctions;
Expand Down Expand Up @@ -222,7 +224,8 @@ public PlanOptimizers(
TaskCountEstimator taskCountEstimator,
PartitioningProviderManager partitioningProviderManager,
FeaturesConfig featuresConfig,
ExpressionOptimizerManager expressionOptimizerManager)
ExpressionOptimizerManager expressionOptimizerManager,
TaskManagerConfig taskManagerConfig)
{
this(metadata,
sqlParser,
Expand All @@ -238,7 +241,8 @@ public PlanOptimizers(
taskCountEstimator,
partitioningProviderManager,
featuresConfig,
expressionOptimizerManager);
expressionOptimizerManager,
taskManagerConfig);
}

@PostConstruct
Expand Down Expand Up @@ -270,7 +274,8 @@ public PlanOptimizers(
TaskCountEstimator taskCountEstimator,
PartitioningProviderManager partitioningProviderManager,
FeaturesConfig featuresConfig,
ExpressionOptimizerManager expressionOptimizerManager)
ExpressionOptimizerManager expressionOptimizerManager,
TaskManagerConfig taskManagerConfig)
{
this.exporter = exporter;
ImmutableList.Builder<PlanOptimizer> builder = ImmutableList.builder();
Expand Down Expand Up @@ -820,6 +825,7 @@ public PlanOptimizers(

if (!noExchange) {
builder.add(new ReplicateSemiJoinInDelete()); // Must run before AddExchanges

builder.add(new IterativeOptimizer(
metadata,
ruleStats,
Expand All @@ -830,6 +836,7 @@ public PlanOptimizers(
// Must run before AddExchanges and after ReplicateSemiJoinInDelete
// to avoid temporarily having an invalid plan
new DetermineSemiJoinDistributionType(costComparator, taskCountEstimator))));

builder.add(new RandomizeNullKeyInOuterJoin(metadata.getFunctionAndTypeManager(), statsCalculator),
new PruneUnreferencedOutputs(),
new IterativeOptimizer(
Expand All @@ -841,6 +848,7 @@ public PlanOptimizers(
new PruneRedundantProjectionAssignments(),
new InlineProjections(metadata.getFunctionAndTypeManager()),
new RemoveRedundantIdentityProjections())));

builder.add(new ShardJoins(metadata, metadata.getFunctionAndTypeManager(), statsCalculator),
new PruneUnreferencedOutputs());
builder.add(
Expand Down Expand Up @@ -914,6 +922,13 @@ public PlanOptimizers(
ImmutableSet.of(
new PruneJoinColumns())));

builder.add(new IterativeOptimizer(
metadata,
ruleStats,
statsCalculator,
costCalculator,
new AddExchangesBelowPartialAggregationOverGroupIdRuleSet(taskCountEstimator, taskManagerConfig, metadata, sqlParser).rules()));

builder.add(new IterativeOptimizer(
metadata,
ruleStats,
Expand Down
Loading

0 comments on commit f95fab9

Please sign in to comment.