diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 59580bcb6a05..45c9709a342e 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -40,6 +40,7 @@ datafusion = { path = "../datafusion/core", features = ["avro"] } datafusion-common = { path = "../datafusion/common" } datafusion-expr = { path = "../datafusion/expr" } datafusion-optimizer = { path = "../datafusion/optimizer" } +datafusion-physical-expr = { workspace = true } datafusion-sql = { path = "../datafusion/sql" } env_logger = { workspace = true } futures = { workspace = true } diff --git a/datafusion-examples/examples/advanced_udaf.rs b/datafusion-examples/examples/advanced_udaf.rs index 8d5314bfbea5..e5433013d9a7 100644 --- a/datafusion-examples/examples/advanced_udaf.rs +++ b/datafusion-examples/examples/advanced_udaf.rs @@ -16,16 +16,22 @@ // under the License. use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; +use datafusion_physical_expr::NullState; use std::{any::Any, sync::Arc}; use arrow::{ - array::{ArrayRef, Float32Array}, + array::{ + ArrayRef, AsArray, Float32Array, PrimitiveArray, PrimitiveBuilder, UInt32Array, + }, + datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type}, record_batch::RecordBatch, }; use datafusion::error::Result; use datafusion::prelude::*; use datafusion_common::{cast::as_float64_array, ScalarValue}; -use datafusion_expr::{Accumulator, AggregateUDF, AggregateUDFImpl, Signature}; +use datafusion_expr::{ + Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature, +}; /// This example shows how to use the full AggregateUDFImpl API to implement a user /// defined aggregate function. As in the `simple_udaf.rs` example, this struct implements @@ -33,12 +39,12 @@ use datafusion_expr::{Accumulator, AggregateUDF, AggregateUDFImpl, Signature}; /// /// To do so, we must implement the `AggregateUDFImpl` trait. #[derive(Debug, Clone)] -struct GeoMeanUdf { +struct GeoMeanUdaf { signature: Signature, } -impl GeoMeanUdf { - /// Create a new instance of the GeoMeanUdf struct +impl GeoMeanUdaf { + /// Create a new instance of the GeoMeanUdaf struct fn new() -> Self { Self { signature: Signature::exact( @@ -52,7 +58,7 @@ impl GeoMeanUdf { } } -impl AggregateUDFImpl for GeoMeanUdf { +impl AggregateUDFImpl for GeoMeanUdaf { /// We implement as_any so that we can downcast the AggregateUDFImpl trait object fn as_any(&self) -> &dyn Any { self @@ -74,6 +80,11 @@ impl AggregateUDFImpl for GeoMeanUdf { } /// This is the accumulator factory; DataFusion uses it to create new accumulators. + /// + /// This is the accumulator factory for row wise accumulation; Even when `GroupsAccumulator` + /// is supported, DataFusion will use this row oriented + /// accumulator when the aggregate function is used as a window function + /// or when there are only aggregates (no GROUP BY columns) in the plan. fn accumulator(&self, _arg: &DataType) -> Result> { Ok(Box::new(GeometricMean::new())) } @@ -82,6 +93,16 @@ impl AggregateUDFImpl for GeoMeanUdf { fn state_type(&self, _return_type: &DataType) -> Result> { Ok(vec![DataType::Float64, DataType::UInt32]) } + + /// Tell DataFusion that this aggregate supports the more performant `GroupsAccumulator` + /// which is used for cases when there are grouping columns in the query + fn groups_accumulator_supported(&self) -> bool { + true + } + + fn create_groups_accumulator(&self) -> Result> { + Ok(Box::new(GeometricMeanGroupsAccumulator::new())) + } } /// A UDAF has state across multiple rows, and thus we require a `struct` with that state. @@ -173,16 +194,25 @@ fn create_context() -> Result { use datafusion::arrow::datatypes::{Field, Schema}; use datafusion::datasource::MemTable; // define a schema. - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Float32, false), + Field::new("b", DataType::Float32, false), + ])); // define data in two partitions let batch1 = RecordBatch::try_new( schema.clone(), - vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))], + vec![ + Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0])), + Arc::new(Float32Array::from(vec![2.0, 2.0, 2.0])), + ], )?; let batch2 = RecordBatch::try_new( schema.clone(), - vec![Arc::new(Float32Array::from(vec![64.0]))], + vec![ + Arc::new(Float32Array::from(vec![64.0])), + Arc::new(Float32Array::from(vec![2.0])), + ], )?; // declare a new context. In spark API, this corresponds to a new spark SQLsession @@ -194,15 +224,183 @@ fn create_context() -> Result { Ok(ctx) } +// Define a `GroupsAccumulator` for GeometricMean +/// which handles accumulator state for multiple groups at once. +/// This API is significantly more complicated than `Accumulator`, which manages +/// the state for a single group, but for queries with a large number of groups +/// can be significantly faster. See the `GroupsAccumulator` documentation for +/// more information. +struct GeometricMeanGroupsAccumulator { + /// The type of the internal sum + prod_data_type: DataType, + + /// The type of the returned sum + return_data_type: DataType, + + /// Count per group (use u32 to make UInt32Array) + counts: Vec, + + /// product per group, stored as the native type (not `ScalarValue`) + prods: Vec, + + /// Track nulls in the input / filters + null_state: NullState, +} + +impl GeometricMeanGroupsAccumulator { + fn new() -> Self { + Self { + prod_data_type: DataType::Float64, + return_data_type: DataType::Float64, + counts: vec![], + prods: vec![], + null_state: NullState::new(), + } + } +} + +impl GroupsAccumulator for GeometricMeanGroupsAccumulator { + /// Updates the accumulator state given input. DataFusion provides `group_indices`, + /// the groups that each row in `values` belongs to as well as an optional filter of which rows passed. + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow::array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "single argument to update_batch"); + let values = values[0].as_primitive::(); + + // increment counts, update sums + self.counts.resize(total_num_groups, 0); + self.prods.resize(total_num_groups, 1.0); + // Use the `NullState` structure to generate specialized code for null / non null input elements + self.null_state.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, new_value| { + let prod = &mut self.prods[group_index]; + *prod = prod.mul_wrapping(new_value); + + self.counts[group_index] += 1; + }, + ); + + Ok(()) + } + + /// Merge the results from previous invocations of `evaluate` into this accumulator's state + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow::array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 2, "two arguments to merge_batch"); + // first batch is counts, second is partial sums + let partial_prods = values[0].as_primitive::(); + let partial_counts = values[1].as_primitive::(); + // update counts with partial counts + self.counts.resize(total_num_groups, 0); + self.null_state.accumulate( + group_indices, + partial_counts, + opt_filter, + total_num_groups, + |group_index, partial_count| { + self.counts[group_index] += partial_count; + }, + ); + + // update prods + self.prods.resize(total_num_groups, 1.0); + self.null_state.accumulate( + group_indices, + partial_prods, + opt_filter, + total_num_groups, + |group_index, new_value: ::Native| { + let prod = &mut self.prods[group_index]; + *prod = prod.mul_wrapping(new_value); + }, + ); + + Ok(()) + } + + /// Generate output, as specififed by `emit_to` and update the intermediate state + fn evaluate(&mut self, emit_to: datafusion_expr::EmitTo) -> Result { + let counts = emit_to.take_needed(&mut self.counts); + let prods = emit_to.take_needed(&mut self.prods); + let nulls = self.null_state.build(emit_to); + + assert_eq!(nulls.len(), prods.len()); + assert_eq!(counts.len(), prods.len()); + + // don't evaluate geometric mean with null inputs to avoid errors on null values + + let array: PrimitiveArray = if nulls.null_count() > 0 { + let mut builder = PrimitiveBuilder::::with_capacity(nulls.len()); + let iter = prods.into_iter().zip(counts).zip(nulls.iter()); + + for ((prod, count), is_valid) in iter { + if is_valid { + builder.append_value(prod.powf(1.0 / count as f64)) + } else { + builder.append_null(); + } + } + builder.finish() + } else { + let geo_mean: Vec<::Native> = prods + .into_iter() + .zip(counts) + .map(|(prod, count)| prod.powf(1.0 / count as f64)) + .collect::>(); + PrimitiveArray::new(geo_mean.into(), Some(nulls)) // no copy + .with_data_type(self.return_data_type.clone()) + }; + + Ok(Arc::new(array)) + } + + // return arrays for counts and prods + fn state(&mut self, emit_to: datafusion_expr::EmitTo) -> Result> { + let nulls = self.null_state.build(emit_to); + let nulls = Some(nulls); + + let counts = emit_to.take_needed(&mut self.counts); + let counts = UInt32Array::new(counts.into(), nulls.clone()); // zero copy + + let prods = emit_to.take_needed(&mut self.prods); + let prods = PrimitiveArray::::new(prods.into(), nulls) // zero copy + .with_data_type(self.prod_data_type.clone()); + + Ok(vec![ + Arc::new(prods) as ArrayRef, + Arc::new(counts) as ArrayRef, + ]) + } + + fn size(&self) -> usize { + self.counts.capacity() * std::mem::size_of::() + + self.prods.capacity() * std::mem::size_of::() + } +} + #[tokio::main] async fn main() -> Result<()> { let ctx = create_context()?; // create the AggregateUDF - let geometric_mean = AggregateUDF::from(GeoMeanUdf::new()); + let geometric_mean = AggregateUDF::from(GeoMeanUdaf::new()); ctx.register_udaf(geometric_mean.clone()); - let sql_df = ctx.sql("SELECT geo_mean(a) FROM t").await?; + let sql_df = ctx.sql("SELECT geo_mean(a) FROM t group by b").await?; sql_df.show().await?; // get a DataFrame from the context diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index 5882718acefd..5b578daa7e34 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -19,7 +19,7 @@ //! user defined aggregate functions use arrow::{array::AsArray, datatypes::Fields}; -use arrow_array::Int32Array; +use arrow_array::{types::UInt64Type, Int32Array, PrimitiveArray}; use arrow_schema::Schema; use std::sync::{ atomic::{AtomicBool, Ordering}, @@ -45,7 +45,9 @@ use datafusion::{ use datafusion_common::{ assert_contains, cast::as_primitive_array, exec_err, DataFusionError, }; -use datafusion_expr::{create_udaf, SimpleAggregateUDF}; +use datafusion_expr::{ + create_udaf, AggregateUDFImpl, GroupsAccumulator, SimpleAggregateUDF, +}; use datafusion_physical_expr::expressions::AvgAccumulator; /// Test to show the contents of the setup @@ -297,6 +299,25 @@ async fn case_sensitive_identifiers_user_defined_aggregates() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_groups_accumulator() -> Result<()> { + let ctx = SessionContext::new(); + let arr = Int32Array::from(vec![1]); + let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(arr) as _)])?; + ctx.register_batch("t", batch).unwrap(); + + let udaf = AggregateUDF::from(TestGroupsAccumulator { + signature: Signature::exact(vec![DataType::Float64], Volatility::Immutable), + result: 1, + }); + ctx.register_udaf(udaf.clone()); + + let sql_df = ctx.sql("SELECT geo_mean(a) FROM t group by a").await?; + sql_df.show().await?; + + Ok(()) +} + /// Returns an context with a table "t" and the "first" and "time_sum" /// aggregate functions registered. /// @@ -621,3 +642,106 @@ impl Accumulator for FirstSelector { std::mem::size_of_val(self) } } + +#[derive(Debug, Clone)] +struct TestGroupsAccumulator { + signature: Signature, + result: u64, +} + +impl AggregateUDFImpl for TestGroupsAccumulator { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "geo_mean" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::UInt64) + } + + fn accumulator(&self, _arg: &DataType) -> Result> { + // should use groups accumulator + panic!("accumulator shouldn't invoke"); + } + + fn state_type(&self, _return_type: &DataType) -> Result> { + Ok(vec![DataType::UInt64]) + } + + fn groups_accumulator_supported(&self) -> bool { + true + } + + fn create_groups_accumulator(&self) -> Result> { + Ok(Box::new(self.clone())) + } +} + +impl Accumulator for TestGroupsAccumulator { + fn update_batch(&mut self, _values: &[ArrayRef]) -> Result<()> { + Ok(()) + } + + fn evaluate(&self) -> Result { + Ok(ScalarValue::from(self.result)) + } + + fn size(&self) -> usize { + std::mem::size_of::() + } + + fn state(&self) -> Result> { + Ok(vec![ScalarValue::from(self.result)]) + } + + fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> { + Ok(()) + } +} + +impl GroupsAccumulator for TestGroupsAccumulator { + fn update_batch( + &mut self, + _values: &[ArrayRef], + _group_indices: &[usize], + _opt_filter: Option<&arrow_array::BooleanArray>, + _total_num_groups: usize, + ) -> Result<()> { + Ok(()) + } + + fn evaluate(&mut self, _emit_to: datafusion_expr::EmitTo) -> Result { + Ok(Arc::new(PrimitiveArray::::new( + vec![self.result].into(), + None, + )) as ArrayRef) + } + + fn state(&mut self, _emit_to: datafusion_expr::EmitTo) -> Result> { + Ok(vec![Arc::new(PrimitiveArray::::new( + vec![self.result].into(), + None, + )) as ArrayRef]) + } + + fn merge_batch( + &mut self, + _values: &[ArrayRef], + _group_indices: &[usize], + _opt_filter: Option<&arrow_array::BooleanArray>, + _total_num_groups: usize, + ) -> Result<()> { + Ok(()) + } + + fn size(&self) -> usize { + std::mem::size_of::() + } +} diff --git a/datafusion/expr/src/groups_accumulator.rs b/datafusion/expr/src/groups_accumulator.rs new file mode 100644 index 000000000000..6580de19bc68 --- /dev/null +++ b/datafusion/expr/src/groups_accumulator.rs @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Vectorized [`GroupsAccumulator`] + +use arrow_array::{ArrayRef, BooleanArray}; +use datafusion_common::Result; + +/// Describes how many rows should be emitted during grouping. +#[derive(Debug, Clone, Copy)] +pub enum EmitTo { + /// Emit all groups + All, + /// Emit only the first `n` groups and shift all existing group + /// indexes down by `n`. + /// + /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted + /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`. + First(usize), +} + +impl EmitTo { + /// Removes the number of rows from `v` required to emit the right + /// number of rows, returning a `Vec` with elements taken, and the + /// remaining values in `v`. + /// + /// This avoids copying if Self::All + pub fn take_needed(&self, v: &mut Vec) -> Vec { + match self { + Self::All => { + // Take the entire vector, leave new (empty) vector + std::mem::take(v) + } + Self::First(n) => { + // get end n+1,.. values into t + let mut t = v.split_off(*n); + // leave n+1,.. in v + std::mem::swap(v, &mut t); + t + } + } + } +} + +/// `GroupAccumulator` implements a single aggregate (e.g. AVG) and +/// stores the state for *all* groups internally. +/// +/// Each group is assigned a `group_index` by the hash table and each +/// accumulator manages the specific state, one per group_index. +/// +/// group_indexes are contiguous (there aren't gaps), and thus it is +/// expected that each GroupAccumulator will use something like `Vec<..>` +/// to store the group states. +pub trait GroupsAccumulator: Send { + /// Updates the accumulator's state from its arguments, encoded as + /// a vector of [`ArrayRef`]s. + /// + /// * `values`: the input arguments to the accumulator + /// + /// * `group_indices`: To which groups do the rows in `values` + /// belong, group id) + /// + /// * `opt_filter`: if present, only update aggregate state using + /// `values[i]` if `opt_filter[i]` is true + /// + /// * `total_num_groups`: the number of groups (the largest + /// group_index is thus `total_num_groups - 1`). + /// + /// Note that subsequent calls to update_batch may have larger + /// total_num_groups as new groups are seen. + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()>; + + /// Returns the final aggregate value for each group as a single + /// `RecordBatch`, resetting the internal state. + /// + /// The rows returned *must* be in group_index order: The value + /// for group_index 0, followed by 1, etc. Any group_index that + /// did not have values, should be null. + /// + /// For example, a `SUM` accumulator maintains a running sum for + /// each group, and `evaluate` will produce that running sum as + /// its output for all groups, in group_index order + /// + /// If `emit_to`` is [`EmitTo::All`], the accumulator should + /// return all groups and release / reset its internal state + /// equivalent to when it was first created. + /// + /// If `emit_to` is [`EmitTo::First`], only the first `n` groups + /// should be emitted and the state for those first groups + /// removed. State for the remaining groups must be retained for + /// future use. The group_indices on subsequent calls to + /// `update_batch` or `merge_batch` will be shifted down by + /// `n`. See [`EmitTo::First`] for more details. + fn evaluate(&mut self, emit_to: EmitTo) -> Result; + + /// Returns the intermediate aggregate state for this accumulator, + /// used for multi-phase grouping, resetting its internal state. + /// + /// For example, `AVG` might return two arrays: `SUM` and `COUNT` + /// but the `MIN` aggregate would just return a single array. + /// + /// Note more sophisticated internal state can be passed as + /// single `StructArray` rather than multiple arrays. + /// + /// See [`Self::evaluate`] for details on the required output + /// order and `emit_to`. + fn state(&mut self, emit_to: EmitTo) -> Result>; + + /// Merges intermediate state (the output from [`Self::state`]) + /// into this accumulator's values. + /// + /// For some aggregates (such as `SUM`), `merge_batch` is the same + /// as `update_batch`, but for some aggregates (such as `COUNT`, + /// where the partial counts must be summed) the operations + /// differ. See [`Self::state`] for more details on how state is + /// used and merged. + /// + /// * `values`: arrays produced from calling `state` previously to the accumulator + /// + /// Other arguments are the same as for [`Self::update_batch`]; + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()>; + + /// Amount of memory used to store the state of this accumulator, + /// in bytes. This function is called once per batch, so it should + /// be `O(n)` to compute, not `O(num_groups)` + fn size(&self) -> usize; +} diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 21647f384159..c29535456327 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -47,6 +47,7 @@ pub mod expr_rewriter; pub mod expr_schema; pub mod field_util; pub mod function; +pub mod groups_accumulator; pub mod interval_arithmetic; pub mod logical_plan; pub mod tree_node; @@ -70,6 +71,7 @@ pub use function::{ AccumulatorFactoryFunction, PartitionEvaluatorFactory, ReturnTypeFunction, ScalarFunctionImplementation, StateTypeFunction, }; +pub use groups_accumulator::{EmitTo, GroupsAccumulator}; pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral}; pub use logical_plan::*; pub use nullif::SUPPORTED_NULLIF_TYPES; diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 4983f6247d24..444a4f1e8099 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -17,12 +17,13 @@ //! [`AggregateUDF`]: User Defined Aggregate Functions +use crate::groups_accumulator::GroupsAccumulator; use crate::{Accumulator, Expr}; use crate::{ AccumulatorFactoryFunction, ReturnTypeFunction, Signature, StateTypeFunction, }; use arrow::datatypes::DataType; -use datafusion_common::Result; +use datafusion_common::{not_impl_err, DataFusionError, Result}; use std::any::Any; use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; @@ -163,6 +164,16 @@ impl AggregateUDF { pub fn state_type(&self, return_type: &DataType) -> Result> { self.inner.state_type(return_type) } + + /// See [`AggregateUDFImpl::groups_accumulator_supported`] for more details. + pub fn groups_accumulator_supported(&self) -> bool { + self.inner.groups_accumulator_supported() + } + + /// See [`AggregateUDFImpl::create_groups_accumulator`] for more details. + pub fn create_groups_accumulator(&self) -> Result> { + self.inner.create_groups_accumulator() + } } impl From for AggregateUDF @@ -250,6 +261,22 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { /// Return the type used to serialize the [`Accumulator`]'s intermediate state. /// See [`Accumulator::state()`] for more details fn state_type(&self, return_type: &DataType) -> Result>; + + /// If the aggregate expression has a specialized + /// [`GroupsAccumulator`] implementation. If this returns true, + /// `[Self::create_groups_accumulator`] will be called. + fn groups_accumulator_supported(&self) -> bool { + false + } + + /// Return a specialized [`GroupsAccumulator`] that manages state + /// for all groups. + /// + /// For maximum performance, a [`GroupsAccumulator`] should be + /// implemented in addition to [`Accumulator`]. + fn create_groups_accumulator(&self) -> Result> { + not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") + } } /// Implementation of [`AggregateUDFImpl`] that wraps the function style pointers diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 91f2fb952dce..187373e14f99 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use crate::aggregate::groups_accumulator::accumulate::NullState; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; -use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; +use crate::{AggregateExpr, PhysicalExpr}; use arrow::compute::sum; use arrow::datatypes::{DataType, Decimal128Type, Float64Type, UInt64Type}; use arrow::{ @@ -41,9 +41,8 @@ use arrow_array::{ use arrow_buffer::{i256, ArrowNativeType}; use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::type_coercion::aggregates::avg_return_type; -use datafusion_expr::Accumulator; +use datafusion_expr::{Accumulator, EmitTo, GroupsAccumulator}; -use super::groups_accumulator::EmitTo; use super::utils::DecimalAverager; /// AVG aggregate expression diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs index 6c97d620616a..92883d8049d2 100644 --- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs +++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs @@ -22,11 +22,11 @@ use datafusion_common::cast::as_list_array; use std::any::Any; use std::sync::Arc; -use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; +use crate::{AggregateExpr, PhysicalExpr}; use arrow::datatypes::DataType; use arrow::{array::ArrayRef, datatypes::Field}; use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; -use datafusion_expr::Accumulator; +use datafusion_expr::{Accumulator, GroupsAccumulator}; use std::collections::HashSet; use crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs b/datafusion/physical-expr/src/aggregate/bool_and_or.rs index 9757d314b6aa..ae205141b4b9 100644 --- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs +++ b/datafusion/physical-expr/src/aggregate/bool_and_or.rs @@ -17,7 +17,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution -use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; +use crate::{AggregateExpr, PhysicalExpr}; use arrow::datatypes::DataType; use arrow::{ array::{ArrayRef, BooleanArray}, @@ -26,7 +26,7 @@ use arrow::{ use datafusion_common::{ downcast_value, internal_err, not_impl_err, DataFusionError, Result, ScalarValue, }; -use datafusion_expr::Accumulator; +use datafusion_expr::{Accumulator, GroupsAccumulator}; use std::any::Any; use std::sync::Arc; diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 8e9ae5cea36b..b6d4b7300434 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -23,7 +23,7 @@ use std::ops::BitAnd; use std::sync::Arc; use crate::aggregate::utils::down_cast_any_ref; -use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; +use crate::{AggregateExpr, PhysicalExpr}; use arrow::array::{Array, Int64Array}; use arrow::compute; use arrow::datatypes::DataType; @@ -34,12 +34,11 @@ use arrow_array::PrimitiveArray; use arrow_buffer::BooleanBuffer; use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::Accumulator; +use datafusion_expr::{Accumulator, EmitTo, GroupsAccumulator}; use crate::expressions::format_state_name; use super::groups_accumulator::accumulate::accumulate_indices; -use super::groups_accumulator::EmitTo; /// COUNT aggregate expression /// Returns the amount of non-null values of the given expression. diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs index 596265a737da..7080ea40039d 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs @@ -17,14 +17,13 @@ //! [`GroupsAccumulator`] helpers: [`NullState`] and [`accumulate_indices`] //! -//! [`GroupsAccumulator`]: crate::GroupsAccumulator +//! [`GroupsAccumulator`]: datafusion_expr::GroupsAccumulator use arrow::datatypes::ArrowPrimitiveType; use arrow_array::{Array, BooleanArray, PrimitiveArray}; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer}; -use crate::EmitTo; - +use datafusion_expr::EmitTo; /// Track the accumulator null state per row: if any values for that /// group were null and if any values have been seen at all for that group. /// @@ -49,7 +48,7 @@ use crate::EmitTo; /// had at least one value to accumulate so they do not need to track /// if they have seen values for a particular group. /// -/// [`GroupsAccumulator`]: crate::GroupsAccumulator +/// [`GroupsAccumulator`]: datafusion_expr::GroupsAccumulator #[derive(Debug)] pub struct NullState { /// Have we seen any non-filtered input values for `group_index`? @@ -62,6 +61,12 @@ pub struct NullState { seen_values: BooleanBufferBuilder, } +impl Default for NullState { + fn default() -> Self { + Self::new() + } +} + impl NullState { pub fn new() -> Self { Self { diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs index c6fd17a69b39..b4e6d2ebc5fc 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs @@ -17,7 +17,6 @@ //! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] -use super::{EmitTo, GroupsAccumulator}; use arrow::{ array::{AsArray, UInt32Builder}, compute, @@ -28,7 +27,7 @@ use datafusion_common::{ arrow_datafusion_err, utils::get_arrayref_at_indices, DataFusionError, Result, ScalarValue, }; -use datafusion_expr::Accumulator; +use datafusion_expr::{Accumulator, EmitTo, GroupsAccumulator}; /// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] /// diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs index 21b6cc29e83d..f40c661a7a2f 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs @@ -21,10 +21,9 @@ use arrow::array::AsArray; use arrow_array::{ArrayRef, BooleanArray}; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use datafusion_common::Result; +use datafusion_expr::{EmitTo, GroupsAccumulator}; -use crate::GroupsAccumulator; - -use super::{accumulate::NullState, EmitTo}; +use super::accumulate::NullState; /// An accumulator that implements a single operation over a /// [`BooleanArray`] where the accumulated state is also boolean (such diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index d2e64d373be2..de090badd349 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -15,146 +15,10 @@ // specific language governing permissions and limitations // under the License. -//! Vectorized [`GroupsAccumulator`] - pub(crate) mod accumulate; mod adapter; +pub use accumulate::NullState; pub use adapter::GroupsAccumulatorAdapter; pub(crate) mod bool_op; pub(crate) mod prim_op; - -use arrow_array::{ArrayRef, BooleanArray}; -use datafusion_common::Result; - -/// Describes how many rows should be emitted during grouping. -#[derive(Debug, Clone, Copy)] -pub enum EmitTo { - /// Emit all groups - All, - /// Emit only the first `n` groups and shift all existing group - /// indexes down by `n`. - /// - /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted - /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`. - First(usize), -} - -impl EmitTo { - /// Removes the number of rows from `v` required to emit the right - /// number of rows, returning a `Vec` with elements taken, and the - /// remaining values in `v`. - /// - /// This avoids copying if Self::All - pub fn take_needed(&self, v: &mut Vec) -> Vec { - match self { - Self::All => { - // Take the entire vector, leave new (empty) vector - std::mem::take(v) - } - Self::First(n) => { - // get end n+1,.. values into t - let mut t = v.split_off(*n); - // leave n+1,.. in v - std::mem::swap(v, &mut t); - t - } - } - } -} - -/// `GroupAccumulator` implements a single aggregate (e.g. AVG) and -/// stores the state for *all* groups internally. -/// -/// Each group is assigned a `group_index` by the hash table and each -/// accumulator manages the specific state, one per group_index. -/// -/// group_indexes are contiguous (there aren't gaps), and thus it is -/// expected that each GroupAccumulator will use something like `Vec<..>` -/// to store the group states. -pub trait GroupsAccumulator: Send { - /// Updates the accumulator's state from its arguments, encoded as - /// a vector of [`ArrayRef`]s. - /// - /// * `values`: the input arguments to the accumulator - /// - /// * `group_indices`: To which groups do the rows in `values` - /// belong, group id) - /// - /// * `opt_filter`: if present, only update aggregate state using - /// `values[i]` if `opt_filter[i]` is true - /// - /// * `total_num_groups`: the number of groups (the largest - /// group_index is thus `total_num_groups - 1`). - /// - /// Note that subsequent calls to update_batch may have larger - /// total_num_groups as new groups are seen. - fn update_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> Result<()>; - - /// Returns the final aggregate value for each group as a single - /// `RecordBatch`, resetting the internal state. - /// - /// The rows returned *must* be in group_index order: The value - /// for group_index 0, followed by 1, etc. Any group_index that - /// did not have values, should be null. - /// - /// For example, a `SUM` accumulator maintains a running sum for - /// each group, and `evaluate` will produce that running sum as - /// its output for all groups, in group_index order - /// - /// If `emit_to`` is [`EmitTo::All`], the accumulator should - /// return all groups and release / reset its internal state - /// equivalent to when it was first created. - /// - /// If `emit_to` is [`EmitTo::First`], only the first `n` groups - /// should be emitted and the state for those first groups - /// removed. State for the remaining groups must be retained for - /// future use. The group_indices on subsequent calls to - /// `update_batch` or `merge_batch` will be shifted down by - /// `n`. See [`EmitTo::First`] for more details. - fn evaluate(&mut self, emit_to: EmitTo) -> Result; - - /// Returns the intermediate aggregate state for this accumulator, - /// used for multi-phase grouping, resetting its internal state. - /// - /// For example, `AVG` might return two arrays: `SUM` and `COUNT` - /// but the `MIN` aggregate would just return a single array. - /// - /// Note more sophisticated internal state can be passed as - /// single `StructArray` rather than multiple arrays. - /// - /// See [`Self::evaluate`] for details on the required output - /// order and `emit_to`. - fn state(&mut self, emit_to: EmitTo) -> Result>; - - /// Merges intermediate state (the output from [`Self::state`]) - /// into this accumulator's values. - /// - /// For some aggregates (such as `SUM`), `merge_batch` is the same - /// as `update_batch`, but for some aggregates (such as `COUNT`, - /// where the partial counts must be summed) the operations - /// differ. See [`Self::state`] for more details on how state is - /// used and merged. - /// - /// * `values`: arrays produced from calling `state` previously to the accumulator - /// - /// Other arguments are the same as for [`Self::update_batch`]; - fn merge_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> Result<()>; - - /// Amount of memory used to store the state of this accumulator, - /// in bytes. This function is called once per batch, so it should - /// be `O(n)` to compute, not `O(num_groups)` - fn size(&self) -> usize; -} diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs index 130d56271280..994f5447d7c0 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs @@ -21,10 +21,9 @@ use arrow::{array::AsArray, datatypes::ArrowPrimitiveType}; use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; use arrow_schema::DataType; use datafusion_common::Result; +use datafusion_expr::{EmitTo, GroupsAccumulator}; -use crate::GroupsAccumulator; - -use super::{accumulate::NullState, EmitTo}; +use super::accumulate::NullState; /// An accumulator that implements a single operation over /// [`ArrowPrimitiveType`] where the accumulated state is the same as diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 7e3ef2a2abab..ba3e70855355 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -22,7 +22,7 @@ use std::convert::TryFrom; use std::sync::Arc; use crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; +use crate::{AggregateExpr, PhysicalExpr}; use arrow::compute; use arrow::datatypes::{ DataType, Date32Type, Date64Type, Time32MillisecondType, Time32SecondType, @@ -47,7 +47,7 @@ use arrow_array::types::{ use datafusion_common::internal_err; use datafusion_common::ScalarValue; use datafusion_common::{downcast_value, DataFusionError, Result}; -use datafusion_expr::Accumulator; +use datafusion_expr::{Accumulator, GroupsAccumulator}; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 270a8e6f7705..2bb205ce90dc 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -19,13 +19,12 @@ use std::any::Any; use std::fmt::Debug; use std::sync::Arc; -use self::groups_accumulator::GroupsAccumulator; use crate::expressions::{NthValueAgg, OrderSensitiveArrayAgg}; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::datatypes::Field; use datafusion_common::{not_impl_err, DataFusionError, Result}; -use datafusion_expr::Accumulator; +use datafusion_expr::{Accumulator, GroupsAccumulator}; mod hyperloglog; mod tdigest; diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 03f666cc4e5d..a770b3874ce0 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use super::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; -use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; +use crate::{AggregateExpr, PhysicalExpr}; use arrow::compute::sum; use arrow::datatypes::DataType; use arrow::{array::ArrayRef, datatypes::Field}; @@ -35,7 +35,7 @@ use arrow_array::{Array, ArrowNativeTypeOp, ArrowNumericType}; use arrow_buffer::ArrowNativeType; use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::type_coercion::aggregates::sum_return_type; -use datafusion_expr::Accumulator; +use datafusion_expr::{Accumulator, GroupsAccumulator}; /// SUM aggregate expression #[derive(Debug, Clone)] diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index bbfba4ad8310..007a03985f45 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -105,14 +105,13 @@ pub(crate) mod tests { use std::sync::Arc; use crate::expressions::{col, create_aggregate_expr, try_cast}; - use crate::{AggregateExpr, EmitTo}; - + use crate::AggregateExpr; use arrow::record_batch::RecordBatch; use arrow_array::ArrayRef; use arrow_schema::{Field, Schema}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::type_coercion::aggregates::coerce_types; - use datafusion_expr::AggregateFunction; + use datafusion_expr::{AggregateFunction, EmitTo}; /// macro to perform an aggregation using [`datafusion_expr::Accumulator`] and verify the /// result. diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index fffa8f602d87..6f55f56916e7 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -48,9 +48,7 @@ pub mod utils; pub mod var_provider; pub mod window; -pub use aggregate::groups_accumulator::{ - EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter, -}; +pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use aggregate::AggregateExpr; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; pub use equivalence::EquivalenceProperties; diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index cafa385eac39..ef9aac3d3ef0 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -19,9 +19,9 @@ use arrow::record_batch::RecordBatch; use arrow_array::{downcast_primitive, ArrayRef}; use arrow_schema::SchemaRef; use datafusion_common::Result; -use datafusion_physical_expr::EmitTo; pub(crate) mod primitive; +use datafusion_expr::EmitTo; use primitive::GroupValuesPrimitive; mod row; diff --git a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs index e3ba284797d1..18d20f3c47e6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs @@ -26,7 +26,7 @@ use arrow_array::{ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArra use arrow_schema::DataType; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; -use datafusion_physical_expr::EmitTo; +use datafusion_expr::EmitTo; use half::f16; use hashbrown::raw::RawTable; use std::sync::Arc; diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index 10ff9edb8912..3b7480cd292a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -25,7 +25,7 @@ use arrow_schema::{DataType, SchemaRef}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; -use datafusion_physical_expr::EmitTo; +use datafusion_expr::EmitTo; use hashbrown::raw::RawTable; /// A [`GroupValues`] making use of [`Rows`] diff --git a/datafusion/physical-plan/src/aggregates/order/full.rs b/datafusion/physical-plan/src/aggregates/order/full.rs index f46ee687faf1..c15538e8ab8e 100644 --- a/datafusion/physical-plan/src/aggregates/order/full.rs +++ b/datafusion/physical-plan/src/aggregates/order/full.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_physical_expr::EmitTo; +use datafusion_expr::EmitTo; /// Tracks grouping state when the data is ordered entirely by its /// group keys diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs b/datafusion/physical-plan/src/aggregates/order/mod.rs index b258b97a9e84..4f1914b12c96 100644 --- a/datafusion/physical-plan/src/aggregates/order/mod.rs +++ b/datafusion/physical-plan/src/aggregates/order/mod.rs @@ -18,7 +18,8 @@ use arrow_array::ArrayRef; use arrow_schema::Schema; use datafusion_common::Result; -use datafusion_physical_expr::{EmitTo, PhysicalSortExpr}; +use datafusion_expr::EmitTo; +use datafusion_physical_expr::PhysicalSortExpr; mod full; mod partial; diff --git a/datafusion/physical-plan/src/aggregates/order/partial.rs b/datafusion/physical-plan/src/aggregates/order/partial.rs index ff8a75b9b28b..ecd37c913e98 100644 --- a/datafusion/physical-plan/src/aggregates/order/partial.rs +++ b/datafusion/physical-plan/src/aggregates/order/partial.rs @@ -20,7 +20,7 @@ use arrow_array::ArrayRef; use arrow_schema::Schema; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; -use datafusion_physical_expr::EmitTo; +use datafusion_expr::EmitTo; use datafusion_physical_expr::PhysicalSortExpr; /// Tracks grouping state when the data is ordered by some subset of diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 6a0c02f5caf3..f9db0a050cfc 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -44,9 +44,10 @@ use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; +use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{ - AggregateExpr, EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter, PhysicalSortExpr, + AggregateExpr, GroupsAccumulatorAdapter, PhysicalSortExpr, }; use futures::ready; diff --git a/datafusion/physical-plan/src/udaf.rs b/datafusion/physical-plan/src/udaf.rs index 94017efe97aa..a82bbe1d0705 100644 --- a/datafusion/physical-plan/src/udaf.rs +++ b/datafusion/physical-plan/src/udaf.rs @@ -17,6 +17,7 @@ //! This module contains functions and structs supporting user-defined aggregate functions. +use datafusion_expr::GroupsAccumulator; use fmt::Debug; use std::any::Any; use std::fmt; @@ -166,6 +167,14 @@ impl AggregateExpr for AggregateFunctionExpr { fn name(&self) -> &str { &self.name } + + fn groups_accumulator_supported(&self) -> bool { + self.fun.groups_accumulator_supported() + } + + fn create_groups_accumulator(&self) -> Result> { + self.fun.create_groups_accumulator() + } } impl PartialEq for AggregateFunctionExpr {