Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Manage group values by blocks in aggregation #11932

Closed
wants to merge 9 commits into from
Prev Previous commit
Next Next commit
just make GroupIdx an internal concept first.
Rachelint committed Aug 11, 2024
commit c35536882062d8d2e6628d4df517edf91c796c51
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/aggregates/group_values/bytes.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::aggregates::group_values::{GroupIdx, GroupValues};
use crate::aggregates::group_values::GroupValues;
use arrow_array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
use datafusion_expr::EmitTo;
use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
@@ -44,7 +44,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<GroupIdx>,
groups: &mut Vec<usize>,
) -> datafusion_common::Result<()> {
assert_eq!(cols.len(), 1);

@@ -63,7 +63,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
},
// called for each group
|group_idx| {
groups.push(GroupIdx::new(0, group_idx as u64));
groups.push(group_idx);
},
);

@@ -111,7 +111,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
self.intern(&[remaining_group_values], &mut group_indexes)?;

// Verify that the group indexes were assigned in the correct order
assert_eq!(0, group_indexes[0].block_offset());
assert_eq!(0, group_indexes[0]);

emit_group_values
}
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::aggregates::group_values::{GroupIdx, GroupValues};
use crate::aggregates::group_values::GroupValues;
use arrow_array::{Array, ArrayRef, RecordBatch};
use datafusion_expr::EmitTo;
use datafusion_physical_expr::binary_map::OutputType;
@@ -45,7 +45,7 @@ impl GroupValues for GroupValuesBytesView {
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<GroupIdx>,
groups: &mut Vec<usize>,
) -> datafusion_common::Result<()> {
assert_eq!(cols.len(), 1);

@@ -64,7 +64,7 @@ impl GroupValues for GroupValuesBytesView {
},
// called for each group
|group_idx| {
groups.push(GroupIdx::new(0, group_idx as u64));
groups.push(group_idx);
},
);

@@ -85,7 +85,7 @@ impl GroupValues for GroupValuesBytesView {
self.num_groups
}

fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<Vec<ArrayRef>>> {
fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();

@@ -112,7 +112,7 @@ impl GroupValues for GroupValuesBytesView {
self.intern(&[remaining_group_values], &mut group_indexes)?;

// Verify that the group indexes were assigned in the correct order
assert_eq!(0, group_indexes[0].block_offset());
assert_eq!(0, group_indexes[0]);

emit_group_values
}
Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@ impl GroupIdx {
/// An interning store for group keys
pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<GroupIdx>) -> Result<()>;
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;

/// Returns the number of bytes used by this [`GroupValues`]
fn size(&self) -> usize;
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::aggregates::group_values::{GroupIdx, GroupValues};
use crate::aggregates::group_values::GroupValues;
use ahash::RandomState;
use arrow::array::BooleanBufferBuilder;
use arrow::buffer::NullBuffer;
@@ -111,7 +111,7 @@ impl<T: ArrowPrimitiveType> GroupValues for GroupValuesPrimitive<T>
where
T::Native: HashValue,
{
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<GroupIdx>) -> Result<()> {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
assert_eq!(cols.len(), 1);
groups.clear();

@@ -145,7 +145,7 @@ where
}
}
};
groups.push(GroupIdx::new(0, group_id as u64))
groups.push(group_id)
}
Ok(())
}
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ impl GroupValuesRows {
}

impl GroupValues for GroupValuesRows {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<GroupIdx>) -> Result<()> {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
// Convert the group keys into the row format
let group_rows = &mut self.rows_buffer;
group_rows.clear();
@@ -121,7 +121,7 @@ impl GroupValues for GroupValuesRows {
self.group_values_blocks.push_back(block);
};

let mut group_values_blocks = mem::take(&mut self.group_values_blocks);
let group_values_blocks = mem::take(&mut self.group_values_blocks);

// tracks to which group each of the input rows belongs
groups.clear();
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::vec;

use crate::aggregates::group_values::{new_group_values, GroupValues};
use crate::aggregates::group_values::{new_group_values, GroupIdx, GroupValues};
use crate::aggregates::order::GroupOrderingFull;
use crate::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
@@ -353,7 +353,7 @@ pub(crate) struct GroupedHashAggregateStream {

/// scratch space for the current input [`RecordBatch`] being
/// processed. Reused across batches here to avoid reallocations
current_group_indices: Vec<usize>,
current_group_indices: Vec<GroupIdx>,

/// Tracks if this stream is generating input or output
exec_state: ExecutionState,