[FEA] Implement a heuristic to split a project's input based on output and add to hash aggregate #8382
Labels
reliability
Features to improve reliability or bugs that severly impact the reliability of the plugin
Is your feature request related to a problem? Please describe.
This is related to some other GpuProjectExec work
#7866
#7258
This is similar, but instead of waiting to run out of memory and possibly produce an output that is way too large, we should split the input by rows if we can predict that the output size is going to be much larger than the input size.
This is specifically intended to be follow on work for #8141
The idea is that if we think that the output of a project is going to be so large that it goes above our target batch size, then we should split the input proactively to avoid that. We see this show up especially in the pre-process step for aggregates because to do them in a distributed way and to work around limitations in CUDF, especially with overflow detection, we need to add extra columns to the data being aggregated.
Describe the solution you'd like
I am kind of shooting from the hip here, so we should do some profiling and testing, but I think what we want to do is to estimate the output size-per row and a error bounds based off of an input batch and the project expressions.
For any fixed width types we know with 0 error that the output size is the width of the data plus some amortized validity buffer. For variable length types it is a bit more of a guess. If the output is just a reference to an input column (which is common) we can know with 0 error what the average per-row size is. It is not perfect if there is skew per row, but it is probably good enough. For other columns we could try and estimate it by looking at the expressions and having them help us with the estimate, but lets not do that now unless we see some real world use cases where it really is problematic. In those cases we would also guess the error as 100% and just pull in the estimate that Spark uses.
Now we can compute a range of possible sizes by summing up all of the estimates and also the estimates plus the error. With this we can now calculate a minimum and maximum number of rows that could be in an output batch to still fit in the desired output. This can be adjusted later, but for now I want to lean more towards making the batch a little bit bigger than needed if we just don't know instead of making it too small. This is to mitigate any performance impact to the common case (when the batch size grows, but not in a huge way)
So for now I would say that if the number of input rows is less than the maximum number of rows we think would work, then we do nothing. If it is larger than this, then we calculate how many batches we should split it into based off of the min number of rows estimate and try to split them evenly.
We want to put this into GpuProjectExec because I think it would be the simplest one to implement and would give us some test coverage, but we also want to put this into the preprocessing step of hash aggregate. The later is likely to require a lot more rework to allow this to be an iterator. We also need to think about retry so we don't lose that functionality. It probably means that the split will have to be separate from executing the project so that the retry can be in between them. It also is going to impact hash aggregate because we are going to need to refactor things to make the code use an iterator as the output of the preprocessing instead of a single batch.
The text was updated successfully, but these errors were encountered: