-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a heuristic to skip second or third agg pass #10950
Add a heuristic to skip second or third agg pass #10950
Conversation
build |
Even after this PR, the first agg pass cannot be saved. Actually, we may even add another heristic to avoid first agg pass cost (if first agg pass is considered cannot reduce row numbers). This will require passing on stats from task to task (on the same executor is sufficient). |
build |
build |
1 similar comment
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have an explicit multi-aggregation-batch test of this for with and without (e.g.: one test sets config to 0 to force it to always avoid merging, the other sets it to 1 to always merge).
I'm a bit confused by the screenshot posted above. How are we sending fewer batches when we don't merge in the partial than when we do? Seems like that should be the opposite in practice.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Show resolved
Hide resolved
190540a
to
250a087
Compare
build |
build |
2 similar comments
build |
build |
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone please do not force-push PRs that are being reviewed. Merge from the base branch. |
_float_conf_skipagg = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', | ||
'spark.rapids.sql.castStringToFloat.enabled': 'true', | ||
'spark.rapids.sql.agg.skipAggPassReductionRatio': '0', | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to use a small batch config as well, otherwise the processing will all occur in one batch. One batch processing means there's nothing to merge and thus nothing to skip. Suggest moving this below _float_smalbatch_conf and building off of that, e.g.:
_float_conf_skipagg = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', | |
'spark.rapids.sql.castStringToFloat.enabled': 'true', | |
'spark.rapids.sql.agg.skipAggPassReductionRatio': '0', | |
} | |
_float_conf_skipagg = copy_and_update(_float_smallbatch_conf, | |
{'spark.rapids.sql.agg.skipAggPassReductionRatio': '0'}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
217fdee
to
f0f47bd
Compare
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
build |
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally this looks good. I would like to understand if you experimented with a few different designs before deciding on this one. I also would like to understand the testing process. I see that you have one extreme case that looks better. Could you please provide some examples of how to reproduce this? Also do we have benchmarks on how this performs with NDS?
jenkins/spark-premerge-build.sh
Outdated
@@ -162,7 +162,7 @@ ci_2() { | |||
$MVN_CMD -U -B $MVN_URM_MIRROR clean package $MVN_BUILD_ARGS -DskipTests=true | |||
export TEST_TAGS="not premerge_ci_1" | |||
export TEST_TYPE="pre-commit" | |||
export TEST_PARALLEL=5 | |||
export TEST_PARALLEL=4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is okay as a short term work around, but I am not going to approve a PR with this in it.
val rowsAfterSecondPassAgg = aggregatedBatches.asScala.foldLeft(0L) { | ||
(totalRows, batch) => totalRows + batch.numRows() | ||
} | ||
shouldSkipThirdPassAgg = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please correct me if I get anything wrong about the algorithm.
It looks like we do a first pass over the data and we place the results in the aggregatedBatches
queue. We keep track of the total number of rows that survived that initial aggregation pass along with the number of rows that were input to this aggregation. If the number of rows after the first agg pass was reduced by at least X% where X is (1 - spark.rapids.sql.agg.skipAggPassReductionRatio), then we continue trying to combine the results together.
Otherwise we skip the "shouldSkipThirdPassAgg", which is confusing because we are not doing a third pass, this would be a second aggregation pass, but whatever.
Combining the results together is done by calling tryMergeAggregatedBatches()
. This code will try and concatenate batches together up to a target input size and then merge those batches. It stops if we get to a single output batch, or we hit a situation where we could not find two batches that would fit in the target merge batch size.
At that point if we didn't reduce the number of rows by X% again, then we will also "shouldSkipThirdPassAgg"
At this point either the data is aggregated into a single batch that we can output, or we have multiple batches to deal with. If we are going to skip the third pass agg (and it is not a final aggregation), then we just output what we have done so far. Otherwise we do the sort fallback iterator.
That looks okay to me, but I am curious if you think we are being aggressive enough with this? and do we want follow on work to see if we can make it more aggressive. I am just a little concerned that we might do an entire first pass over multiple batches, when it is clear from the very first batch that this is just not going to work. We have to be very careful that we don't change the order of the batches so things like first and last still work correctly, but that should not be too difficult.
What if we read an input batch, and check if it reduced the number of rows by X% if not, we just instantly return it. No need to cache that batch at all. We just let downstream deal with the large batch we just saw. We keep doing that until we hit a situation where we did see something reduce in size. Then we start to cache them, so that we don't mess up the order. I am concerned about spilling and memory pressure . A map side aggregation is almost always going to feed into a shuffle and if we can release some of the batch batches quickly we reduce the GPU memory pressure because it should be pulled back off of the GPU without us needing to cache anything.
but it looks like we ask the firstPassIterator if there is more to process. firstPassIterator reads in the original input data, does the pre-project, and does an initial aggregation. aggregateInputBatches reads in those input batches and places them into a queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
second pass agg -> tryMergeAggregatedBatches() , third pass agg -> buildSortFallbackIterator, the naming should be correct.
"Setting this to 1 essentially disables this feature.") | ||
.doubleConf | ||
.checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].") | ||
.createWithDefault(0.9) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did you reach 90% as the cutoff? Is it just arbitrary or did you do some tuning/testing to figure this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is arbitrary. I chose 0.9 in fear of incurring too much pressure to shuffle
I don't think that this is the correct heuristic to use here. At least not to have it enabled by default until we understand better what real world data distributions look like. I did some very simple testing with range + rand to generate data. The performance of the aggregation as a whole is going to be the time to do the partial aggregation + the time to do the shuffle + the time to do the final aggregation. The time to do the shuffle in many cases, especially in the cases when this heuristic kicks in, is going to dominate the runtime of the query. If you run
The run time is about 45 seconds, but
runs in under 10 seconds (on my desktop, with lots and lots of pinned host spill). We need to talk about spill at some point too. That means 78% of the time is due to shuffle and not the aggregation itself. The current heuristic makes the assumption that like keys are going to be close to each other in the input data. But I don't think that we can make that assumption. I don't really have time to do a full breakdown of this, but I did some testing with rand for both the key and the value to generate a flat distribution of data. I then fed in a lot of data to a small number of tasks to help simulate a skew situation, or reading data from a highly compressed parquet split. In the best case I saw a 14% overall speedup. But that only showed up when the keys were essentially all unique and the other heuristic was disabled We really need to dig in deeper and understand the problem at a more fundamental level if we want to see any performance gains here. I also retract my previous statement #10950 (comment) about making the algorithm more aggressive. I was thinking about spill in that case, but I think shuffle is generally the bigger problem we have to worry about, although we should not discount it either. |
hi @revans2 the example you gave:
will go into
Regarding "But in the worst case I saw a 100% slowdown", can you please show some examples to reproduce? |
I think you're right on this. I'm thinking about refining this PR so that a after the first task is finished (the first task runs as normal regardless of any heuristic), it will check whether the task has attempted to do sort based aggregate and end up with a poor row reduction ratio (let's stick to the 90% ratio). If yes, then the task might as well need to leave a thread-local hint that says "don't try to do the second&third pass agg, it's waste of time", so that other tasks later scheduled to the same thread/executor can safely skip the second&third pass. In our typical customer use case, there're 2000 tasks in a 100 core spark session. On average, each executor thread will run 20 tasks one by one. With the above design, the first task will be as slow as before, but the remaining 19 tasks can benefit from the design. Before deciding to leave the sign, we may also need to check whether spilling to disk have happened during the first task doing sort based aggregation. Spilling to disk should be considered a bigger threat to perf and so it will be more confident and safer to skip it. What do you think of this ? @revans2 |
Yes I was not pointing out a bug in this code with that. I was just pointing out that shuffle takes the majority of the time when there are very few rows that can be combined. This was to point out the order of magnitude of the maximum impact that this optimization can have on the runtime of this query. Meaning relatively small, just because the size of the data shuffled should be proportional to the number of aggregations being done + the number of aggregation keys.
This shows essentially the same thing. 43.85 seconds with the shuffle and 7.68 seconds without the shuffle without your patch. With your patch those numbers are 42.184 and 5.0 seconds respectively. That means shuffle went from taking 82% of the time to 88% of the time with your patch for this one query. That is fine, but the point really is just that it is all about shuffle.
but a single min/max aggregation is fine because the input data is split differently based on the expected output data size. All of these appear like they would be fixed by your proposed change. But I don't have enough information to truly evaluate it. I can come up with a distribution that would cause the same kind of a problem even after your proposed change.
For this one the time went from 30 seconds with your heuristic disabled to 40 seconds with it enabled and it is stopping after the first aggregation pass. I'll try to think about the problem a little and see if I can come up with a solution. In the short term I think at a minimum we need to have this disabled by default. |
Hi @revans2
This is true when no "SPILL TO DISK" happens. When "SPILL TO DISK" happens, time spent on agg will greatly outnumber shuffle, and become the major bottleneck. You can try increase rows from 1000000000L to 2000000000L, and set spark.rapids.memory.pinnedPool.size to a smaller values (e.g. 2G) to reproduce spilling to disk. In other words, the whole sort-based agg is more worthy to be optimized when SPILL TO DISK is happending. In some scenarios, increasing spark.rapids.memory.pinnedPool.size will eliminate SPILL TO DISK and the bottleneck on agg will become unimportant, because larger pinnedPool will convert SPILL TO DISK to SPLIL TO MEM. However, in our customer environment we still see SPILL TO DISK even after increasing pinnedPool size (We observed increased amount of both mem and disk spill after increasing pinnedPool size, this remains a mystery and needs investigating with LORE). That's why we're also thinking about adding this heruistic.
I'm a little confused here. With my latest proposed change in #10950 (comment), we'll give up any optimization inside the first round of tasks. The benefit is mostly for subsequent rounds of tasks. If we observe a single thread on the executor, its workflow will be:
This design will improve performance for large jobs (with hundreds to thousands of tasks). And more importantly, making the decision after the task is completed is much safer. The downside is that the first round of task are still very slow, and some biased task may leave a wrong hint to other tasks. |
BTW, #11042 is also a good idea. I'm just wondering the amout of required work behind it (especially the unfinished dependency issues). If there's significant work to do, then so should probably wrap up a temporal solution? Leaving it disabled by default is acceptable to me. To me there're three choices now: I'm inclined to go with a for now, that you think? @revans2 |
@binmahone I agree with you. a or b sound like a good short/medium term solutions, but only if the performance testing shows that it has a positive impact for use cases that matter to a customer. If not, then we probably don't want to make the code more complex at this time. I also agree that spilling to disk can have a big, negative impact on the performance of a query. In the worst case with the current code we can do 3 full passes through the input data, and if it is larger than fits in memory, then we are in trouble because we may write all of that data to disk and read it back in again. That could be more than the cost of shuffle. Which is why we need to get some real-world tests to see how this works. I think long term we can possibly improve it even more #11042 (comment) But like you said it is potentially a lot of work to get these done/in place. |
Do you have another scenarios that can benefit from #11042 ? Even though a or b may already suffice for our customer, I still wish do complete it cleanly. I would love to invest more time in #11042 if more than one (important) scenarios can benefit from this. @revans2 |
I can come up with all kinds of contrived situations for different key distributions. The big question is how common are these different situations in real data.
If you want me to think of more I can, but these feel less contrived. We might be able to do something with min/max for batches while we wait for HyperLogLog++ too. That might give us the ability to at least detect sorted, or nearly sorted data and cut out after a first pass, but it would not help with random key distributions that are unique. I'm not sure what is happening with the customer's data. |
BTW, @revans2 Do you have any comment on |
Got. Looks like approach a or b will suffice in the short term, and we'll need some more customer scenarios to justify the priority. I'll keep watching on this. |
* add a heristic to skip agg pass Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * commit doc change Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * refine naming Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix only reduction case Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix compile Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * clean Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix doc Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * reduce premergeci2 Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * reduce premergeci2, 2 Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * use test_parallel to workaround flaky array test Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * address review comment Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * remove comma Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * workaround for ci_scala213 Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix conflict Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> --------- Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
* add a heristic to skip agg pass Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * commit doc change Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * refine naming Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix only reduction case Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix compile Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * clean Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix doc Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * reduce premergeci2 Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * reduce premergeci2, 2 Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * use test_parallel to workaround flaky array test Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * address review comment Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * remove comma Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * workaround for ci_scala213 Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> * fix conflict Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> --------- Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
build |
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
build |
build |
query perf pass @wjxiz1992 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this off by default I think it is okay. But I do think there is a much better option in the future once we have something like approx_count_distinct working really quickly.
This PR closes #7404
In a synthetic test in my local machine, I create a dataset that will reduce 0 zero rows after aggregation, the op time of the Partial stage aggregation dropped from 10.6s (falling back to sort aggregation also occurred) to 1.7s:
We'll also evaluating this in real data environment