Skip to content

Commit

Permalink
Remove redundant distinct over group by
Browse files Browse the repository at this point in the history
Remove distinct if the corresponding output is already distinct after a
group by operation.
  • Loading branch information
feilong-liu committed Oct 21, 2022
1 parent c6f6262 commit bd1aec4
Show file tree
Hide file tree
Showing 8 changed files with 488 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.benchmark;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.intellij.lang.annotations.Language;

import java.util.Map;

import static com.facebook.presto.benchmark.BenchmarkQueryRunner.createLocalQueryRunner;

public class SqlRemoveRedundantDistinctAggregationBenchmarks
extends AbstractSqlBenchmark
{
private static final Logger LOGGER = Logger.get(SqlRewriteConditionalAggregationBenchmarks.class);

public SqlRemoveRedundantDistinctAggregationBenchmarks(LocalQueryRunner localQueryRunner, @Language("SQL") String sql)
{
super(localQueryRunner, "remove_redundant_distinct_aggregation", 10, 20, sql);
}

public static void main(String[] args)
{
Map<String, String> disableOptimization = ImmutableMap.of("remove_redundant_distinct_aggregation_enabled", "false");
String sql = "select distinct orderkey, partkey, suppkey, avg(extendedprice) from lineitem group by orderkey, partkey, suppkey";
LOGGER.info("Without optimization");
new SqlRemoveRedundantDistinctAggregationBenchmarks(createLocalQueryRunner(disableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
LOGGER.info("With optimization");
new SqlRemoveRedundantDistinctAggregationBenchmarks(createLocalQueryRunner(), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ public final class SystemSessionProperties
public static final String QUICK_DISTINCT_LIMIT_ENABLED = "quick_distinct_limit_enabled";
public static final String OPTIMIZE_CONDITIONAL_AGGREGATION_ENABLED = "optimize_conditional_aggregation_enabled";
public static final String ANALYZER_TYPE = "analyzer_type";
public static final String REMOVE_REDUNDANT_DISTINCT_AGGREGATION_ENABLED = "remove_redundant_distinct_aggregation_enabled";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled";
Expand Down Expand Up @@ -1409,6 +1410,11 @@ public SystemSessionProperties(
OPTIMIZE_CONDITIONAL_AGGREGATION_ENABLED,
"Enable rewriting IF(condition, AGG(x)) to AGG(x) with condition included in mask",
featuresConfig.isOptimizeConditionalAggregationEnabled(),
false),
booleanProperty(
REMOVE_REDUNDANT_DISTINCT_AGGREGATION_ENABLED,
"Enable removing distinct aggregation node if input is already distinct",
featuresConfig.isRemoveRedundantDistinctAggregationEnabled(),
false));
}

Expand Down Expand Up @@ -2359,4 +2365,9 @@ public static boolean isOptimizeConditionalAggregationEnabled(Session session)
{
return session.getSystemProperty(OPTIMIZE_CONDITIONAL_AGGREGATION_ENABLED, Boolean.class);
}

public static boolean isRemoveRedundantDistinctAggregationEnabled(Session session)
{
return session.getSystemProperty(REMOVE_REDUNDANT_DISTINCT_AGGREGATION_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ public class FeaturesConfig
private String nativeExecutionExecutablePath = "./presto_server";
private boolean randomizeOuterJoinNullKey;
private boolean isOptimizeConditionalAggregationEnabled;
private boolean isRemoveRedundantDistinctAggregationEnabled = true;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2266,4 +2267,17 @@ public FeaturesConfig setOptimizeConditionalAggregationEnabled(boolean isOptimiz
this.isOptimizeConditionalAggregationEnabled = isOptimizeConditionalAggregationEnabled;
return this;
}

public boolean isRemoveRedundantDistinctAggregationEnabled()
{
return isRemoveRedundantDistinctAggregationEnabled;
}

@Config("optimizer.remove-redundant-distinct-aggregation-enabled")
@ConfigDescription("Enable removing distinct aggregation node if input is already distinct")
public FeaturesConfig setRemoveRedundantDistinctAggregationEnabled(boolean isRemoveRedundantDistinctAggregationEnabled)
{
this.isRemoveRedundantDistinctAggregationEnabled = isRemoveRedundantDistinctAggregationEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import com.facebook.presto.sql.planner.optimizations.PruneUnreferencedOutputs;
import com.facebook.presto.sql.planner.optimizations.PushdownSubfields;
import com.facebook.presto.sql.planner.optimizations.RandomizeNullKeyInOuterJoin;
import com.facebook.presto.sql.planner.optimizations.RemoveRedundantDistinctAggregation;
import com.facebook.presto.sql.planner.optimizations.ReplicateSemiJoinInDelete;
import com.facebook.presto.sql.planner.optimizations.RewriteIfOverAggregation;
import com.facebook.presto.sql.planner.optimizations.SetFlatteningOptimizer;
Expand Down Expand Up @@ -602,6 +603,8 @@ public PlanOptimizers(
.add(new InlineProjections(metadata.getFunctionAndTypeManager()))
.build()));

builder.add(new RemoveRedundantDistinctAggregation());

if (!forceSingleNode) {
builder.add(new ReplicateSemiJoinInDelete()); // Must run before AddExchanges
builder.add(new IterativeOptimizer(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.Session;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.PlanVariableAllocator;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.isRemoveRedundantDistinctAggregationEnabled;
import static com.facebook.presto.spi.plan.AggregationNode.isDistinct;
import static com.facebook.presto.sql.planner.plan.ChildReplacer.replaceChildren;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;

/**
* Remove the redundant distinct if output is already distinct.
* For example, for query select distinct k, sum(x) from table group by k
* The plan will change
* <p>
* From:
* <pre>
* - Aggregation group by k, sum
* - Aggregation (sum <- AGG(x)) group by k
* </pre>
* To:
* <pre>
* - Aggregation (sum <- AGG(x)) group by k
* </pre>
* <p>
*/
public class RemoveRedundantDistinctAggregation
implements PlanOptimizer
{
@Override
public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, PlanVariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
if (isRemoveRedundantDistinctAggregationEnabled(session)) {
PlanWithProperties result = new RemoveRedundantDistinctAggregation.Rewriter().accept(plan);
return result.getNode();
}
return plan;
}

private static class PlanWithProperties
{
private final PlanNode node;
// Variables in each set combines to be distinct in the output of the plan node.
private final List<Set<VariableReferenceExpression>> distinctVariableSet;

public PlanWithProperties(PlanNode node, List<Set<VariableReferenceExpression>> distinctVariableSet)
{
this.node = requireNonNull(node, "node is null");
this.distinctVariableSet = requireNonNull(distinctVariableSet, "StreamProperties is null");
}

public PlanNode getNode()
{
return node;
}

public List<Set<VariableReferenceExpression>> getProperties()
{
return distinctVariableSet;
}
}

private static class Rewriter
extends InternalPlanVisitor<PlanWithProperties, Void>
{
@Override
public PlanWithProperties visitPlan(PlanNode node, Void context)
{
// For nodes such as join, unnest etc. the distinct properties may be violated, hence pass empty list for these cases.
return planAndRecplace(node, false);
}

@Override
public PlanWithProperties visitAggregation(AggregationNode node, Void context)
{
PlanWithProperties child = accept(node.getSource());
if (isDistinct(node) && child.getProperties().stream().anyMatch(node.getGroupingKeys()::containsAll)) {
return child;
}
ImmutableList.Builder<Set<VariableReferenceExpression>> properties = ImmutableList.builder();
// Only do it for aggregations with one single grouping set
if (node.getGroupingSetCount() == 1 && !node.getGroupingKeys().isEmpty()) {
properties.add(node.getGroupingKeys().stream().collect(toImmutableSet()));
}
PlanNode newAggregation = node.replaceChildren(ImmutableList.of(child.getNode()));
return new PlanWithProperties(newAggregation, properties.build());
}

@Override
public PlanWithProperties visitProject(ProjectNode node, Void context)
{
return planAndRecplace(node, true);
}

private PlanWithProperties accept(PlanNode node)
{
PlanWithProperties result = node.accept(this, null);
return new PlanWithProperties(
result.getNode().assignStatsEquivalentPlanNode(node.getStatsEquivalentPlanNode()),
result.getProperties());
}

private PlanWithProperties planAndRecplace(PlanNode node, boolean passProperties)
{
List<PlanWithProperties> children = node.getSources().stream().map(this::accept).collect(toImmutableList());
PlanNode result = replaceChildren(node, children.stream().map(PlanWithProperties::getNode).collect(toImmutableList()));
if (!passProperties) {
return new PlanWithProperties(result, ImmutableList.of());
}
ImmutableList.Builder<Set<VariableReferenceExpression>> properties = ImmutableList.builder();
children.stream().map(PlanWithProperties::getProperties).forEach(properties::addAll);
return new PlanWithProperties(result, properties.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ public void testDefaults()
.setNativeExecutionEnabled(false)
.setNativeExecutionExecutablePath("./presto_server")
.setRandomizeOuterJoinNullKeyEnabled(false)
.setOptimizeConditionalAggregationEnabled(false));
.setOptimizeConditionalAggregationEnabled(false)
.setRemoveRedundantDistinctAggregationEnabled(true));
}

@Test
Expand Down Expand Up @@ -372,6 +373,7 @@ public void testExplicitPropertyMappings()
.put("native-execution-executable-path", "/bin/echo")
.put("optimizer.randomize-outer-join-null-key", "true")
.put("optimizer.optimize-conditional-aggregation-enabled", "true")
.put("optimizer.remove-redundant-distinct-aggregation-enabled", "false")
.build();

FeaturesConfig expected = new FeaturesConfig()
Expand Down Expand Up @@ -529,7 +531,8 @@ public void testExplicitPropertyMappings()
.setNativeExecutionEnabled(true)
.setNativeExecutionExecutablePath("/bin/echo")
.setRandomizeOuterJoinNullKeyEnabled(true)
.setOptimizeConditionalAggregationEnabled(true);
.setOptimizeConditionalAggregationEnabled(true)
.setRemoveRedundantDistinctAggregationEnabled(false);
assertFullMapping(properties, expected);
}

Expand Down
Loading

0 comments on commit bd1aec4

Please sign in to comment.