Skip to content

Commit e84e4d2

Browse files
committed
Add Exchange before GroupId to improve Partial Aggregation
Based on: trinodb/trino@dc1d66fb co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com> Based on : trinodb/trino@c573b34 co-authored-by: Lukasz Stec <lukasz.stec@starburstdata.com> Based on: trinodb/trino@29328d3 co-authored-by: praveenkrishna <praveenkrishna@tutanota.com>
1 parent 828e81f commit e84e4d2

15 files changed

+827
-16
lines changed

presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java

+10
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ public final class SystemSessionProperties
328328
public static final String INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER = "include_values_node_in_connector_optimizer";
329329
public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled";
330330
public static final String EXPRESSION_OPTIMIZER_NAME = "expression_optimizer_name";
331+
public static final String ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID = "add_exchange_below_partial_aggregation_over_group_id";
331332

332333
// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
333334
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
@@ -1858,6 +1859,10 @@ public SystemSessionProperties(
18581859
EXPRESSION_OPTIMIZER_NAME,
18591860
"Configure which expression optimizer to use",
18601861
featuresConfig.getExpressionOptimizerName(),
1862+
false),
1863+
booleanProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID,
1864+
"Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance",
1865+
featuresConfig.getAddExchangeBelowPartialAggregationOverGroupId(),
18611866
false));
18621867
}
18631868

@@ -3164,4 +3169,9 @@ public static String getExpressionOptimizerName(Session session)
31643169
{
31653170
return session.getSystemProperty(EXPRESSION_OPTIMIZER_NAME, String.class);
31663171
}
3172+
3173+
public static boolean isEnabledAddExchangeBelowGroupId(Session session)
3174+
{
3175+
return session.getSystemProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID, Boolean.class);
3176+
}
31673177
}

presto-main/src/main/java/com/facebook/presto/cost/TaskCountEstimator.java

+8
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.facebook.presto.cost;
1515

16+
import com.facebook.presto.Session;
1617
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
1718
import com.facebook.presto.metadata.InternalNode;
1819
import com.facebook.presto.metadata.InternalNodeManager;
@@ -22,6 +23,8 @@
2223
import java.util.Set;
2324
import java.util.function.IntSupplier;
2425

26+
import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
27+
import static java.lang.Math.min;
2528
import static java.lang.Math.toIntExact;
2629
import static java.util.Objects.requireNonNull;
2730

@@ -54,4 +57,9 @@ public int estimateSourceDistributedTaskCount()
5457
{
5558
return numberOfNodes.getAsInt();
5659
}
60+
61+
public int estimateHashedTaskCount(Session session)
62+
{
63+
return min(numberOfNodes.getAsInt(), getHashPartitionCount(session));
64+
}
5765
}

presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java

+14
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ public class FeaturesConfig
297297
private boolean singleNodeExecutionEnabled;
298298
private boolean nativeExecutionScaleWritersThreadsEnabled;
299299
private String expressionOptimizerName = DEFAULT_EXPRESSION_OPTIMIZER_NAME;
300+
private boolean addExchangeBelowPartialAggregationOverGroupId;
300301

301302
public enum PartitioningPrecisionStrategy
302303
{
@@ -2945,4 +2946,17 @@ public boolean isExcludeInvalidWorkerSessionProperties()
29452946
{
29462947
return this.setExcludeInvalidWorkerSessionProperties;
29472948
}
2949+
2950+
@Config("optimizer.add-exchange-below-partial-aggregation-over-group-id")
2951+
@ConfigDescription("Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance")
2952+
public FeaturesConfig setAddExchangeBelowPartialAggregationOverGroupId(boolean addExchangeBelowPartialAggregationOverGroupId)
2953+
{
2954+
this.addExchangeBelowPartialAggregationOverGroupId = addExchangeBelowPartialAggregationOverGroupId;
2955+
return this;
2956+
}
2957+
2958+
public boolean getAddExchangeBelowPartialAggregationOverGroupId()
2959+
{
2960+
return addExchangeBelowPartialAggregationOverGroupId;
2961+
}
29482962
}

presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.presto.cost.CostComparator;
1919
import com.facebook.presto.cost.StatsCalculator;
2020
import com.facebook.presto.cost.TaskCountEstimator;
21+
import com.facebook.presto.execution.TaskManagerConfig;
2122
import com.facebook.presto.metadata.Metadata;
2223
import com.facebook.presto.split.PageSourceManager;
2324
import com.facebook.presto.split.SplitManager;
@@ -27,6 +28,7 @@
2728
import com.facebook.presto.sql.planner.iterative.IterativeOptimizer;
2829
import com.facebook.presto.sql.planner.iterative.Rule;
2930
import com.facebook.presto.sql.planner.iterative.properties.LogicalPropertiesProviderImpl;
31+
import com.facebook.presto.sql.planner.iterative.rule.AddExchangesBelowPartialAggregationOverGroupIdRuleSet;
3032
import com.facebook.presto.sql.planner.iterative.rule.AddIntermediateAggregations;
3133
import com.facebook.presto.sql.planner.iterative.rule.AddNotNullFiltersToJoinNode;
3234
import com.facebook.presto.sql.planner.iterative.rule.CombineApproxPercentileFunctions;
@@ -223,7 +225,8 @@ public PlanOptimizers(
223225
TaskCountEstimator taskCountEstimator,
224226
PartitioningProviderManager partitioningProviderManager,
225227
FeaturesConfig featuresConfig,
226-
ExpressionOptimizerManager expressionOptimizerManager)
228+
ExpressionOptimizerManager expressionOptimizerManager,
229+
TaskManagerConfig taskManagerConfig)
227230
{
228231
this(metadata,
229232
sqlParser,
@@ -239,7 +242,8 @@ public PlanOptimizers(
239242
taskCountEstimator,
240243
partitioningProviderManager,
241244
featuresConfig,
242-
expressionOptimizerManager);
245+
expressionOptimizerManager,
246+
taskManagerConfig);
243247
}
244248

245249
@PostConstruct
@@ -271,7 +275,8 @@ public PlanOptimizers(
271275
TaskCountEstimator taskCountEstimator,
272276
PartitioningProviderManager partitioningProviderManager,
273277
FeaturesConfig featuresConfig,
274-
ExpressionOptimizerManager expressionOptimizerManager)
278+
ExpressionOptimizerManager expressionOptimizerManager,
279+
TaskManagerConfig taskManagerConfig)
275280
{
276281
this.exporter = exporter;
277282
ImmutableList.Builder<PlanOptimizer> builder = ImmutableList.builder();
@@ -822,6 +827,7 @@ public PlanOptimizers(
822827

823828
if (!noExchange) {
824829
builder.add(new ReplicateSemiJoinInDelete()); // Must run before AddExchanges
830+
825831
builder.add(new IterativeOptimizer(
826832
metadata,
827833
ruleStats,
@@ -832,6 +838,7 @@ public PlanOptimizers(
832838
// Must run before AddExchanges and after ReplicateSemiJoinInDelete
833839
// to avoid temporarily having an invalid plan
834840
new DetermineSemiJoinDistributionType(costComparator, taskCountEstimator))));
841+
835842
builder.add(new RandomizeNullKeyInOuterJoin(metadata.getFunctionAndTypeManager(), statsCalculator),
836843
new PruneUnreferencedOutputs(),
837844
new IterativeOptimizer(
@@ -843,6 +850,7 @@ public PlanOptimizers(
843850
new PruneRedundantProjectionAssignments(),
844851
new InlineProjections(metadata.getFunctionAndTypeManager()),
845852
new RemoveRedundantIdentityProjections())));
853+
846854
builder.add(new ShardJoins(metadata, metadata.getFunctionAndTypeManager(), statsCalculator),
847855
new PruneUnreferencedOutputs());
848856
builder.add(
@@ -916,6 +924,13 @@ public PlanOptimizers(
916924
ImmutableSet.of(
917925
new PruneJoinColumns())));
918926

927+
builder.add(new IterativeOptimizer(
928+
metadata,
929+
ruleStats,
930+
statsCalculator,
931+
costCalculator,
932+
new AddExchangesBelowPartialAggregationOverGroupIdRuleSet(taskCountEstimator, taskManagerConfig, metadata).rules()));
933+
919934
builder.add(new IterativeOptimizer(
920935
metadata,
921936
ruleStats,

0 commit comments

Comments
 (0)