Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a local round robin shuffle before DisinctLimitPartial #17536

Merged

Conversation

kaikalur
Copy link
Contributor

@kaikalur kaikalur commented Mar 27, 2022

Local round robin makes the result of the partial agg much smaller than before resulting in quicker results. For some of our internal tests, we have seen it go down from 81M to 48K for one example and similar dramatic improvements in other cases as well.

Test plan - Added a new test

== RELEASE NOTES ==

General Changes
* We have an option to now to do a local round robin shuffle to reduce the partial distinct limit output size.

@kaikalur kaikalur requested review from rongrong and rschlussel March 27, 2022 20:02
@kaikalur kaikalur force-pushed the fix_exchange_for_distinct_limit branch from 9356c91 to 54512ea Compare March 27, 2022 20:07
@kaikalur kaikalur changed the title Adding a local round robin shuffle and not using a precomputed hash Add a local round robin shuffle and remove precomputed hash for DisinctLimitPartial Mar 28, 2022
@kaikalur kaikalur force-pushed the fix_exchange_for_distinct_limit branch 3 times, most recently from 8a4588b to 72dc7d0 Compare March 29, 2022 20:07
Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how this works. why does adding a local round robin exchange remove the need for a remote exchange before distinct limit final? Won't it just depend on whether the partial distinct limit is distributed or on a single node? Also, local exchanges get added after remote exchanges, so the remote exchange would already be there. Maybe some plan examples would help.

Also, make sure the commit message follows the commit message guidelines: https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#commit-formatting-and-pull-requests

