diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 360898f2658b4..b961e6e9d5a29 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -230,6 +230,7 @@ public final class SystemSessionProperties public static final String KEY_BASED_SAMPLING_FUNCTION = "key_based_sampling_function"; public static final String HASH_BASED_DISTINCT_LIMIT_ENABLED = "hash_based_distinct_limit_enabled"; public static final String HASH_BASED_DISTINCT_LIMIT_THRESHOLD = "hash_based_distinct_limit_threshold"; + public static final String ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT = "round_robin_shuffle_before_partial_distinct_limit"; private final List> sessionProperties; @@ -1254,6 +1255,11 @@ public SystemSessionProperties( HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD, "Threshold for obtaining precise results from aggregation functions", featuresConfig.getHyperloglogStandardErrorWarningThreshold(), + false), + booleanProperty( + ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT, + "Add a local roundrobin shuffle before partial distinct limit", + featuresConfig.isRoundRobinShuffleBeforePartialDistinctLimit(), false)); } @@ -2109,4 +2115,9 @@ public static double getHyperloglogStandardErrorWarningThreshold(Session session { return session.getSystemProperty(HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD, Double.class); } + + public static boolean isRoundRobinShuffleBeforePartialDistinctLimit(Session session) + { + return session.getSystemProperty(ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT, Boolean.class); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 9211f3968cc1a..a2dd47a20591a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -88,7 +88,7 @@ public class FeaturesConfig private int maxReorderedJoins = 9; private boolean redistributeWrites = true; private boolean scaleWriters; - private DataSize writerMinSize = new DataSize(32, DataSize.Unit.MEGABYTE); + private DataSize writerMinSize = new DataSize(32, MEGABYTE); private boolean optimizedScaleWriterProducerBuffer; private boolean optimizeMetadataQueries; private boolean optimizeMetadataQueriesIgnoreStats; @@ -133,7 +133,7 @@ public class FeaturesConfig private boolean orderByAggregationSpillEnabled = true; private boolean windowSpillEnabled = true; private boolean orderBySpillEnabled = true; - private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, DataSize.Unit.MEGABYTE); + private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, MEGABYTE); private List spillerSpillPaths = ImmutableList.of(); private int spillerThreads = 4; private double spillMaxUsedSpaceThreshold = 0.9; @@ -220,6 +220,7 @@ public class FeaturesConfig private boolean preferMergeJoin; private int maxStageCountForEagerScheduling = 25; + private boolean roundRobinShuffleBeforePartialDistinctLimit; private double hyperloglogStandardErrorWarningThreshold = 0.004; @@ -2042,4 +2043,17 @@ public FeaturesConfig setPreferMergeJoin(boolean preferMergeJoin) this.preferMergeJoin = preferMergeJoin; return this; } + + public boolean isRoundRobinShuffleBeforePartialDistinctLimit() + { + return roundRobinShuffleBeforePartialDistinctLimit; + } + + @Config("optimizer.round-robin-shuffle-before-partial-distinct-limit") + @ConfigDescription("Add a round robin shuffle before partial distinct limit") + public FeaturesConfig setRoundRobinShuffleBeforePartialDistinctLimit(boolean roundRobinShuffleBeforePartialDistinctLimit) + { + this.roundRobinShuffleBeforePartialDistinctLimit = roundRobinShuffleBeforePartialDistinctLimit; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index 881f80b76c21b..f6ab4569e1dcc 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -70,6 +70,7 @@ import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled; import static com.facebook.presto.SystemSessionProperties.isEnforceFixedDistributionForOutputOperator; import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled; +import static com.facebook.presto.SystemSessionProperties.isRoundRobinShuffleBeforePartialDistinctLimit; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled; import static com.facebook.presto.common.type.BigintType.BIGINT; @@ -264,8 +265,17 @@ public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, StreamPrefe StreamPreferredProperties requiredProperties; StreamPreferredProperties preferredProperties; if (node.isPartial()) { - requiredProperties = parentPreferences.withoutPreference().withDefaultParallelism(session); - preferredProperties = parentPreferences.withDefaultParallelism(session); + if (isRoundRobinShuffleBeforePartialDistinctLimit(session)) { + PlanWithProperties source = node.getSource().accept(this, defaultParallelism(session)); + PlanWithProperties exchange = deriveProperties( + roundRobinExchange(idAllocator.getNextId(), LOCAL, source.getNode()), + source.getProperties()); + return rebaseAndDeriveProperties(node, ImmutableList.of(exchange)); + } + else { + requiredProperties = parentPreferences.withoutPreference().withDefaultParallelism(session); + preferredProperties = parentPreferences.withDefaultParallelism(session); + } } else { // a final changes the input organization completely, so we do not pass through parent preferences diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index cb13b6dde2af5..13d8c177947b1 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -193,7 +193,8 @@ public void testDefaults() .setStreamingForPartialAggregationEnabled(false) .setMaxStageCountForEagerScheduling(25) .setHyperloglogStandardErrorWarningThreshold(0.004) - .setPreferMergeJoin(false)); + .setPreferMergeJoin(false) + .setRoundRobinShuffleBeforePartialDistinctLimit(false)); } @Test @@ -338,6 +339,7 @@ public void testExplicitPropertyMappings() .put("execution-policy.max-stage-count-for-eager-scheduling", "123") .put("hyperloglog-standard-error-warning-threshold", "0.02") .put("optimizer.prefer-merge-join", "true") + .put("optimizer.round-robin-shuffle-before-partial-distinct-limit", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -479,7 +481,8 @@ public void testExplicitPropertyMappings() .setStreamingForPartialAggregationEnabled(true) .setMaxStageCountForEagerScheduling(123) .setHyperloglogStandardErrorWarningThreshold(0.02) - .setPreferMergeJoin(true); + .setPreferMergeJoin(true) + .setRoundRobinShuffleBeforePartialDistinctLimit(true); assertFullMapping(properties, expected); } diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java index af6be28433f94..3c7aab2618127 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java @@ -56,6 +56,7 @@ import static com.facebook.presto.SystemSessionProperties.KEY_BASED_SAMPLING_PERCENTAGE; import static com.facebook.presto.SystemSessionProperties.OFFSET_CLAUSE_ENABLED; import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_JOINS_WITH_EMPTY_SOURCES; +import static com.facebook.presto.SystemSessionProperties.ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.BooleanType.BOOLEAN; import static com.facebook.presto.common.type.DecimalType.createDecimalType; @@ -963,6 +964,14 @@ public void testDistinctHaving() "GROUP BY orderdate " + "HAVING COUNT(DISTINCT clerk) > 1"); } + @Test + public void testDistinctLimitWithRoundRobinShuffleBeforePartialDistinctLimit() + { + Session session = Session.builder(getSession()) + .setSystemProperty(ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT, "true") + .build(); + testDistinctLimitInternal(session); + } @Test public void testDistinctLimitWithHashBasedDistinctLimitEnabled()