Skip to content

Commit

Permalink
Add a local roundrobin shuffle before DistinctLimitPartial
Browse files Browse the repository at this point in the history
  • Loading branch information
Sreeni Viswanadha committed Apr 4, 2022
1 parent 0b67486 commit 3ecbf9e
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public final class SystemSessionProperties
public static final String KEY_BASED_SAMPLING_FUNCTION = "key_based_sampling_function";
public static final String HASH_BASED_DISTINCT_LIMIT_ENABLED = "hash_based_distinct_limit_enabled";
public static final String HASH_BASED_DISTINCT_LIMIT_THRESHOLD = "hash_based_distinct_limit_threshold";
public static final String ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT = "round_robin_shuffle_before_partial_distinct_limit";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1254,6 +1255,11 @@ public SystemSessionProperties(
HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD,
"Threshold for obtaining precise results from aggregation functions",
featuresConfig.getHyperloglogStandardErrorWarningThreshold(),
false),
booleanProperty(
ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT,
"Add a local roundrobin shuffle before partial distinct limit",
featuresConfig.isRoundRobinShuffleBeforePartialDistinctLimit(),
false));
}

Expand Down Expand Up @@ -2109,4 +2115,9 @@ public static double getHyperloglogStandardErrorWarningThreshold(Session session
{
return session.getSystemProperty(HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD, Double.class);
}

public static boolean isRoundRobinShuffleBeforePartialDistinctLimit(Session session)
{
return session.getSystemProperty(ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class FeaturesConfig
private int maxReorderedJoins = 9;
private boolean redistributeWrites = true;
private boolean scaleWriters;
private DataSize writerMinSize = new DataSize(32, DataSize.Unit.MEGABYTE);
private DataSize writerMinSize = new DataSize(32, MEGABYTE);
private boolean optimizedScaleWriterProducerBuffer;
private boolean optimizeMetadataQueries;
private boolean optimizeMetadataQueriesIgnoreStats;
Expand Down Expand Up @@ -133,7 +133,7 @@ public class FeaturesConfig
private boolean orderByAggregationSpillEnabled = true;
private boolean windowSpillEnabled = true;
private boolean orderBySpillEnabled = true;
private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, DataSize.Unit.MEGABYTE);
private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, MEGABYTE);
private List<Path> spillerSpillPaths = ImmutableList.of();
private int spillerThreads = 4;
private double spillMaxUsedSpaceThreshold = 0.9;
Expand Down Expand Up @@ -220,6 +220,7 @@ public class FeaturesConfig
private boolean preferMergeJoin;

private int maxStageCountForEagerScheduling = 25;
private boolean roundRobinShuffleBeforePartialDistinctLimit;

private double hyperloglogStandardErrorWarningThreshold = 0.004;

Expand Down Expand Up @@ -2042,4 +2043,17 @@ public FeaturesConfig setPreferMergeJoin(boolean preferMergeJoin)
this.preferMergeJoin = preferMergeJoin;
return this;
}

public boolean isRoundRobinShuffleBeforePartialDistinctLimit()
{
return roundRobinShuffleBeforePartialDistinctLimit;
}

@Config("optimizer.round-robin-shuffle-before-partial-distinct-limit")
@ConfigDescription("Add a round robin shuffle before partial distinct limit")
public FeaturesConfig setRoundRobinShuffleBeforePartialDistinctLimit(boolean roundRobinShuffleBeforePartialDistinctLimit)
{
this.roundRobinShuffleBeforePartialDistinctLimit = roundRobinShuffleBeforePartialDistinctLimit;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled;
import static com.facebook.presto.SystemSessionProperties.isEnforceFixedDistributionForOutputOperator;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isRoundRobinShuffleBeforePartialDistinctLimit;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
Expand Down Expand Up @@ -264,8 +265,17 @@ public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, StreamPrefe
StreamPreferredProperties requiredProperties;
StreamPreferredProperties preferredProperties;
if (node.isPartial()) {
requiredProperties = parentPreferences.withoutPreference().withDefaultParallelism(session);
preferredProperties = parentPreferences.withDefaultParallelism(session);
if (isRoundRobinShuffleBeforePartialDistinctLimit(session)) {
PlanWithProperties source = node.getSource().accept(this, defaultParallelism(session));
PlanWithProperties exchange = deriveProperties(
roundRobinExchange(idAllocator.getNextId(), LOCAL, source.getNode()),
source.getProperties());
return rebaseAndDeriveProperties(node, ImmutableList.of(exchange));
}
else {
requiredProperties = parentPreferences.withoutPreference().withDefaultParallelism(session);
preferredProperties = parentPreferences.withDefaultParallelism(session);
}
}
else {
// a final changes the input organization completely, so we do not pass through parent preferences
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void testDefaults()
.setStreamingForPartialAggregationEnabled(false)
.setMaxStageCountForEagerScheduling(25)
.setHyperloglogStandardErrorWarningThreshold(0.004)
.setPreferMergeJoin(false));
.setPreferMergeJoin(false)
.setRoundRobinShuffleBeforePartialDistinctLimit(false));
}

@Test
Expand Down Expand Up @@ -338,6 +339,7 @@ public void testExplicitPropertyMappings()
.put("execution-policy.max-stage-count-for-eager-scheduling", "123")
.put("hyperloglog-standard-error-warning-threshold", "0.02")
.put("optimizer.prefer-merge-join", "true")
.put("optimizer.round-robin-shuffle-before-partial-distinct-limit", "true")
.build();

FeaturesConfig expected = new FeaturesConfig()
Expand Down Expand Up @@ -479,7 +481,8 @@ public void testExplicitPropertyMappings()
.setStreamingForPartialAggregationEnabled(true)
.setMaxStageCountForEagerScheduling(123)
.setHyperloglogStandardErrorWarningThreshold(0.02)
.setPreferMergeJoin(true);
.setPreferMergeJoin(true)
.setRoundRobinShuffleBeforePartialDistinctLimit(true);
assertFullMapping(properties, expected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static com.facebook.presto.SystemSessionProperties.KEY_BASED_SAMPLING_PERCENTAGE;
import static com.facebook.presto.SystemSessionProperties.OFFSET_CLAUSE_ENABLED;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_JOINS_WITH_EMPTY_SOURCES;
import static com.facebook.presto.SystemSessionProperties.ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DecimalType.createDecimalType;
Expand Down Expand Up @@ -963,6 +964,14 @@ public void testDistinctHaving()
"GROUP BY orderdate " +
"HAVING COUNT(DISTINCT clerk) > 1");
}
@Test
public void testDistinctLimitWithRoundRobinShuffleBeforePartialDistinctLimit()
{
Session session = Session.builder(getSession())
.setSystemProperty(ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT, "true")
.build();
testDistinctLimitInternal(session);
}

@Test
public void testDistinctLimitWithHashBasedDistinctLimitEnabled()
Expand Down

0 comments on commit 3ecbf9e

Please sign in to comment.