Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Exchange before GroupId to improve Partial Aggregation #24047

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public final class SystemSessionProperties
public static final String INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER = "include_values_node_in_connector_optimizer";
public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled";
public static final String EXPRESSION_OPTIMIZER_NAME = "expression_optimizer_name";
public static final String ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID = "add_exchange_below_partial_aggregation_over_group_id";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
Expand Down Expand Up @@ -1858,6 +1859,10 @@ public SystemSessionProperties(
EXPRESSION_OPTIMIZER_NAME,
"Configure which expression optimizer to use",
featuresConfig.getExpressionOptimizerName(),
false),
booleanProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID,
"Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance",
featuresConfig.getAddExchangeBelowPartialAggregationOverGroupId(),
false));
}

Expand Down Expand Up @@ -3164,4 +3169,9 @@ public static String getExpressionOptimizerName(Session session)
{
return session.getSystemProperty(EXPRESSION_OPTIMIZER_NAME, String.class);
}

public static boolean isEnabledAddExchangeBelowGroupId(Session session)
aaneja marked this conversation as resolved.
Show resolved Hide resolved
{
return session.getSystemProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
Expand All @@ -22,6 +23,8 @@
import java.util.Set;
import java.util.function.IntSupplier;

import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -54,4 +57,9 @@ public int estimateSourceDistributedTaskCount()
{
return numberOfNodes.getAsInt();
}

public int estimateHashedTaskCount(Session session)
{
return min(numberOfNodes.getAsInt(), getHashPartitionCount(session));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ public class FeaturesConfig
private boolean singleNodeExecutionEnabled;
private boolean nativeExecutionScaleWritersThreadsEnabled;
private String expressionOptimizerName = DEFAULT_EXPRESSION_OPTIMIZER_NAME;
private boolean addExchangeBelowPartialAggregationOverGroupId;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2945,4 +2946,17 @@ public boolean isExcludeInvalidWorkerSessionProperties()
{
return this.setExcludeInvalidWorkerSessionProperties;
}

@Config("optimizer.add-exchange-below-partial-aggregation-over-group-id")
@ConfigDescription("Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance")
public FeaturesConfig setAddExchangeBelowPartialAggregationOverGroupId(boolean addExchangeBelowPartialAggregationOverGroupId)
{
this.addExchangeBelowPartialAggregationOverGroupId = addExchangeBelowPartialAggregationOverGroupId;
return this;
}

public boolean getAddExchangeBelowPartialAggregationOverGroupId()
{
return addExchangeBelowPartialAggregationOverGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.cost.CostComparator;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.cost.TaskCountEstimator;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.split.SplitManager;
Expand All @@ -27,6 +28,7 @@
import com.facebook.presto.sql.planner.iterative.IterativeOptimizer;
import com.facebook.presto.sql.planner.iterative.Rule;
import com.facebook.presto.sql.planner.iterative.properties.LogicalPropertiesProviderImpl;
import com.facebook.presto.sql.planner.iterative.rule.AddExchangesBelowPartialAggregationOverGroupIdRuleSet;
import com.facebook.presto.sql.planner.iterative.rule.AddIntermediateAggregations;
import com.facebook.presto.sql.planner.iterative.rule.AddNotNullFiltersToJoinNode;
import com.facebook.presto.sql.planner.iterative.rule.CombineApproxPercentileFunctions;
Expand Down Expand Up @@ -222,7 +224,8 @@ public PlanOptimizers(
TaskCountEstimator taskCountEstimator,
PartitioningProviderManager partitioningProviderManager,
FeaturesConfig featuresConfig,
ExpressionOptimizerManager expressionOptimizerManager)
ExpressionOptimizerManager expressionOptimizerManager,
TaskManagerConfig taskManagerConfig)
{
this(metadata,
sqlParser,
Expand All @@ -238,7 +241,8 @@ public PlanOptimizers(
taskCountEstimator,
partitioningProviderManager,
featuresConfig,
expressionOptimizerManager);
expressionOptimizerManager,
taskManagerConfig);
}

@PostConstruct
Expand Down Expand Up @@ -270,7 +274,8 @@ public PlanOptimizers(
TaskCountEstimator taskCountEstimator,
PartitioningProviderManager partitioningProviderManager,
FeaturesConfig featuresConfig,
ExpressionOptimizerManager expressionOptimizerManager)
ExpressionOptimizerManager expressionOptimizerManager,
TaskManagerConfig taskManagerConfig)
{
this.exporter = exporter;
ImmutableList.Builder<PlanOptimizer> builder = ImmutableList.builder();
Expand Down Expand Up @@ -820,6 +825,7 @@ public PlanOptimizers(

if (!noExchange) {
builder.add(new ReplicateSemiJoinInDelete()); // Must run before AddExchanges

builder.add(new IterativeOptimizer(
metadata,
ruleStats,
Expand All @@ -830,6 +836,7 @@ public PlanOptimizers(
// Must run before AddExchanges and after ReplicateSemiJoinInDelete
// to avoid temporarily having an invalid plan
new DetermineSemiJoinDistributionType(costComparator, taskCountEstimator))));

builder.add(new RandomizeNullKeyInOuterJoin(metadata.getFunctionAndTypeManager(), statsCalculator),
new PruneUnreferencedOutputs(),
new IterativeOptimizer(
Expand All @@ -841,6 +848,7 @@ public PlanOptimizers(
new PruneRedundantProjectionAssignments(),
new InlineProjections(metadata.getFunctionAndTypeManager()),
new RemoveRedundantIdentityProjections())));

builder.add(new ShardJoins(metadata, metadata.getFunctionAndTypeManager(), statsCalculator),
new PruneUnreferencedOutputs());
builder.add(
Expand Down Expand Up @@ -914,6 +922,13 @@ public PlanOptimizers(
ImmutableSet.of(
new PruneJoinColumns())));

builder.add(new IterativeOptimizer(
metadata,
ruleStats,
statsCalculator,
costCalculator,
new AddExchangesBelowPartialAggregationOverGroupIdRuleSet(taskCountEstimator, taskManagerConfig, metadata).rules()));

builder.add(new IterativeOptimizer(
metadata,
ruleStats,
Expand Down
Loading
Loading