@@ -207,7 +209,7 @@ public PlanWithProperties visitGroupId(GroupIdNode node, HashComputationSet pare
public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, HashComputationSet parentPreference)
{
// skip hash variable generation for single bigint
if (canSkipHashGeneration(node.getDistinctVariables())) {
if (canSkipHashGeneration(node.getDistinctVariables()) || SystemSessionProperties.isRoundRobinShuffleBeforePartialAgg(session)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what's going on with skipping hash generation?

(Also should match on the plan here rather than session properties - but if this code stays, should be static import)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - now removed this change

@@ -264,8 +265,17 @@ public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, StreamPrefe
StreamPreferredProperties requiredProperties;
StreamPreferredProperties preferredProperties;
if (node.isPartial()) {
requiredProperties = parentPreferences.withoutPreference().withDefaultParallelism(session);
preferredProperties = parentPreferences.withDefaultParallelism(session);
if (SystemSessionProperties.isRoundRobinShuffleBeforePartialAgg(session)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static import isRoundRobinShuffle...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@kaikalur
Copy link
Contributor Author

I don't understand how this works. why does adding a local round robin exchange remove the need for a remote exchange before distinct limit final? Won't it just depend on whether the partial distinct limit is distributed or on a single node? Also, local exchanges get added after remote exchanges, so the remote exchange would already be there. Maybe some plan examples would help.

Also, make sure the commit message follows the commit message guidelines: https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#commit-formatting-and-pull-requests

No it doesn't but removing the precomputed hash does. That's what we are using to indicate shuffle. Very odd. But the local round robin seems to dramatically (1/10-1/50) size reduction in partial distinct limit.

Before:

 explain ANALYZE SELECT DISTINCT orderkey, partkey FROM lineitem  LIMIT 10000

    [Fragment 1 [SINGLE]
    CPU: 39.86ms, Scheduled: 140.03ms, Input: 18411 rows (701.20kB); per task: avg.: 18411.00 std.dev.: 0.00, Output: 10000 rows (247.92kB)
    Output layout: [orderkey, partkey]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Project[projectLocality = LOCAL] => [orderkey:bigint, partkey:bigint]
            CPU: 4.00ms (1.51%), Scheduled: 6.00ms (0.97%), Output: 10000 rows (247.92kB)
            Input avg.: 10000.00 rows, Input std.dev.: 0.00%
        - DistinctLimit[10000][$hashvalue] => [orderkey:bigint, partkey:bigint, $hashvalue:bigint]
                CPU: 23.00ms (8.68%), Scheduled: 45.00ms (7.27%), Output: 10000 rows (380.86kB)
                Input avg.: 10226.00 rows, Input std.dev.: 0.00%
            - LocalExchange[SINGLE] () => [orderkey:bigint, partkey:bigint, $hashvalue:bigint]
                    CPU: 3.00ms (1.13%), Scheduled: 10.00ms (1.62%), Output: 10226 rows (389.47kB)
                    Input avg.: 4602.75 rows, Input std.dev.: 79.08%
                - RemoteSource[2] => [orderkey:bigint, partkey:bigint, $hashvalue_7:bigint]
                        CPU: 6.00ms (2.26%), Scheduled: 36.00ms (5.82%), Output: 18411 rows (701.20kB)
                        Input avg.: 4602.75 rows, Input std.dev.: 79.08%

Fragment 2 [SOURCE]
    CPU: 232.99ms, Scheduled: 541.31ms, Input: 22048 rows (1.38MB); per task: avg.: 11024.00 std.dev.: 0.00, Output: 20000 rows (761.72kB)
    Output layout: [orderkey, partkey, $hashvalue_8]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - DistinctLimitPartial[10000][$hashvalue_8] => [orderkey:bigint, partkey:bigint, $hashvalue_8:bigint]
            CPU: 64.00ms (24.15%), Scheduled: 133.00ms (21.49%), Output: 20000 rows (761.72kB)
            Input avg.: 11024.00 rows, Input std.dev.: 0.00%
        - ScanProject[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{}]'}, grouped = false, projectLocality = LOCAL] => [orderkey:bigint, partkey:bigint, $hashvalue_8:bigint]
                CPU: 165.00ms (62.26%), Scheduled: 389.00ms (62.84%), Output: 22048 rows (581.34kB)
                Input avg.: 11024.00 rows, Input std.dev.: 0.00%
                $hashvalue_8 := combine_hash(combine_hash(BIGINT'0', COALESCE($operator$hash_code(orderkey), BIGINT'0')), COALESCE($operator$hash_code(partkey), BIGINT'0')) (1:33)
                LAYOUT: tpch.lineitem{}
                orderkey := orderkey:bigint:0:REGULAR (1:56)
                partkey := partkey:bigint:1:REGULAR (1:56)
                Input: 22048 rows (1.38MB), Filtered: 0.00%

After:

 explain ANALYZE SELECT DISTINCT orderkey, partkey FROM lineitem  LIMIT 10000
    [Fragment 1 [SINGLE]
    CPU: 41.95ms, Scheduled: 309.07ms, Input: 58671 rows (1.45MB); per task: avg.: 58671.00 std.dev.: 0.00, Output: 10000 rows (253.91kB)
    Output layout: [orderkey, partkey]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - DistinctLimit[10000] => [orderkey:bigint, partkey:bigint]
            CPU: 23.00ms (6.93%), Scheduled: 83.00ms (6.61%), Output: 10000 rows (253.91kB)
            Input avg.: 10228.00 rows, Input std.dev.: 0.00%
        - LocalExchange[SINGLE] () => [orderkey:bigint, partkey:bigint]
                CPU: 2.00ms (0.60%), Scheduled: 11.00ms (0.88%), Output: 10228 rows (259.70kB)
                Input avg.: 14667.75 rows, Input std.dev.: 78.13%
            - RemoteSource[2] => [orderkey:bigint, partkey:bigint]
                    CPU: 12.00ms (3.61%), Scheduled: 122.00ms (9.72%), Output: 58671 rows (1.45MB)
                    Input avg.: 14667.75 rows, Input std.dev.: 78.13%

Fragment 2 [SOURCE]
    CPU: 301.58ms, Scheduled: 1.11s, Input: 60175 rows (1.38MB); per task: avg.: 30087.50 std.dev.: 1452.50, Output: 58671 rows (1.45MB)
    Output layout: [orderkey, partkey]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - DistinctLimitPartial[10000] => [orderkey:bigint, partkey:bigint]
            CPU: 97.00ms (29.22%), Scheduled: 501.00ms (39.92%), Output: 58671 rows (1.45MB)
            Input avg.: 7423.88 rows, Input std.dev.: 35.55%
        - LocalExchange[ROUND_ROBIN] () => [orderkey:bigint, partkey:bigint]
                CPU: 6.00ms (1.81%), Scheduled: 32.00ms (2.55%), Output: 59391 rows (1.02MB)
                Input avg.: 30087.50 rows, Input std.dev.: 4.83%
            - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{}]'}, grouped = false] => [orderkey:bigint, partkey:bigint]
                    CPU: 192.00ms (57.83%), Scheduled: 506.00ms (40.32%), Output: 60175 rows (1.03MB)
                    Input avg.: 30087.50 rows, Input std.dev.: 4.83%
                    LAYOUT: tpch.lineitem{}
                    partkey := partkey:bigint:1:REGULAR (1:56)
                    orderkey := orderkey:bigint:0:REGULAR (1:56)
                    Input: 60175 rows (1.38MB), Filtered: 0.00%

@rschlussel
Copy link
Contributor

Also, this should have plan tests in addition to the correctness test you added to make sure the plan is as expected for all cases.

@kaikalur
Copy link
Contributor Author

kaikalur commented Mar 30, 2022

Also, this should have plan tests in addition to the correctness test you added to make sure the plan is as expected for all cases.

I'm generally quite skeptical about the usefulness of plan tests but I will add them :(

@rschlussel
Copy link
Contributor

No it doesn't but removing the precomputed hash does. That's what we are using to indicate shuffle. Very odd. But the local round robin seems to dramatically (1/10-1/50) size reduction in partial distinct limit.

I still don't understand. They both have the same number of remote exchanges. I do see how round robin exchange might be helpful to improve the effectiveness of the partial distinct limit because it randomizes the rows.

I think maybe the hash thing is something to do with how the DistinctLimitOperator itself works with/without the precomputed hash?

@kaikalur
Copy link
Contributor Author

I don't understand how this works. why does adding a local round robin exchange remove the need for a remote exchange before distinct limit final? Won't it just depend on whether the partial distinct limit is distributed or on a single node? Also, local exchanges get added after remote exchanges, so the remote exchange would already be there. Maybe some plan examples would help.
Also, make sure the commit message follows the commit message guidelines: https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#commit-formatting-and-pull-requests

No it doesn't but removing the precomputed hash does. That's what we are using to indicate shuffle. Very odd. But the local round robin seems to dramatically (1/10-1/50) size reduction in partial distinct limit.

Before:

 explain ANALYZE SELECT DISTINCT orderkey, partkey FROM lineitem  LIMIT 10000

    [Fragment 1 [SINGLE]
    CPU: 39.86ms, Scheduled: 140.03ms, Input: 18411 rows (701.20kB); per task: avg.: 18411.00 std.dev.: 0.00, Output: 10000 rows (247.92kB)
    Output layout: [orderkey, partkey]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Project[projectLocality = LOCAL] => [orderkey:bigint, partkey:bigint]
            CPU: 4.00ms (1.51%), Scheduled: 6.00ms (0.97%), Output: 10000 rows (247.92kB)
            Input avg.: 10000.00 rows, Input std.dev.: 0.00%
        - DistinctLimit[10000][$hashvalue] => [orderkey:bigint, partkey:bigint, $hashvalue:bigint]
                CPU: 23.00ms (8.68%), Scheduled: 45.00ms (7.27%), Output: 10000 rows (380.86kB)
                Input avg.: 10226.00 rows, Input std.dev.: 0.00%
            - LocalExchange[SINGLE] () => [orderkey:bigint, partkey:bigint, $hashvalue:bigint]
                    CPU: 3.00ms (1.13%), Scheduled: 10.00ms (1.62%), Output: 10226 rows (389.47kB)
                    Input avg.: 4602.75 rows, Input std.dev.: 79.08%
                - RemoteSource[2] => [orderkey:bigint, partkey:bigint, $hashvalue_7:bigint]
                        CPU: 6.00ms (2.26%), Scheduled: 36.00ms (5.82%), Output: 18411 rows (701.20kB)
                        Input avg.: 4602.75 rows, Input std.dev.: 79.08%

Fragment 2 [SOURCE]
    CPU: 232.99ms, Scheduled: 541.31ms, Input: 22048 rows (1.38MB); per task: avg.: 11024.00 std.dev.: 0.00, Output: 20000 rows (761.72kB)
    Output layout: [orderkey, partkey, $hashvalue_8]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - DistinctLimitPartial[10000][$hashvalue_8] => [orderkey:bigint, partkey:bigint, $hashvalue_8:bigint]
            CPU: 64.00ms (24.15%), Scheduled: 133.00ms (21.49%), Output: 20000 rows (761.72kB)
            Input avg.: 11024.00 rows, Input std.dev.: 0.00%
        - ScanProject[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{}]'}, grouped = false, projectLocality = LOCAL] => [orderkey:bigint, partkey:bigint, $hashvalue_8:bigint]
                CPU: 165.00ms (62.26%), Scheduled: 389.00ms (62.84%), Output: 22048 rows (581.34kB)
                Input avg.: 11024.00 rows, Input std.dev.: 0.00%
                $hashvalue_8 := combine_hash(combine_hash(BIGINT'0', COALESCE($operator$hash_code(orderkey), BIGINT'0')), COALESCE($operator$hash_code(partkey), BIGINT'0')) (1:33)
                LAYOUT: tpch.lineitem{}
                orderkey := orderkey:bigint:0:REGULAR (1:56)
                partkey := partkey:bigint:1:REGULAR (1:56)
                Input: 22048 rows (1.38MB), Filtered: 0.00%

After:

 explain ANALYZE SELECT DISTINCT orderkey, partkey FROM lineitem  LIMIT 10000
    [Fragment 1 [SINGLE]
    CPU: 41.95ms, Scheduled: 309.07ms, Input: 58671 rows (1.45MB); per task: avg.: 58671.00 std.dev.: 0.00, Output: 10000 rows (253.91kB)
    Output layout: [orderkey, partkey]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - DistinctLimit[10000] => [orderkey:bigint, partkey:bigint]
            CPU: 23.00ms (6.93%), Scheduled: 83.00ms (6.61%), Output: 10000 rows (253.91kB)
            Input avg.: 10228.00 rows, Input std.dev.: 0.00%
        - LocalExchange[SINGLE] () => [orderkey:bigint, partkey:bigint]
                CPU: 2.00ms (0.60%), Scheduled: 11.00ms (0.88%), Output: 10228 rows (259.70kB)
                Input avg.: 14667.75 rows, Input std.dev.: 78.13%
            - RemoteSource[2] => [orderkey:bigint, partkey:bigint]
                    CPU: 12.00ms (3.61%), Scheduled: 122.00ms (9.72%), Output: 58671 rows (1.45MB)
                    Input avg.: 14667.75 rows, Input std.dev.: 78.13%

Fragment 2 [SOURCE]
    CPU: 301.58ms, Scheduled: 1.11s, Input: 60175 rows (1.38MB); per task: avg.: 30087.50 std.dev.: 1452.50, Output: 58671 rows (1.45MB)
    Output layout: [orderkey, partkey]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - DistinctLimitPartial[10000] => [orderkey:bigint, partkey:bigint]
            CPU: 97.00ms (29.22%), Scheduled: 501.00ms (39.92%), Output: 58671 rows (1.45MB)
            Input avg.: 7423.88 rows, Input std.dev.: 35.55%
        - LocalExchange[ROUND_ROBIN] () => [orderkey:bigint, partkey:bigint]
                CPU: 6.00ms (1.81%), Scheduled: 32.00ms (2.55%), Output: 59391 rows (1.02MB)
                Input avg.: 30087.50 rows, Input std.dev.: 4.83%
            - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{}]'}, grouped = false] => [orderkey:bigint, partkey:bigint]
                    CPU: 192.00ms (57.83%), Scheduled: 506.00ms (40.32%), Output: 60175 rows (1.03MB)
                    Input avg.: 30087.50 rows, Input std.dev.: 4.83%
                    LAYOUT: tpch.lineitem{}
                    partkey := partkey:bigint:1:REGULAR (1:56)
                    orderkey := orderkey:bigint:0:REGULAR (1:56)
                    Input: 60175 rows (1.38MB), Filtered: 0.00%

Basically, the thinking is it's not very useful to shuffle for small amount of data (LIMIT is generally a small number) so we can just let it go to the single node which will do the final distinct anyway.

@rschlussel
Copy link
Contributor

but both examples go to a single node. The hash generation optimizer runs after all exchanges are added, so it doesn't change anything about the data distribution. It just means a precomputed hash gets passed around instead of the distinct limit operator having to calculate the hash for the distinct itself.

Also, just want to confirm that the hash generation thing is unrelated to the added round robin exchange.

@kaikalur
Copy link
Contributor Author

but both examples go to a single node. The hash generation optimizer runs after all exchanges are added, so it doesn't change anything about the data distribution. It just means a precomputed hash gets passed around instead of the distinct limit operator having to calculate the hash for the distinct itself.

Also, just want to confirm that the hash generation thing is unrelated to the added round robin exchange.

Yeah we don't generate hash even before for a case of distinct on a single bigint - which is what made me realize that works a lot better. But the issue with that is if you have a lot of data, all the <= N distinct values will show but it will keep going forever doing the distinct on the single node for millions of rows which is not good. The round robin seems to magically solve the issue. Hope someone here has a better explanation as to why

@kaikalur
Copy link
Contributor Author

No it doesn't but removing the precomputed hash does. That's what we are using to indicate shuffle. Very odd. But the local round robin seems to dramatically (1/10-1/50) size reduction in partial distinct limit.

I still don't understand. They both have the same number of remote exchanges. I do see how round robin exchange might be helpful to improve the effectiveness of the partial distinct limit because it randomizes the rows.

I think maybe the hash thing is something to do with how the DistinctLimitOperator itself works with/without the precomputed hash?

#17328

The operator does not use the precomputed hash at all! It uses something like groupby for doing distinct keeping max N elements. The precomputed hash seems to result in remote shuffle after the partial distinctlimit. So the issue is if N > max distinct count, it will never finish as it will keep looking for more and shuffling the tons of data. I have seen situations where someone is scanning 3Tn rows, but has only 20 distinct values but due to limit 10k, it will keep going for 1hr and timeout because it will never find 10k.

@kaikalur kaikalur force-pushed the fix_exchange_for_distinct_limit branch from 72dc7d0 to 91098ee Compare March 30, 2022 18:24
@kaikalur
Copy link
Contributor Author

Talked to Rebecca offline and based on real queries here, we see the RR shuffle is reducing the intermediate data dramatically. So I will make the PR just doing that part (and not the hash computation part)

@kaikalur kaikalur force-pushed the fix_exchange_for_distinct_limit branch from 91098ee to 163aa72 Compare March 31, 2022 20:01
@kaikalur kaikalur changed the title Add a local round robin shuffle and remove precomputed hash for DisinctLimitPartial Add a local round robin shuffle before DisinctLimitPartial Mar 31, 2022
@kaikalur kaikalur requested a review from rschlussel March 31, 2022 20:25
@kaikalur kaikalur force-pushed the fix_exchange_for_distinct_limit branch from 163aa72 to ce209f4 Compare March 31, 2022 21:55
@@ -230,6 +230,7 @@
public static final String KEY_BASED_SAMPLING_FUNCTION = "key_based_sampling_function";
public static final String HASH_BASED_DISTINCT_LIMIT_ENABLED = "hash_based_distinct_limit_enabled";
public static final String HASH_BASED_DISTINCT_LIMIT_THRESHOLD = "hash_based_distinct_limit_threshold";
public static final String ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_AGG = "round_robin_shuffle_before_partial_agg";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this shuffle_before_partial_distinct_limit since it's specific to distinct limit, not all partial aggs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking this could be useful for others as well so we can use it whenever applicable like Distinct (and maybe other group by as well).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But on second thoughts, changed it.

false),
booleanProperty(
ROUND_ROBIN_SHUFFLE_BEFORE_PARTIAL_AGG,
"Add a local roundrobin shuffle before partial agg",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agg -> distinct limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see previouis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description still needs to be updated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaikalur this still says partial agg instead of partial distinct limit.

return roundRobinShuffleBeforePartialAgg;
}

@Config("round-robin-shuffle-before-partial-agg")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optimizer properties should have the optimizer prefix. So this + the name change would be
optimizer.shuffle-before-partial-distinct-limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -112,7 +112,7 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, Pla
requireNonNull(variableAllocator, "variableAllocator is null");
requireNonNull(idAllocator, "idAllocator is null");
if (SystemSessionProperties.isOptimizeHashGenerationEnabled(session)) {
PlanWithProperties result = plan.accept(new Rewriter(idAllocator, variableAllocator, functionAndTypeManager), new HashComputationSet());
PlanWithProperties result = plan.accept(new Rewriter(session, idAllocator, variableAllocator, functionAndTypeManager), new HashComputationSet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert the changes to HashGenerationOptimizer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@kaikalur kaikalur force-pushed the fix_exchange_for_distinct_limit branch from ce209f4 to 3161b87 Compare April 1, 2022 14:46
@kaikalur
Copy link
Contributor Author

kaikalur commented Apr 1, 2022

Addressed all comments

@@ -338,6 +339,7 @@ public void testExplicitPropertyMappings()
.put("execution-policy.max-stage-count-for-eager-scheduling", "123")
.put("hyperloglog-standard-error-warning-threshold", "0.02")
.put("optimizer.prefer-merge-join", "true")
.put("round-robin-shuffle-before-partial-agg", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to use the updated name

@kaikalur kaikalur force-pushed the fix_exchange_for_distinct_limit branch from 3161b87 to 391d07e Compare April 1, 2022 15:18
@kaikalur kaikalur requested a review from rschlussel April 1, 2022 15:48
@pettyjamesm
Copy link
Contributor

Does it make sense to only apply this optimization on top of SOURCE distributed stages? My intuition about when this strategy should be effective is when a large number of input drivers don't independently identify many distinct rows- but I'm not sure that this makes sense on top of intermediate stages where the drivers are long running and have plenty of opportunity to filter non-distinct rows and/or reach the limit.

@kaikalur
Copy link
Contributor Author

kaikalur commented Apr 1, 2022

Does it make sense to only apply this optimization on top of SOURCE distributed stages? My intuition about when this strategy should be effective is when a large number of input drivers don't independently identify many distinct rows- but I'm not sure that this makes sense on top of intermediate stages where the drivers are long running and have plenty of opportunity to filter non-distinct rows and/or reach the limit.

But it won't hurt right? Local RR shuffle looks really cheap (almost free)?

@pettyjamesm
Copy link
Contributor

But it won't hurt right? Local RR shuffle looks really cheap (almost free)?

For intermediate stages that are already partitioned on the distinct keys, this won’t help and will just add extra work. For intermediate stages that aren’t already partitioned appropriately, there’s a chance that a local partitioned exchange would be better (in some cases, maybe not others depending on the overall plan shape).

@kaikalur
Copy link
Contributor Author

kaikalur commented Apr 1, 2022

Does it make sense to only apply this optimization on top of SOURCE distributed stages? My intuition about when this strategy should be effective is when a large number of input drivers don't independently identify many distinct rows- but I'm not sure that this makes sense on top of intermediate stages where the drivers are long running and have plenty of opportunity to filter non-distinct rows and/or reach the limit.

More interestingly, I'm thinking if this will help other kinds of aggregations as well to reduce the intermediate data size

But it won't hurt right? Local RR shuffle looks really cheap (almost free)?

For intermediate stages that are already partitioned on the distinct keys, this won’t help and will just add extra work. For intermediate stages that aren’t already partitioned appropriately, there’s a chance that a local partitioned exchange would be better (in some cases, maybe not others depending on the overall plan shape).

Yeah - that's what I'm saying when it helps it helps greatly but not much overhead otherwise.

@pettyjamesm
Copy link
Contributor

More interestingly, I'm thinking if this will help other kinds of aggregations as well to reduce the intermediate data size

It could, yeah. There's ultimately a trade-off though, because if the subsequent partial aggregation / distinct operation is expensive enough then you might end up harming the input processing throughput enough to actually regress performance.

Actually, now that I think about it- forcing a local exchange of any kind on top of a source stage might cause problems in relation to the SplitConcurrencyController, no? Split concurrency is driven off of whether the whole task output buffer is more or less than 1/2 full- but since the task output buffer is now coupled to the pipeline after the local exchange, you might have a lot of input splits start on the input pipeline even though the output buffer is empty because of a bottleneck in the post-exchange processing.

@kaikalur
Copy link
Contributor Author

kaikalur commented Apr 1, 2022

More interestingly, I'm thinking if this will help other kinds of aggregations as well to reduce the intermediate data size

It could, yeah. There's ultimately a trade-off though, because if the subsequent partial aggregation / distinct operation is expensive enough then you might end up harming the input processing throughput enough to actually regress performance.

Actually, now that I think about it- forcing a local exchange of any kind on top of a source stage might cause problems in relation to the SplitConcurrencyController, no? Split concurrency is driven off of whether the whole task output buffer is more or less than 1/2 full- but since the task output buffer is now coupled to the pipeline after the local exchange, you might have a lot of input splits start on the input pipeline even though the output buffer is empty because of a bottleneck in the post-exchange processing.

Well that's why it's optional optimization :) We are planning to run it on some of our interactive workloads where it is common for users to look for distinct values - with a small number (several dozen) of distincts but a much larger limit (like 10k) in XL sized datasets. There this helped quite a bit.

@kaikalur
Copy link
Contributor Author

kaikalur commented Apr 1, 2022

Addressed all the comments

@rschlussel
Copy link
Contributor

Actually, now that I think about it- forcing a local exchange of any kind on top of a source stage might cause problems in relation to the SplitConcurrencyController, no? Split concurrency is driven off of whether the whole task output buffer is more or less than 1/2 full- but since the task output buffer is now coupled to the pipeline after the local exchange, you might have a lot of input splits start on the input pipeline even though the output buffer is empty because of a bottleneck in the post-exchange processing.

@pettyjamesm Can you explain this more? I'm not so familiar with how split scheduling works nor how local exchange changes things. How is this different than if this happens without the local exchange, or similarly if there were an expensive function as part of the processing? Also, since there is also a targetConcurrency, there is still an upper limit on how many tasks will get scheduled.

@pettyjamesm
Copy link
Contributor

For input stages, the number of target "leaf" splits that should be running at any time is controlled by SplitConcurrencyController which increments the target concurrency every task.split-concurrency-adjustment-interval (default: 100 ms) of scheduled time that a task receives so long as the current "utilization" is < 0.5.

For every leaf split that completes, it will also either increment or decrement the target concurrency depending on whether the current utilization is greater or less than 0.5. Greater than 0.5: decrement the target value, less than 0.5: increment it.

The "utilization" value that it receives to drive this decision is the "percent full" of the task-wide output buffer compared to the max output buffer size. Basically, there's an hidden assumption that the logic is making: that so long as the output buffer is < 50% full, it's safe to increase the concurrency of leaf split processing. This approximation is OK if you assume that leaf split pipelines are always essentially linear paths straight to the output buffer that can be arbitrarily thread-parallelized to increase throughput.

If you break a source stage into two pipelines where the input pipeline feeds a local exchange sink instead of an output buffer and some other pipeline reading from a local exchange source is what the sends data to the output buffer, the concurrency controller will make bad decisions about when to adjust the target concurrency for leaf splits for two reasons:

  • The output buffer utilization no longer has a clear relationship to the number of running leaf drivers, it has to do with the availability of data from the leaf splits and the throughput of the subsequent operations.
  • The "scheduled" time interval between adjustments will be artificially high, because it will include scheduled time from all drivers in the task and not just the leaf split runners (ie: adjustments will be more frequent, which will ramp the target concurrency up or down more rapidly)

The risk is that you'll have a "slow" leg on the pipeline after the local exchange that bottlenecks data to the output buffer, and the number of leaf splits that get started will just climb and climb. Even in the case of DistinctLimitOperator(partial) a very effective distincting filter could start filtering 100% of input rows which could induce this problem.

This should be fixable if the split concurrency controller received a utilization value that somehow estimated the percentage of time that the input pipeline's local exchange sink or (and/or associated source) was blocked instead of just looking at the output buffer utilization- but without extra logic to handle that, creating plans that contain leaf split pipelines terminating in a local exchange (of any kind) instead of an output buffer is dangerous.

@pettyjamesm
Copy link
Contributor

Another problem: it’s possible that pages with not-yet-loaded LazyBlocks still present could transit through the local exchange which would be a thread safety issue. You would need to force those to be loaded before passing the pages into the local exchange sink.

@kaikalur
Copy link
Contributor Author

kaikalur commented Apr 1, 2022

Another problem: it’s possible that pages with not-yet-loaded LazyBlocks still present could transit through the local exchange which would be a thread safety issue. You would need to force those to be loaded before passing the pages into the local exchange sink.

I have seen this (local RR exchange) in the plans for INNER JOIN as well so I thought this should be safe. If not we should add a check to the plan validator to disallow it.

@pettyjamesm
Copy link
Contributor

Broadcast inner joins would have a leaf pipeline and a pipeline for broadcast input, but those would be linked to each other via the join operators and the leaf split pipeline would still process data directly through to the task output buffer.

I’m not aware of any plan shapes that would terminate a leaf pipeline with a local exchange instead of output operator, but I might have missed some relevant changes and maybe my understanding is out of date. Can you provide an example of a query / plan that already does that?

@kaikalur
Copy link
Contributor Author

kaikalur commented Apr 1, 2022

One query shape I have seen is partitioned join of: (A UNION ALL B) JOIN C - in that case we read the two remote sources for A and B and then add a local RR exchange (ostensibly to mitigate skew?) and then join. I will try and craft a tpch query that shows it.

select orderkey from (select orderkey from orders union all select partkey from lineitem) full join customer on orderkey=custkey

Plan:

Fragment 1 [tpch:orders:15000]
    Output layout: [orderkey_7]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - FullJoin[("orderkey_7" = "custkey_9")][$hashvalue, $hashvalue_21] => [orderkey_7:bigint]
            Estimates: {rows: 78934 (693.75kB), cpu: 6651078.75, memory: 27000.00, network: 1110150.00}
            Distribution: PARTITIONED
        - LocalExchange[ROUND_ROBIN] () => [orderkey_7:bigint, $hashvalue:bigint]
                Estimates: {rows: 75175 (1.29MB), cpu: 4466025.00, memory: 0.00, network: 1083150.00}
            - ScanProject[table = TableHandle {connectorId='tpch', connectorHandle='orders:sf0.01', layout='Optional[orders:sf0.01]'}, grouped = false, projectLocality = LOCAL] => [orderkey:bigint, $hashvalue_18:bigint]
                    Estimates: {rows: 15000 (263.67kB), cpu: 135000.00, memory: 0.00, network: 0.00}/{rows: 15000 (263.67kB), cpu: 405000.00, memory: 0.00, network: 0.00}
                    $hashvalue_18 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(orderkey), BIGINT'0')) (1:56)
                    orderkey := tpch:orderkey (1:70)
                    tpch:orderstatus
                        :: [["F"], ["O"], ["P"]]
            - RemoteSource[2] => [partkey:bigint, $hashvalue_19:bigint]
        - LocalExchange[HASH][$hashvalue_21] (custkey_9) => [custkey_9:bigint, $hashvalue_21:bigint]
                Estimates: {rows: 1500 (26.37kB), cpu: 94500.00, memory: 0.00, network: 27000.00}
            - RemoteSource[3] => [custkey_9:bigint, $hashvalue_22:bigint]

Fragment 2 [tpch:orders:15000]
    Output layout: [partkey, $hashvalue_20]
    Output partitioning: tpch:orders:15000 [partkey]
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - ScanProject[table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false, projectLocality = LOCAL] => [partkey:bigint, $hashvalue_20:bigint]
            Estimates: {rows: 60175 (1.03MB), cpu: 541575.00, memory: 0.00, network: 0.00}/{rows: 60175 (1.03MB), cpu: 1624725.00, memory: 0.00, network: 0.00}
            $hashvalue_20 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(partkey), BIGINT'0')) (1:107)
            partkey := tpch:partkey (1:107)

Fragment 3 [SOURCE]
    Output layout: [custkey_9, $hashvalue_23]
    Output partitioning: tpch:orders:15000 [custkey_9]
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - ScanProject[table = TableHandle {connectorId='tpch', connectorHandle='customer:sf0.01', layout='Optional[customer:sf0.01]'}, grouped = false, projectLocality = LOCAL] => [custkey_9:bigint, $hashvalue_23:bigint]
            Estimates: {rows: 1500 (26.37kB), cpu: 13500.00, memory: 0.00, network: 0.00}/{rows: 1500 (26.37kB), cpu: 40500.00, memory: 0.00, network: 0.00}
            $hashvalue_23 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(custkey_9), BIGINT'0')) (1:127)
            custkey_9 := tpch:custkey (1:127)

@rschlussel
Copy link
Contributor

Thanks for the detailed explanation!

To make sure I'm understanding correctly- is the problem that leaf stages have a variable task concurrency, but the operators after the local exchange will have a fixed task concurrency, so you'll keep adding more splits to the leaf stage without increasing the concurrency of the post-exchange operations?

I wonder how that compares to other existing situations. It sounds like the algorithm basically assumes that the leaf processing is always going to be cheap, and the bottleneck will be in the output buffer. There are already plenty of cases where the leaf stage could be expensive and not produce a lot of output because it is busy (expensive filter/partial aggregation, extreme regex, even a hardware issue that just makes processing super slow). Wouldn't those cases also keep getting splits piled on beyond what they can handle?

@rschlussel
Copy link
Contributor

rschlussel commented Apr 4, 2022

@pettyjamesm Here is an example where a leaf stage has a local exchange:

presto:tpch_bucketed> explain (type distributed) select count (*) from lineitem group by orderkey;
                                                                                                                                             Query Plan                                                                                                                                             
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]                                                                                                                                                                                                                                                                                
     Output layout: [count]                                                                                                                                                                                                                                                                         
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                                 
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                  
     - Output[_col0] => [count:bigint]                                                                                                                                                                                                                                                              
             _col0 := count (1:35)                                                                                                                                                                                                                                                                  
         - RemoteSource[1] => [count:bigint]                                                                                                                                                                                                                                                        
                                                                                                                                                                                                                                                                                                    
 Fragment 1 [hive_bucketed:buckets=11, bucketFunctionType=HIVE_COMPATIBLE, types=[bigint]]                                                                                                                                                                                                          
     Output layout: [count]                                                                                                                                                                                                                                                                         
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                                 
     Stage Execution Strategy: DYNAMIC_LIFESPAN_SCHEDULE_GROUPED_EXECUTION                                                                                                                                                                                                                          
     - Project[projectLocality = LOCAL] => [count:bigint]                                                                                                                                                                                                                                           
         - Aggregate(FINAL)[orderkey] => [orderkey:bigint, count:bigint]                                                                                                                                                                                                                            
                 count := "presto.default.count"((count_4)) (1:35)                                                                                                                                                                                                                                  
             - LocalExchange[HASH][$hashvalue] (orderkey) => [orderkey:bigint, count_4:bigint, $hashvalue:bigint]                                                                                                                                                                                   
                 - Project[projectLocality = LOCAL] => [orderkey:bigint, count_4:bigint, $hashvalue_5:bigint]                                                                                                                                                                                       
                         $hashvalue_5 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(orderkey), BIGINT'0')) (1:68)                                                                                                                                                                         
                     - Aggregate(PARTIAL)[orderkey] => [orderkey:bigint, count_4:bigint]                                                                                                                                                                                                            
                             count_4 := "presto.default.count"(*) (1:35)                                                                                                                                                                                                                            
                         - TableScan[TableHandle {connectorId='hive_bucketed', connectorHandle='HiveTableHandle{schemaName=tpch_bucketed, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch_bucketed.lineitem{buckets=11}]'}, grouped = true] => [orderkey:bigint] 
                                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}                                                                                                                                                                                   
                                 LAYOUT: tpch_bucketed.lineitem{buckets=11}                                                                                                                                                                                                                         
                                 orderkey := orderkey:bigint:0:REGULAR (1:50)    

@pettyjamesm
Copy link
Contributor

is the problem that leaf stages have a variable task concurrency, but the operators after the local exchange will have a fixed task concurrency, so you'll keep adding more splits to the leaf stage without increasing the concurrency of the post-exchange operations?

Yep, that's the essence of where the potential problems could begin.

I wonder how that compares to other existing situations. It sounds like the algorithm basically assumes that the leaf processing is always going to be cheap, and the bottleneck will be in the output buffer. There are already plenty of cases where the leaf stage could be expensive and not produce a lot of output because it is busy (expensive filter/partial aggregation, extreme regex, even a hardware issue that just makes processing super slow). Wouldn't those cases also keep getting splits piled on beyond what they can handle?

This is true, but the potential problems are different here. If you have an expensive filter or regex operation- you can still fundamentally hope to increase throughput by increasing the parallelism factor because splits are processed independently of one another. Bottlenecks in one driver don't directly create pressure on another, except in competing for scheduled time on a CPU (so long as they aren't competing for output buffer space which is what SplitConcurrencyController is assuming will happen if concurrency is too high). The potential for increased unnecessary memory consumption is still there, but less pronounced because in order for split concurrency to increase, splits must still be finishing without filling the output buffer (concurrency only increases when a split is finished).

What can happen here, is that post-exchange drivers can create a bottleneck where increasing the concurrency of the leaf splits will just create more competition for the exchange source / sink, but splits will presumably be finishing. As @kaikalur pointed out though, apparently these plans already do occur which was news to me.

