Skip to content

Commit

Permalink
Add Exchange before GroupId to improve Partial Aggregation
Browse files Browse the repository at this point in the history
The rule brings significant improvement in TPC-DS Q22 and Q67

Based on: trinodb/trino@dc1d66fb
co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com>
  • Loading branch information
aaneja committed Nov 14, 2024
1 parent 5442d1b commit 62aaab6
Show file tree
Hide file tree
Showing 9 changed files with 460 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,32 @@ local exchange (GATHER, SINGLE, [])
local exchange (REPARTITION, HASH, [ca_country$gid, ca_county$gid, ca_state$gid, groupid, i_item_id$gid])
remote exchange (REPARTITION, HASH, [ca_country$gid, ca_county$gid, ca_state$gid, groupid, i_item_id$gid])
partial aggregation over (ca_country$gid, ca_county$gid, ca_state$gid, groupid, i_item_id$gid)
join (INNER, REPLICATED):
join (INNER, REPLICATED):
local exchange (REPARTITION, HASH, [i_item_id])
remote exchange (REPARTITION, HASH, [i_item_id])
join (INNER, REPLICATED):
join (INNER, REPLICATED):
scan catalog_sales
join (INNER, REPLICATED):
join (INNER, REPLICATED):
scan catalog_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan customer_demographics
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, [c_current_cdemo_sk])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, [c_current_addr_sk])
scan customer
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [ca_address_sk])
scan customer_address
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [cd_demo_sk_0])
scan customer_demographics
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan customer_demographics
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, [c_current_cdemo_sk])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, [c_current_addr_sk])
scan customer
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [ca_address_sk])
scan customer_address
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, [cd_demo_sk_0])
scan customer_demographics
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan item
scan item
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ local exchange (GATHER, SINGLE, [])
local exchange (REPARTITION, HASH, [groupid, i_brand$gid, i_category$gid, i_class$gid, i_product_name$gid])
remote exchange (REPARTITION, HASH, [groupid, i_brand$gid, i_category$gid, i_class$gid, i_product_name$gid])
partial aggregation over (groupid, i_brand$gid, i_category$gid, i_class$gid, i_product_name$gid)
join (INNER, REPLICATED):
join (INNER, REPLICATED):
scan inventory
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan item
local exchange (REPARTITION, HASH, [i_product_name])
remote exchange (REPARTITION, HASH, [i_product_name])
join (INNER, REPLICATED):
join (INNER, REPLICATED):
scan inventory
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan item
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ local exchange (GATHER, SINGLE, [])
local exchange (REPARTITION, HASH, [d_moy$gid, d_qoy$gid, d_year$gid, groupid, i_brand$gid, i_category$gid, i_class$gid, i_product_name$gid, s_store_id$gid])
remote exchange (REPARTITION, HASH, [d_moy$gid, d_qoy$gid, d_year$gid, groupid, i_brand$gid, i_category$gid, i_class$gid, i_product_name$gid, s_store_id$gid])
partial aggregation over (d_moy$gid, d_qoy$gid, d_year$gid, groupid, i_brand$gid, i_category$gid, i_class$gid, i_product_name$gid, s_store_id$gid)
join (INNER, REPLICATED):
join (INNER, REPLICATED):
local exchange (REPARTITION, HASH, [i_product_name])
remote exchange (REPARTITION, HASH, [i_product_name])
join (INNER, REPLICATED):
scan store_sales
join (INNER, REPLICATED):
join (INNER, REPLICATED):
scan store_sales
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan store
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan store
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan item
scan item
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZER_USE_HISTOGRAMS = "optimizer_use_histograms";
public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns";
public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values";
public static final String ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID = "enable_forced_exchange_below_group_id";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
Expand Down Expand Up @@ -1830,6 +1831,10 @@ public SystemSessionProperties(
NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING,
"Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only",
queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(),
false),
booleanProperty(ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID,
"Whether or not to force exchange below GroupId",
true,
false));
}

Expand Down Expand Up @@ -3111,4 +3116,9 @@ public static int getMinColumnarEncodingChannelsToPreferRowWiseEncoding(Session
{
return session.getSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, Integer.class);
}

public static boolean isEnableForcedExchangeBelowGroupId(Session session)
{
return session.getSystemProperty(ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
Expand All @@ -22,6 +23,8 @@
import java.util.Set;
import java.util.function.IntSupplier;

import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -54,4 +57,9 @@ public int estimateSourceDistributedTaskCount()
{
return numberOfNodes.getAsInt();
}

public int estimateHashedTaskCount(Session session)
{
return min(numberOfNodes.getAsInt(), getHashPartitionCount(session));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.cost.CostComparator;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.cost.TaskCountEstimator;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.split.SplitManager;
Expand All @@ -26,6 +27,7 @@
import com.facebook.presto.sql.planner.iterative.IterativeOptimizer;
import com.facebook.presto.sql.planner.iterative.Rule;
import com.facebook.presto.sql.planner.iterative.properties.LogicalPropertiesProviderImpl;
import com.facebook.presto.sql.planner.iterative.rule.AddExchangesBelowPartialAggregationOverGroupIdRuleSet;
import com.facebook.presto.sql.planner.iterative.rule.AddIntermediateAggregations;
import com.facebook.presto.sql.planner.iterative.rule.AddNotNullFiltersToJoinNode;
import com.facebook.presto.sql.planner.iterative.rule.CombineApproxPercentileFunctions;
Expand Down Expand Up @@ -219,7 +221,8 @@ public PlanOptimizers(
CostComparator costComparator,
TaskCountEstimator taskCountEstimator,
PartitioningProviderManager partitioningProviderManager,
FeaturesConfig featuresConfig)
FeaturesConfig featuresConfig,
TaskManagerConfig taskManagerConfig)
{
this(metadata,
sqlParser,
Expand All @@ -234,7 +237,8 @@ public PlanOptimizers(
costComparator,
taskCountEstimator,
partitioningProviderManager,
featuresConfig);
featuresConfig,
taskManagerConfig);
}

@PostConstruct
Expand Down Expand Up @@ -265,7 +269,8 @@ public PlanOptimizers(
CostComparator costComparator,
TaskCountEstimator taskCountEstimator,
PartitioningProviderManager partitioningProviderManager,
FeaturesConfig featuresConfig)
FeaturesConfig featuresConfig,
TaskManagerConfig taskManagerConfig)
{
this.exporter = exporter;
ImmutableList.Builder<PlanOptimizer> builder = ImmutableList.builder();
Expand Down Expand Up @@ -908,6 +913,13 @@ public PlanOptimizers(
ImmutableSet.of(
new PruneJoinColumns())));

builder.add(new IterativeOptimizer(
metadata,
ruleStats,
statsCalculator,
costCalculator,
new AddExchangesBelowPartialAggregationOverGroupIdRuleSet(taskCountEstimator, taskManagerConfig, metadata, sqlParser).rules()));

builder.add(new IterativeOptimizer(
metadata,
ruleStats,
Expand Down
Loading

0 comments on commit 62aaab6

Please sign in to comment.