-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Heuristic to speed up partial aggregates that get larger #8618
Conversation
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchUtils.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
build |
Hmm, the markdown check failure was for a PGP key that when I go to the link it is fine, so I think it was just a one off. I'll try to trigger it again. |
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look great to me. I had minor nits that could be ignored.
|
||
val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] = | ||
conf("spark.rapids.sql.agg.forceSinglePassPartialSort") | ||
.doc("Force a single pass partial sort agg to happen in all cases that it could, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, indentation is off here.
} | ||
|
||
// Partial mode: | ||
// 1. boundInputReferences: picks column from raw input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to your change, boundInputReferences
is no longer here :( we should remove from the comments (I missed this)
sbIter.map { sb => | ||
withResource(new NvtxWithMetrics("finalize agg", NvtxColor.DARK_GREEN, aggTime, | ||
opTime)) { _ => | ||
val finalBatch = boundExpressions.boundFinalProjections.map { exprs => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, since this is the final step for a partial (pre shuffle), should we call it batchToShuffle
? To distinguish it from the batch that is merged and ready for a parent to process after a shuffle.
A comment around this function would help call out this is to be used in the single-pass agg case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it is not guaranteed to be that way. This same code is also used for a complete or final aggregation. This is the "final" step of any of the aggregations. Should I rename it so that it is a last pass instead of a final pass so that the terms don't conflict with the aggregation types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if I do, then I would want to rename boundFinalProjections right?
build |
(math.max(minPreGrowth, estimatedPreGrowth) * cardinality) / numRows | ||
} | ||
val wrappedIter = Seq(cb).toIterator ++ cbIter | ||
(wrappedIter, estimatedGrowthAfterAgg > 1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is 1.0 too small as a threshold? currently a simple query like below will incur single pass:
spark.time(spark.range(0, 1000000000L, 1, 2).selectExpr("id + 1 as key", "id as value").groupBy("key").agg(avg(col("value")).alias("avg_v")).orderBy(desc("avg_v"), col("key")).show())
The reason is simply because avg aggregate will use two buffers (count and sum), so the total size after aggregate is 1.5x of the input.
Since you mentioned This is my first PR for an optimization for aggregations where there are a lot of aggregations
, obviously the example query can't be counted as a lot of aggregations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The heuristic is simply looking at the cost of sorting now vs sorting later. 1.0 is the correct cutoff for that decision. The assumption that a sort will happen later is what is not correct. Nor is the assumption that a single batch is representative of the entire task's data. But we don't have the information we need to know if that sort is going to happen or not. We don't know how much data is coming later on. This could be the only batch in which case the sort is useless. There could be 100 more batches coming. If that is true, then sorting the data now will be faster than sorting it later on because there is less data to spill (even if it is just a little less data). Also we don't have a good way to start sorting later after we have already done part of the operation.
If we want to fix this we really want to fix all of aggregation and have unified heuristics to decide what to do an when.
describes a starting point of moving to HyperLogLog for doing some operations. That includes updating this code to do an approximate count distinct instead of a full on count distinct (assuming the performance is good enough).
We also have #8622 which is to replace the sort here with a hash re-partition operation #11042 also talks about doing hash re-partitioning instead of sorting.
To me what we want is a somewhat lazy aggregation operation. I'll update #11042 to reflect some ideas I have for this.
This is my first PR for an optimization for aggregations where there are a lot of aggregations. Like hundreds of aggregations. The problem that I ran into when trying to do lots of aggregations is that the size of the aggregation data could be larger than the size of the input data. We try to cache the aggregation data to match Spark exactly so that no matter where a key shows up in the input we can make sure that it was merged with all other keys in the input and you get an output with no duplicate keys in it. If you combine those two together you end up with caching a lot of data.
What this does is that it tries to detect the situation where an aggregation might grow in size. If it does see that the agg would grow, then it switches over to doing a sort aggregation instead. Why a sort aggregation? It is a compromise. We could just take the input a batch at a time and return it. That would be something like #7404, but if the input is large (lots of batches) we might end up with shuffling a lot of extra data around. By sorting the input we still end up caching all of the data, but we are caching the smaller amount of data (in this case the input data) and we know that the number of duplicate rows we have (the amount of extra data being shuffled) would only be on the order of the number of batches we process.
This is not perfect. Sort can be very expensive. This is especially true if there are multiple columns or the columns are complex. For now I am limiting this heuristic to just a single aggregation column. But I plan to file a follow on to understand better the cost of sort so we can hopefully expand that to something more appropriate.
This also only works on partial aggregates. Why? Well final aggregates don't really grow in size by adding new columns or larger columns. The data already grew in the partial. And I just have not spent much time for complete aggs. I'll file a follow on issue for that too.
It turns out that most (probably all) aggregations are faster when the data is already sorted. So if the data is already sorted we just do the partial pass through agg.
At some point there are so many aggs that sorting the data and then doing the aggs is cheaper than just doing the hash aggs. I don't have cost models for those yet, so I will file a separate follow on for that too.
And finally this does not fully address #8398. It does not look at the idea of splitting the data by column instead of by row. This still does rows. That will be another follow on.
I need to run some more benchmarks at scale, but for now when doing 300 aggregations with a simple key (using TPC-DS data at scale factor 200).
where agg_test is really store_sales with some columns renamed, the query sped up from 1,030 seconds to 430 seconds.
Most of the code changes are refactoring aggregate so I could split parts of it up into different iterators that I could choose from to make the agg operation I wanted, but a lot of white space changed too, so diffing it with -b to ignore white space changes makes it a lot simpler to follow what happened.