Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a local round robin shuffle before DisinctLimitPartial #17536

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public final class SystemSessionProperties
public static final String KEY_BASED_SAMPLING_FUNCTION = "key_based_sampling_function";
public static final String HASH_BASED_DISTINCT_LIMIT_ENABLED = "hash_based_distinct_limit_enabled";
public static final String HASH_BASED_DISTINCT_LIMIT_THRESHOLD = "hash_based_distinct_limit_threshold";
public static final String ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT = "round_robin_shuffle_before_partial_distinct_limit";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1254,6 +1255,11 @@ public SystemSessionProperties(
HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD,
"Threshold for obtaining precise results from aggregation functions",
featuresConfig.getHyperloglogStandardErrorWarningThreshold(),
false),
booleanProperty(
ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT,
"Add a local roundrobin shuffle before partial distinct limit",
featuresConfig.isRoundRobinShuffleBeforePartialDistinctLimit(),
false));
}

Expand Down Expand Up @@ -2109,4 +2115,9 @@ public static double getHyperloglogStandardErrorWarningThreshold(Session session
{
return session.getSystemProperty(HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD, Double.class);
}

public static boolean isRoundRobinShuffleBeforePartialDistinctLimit(Session session)
{
return session.getSystemProperty(ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class FeaturesConfig
private int maxReorderedJoins = 9;
private boolean redistributeWrites = true;
private boolean scaleWriters;
private DataSize writerMinSize = new DataSize(32, DataSize.Unit.MEGABYTE);
private DataSize writerMinSize = new DataSize(32, MEGABYTE);
private boolean optimizedScaleWriterProducerBuffer;
private boolean optimizeMetadataQueries;
private boolean optimizeMetadataQueriesIgnoreStats;
Expand Down Expand Up @@ -133,7 +133,7 @@ public class FeaturesConfig
private boolean orderByAggregationSpillEnabled = true;
private boolean windowSpillEnabled = true;
private boolean orderBySpillEnabled = true;
private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, DataSize.Unit.MEGABYTE);
private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, MEGABYTE);
private List<Path> spillerSpillPaths = ImmutableList.of();
private int spillerThreads = 4;
private double spillMaxUsedSpaceThreshold = 0.9;
Expand Down Expand Up @@ -220,6 +220,7 @@ public class FeaturesConfig
private boolean preferMergeJoin;

private int maxStageCountForEagerScheduling = 25;
private boolean roundRobinShuffleBeforePartialDistinctLimit;

private double hyperloglogStandardErrorWarningThreshold = 0.004;

Expand Down Expand Up @@ -2042,4 +2043,17 @@ public FeaturesConfig setPreferMergeJoin(boolean preferMergeJoin)
this.preferMergeJoin = preferMergeJoin;
return this;
}

public boolean isRoundRobinShuffleBeforePartialDistinctLimit()
{
return roundRobinShuffleBeforePartialDistinctLimit;
}

@Config("optimizer.round-robin-shuffle-before-partial-distinct-limit")
@ConfigDescription("Add a round robin shuffle before partial distinct limit")
public FeaturesConfig setRoundRobinShuffleBeforePartialDistinctLimit(boolean roundRobinShuffleBeforePartialDistinctLimit)
{
this.roundRobinShuffleBeforePartialDistinctLimit = roundRobinShuffleBeforePartialDistinctLimit;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled;
import static com.facebook.presto.SystemSessionProperties.isEnforceFixedDistributionForOutputOperator;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isRoundRobinShuffleBeforePartialDistinctLimit;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
Expand Down Expand Up @@ -264,8 +265,17 @@ public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, StreamPrefe
StreamPreferredProperties requiredProperties;
StreamPreferredProperties preferredProperties;
if (node.isPartial()) {
requiredProperties = parentPreferences.withoutPreference().withDefaultParallelism(session);
preferredProperties = parentPreferences.withDefaultParallelism(session);
if (isRoundRobinShuffleBeforePartialDistinctLimit(session)) {
PlanWithProperties source = node.getSource().accept(this, defaultParallelism(session));
PlanWithProperties exchange = deriveProperties(
roundRobinExchange(idAllocator.getNextId(), LOCAL, source.getNode()),
source.getProperties());
return rebaseAndDeriveProperties(node, ImmutableList.of(exchange));
}
else {
requiredProperties = parentPreferences.withoutPreference().withDefaultParallelism(session);
preferredProperties = parentPreferences.withDefaultParallelism(session);
}
}
else {
// a final changes the input organization completely, so we do not pass through parent preferences
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void testDefaults()
.setStreamingForPartialAggregationEnabled(false)
.setMaxStageCountForEagerScheduling(25)
.setHyperloglogStandardErrorWarningThreshold(0.004)
.setPreferMergeJoin(false));
.setPreferMergeJoin(false)
.setRoundRobinShuffleBeforePartialDistinctLimit(false));
}

@Test
Expand Down Expand Up @@ -338,6 +339,7 @@ public void testExplicitPropertyMappings()
.put("execution-policy.max-stage-count-for-eager-scheduling", "123")
.put("hyperloglog-standard-error-warning-threshold", "0.02")
.put("optimizer.prefer-merge-join", "true")
.put("optimizer.round-robin-shuffle-before-partial-distinct-limit", "true")
.build();

FeaturesConfig expected = new FeaturesConfig()
Expand Down Expand Up @@ -479,7 +481,8 @@ public void testExplicitPropertyMappings()
.setStreamingForPartialAggregationEnabled(true)
.setMaxStageCountForEagerScheduling(123)
.setHyperloglogStandardErrorWarningThreshold(0.02)
.setPreferMergeJoin(true);
.setPreferMergeJoin(true)
.setRoundRobinShuffleBeforePartialDistinctLimit(true);
assertFullMapping(properties, expected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static com.facebook.presto.SystemSessionProperties.KEY_BASED_SAMPLING_PERCENTAGE;
import static com.facebook.presto.SystemSessionProperties.OFFSET_CLAUSE_ENABLED;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_JOINS_WITH_EMPTY_SOURCES;
import static com.facebook.presto.SystemSessionProperties.ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DecimalType.createDecimalType;
Expand Down Expand Up @@ -963,6 +964,14 @@ public void testDistinctHaving()
"GROUP BY orderdate " +
"HAVING COUNT(DISTINCT clerk) > 1");
}
@Test
public void testDistinctLimitWithRoundRobinShuffleBeforePartialDistinctLimit()
{
Session session = Session.builder(getSession())
.setSystemProperty(ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_DISTINCT_LIMIT, "true")
.build();
testDistinctLimitInternal(session);
}

@Test
public void testDistinctLimitWithHashBasedDistinctLimitEnabled()
Expand Down