I think in the example given with (A UNION ALL B) JOIN C could potentially have the same issue if the join rate on the other side of the exchange is low, or if there are other expensive operations post-join. I'm not sure what rules are involved with creating that kind of plan, so it's possible this problem isn't new. It might be worse by making this change, but that would require some experimentation to quantify.

Fundamentally, I'm not opposed to this change so don't consider this a blocking review- but I think it's risky and wanted to call that out.

@kaikalur
Copy link
Contributor Author

kaikalur commented Apr 4, 2022

Fundamentally, I'm not opposed to this change so don't consider this a blocking review- but I think it's risky and wanted to call that out.

Yes, I have it disabled by default. My plan is to shadow real workloads with this feature enabled and see how it does and enable it for some specific use cases.

@rschlussel
Copy link
Contributor

Thanks @pettyjamesm for the super clear explanation. That makes a lot of sense.
@kaikalur this is good to go once you address my last comment.

@kaikalur kaikalur force-pushed the fix_exchange_for_distinct_limit branch from 391d07e to 3ecbf9e Compare April 4, 2022 18:46
@kaikalur
Copy link
Contributor Author

kaikalur commented Apr 4, 2022

Also, this should have plan tests in addition to the correctness test you added to make sure the plan is as expected for all cases.

Done

@kaikalur kaikalur removed the request for review from rongrong April 5, 2022 19:55
@rschlussel rschlussel merged commit 59c7127 into prestodb:master Apr 6, 2022
@mshang816 mshang816 mentioned this pull request May 17, 2022
14 tasks
@kaikalur kaikalur mentioned this pull request Dec 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants