Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RowAccumulators support generics #6657

Closed
wants to merge 15 commits into from
Closed

Conversation

mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Jun 13, 2023

Which issue does this PR close?

Closes #6658.

Rationale for this change

After #6003, we will use Scalar values to update the row accumulator states for high cardinality aggregations. The aggregation framework will read the arrow Array for each aggregate input row and convert to the Scalar value and the Scalar value is further converted to rust native types values and then update the row accumulator internal states.

What changes are included in this PR?

This PR improves the RowAccumulator update performance(for high cardinality aggregation) in two aspects:

  1. Get rid from generating the Scala values and use rust native values to update the row accumulator internal stats directly.
  2. Avoid dynamic type dispatch in hot path and use generics to do static type dispatch as much as possible.

Are these changes tested?

I had test this on my local Mac. For TPCH-q17(SF = 1), there is about 10%~ 15% improvement.

Main branch:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 5, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: true }
Query 17 iteration 0 took 1371.2 ms and returned 1 rows
Query 17 iteration 1 took 1315.2 ms and returned 1 rows
Query 17 iteration 2 took 1487.3 ms and returned 1 rows
Query 17 iteration 3 took 1421.6 ms and returned 1 rows
Query 17 iteration 4 took 1426.3 ms and returned 1 rows
Query 17 avg time: 1404.32 ms

This PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 5, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: true }
Query 17 iteration 0 took 1225.3 ms and returned 1 rows
Query 17 iteration 1 took 1151.3 ms and returned 1 rows
Query 17 iteration 2 took 1135.7 ms and returned 1 rows
Query 17 iteration 3 took 1165.5 ms and returned 1 rows
Query 17 iteration 4 took 1135.8 ms and returned 1 rows
Query 17 avg time: 1162.73 ms

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate physical-expr Physical Expressions labels Jun 13, 2023
@mingmwang mingmwang changed the title RowAccumulator support generics RowAccumulators support generics Jun 13, 2023
@alamb
Copy link
Contributor

alamb commented Jun 13, 2023

This looks exciting @mingmwang -- I will review this tomorrow

@@ -139,7 +137,8 @@ impl RowWriter {
self.row_width = self.layout.fixed_part_width();
}

#[inline]
#[allow(dead_code)]
#[inline(always)]
fn assert_index_valid(&self, idx: usize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

@@ -142,7 +140,8 @@ impl<'a> RowReader<'a> {
self.data = data;
}

#[inline]
#[allow(dead_code)]
#[inline(always)]
fn assert_index_valid(&self, idx: usize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used in current code. I leave it here, this check can be invoked for each record batch.

@mingmwang
Copy link
Contributor Author

mingmwang commented Jun 14, 2023

flamegraph

tpch-q17 flamegraph for this PR.

@mingmwang
Copy link
Contributor Author

@liurenjie1024

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very neat idea -- thank you @mingmwang

The major questions I have is:

  1. Can we reduce code by not having a 1 row special case
  2. Is there any reason to have a trait if the returned object RowAccumulatorItem can only support the built in aggregators?

}

/// Enum to dispatch the RowAccumulator, RowAccumulator contains generic methods and can not be used as the trait objects
pub enum RowAccumulatorItem {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to have an enum that has all the accumulators in it, I don't see much value in the RowAccumulator trait anymore -- could we simply use RowAccumulatorItem everywhere?

To keep using a trait I think you would need a function somewhere that took the input data types and instantiated an instance of RowAccumulator -- I may be missing some subtlety here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can still use the Trait to define a common interface for all the row accumulators, we just can not use the trait objects anymore.

Comment on lines 691 to 703
for idx in &group_state.indices {
for (accumulator, values_array, filter_array) in izip!(
self.row_accumulators.iter_mut(),
row_values.iter(),
filter_bool_array.iter()
) {
accumulator.update_single_row(
values_array,
filter_array,
*idx as usize,
&mut state_accessor,
)?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written, doesn't this code do all the type dispatch for for each idx in this loop?

Would it be possible to pass in group_state.indices just once so the dispatch is done just once?

Something like this (would need the traits / etc updated)

Suggested change
for idx in &group_state.indices {
for (accumulator, values_array, filter_array) in izip!(
self.row_accumulators.iter_mut(),
row_values.iter(),
filter_bool_array.iter()
) {
accumulator.update_single_row(
values_array,
filter_array,
*idx as usize,
&mut state_accessor,
)?;
}
for (accumulator, values_array, filter_array) in izip!(
self.row_accumulators.iter_mut(),
row_values.iter(),
filter_bool_array.iter()
) {
accumulator.update_indicies(
values_array,
filter_array,
&group_state.indices,
&mut state_accessor,
)?;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written, doesn't this code do all the type dispatch for for each idx in this loop?

Would it be possible to pass in group_state.indices just once so the dispatch is done just once?

Something like this (would need the traits / etc updated)

I will try this approach as you suggested. But even with this change, the type dispatch overhead is still heavy because it is applied for every update groups with rows.

Ok(())
}

fn update_two_accumulator2_with_native_value<T1, T2>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to special case 2 accumulators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we have to handle the 2 accumulators cases specially, to avoid the dynamic type dispatching in the inner loops and leverage generics to do the static type dispatching, and also to achieve the best performance.

The current update flow is:

outmost loop :  for each `group idx` in groups_with_rows 
     construct the RowAccessor
     inner loop:s  for each accumuator
         update accumulator stats

The accumuators loop must be in the inner loop, so that we can benefit from the row layout and
different accumulator stats are updated to the single row(the same RowAccessor).

And what we want to achieve is to do the array type cast out side the outmost loop

for each agg input arrays,  type cast here
outmost loop :  for each `group idx` in groups_with_rows
     construct the RowAccessor
     inner loop:s  for each accumuator
          call  generic method to update the accumuator stats

Because the agg input types for each accumulator is different, the generic method can not be called in the inner loops, so that I have to handle the 2 accumulators cases specially. So the update flow becomes

1 Accumuator version:

agg input array1,  type cast to T1
outmost loop :  for each `group idx` in groups_with_rows 
     construct the RowAccessor
     generic method <T1>:  update accumuator stats

2 Accumuators version:

agg input array1,  type cast to T1
agg input array2,  type cast to T2
outmost loop :  for each `group idx` in groups_with_rows
     construct the RowAccessor
     generic method <T1>:  update accumuator1 stats
     generic method <T2>:  update accumuator2 stats

3 Accumuators version:

agg input array1,  type cast to T1
agg input array2,  type cast to T2
agg input array3,  type cast to T3
outmost loop :  for each `group idx` in groups_with_rows
     construct the RowAccessor
     generic method <T1>:  update accumuator1 stats
     generic method <T2>:  update accumuator2 stats
     generic method <T3>:  update accumuator3 stats

Hope this explains. I'm not sure whether there are other good ways to simply the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this a bit -- I wonder if we can somehow get rid of the need to typecast during the aggregate at all -- it seems to me the code would be much simpler if the aggregator didn't have to cast its input, but instead the inputs were cast as needed.

Copy link
Contributor Author

@mingmwang mingmwang Jun 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this a bit -- I wonder if we can somehow get rid of the need to typecast during the aggregate at all -- it seems to me the code would be much simpler if the aggregator didn't have to cast its input, but instead the inputs were cast as needed.

I'm afraid the typecast can not be avoid. This is because the arrow Array Trait itself does not provide any read functions to read its value at a given index. The trait must be downcast to the Struct(PrimitiveArray<T:ArrowPrimitiveType> or BooleanArray) first.

I think this is related to Trait Objects restrictions. If we want to add generate methods(like read value at given index) method or add GAT to a trait, that trait can not be used as a Trait Objects any more.
This is also why in this PR I had added a new trait. This new trait and its implementors bridge the type system between the arrow ArrowPrimitiveType and the row accumulator internal state types.

pub trait ArrowArrayReader: Array {
    type Item: RowAccumulatorNativeType;

    /// Returns the element at index `i`
    /// # Panics
    /// Panics if the value is outside the bounds of the array
    fn value_at(&self, index: usize) -> Self::Item;

    /// Returns the element at index `i`
    /// # Safety
    /// Caller is responsible for ensuring that the index is within the bounds of the array
    unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item;
}

impl<'a> ArrowArrayReader for &'a BooleanArray {
    type Item = bool;

    #[inline]
    fn value_at(&self, index: usize) -> Self::Item {
        BooleanArray::value(self, index)
    }

    #[inline]
    unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item {
        BooleanArray::value_unchecked(self, index)
    }
}

impl<'a, T: ArrowPrimitiveType> ArrowArrayReader for &'a PrimitiveArray<T>
where
    <T as ArrowPrimitiveType>::Native: RowAccumulatorNativeType,
{
    type Item = T::Native;

    #[inline]
    fn value_at(&self, index: usize) -> Self::Item {
        PrimitiveArray::value(self, index)
    }

    #[inline]
    unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item {
        PrimitiveArray::value_unchecked(self, index)
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry -- I mis understood -- I thought you meant like when aggregating SUM(Int16) that the argument needed to be cast to Int32 or something as the type of the aggregator was different than the type of the inderlying value in the accumulator

I agree the downcasting is needed to figure out the correct function to call -- though the more I think about it the more I think it can / should be done during setup time and not on every batch (as the types don't change during the query execution)

fn update_scalar_values(
&mut self,
values: &[ScalarValue],
fn update_single_row(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above, I think the code would be simpler and likely faster if there wasn't a special single value case

I seems like given that you have vectorized the code for updating the aggregator values, and the group operators already have a slice if [usize] (aka the selection vector) of values to aggregate, maybe all callsites can use the selection vector

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had updated the interface.

@alamb
Copy link
Contributor

alamb commented Jun 18, 2023

I am running some benchmarks on this branch -- I also hope to find some time next week to help simplify this PR as the basic idea is really cool and very much needed

@alamb
Copy link
Contributor

alamb commented Jun 19, 2023

Here are my measurements, which are consistent with the information posted above

--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ agg_framework ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  516.91ms │      530.19ms │     no change │
│ QQuery 2     │  267.38ms │      272.23ms │     no change │
│ QQuery 3     │  176.90ms │      183.52ms │     no change │
│ QQuery 4     │  112.13ms │      114.36ms │     no change │
│ QQuery 5     │  469.13ms │      489.38ms │     no change │
│ QQuery 6     │   40.78ms │       39.62ms │     no change │
│ QQuery 7     │ 1140.46ms │     1172.60ms │     no change │
│ QQuery 8     │  250.55ms │      247.42ms │     no change │
│ QQuery 9     │  639.51ms │      626.28ms │     no change │
│ QQuery 10    │  345.85ms │      335.14ms │     no change │
│ QQuery 11    │  277.08ms │      269.83ms │     no change │
│ QQuery 12    │  167.45ms │      173.82ms │     no change │
│ QQuery 13    │  660.02ms │      638.75ms │     no change │
│ QQuery 14    │   52.18ms │       54.76ms │     no change │
│ QQuery 15    │   85.71ms │       81.79ms │     no change │
│ QQuery 16    │  222.74ms │      247.64ms │  1.11x slower │
│ QQuery 17    │ 2760.35ms │     2393.44ms │ +1.15x faster │
│ QQuery 18    │ 2978.54ms │     2981.99ms │     no change │
│ QQuery 19    │  175.93ms │      164.40ms │ +1.07x faster │
│ QQuery 20    │  859.35ms │      872.16ms │     no change │
│ QQuery 21    │ 1526.80ms │     1419.54ms │ +1.08x faster │
│ QQuery 22    │  146.40ms │      146.43ms │     no change │
└──────────────┴───────────┴───────────────┴───────────────┘```

@mingmwang
Copy link
Contributor Author

I am running some benchmarks on this branch -- I also hope to find some time next week to help simplify this PR as the basic idea is really cool and very much needed

Yes, please help me to simply this PR.
I will resolve the conflicts today and also take a look at q16 and check why it is slower.

@alamb
Copy link
Contributor

alamb commented Jun 21, 2023

Yes, please help me to simply this PR.

I am hoping to find time tomorrow to do so

@alamb
Copy link
Contributor

alamb commented Jun 23, 2023

I have not forgotten about this -- I hope/plan to spend time on the weekend

@alamb
Copy link
Contributor

alamb commented Jun 24, 2023

tomorrow....

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent several hours this morning reviewing the aggregate code in general and this PR in particular. Thank you so much for trying to push this forward @mingmwang -- I 100% agree with the premise of this PR that avoiding updating aggregates ScalarValue is the key to improving high cardinality aggregate performance.

However, is my opinion that the extra 10% performance improvement in this PR as written is not worth the additional code complexity it requires. I would love to hear other community member's opinions and if they think the tradeoff is acceptable, then my mind could be changed.

Given the grouping code is both performance critical and non trivially complicated, I believe keeping it manageable is of paramount concern. I worry that this change will basically make the grouping code basically impossible to modify going forward and I think will still need to improve performance even after this PR to be competitive with the best of breed solutions

Thus I would like to offer (and start helping with) the following alternate plan:

  1. Remove the copy/paste between row_hash.rs and bounded_aggregate_stream.rs (that will reduce a source of significant complexity immediately)
  2. Encapsulate the row/ non-row accumulator interface in the group by hash rather than maintaining two parallel sets of data structures
  3. Figure out how to pick the right row accumulator update function during stream setup rather than doing the dispatch once per batch.

What do you think?

RowAccessor::new_from_layout(self.row_aggr_layout.clone());
state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
for idx in &group_state.indices {
let mut single_value_acc_idx = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @ozankabak and @mustafasrepo -- here is an example where having two grouping operators (hash and bounded) requires 2x the code - not sure if we can find a way to reduce the duplication

Copy link
Contributor Author

@mingmwang mingmwang Jun 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely should address the duplicate code between the row_hash.rs and bounded_aggregate_stream.rs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb
I had combined the RowAccumulators update logic in the row_hash.rs and bounded_aggregate_stream.rs and avoided the copy/paste.
There are still some other duplicate coded(not introduced by this PR), I would prefer to leave it for future PR.

@@ -761,4 +785,67 @@ impl GroupedHashAggregateStream {
}
Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
}

fn update_one_accumulator_with_native_value<T1>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is basically a copy of what is in datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are duplicates between those two.

pub use dispatch_all_supported_data_types;

// TODO generate the matching type pairs
#[macro_export]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really worry about this table -- it seems like it will be very hard to maintain (aka trying to add some new type or accumulator type will be very hard)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had tried to move all the row accumulator supported types and the dispatching logic to this new added macros.
I know this macro dispatch_all_supported_data_types_pairs is not well implemented, I will try to use another macro to generate the body.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the challenge in my mind is that it seems so specific -- I think maybe centralizing the instantiation of type specific accumulator code would help a lot (aka move the type dispatch logic into something that looks like an arrow kernel... I will thin about this as I sleep tonight

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will think about this as I sleep tonight

This made me smile -- such is the life of an engineer 🤣

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really worry about this table -- it seems like it will be very hard to maintain (aka trying to add some new type or accumulator type will be very hard)

Done, I use nested macros to generate the body now.

Ok(())
}

fn update_two_accumulator2_with_native_value<T1, T2>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this a bit -- I wonder if we can somehow get rid of the need to typecast during the aggregate at all -- it seems to me the code would be much simpler if the aggregator didn't have to cast its input, but instead the inputs were cast as needed.

@mingmwang
Copy link
Contributor Author

BTW, I had also tested the enum_dispatch macros, this can reduce about 300 lines of code in this PR, but seems the performance is not as good as the handwriting version, so I give it up and keep the current handwriting version.

#[enum_dispatch]
pub trait RowAccumulator: Send + Sync + Debug {
    /// updates the accumulator's state from a vector of arrays.
    fn update_batch(&self, values: &[ArrayRef], accessor: &mut RowAccessor)
        -> Result<()>;

    /// updates the accumulator's state from rows with the specified indices.
    fn update_row_indices(
        &self,
        values: &[ArrayRef],
        filter: &Option<&BooleanArray>,
        row_indices: &[usize],
        accessor: &mut RowAccessor,
    ) -> Result<()>;

    /// updates the accumulator's state from a rust native value.
    fn update_value<N: RowAccumulatorNativeType>(
        &self,
        native_value: Option<N>,
        accessor: &mut RowAccessor,
    );

    /// updates the accumulator's state from a vector of states.
    fn merge_batch(
        &mut self,
        states: &[ArrayRef],
        accessor: &mut RowAccessor,
    ) -> Result<()>;

    /// returns its value based on its current state.
    fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue>;

    /// State's starting field index in the row.
    fn state_index(&self) -> usize;
}

/// Returns if `data_type` is supported with `RowAccumulator`
pub fn is_row_accumulator_support_dtype(data_type: &DataType) -> bool {
    matches_all_supported_data_types!(data_type)
}

/// Enum to dispatch the RowAccumulator, RowAccumulator contains generic methods and can not be used as the trait objects
#[derive(Debug)]
#[enum_dispatch(RowAccumulator)]
pub enum RowAccumulatorItem {
    AVG(AvgRowAccumulator),
    SUM(SumRowAccumulator),
    COUNT(CountRowAccumulator),
    MIN(MinRowAccumulator),
    MAX(MaxRowAccumulator),
    BITAND(BitAndRowAccumulator),
    BITOR(BitOrRowAccumulator),
    BITXOR(BitXorRowAccumulator),
    BOOLAND(BoolAndRowAccumulator),
    BOOLOR(BoolOrRowAccumulator),
}

@alamb
Copy link
Contributor

alamb commented Jun 26, 2023

@tustvold and I had a long chat about this and we have an alternate proposal that removes the per-group overhead entirely:
#4973 (comment)

Our observation is that the approach in this PR still has non-trivial per group overhead and the only way to really avoid that is to vectorize the update on both the input rows as well as the groups

@mingmwang mingmwang marked this pull request as draft June 28, 2023 05:51
@mingmwang
Copy link
Contributor Author

@alamb @tustvold
Move this PR to draft.

@alamb
Copy link
Contributor

alamb commented Jun 28, 2023

Thank you -- I hope to POC out the approach in #4973 (comment) over the next day or so (sorry I have been busy with other things -- notably creating a reproducer for apache/arrow-rs#4459)

@alamb
Copy link
Contributor

alamb commented Jul 2, 2023

BTW the POC is #6800 and it is showing some very nice results so far

@mingmwang
Copy link
Contributor Author

Just close it since we will have a better approach.

@mingmwang mingmwang closed this Jul 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

RowAccumulators support generics
4 participants