-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA] explore using hyper-log-log to estimate the if we should continue with a partial aggregation. #11042
Comments
I do want to add a few things.
|
A comment from @binmahone triggered another idea for me about a possible optimization that we could do with HyperLogLog++ sketches for each batch. We could have a custom GPU kernel that could evaluate in parallel all, or at least a very large number of combinations of batches. This would let us very quickly decide which batches, if any, would be worth combining and which might have no overlap with other batches. Essentially a clustering algorithm for the batches based on estimated overlap between them. It would need to know if we needed to maintain order or not (first/last), but that should not be too hard to deal with. Any batch with effectively no overlap at all could be released when it is ready (again first/last). I am not sure how common it is for overlaps to not occur between batches, but I can see it happening. For example if specific events occur in bursts, we could end up with things that combine well for a few batches, but not for others. |
Another comment from @binmahone has spurred some more ideas about how we can/should integrate the per-sort optimization from #8618 into this work. This issue describes doing a first aggregation pass over all of the data, with some more aggressive merging of data when specific size limits are met. It also talks about doing a full pass over the data before deciding what to do next. I don't see that changing with respect to #8618, but what I do see changing is when we do the first pass aggregation. I would say that we start off just like what is described here and do a first pass aggregation on the input batches until we hit an intermediate result that would go over the limit we have set. At that point we would calculate the HyperLogLog++ for the output of that aggregation result. If the high level plan heuristic from #8618 indicates that it is worth checking if the input will be smaller than the output then we first calculate the HyperLogLog++ of the input batche. If it looks like there is very little that could be combined (the input size is smaller than the output size). Then we keep the batch in a pre-aggregated state. The HyperLogLog++ sketch for this batch could still be used to/combined with the sketches from batches that have been aggregated to make decisions about how to combine batches together. If we decide that it is worth combining batches together. Then we can lazily do the first pass aggregation on the batches that need it before doing a merge aggregation afterwards. We also could look at doing hash partitioning of the data after doing a full pass over the data. A full pass would not necessarily involve doing a first pass aggregation. It would just involve having read in and cached all of the data for the task so we can make an informed decision on what to do next. That might involve re-partitioning the data before we do any aggregation on it. We can also use the approximate distinct count for all of the data to decide how many partitions we would need before doing that. |
quick question: why are you highlighting |
tldr; Technically yes you are 100% correct. But here is the long answer. It is not perfect though because there are a lot of caveats with it. Technically yes they don't have to be that way. But the reality is that they are close to deterministic enough that users start to assume that it is deterministic. Especially when running small tests in local mode. Also we can match it so why not? If we see a huge performance improvement from not matching the CPU then we can put that behavior under a flag and have it be fast by default. For example in local mode without any failures the following will produce the same result all of the time.
This is because FIRST is deterministic on the CPU except when it comes to shuffle. But the order of local batches is deterministic and so when all batches are local batches the order is deterministic. If we want to abuse Spark a little with some knowledge about how it works we could even do.
and it will produce the same result every single time. |
Thanks @revans2 for raising this, it's an interesting idea to determine if we should do a partial agg at runtime. There are also other approaches to determine if we should do partial aggregation in other system:
|
Is your feature request related to a problem? Please describe.
In Spark if we are doing a distributed partial aggregation we technically don't need to complete the aggregation for the task to get the correct answer after the shuffle and final aggregation is complete. The partial aggregation really is a performance optimization to try and reduce the size of the data before shuffling it. But at the same time there are cases where there really is no point in trying to combine all of the data together, because it will not reduce the size of the shuffled data. Enough of the keys within a task are unique, so any computation that we do will just be wasted.
#10950 is an attempt to address this, but it has some serious flaws in that it assumes that keys for a task, that can be combined, will appear close to each other in the task's data. I think a lot of the time this is true, but not all of the time. Unless the window we are looking at is for the entire task we have no way to guarantee that combining more data would or would not be good. If we know the total size of the data we can make some statistical assumptions, but we don't always know how large the data will be until we have seen it. And if we wait until the end of processing before we make a decision we will have already done a lot of processing and might have spilled a lot of data too.
#5199, rapidsai/cudf#10652, and NVIDIA/cuCollections#429 are issues to add in a HyperLogLog++ implementation on the GPU to help us do a very fast and efficient approximate count distinct operation. With that we should be able to make an informed decision at any point in time if the batches of data we have seen so far would combine to something smaller if we finished aggregating/combine them.
This does not solve all of the issues though.
Describe the solution you'd like
I propose that we keep some things the same as today. If we know that the input data is sorted start to do a first pass aggregation like we do today and release the batches as they are produced.
We also keep the sort fallback and heuristic for pre-sorting the data. The heuristic would be updated to do a approximate count distinct instead of a full count distinct though. I think those can/should be addressed separately as a way to possibly reduce memory pressure and spill.
With the current code we do a full pass through all of the input data and put the results for each aggregation into a queue. Once that pass is done, if there were multiple input batches, then we will try and combine them together through some merge aggregations. We will take groups of intermediate batches that add up to a size we know will fit in our GPU memory budget. We will concat them into a single batch, and then do the merge aggregation. This will happen in a loop until we either have a single batch or no two consecutive batches could be combined together. They have to be consecutive batches for first/last to work properly. If there are still multiple batches after this second merge phase, then we will sort the data by the grouping keys and finally do a merge aggregation pass before outputting the results. This can result in three full passes through all of the input data, all of which is kept in memory, and might be spilled.
Instead I propose that we will start to do the initial pass through the data like we do today. But instead of stopping only after the first pass is complete. We will stop once we have enough data that it looks like it is worth trying to combine them. If that happens, then we will combine the results and calculate a HyperLogLog++ on the result. We will also start to calculate a HyperLogLog++ result for each subsequent batch we see. With these we can quickly estimate how much of a size reduction we will see if we combined multiple batches together, by combining the sketches and estimating the unique count. If the data looks like we can combine things and stay under our limit, then we should keep trying to combine them and update the HyperLogLog++ buffers for the newly produced batches. If they look like we cannot combine things to stay under our budget, then things start to be difficult, as we have a few choices.
The upside of option 1 is that we produce a full output like Spark on the CPU would. The downside is that this is mostly what we do today and there was little point in calculating the HyperLogLog++'s if we go this route. It just might reduce the amount of memory pressure we are under in a few cases by more aggressively combining batches early instead of waiting until the first pass is complete.
The upside of option 2 is that we don't do a sort at all, which can be really expensive in terms of memory pressure and computation. Even with #10370 not doing something is going to be faster than doing something more efficiently. The downside is that there is an unknown probability that the size of the shuffle data will be larger.
The upside of option 3 is that we now have complete, or nearly complete information, before we make a choice. The downside is that we have to have done a complete pass through the data, which can lead to increased memory pressure.
To balance these I would like to see us start with option 3, as the memory pressure is no worse than it is today and it might get to be better if we start to combine things aggressively. If we know that there are no first or last aggregations, we could even try to combine them out of order if the sketches indicate that would be good.
If the data indicates that we have a lot of things that could be combined together based off of the ratio of approx_count_distinct vs the actual number of rows we have pending. Then we sort/re-partition to combine. If the data indicates that there is little if anything to combine, then we just start to release the partial aggregations that we have done so far.
The text was updated successfully, but these errors were encountered: