From f478344c42b9da8931d65415e500a01a37fc34f2 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 2 Sep 2024 18:41:15 +0000 Subject: [PATCH 1/5] Removes min/max/count comparison based on name in aggregate statistics --- datafusion/expr/src/udaf.rs | 26 +++++++++++++ datafusion/functions-aggregate/src/count.rs | 4 ++ datafusion/functions-aggregate/src/min_max.rs | 8 ++++ .../src/aggregate_statistics.rs | 39 +++---------------- 4 files changed, 43 insertions(+), 34 deletions(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 7b4b3bb95c46..87be2321c590 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -262,6 +262,19 @@ impl AggregateUDF { self.inner.is_descending() } + /// Returns true if the function is min. Used by the optimizer + pub fn is_min(&self) -> bool { + self.inner.is_min() + } + + /// Returns true if the function is max. Used by the optimizer + pub fn is_max(&self) -> bool { + self.inner.is_max() + } + /// Returns true if the function is count. Used by the optimizer + pub fn is_count(&self) -> bool { + self.inner.is_count() + } /// See [`AggregateUDFImpl::default_value`] for more details. pub fn default_value(&self, data_type: &DataType) -> Result { self.inner.default_value(data_type) @@ -575,6 +588,19 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { None } + // Returns true if the function is min. Used by the optimizer + fn is_min(&self) -> bool { + false + } + // Returns true if the function is max. Used by the optimizer + fn is_max(&self) -> bool { + false + } + // Returns true if the function is count. Used by the optimizer + fn is_count(&self) -> bool { + false + } + /// Returns default value of the function given the input is all `null`. /// /// Most of the aggregate function return Null if input is Null, diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 417e28e72a71..dd61eb999831 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -291,6 +291,10 @@ impl AggregateUDFImpl for Count { fn default_value(&self, _data_type: &DataType) -> Result { Ok(ScalarValue::Int64(Some(0))) } + + fn is_count(&self) -> bool { + true + } } #[derive(Debug)] diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 961e8639604c..5a69fe9ff36b 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -272,6 +272,10 @@ impl AggregateUDFImpl for Max { fn is_descending(&self) -> Option { Some(true) } + + fn is_max(&self) -> bool { + true + } fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity { datafusion_expr::utils::AggregateOrderSensitivity::Insensitive } @@ -1052,6 +1056,10 @@ impl AggregateUDFImpl for Min { Some(false) } + fn is_min(&self) -> bool { + true + } + fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity { datafusion_expr::utils::AggregateOrderSensitivity::Insensitive } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 863c5ab2d288..35b12765f550 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -141,7 +141,7 @@ fn take_optimizable_column_and_table_count( stats: &Statistics, ) -> Option<(ScalarValue, String)> { let col_stats = &stats.column_statistics; - if is_non_distinct_count(agg_expr) { + if agg_expr.fun().is_count() && !agg_expr.is_distinct() { if let Precision::Exact(num_rows) = stats.num_rows { let exprs = agg_expr.expressions(); if exprs.len() == 1 { @@ -181,7 +181,7 @@ fn take_optimizable_min( match *num_rows { 0 => { // MIN/MAX with 0 rows is always null - if is_min(agg_expr) { + if agg_expr.fun().is_min() { if let Ok(min_data_type) = ScalarValue::try_from(agg_expr.field().data_type()) { @@ -191,7 +191,7 @@ fn take_optimizable_min( } value if value > 0 => { let col_stats = &stats.column_statistics; - if is_min(agg_expr) { + if agg_expr.fun().is_min() { let exprs = agg_expr.expressions(); if exprs.len() == 1 { // TODO optimize with exprs other than Column @@ -227,7 +227,7 @@ fn take_optimizable_max( match *num_rows { 0 => { // MIN/MAX with 0 rows is always null - if is_max(agg_expr) { + if agg_expr.fun().is_max() { if let Ok(max_data_type) = ScalarValue::try_from(agg_expr.field().data_type()) { @@ -237,7 +237,7 @@ fn take_optimizable_max( } value if value > 0 => { let col_stats = &stats.column_statistics; - if is_max(agg_expr) { + if agg_expr.fun().is_max() { let exprs = agg_expr.expressions(); if exprs.len() == 1 { // TODO optimize with exprs other than Column @@ -263,32 +263,3 @@ fn take_optimizable_max( } None } - -// TODO: Move this check into AggregateUDFImpl -// https://github.com/apache/datafusion/issues/11153 -fn is_non_distinct_count(agg_expr: &AggregateFunctionExpr) -> bool { - if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() { - return true; - } - false -} - -// TODO: Move this check into AggregateUDFImpl -// https://github.com/apache/datafusion/issues/11153 -fn is_min(agg_expr: &AggregateFunctionExpr) -> bool { - if agg_expr.fun().name().to_lowercase() == "min" { - return true; - } - false -} - -// TODO: Move this check into AggregateUDFImpl -// https://github.com/apache/datafusion/issues/11153 -fn is_max(agg_expr: &AggregateFunctionExpr) -> bool { - if agg_expr.fun().name().to_lowercase() == "max" { - return true; - } - false -} - -// See tests in datafusion/core/tests/physical_optimizer From 751e35a10136c5eb7f3e32f805bb416adf2b1a44 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Fri, 13 Sep 2024 23:17:18 +0000 Subject: [PATCH 2/5] Abstracting away value from statistics --- datafusion/expr/src/udaf.rs | 42 +++--- datafusion/functions-aggregate/src/count.rs | 32 ++++- datafusion/functions-aggregate/src/min_max.rs | 90 +++++++++++- .../src/aggregate_statistics.rs | 132 ++---------------- 4 files changed, 146 insertions(+), 150 deletions(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 87be2321c590..9dda0edb421c 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -25,7 +25,8 @@ use std::vec; use arrow::datatypes::{DataType, Field}; -use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; +use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue, Statistics}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use crate::expr::AggregateFunction; use crate::function::{ @@ -262,19 +263,16 @@ impl AggregateUDF { self.inner.is_descending() } - /// Returns true if the function is min. Used by the optimizer - pub fn is_min(&self) -> bool { - self.inner.is_min() + pub fn value_from_stats( + &self, + statistics: &Statistics, + data_type: &DataType, + arguments: &[Arc], + ) -> Option { + self.inner + .value_from_stats(statistics, &data_type, arguments) } - /// Returns true if the function is max. Used by the optimizer - pub fn is_max(&self) -> bool { - self.inner.is_max() - } - /// Returns true if the function is count. Used by the optimizer - pub fn is_count(&self) -> bool { - self.inner.is_count() - } /// See [`AggregateUDFImpl::default_value`] for more details. pub fn default_value(&self, data_type: &DataType) -> Result { self.inner.default_value(data_type) @@ -587,18 +585,14 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { fn is_descending(&self) -> Option { None } - - // Returns true if the function is min. Used by the optimizer - fn is_min(&self) -> bool { - false - } - // Returns true if the function is max. Used by the optimizer - fn is_max(&self) -> bool { - false - } - // Returns true if the function is count. Used by the optimizer - fn is_count(&self) -> bool { - false + // Return the value of the current UDF from the statistics + fn value_from_stats( + &self, + _statistics: &Statistics, + _data_type: &DataType, + _arguments: &[Arc], + ) -> Option { + None } /// Returns default value of the function given the input is all `null`. diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index dd61eb999831..e62500d7af12 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,7 +16,9 @@ // under the License. use ahash::RandomState; +use datafusion_common::stats::Precision; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; +use datafusion_physical_expr::expressions; use std::collections::HashSet; use std::ops::BitAnd; use std::{fmt::Debug, sync::Arc}; @@ -54,6 +56,7 @@ use datafusion_functions_aggregate_common::aggregate::count_distinct::{ use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate_indices; use datafusion_physical_expr_common::binary_map::OutputType; +use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; make_udaf_expr_and_func!( Count, count, @@ -292,8 +295,33 @@ impl AggregateUDFImpl for Count { Ok(ScalarValue::Int64(Some(0))) } - fn is_count(&self) -> bool { - true + fn value_from_stats( + &self, + statistics: &datafusion_common::Statistics, + _data_type: &DataType, + arguments: &[Arc], + ) -> Option { + if let Precision::Exact(num_rows) = statistics.num_rows { + if arguments.len() == 1 { + // TODO optimize with exprs other than Column + if let Some(col_expr) = + arguments[0].as_any().downcast_ref::() + { + let current_val = + &statistics.column_statistics[col_expr.index()].null_count; + if let &Precision::Exact(val) = current_val { + return Some(ScalarValue::Int64(Some((num_rows - val) as i64))); + } + } else if let Some(lit_expr) = + arguments[0].as_any().downcast_ref::() + { + if lit_expr.value() == &COUNT_STAR_EXPANSION { + return Some(ScalarValue::Int64(Some(num_rows as i64))); + } + } + } + } + None } } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 5a69fe9ff36b..659fca60447a 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -49,11 +49,15 @@ use arrow::datatypes::{ UInt8Type, }; use arrow_schema::IntervalUnit; +use datafusion_common::stats::Precision; use datafusion_common::{ - downcast_value, exec_err, internal_err, DataFusionError, Result, + downcast_value, exec_err, internal_err, ColumnStatistics, DataFusionError, Result, + Statistics, }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +use datafusion_physical_expr::{expressions, PhysicalExpr}; use std::fmt::Debug; +use std::sync::Arc; use arrow::datatypes::i256; use arrow::datatypes::{ @@ -147,6 +151,55 @@ macro_rules! instantiate_min_accumulator { }}; } +trait FromColumnStatistics { + fn value_from_column_statistics( + &self, + stats: &ColumnStatistics, + ) -> Option; + + fn value_from_statistics( + &self, + statistics: &Statistics, + data_type: &DataType, + arguments: &[Arc], + ) -> Option { + if let Precision::Exact(num_rows) = &statistics.num_rows { + match *num_rows { + 0 => return ScalarValue::try_from(data_type).ok(), + value if value > 0 => { + let col_stats = &statistics.column_statistics; + if arguments.len() == 1 { + // TODO optimize with exprs other than Column + if let Some(col_expr) = + arguments[0].as_any().downcast_ref::() + { + return self.value_from_column_statistics( + &col_stats[col_expr.index()], + ); + } + } + } + _ => {} + } + } + None + } +} + +impl FromColumnStatistics for Max { + fn value_from_column_statistics( + &self, + col_stats: &ColumnStatistics, + ) -> Option { + if let Precision::Exact(ref val) = col_stats.max_value { + if !val.is_null() { + return Some(val.clone()); + } + } + None + } +} + impl AggregateUDFImpl for Max { fn as_any(&self) -> &dyn std::any::Any { self @@ -273,9 +326,6 @@ impl AggregateUDFImpl for Max { Some(true) } - fn is_max(&self) -> bool { - true - } fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity { datafusion_expr::utils::AggregateOrderSensitivity::Insensitive } @@ -286,6 +336,14 @@ impl AggregateUDFImpl for Max { fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF { datafusion_expr::ReversedUDAF::Identical } + fn value_from_stats( + &self, + statistics: &Statistics, + data_type: &DataType, + arguments: &[Arc], + ) -> Option { + self.value_from_statistics(statistics, data_type, arguments) + } } // Statically-typed version of min/max(array) -> ScalarValue for string types @@ -930,6 +988,20 @@ impl Default for Min { } } +impl FromColumnStatistics for Min { + fn value_from_column_statistics( + &self, + col_stats: &ColumnStatistics, + ) -> Option { + if let Precision::Exact(ref val) = col_stats.min_value { + if !val.is_null() { + return Some(val.clone()); + } + } + None + } +} + impl AggregateUDFImpl for Min { fn as_any(&self) -> &dyn std::any::Any { self @@ -1056,10 +1128,14 @@ impl AggregateUDFImpl for Min { Some(false) } - fn is_min(&self) -> bool { - true + fn value_from_stats( + &self, + statistics: &Statistics, + data_type: &DataType, + arguments: &[Arc], + ) -> Option { + self.value_from_statistics(statistics, data_type, arguments) } - fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity { datafusion_expr::utils::AggregateOrderSensitivity::Insensitive } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 35b12765f550..fd04f9e2998e 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -61,10 +61,10 @@ impl PhysicalOptimizerRule for AggregateStatistics { take_optimizable_column_and_table_count(expr, &stats) { projections.push((expressions::lit(non_null_rows), name.to_owned())); - } else if let Some((min, name)) = take_optimizable_min(expr, &stats) { - projections.push((expressions::lit(min), name.to_owned())); - } else if let Some((max, name)) = take_optimizable_max(expr, &stats) { - projections.push((expressions::lit(max), name.to_owned())); + } else if let Some((min_or_max, name)) = + take_optimizable_value_from_statistics(expr, &stats) + { + projections.push((expressions::lit(min_or_max), name.to_owned())); } else { // TODO: we need all aggr_expr to be resolved (cf TODO fullres) break; @@ -140,126 +140,24 @@ fn take_optimizable_column_and_table_count( agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { - let col_stats = &stats.column_statistics; - if agg_expr.fun().is_count() && !agg_expr.is_distinct() { - if let Precision::Exact(num_rows) = stats.num_rows { - let exprs = agg_expr.expressions(); - if exprs.len() == 1 { - // TODO optimize with exprs other than Column - if let Some(col_expr) = - exprs[0].as_any().downcast_ref::() - { - let current_val = &col_stats[col_expr.index()].null_count; - if let &Precision::Exact(val) = current_val { - return Some(( - ScalarValue::Int64(Some((num_rows - val) as i64)), - agg_expr.name().to_string(), - )); - } - } else if let Some(lit_expr) = - exprs[0].as_any().downcast_ref::() - { - if lit_expr.value() == &COUNT_STAR_EXPANSION { - return Some(( - ScalarValue::Int64(Some(num_rows as i64)), - agg_expr.name().to_string(), - )); - } - } - } - } - } - None -} - -/// If this agg_expr is a min that is exactly defined in the statistics, return it. -fn take_optimizable_min( - agg_expr: &AggregateFunctionExpr, - stats: &Statistics, -) -> Option<(ScalarValue, String)> { - if let Precision::Exact(num_rows) = &stats.num_rows { - match *num_rows { - 0 => { - // MIN/MAX with 0 rows is always null - if agg_expr.fun().is_min() { - if let Ok(min_data_type) = - ScalarValue::try_from(agg_expr.field().data_type()) - { - return Some((min_data_type, agg_expr.name().to_string())); - } - } - } - value if value > 0 => { - let col_stats = &stats.column_statistics; - if agg_expr.fun().is_min() { - let exprs = agg_expr.expressions(); - if exprs.len() == 1 { - // TODO optimize with exprs other than Column - if let Some(col_expr) = - exprs[0].as_any().downcast_ref::() - { - if let Precision::Exact(val) = - &col_stats[col_expr.index()].min_value - { - if !val.is_null() { - return Some(( - val.clone(), - agg_expr.name().to_string(), - )); - } - } - } - } - } - } - _ => {} + if !agg_expr.is_distinct() { + if let Some((val, name)) = take_optimizable_value_from_statistics(agg_expr, stats) + { + return Some((val, name)); } } None } /// If this agg_expr is a max that is exactly defined in the statistics, return it. -fn take_optimizable_max( +fn take_optimizable_value_from_statistics( agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { - if let Precision::Exact(num_rows) = &stats.num_rows { - match *num_rows { - 0 => { - // MIN/MAX with 0 rows is always null - if agg_expr.fun().is_max() { - if let Ok(max_data_type) = - ScalarValue::try_from(agg_expr.field().data_type()) - { - return Some((max_data_type, agg_expr.name().to_string())); - } - } - } - value if value > 0 => { - let col_stats = &stats.column_statistics; - if agg_expr.fun().is_max() { - let exprs = agg_expr.expressions(); - if exprs.len() == 1 { - // TODO optimize with exprs other than Column - if let Some(col_expr) = - exprs[0].as_any().downcast_ref::() - { - if let Precision::Exact(val) = - &col_stats[col_expr.index()].max_value - { - if !val.is_null() { - return Some(( - val.clone(), - agg_expr.name().to_string(), - )); - } - } - } - } - } - } - _ => {} - } - } - None + let value = agg_expr.fun().value_from_stats( + &stats, + agg_expr.field().data_type(), + agg_expr.expressions().as_slice(), + ); + value.map(|val| (val, agg_expr.name().to_string())) } From 016decf5d73425ad60a40abbca31a4e437ba8450 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Fri, 13 Sep 2024 23:21:39 +0000 Subject: [PATCH 3/5] Removing imports --- datafusion/physical-optimizer/src/aggregate_statistics.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index fd04f9e2998e..0fcd5bf2c95c 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -26,9 +26,7 @@ use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::{expressions, ExecutionPlan, Statistics}; use crate::PhysicalOptimizerRule; -use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::udaf::AggregateFunctionExpr; From f41dd3e35eb1b191c2c7d05436dc1183bb84290d Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Sat, 28 Sep 2024 08:46:48 -0400 Subject: [PATCH 4/5] Introduced StatisticsArgs --- datafusion/expr/src/lib.rs | 2 +- datafusion/expr/src/udaf.rs | 26 ++++++----- datafusion/functions-aggregate/src/count.rs | 31 ++++++------- datafusion/functions-aggregate/src/min_max.rs | 43 +++++++----------- .../src/aggregate_statistics.rs | 45 +++++++------------ datafusion/physical-plan/src/lib.rs | 1 + 6 files changed, 64 insertions(+), 84 deletions(-) diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 260065f69af9..4e967cc62322 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -90,7 +90,7 @@ pub use logical_plan::*; pub use partition_evaluator::PartitionEvaluator; pub use sqlparser; pub use table_source::{TableProviderFilterPushDown, TableSource, TableType}; -pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF}; +pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF, StatisticsArgs}; pub use udf::{ScalarUDF, ScalarUDFImpl}; pub use udwf::{WindowUDF, WindowUDFImpl}; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 9dda0edb421c..926eb2bc0ac6 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -94,6 +94,18 @@ impl fmt::Display for AggregateUDF { } } +pub struct StatisticsArgs<'a> { + pub statistics: &'a Statistics, + pub return_type: &'a DataType, + /// Whether the aggregate function is distinct. + /// + /// ```sql /// SELECT COUNT(DISTINCT column1) FROM t; + /// ``` + pub is_distinct: bool, + /// The physical expression of arguments the aggregate function takes. + pub exprs: &'a [Arc], +} + impl AggregateUDF { /// Create a new AggregateUDF /// @@ -265,12 +277,9 @@ impl AggregateUDF { pub fn value_from_stats( &self, - statistics: &Statistics, - data_type: &DataType, - arguments: &[Arc], + statistics_args: &StatisticsArgs, ) -> Option { - self.inner - .value_from_stats(statistics, &data_type, arguments) + self.inner.value_from_stats(statistics_args) } /// See [`AggregateUDFImpl::default_value`] for more details. @@ -586,12 +595,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { None } // Return the value of the current UDF from the statistics - fn value_from_stats( - &self, - _statistics: &Statistics, - _data_type: &DataType, - _arguments: &[Arc], - ) -> Option { + fn value_from_stats(&self, _statistics_args: &StatisticsArgs) -> Option { None } diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index e62500d7af12..cc245b3572ec 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -48,7 +48,7 @@ use datafusion_expr::{ function::AccumulatorArgs, utils::format_state_name, Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, Signature, Volatility, }; -use datafusion_expr::{Expr, ReversedUDAF, TypeSignature}; +use datafusion_expr::{Expr, ReversedUDAF, StatisticsArgs, TypeSignature}; use datafusion_functions_aggregate_common::aggregate::count_distinct::{ BytesDistinctCountAccumulator, FloatDistinctCountAccumulator, PrimitiveDistinctCountAccumulator, @@ -295,25 +295,26 @@ impl AggregateUDFImpl for Count { Ok(ScalarValue::Int64(Some(0))) } - fn value_from_stats( - &self, - statistics: &datafusion_common::Statistics, - _data_type: &DataType, - arguments: &[Arc], - ) -> Option { - if let Precision::Exact(num_rows) = statistics.num_rows { - if arguments.len() == 1 { + fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option { + if statistics_args.is_distinct { + return None; + } + if let Precision::Exact(num_rows) = statistics_args.statistics.num_rows { + if statistics_args.exprs.len() == 1 { // TODO optimize with exprs other than Column - if let Some(col_expr) = - arguments[0].as_any().downcast_ref::() + if let Some(col_expr) = statistics_args.exprs[0] + .as_any() + .downcast_ref::() { - let current_val = - &statistics.column_statistics[col_expr.index()].null_count; + let current_val = &statistics_args.statistics.column_statistics + [col_expr.index()] + .null_count; if let &Precision::Exact(val) = current_val { return Some(ScalarValue::Int64(Some((num_rows - val) as i64))); } - } else if let Some(lit_expr) = - arguments[0].as_any().downcast_ref::() + } else if let Some(lit_expr) = statistics_args.exprs[0] + .as_any() + .downcast_ref::() { if lit_expr.value() == &COUNT_STAR_EXPANSION { return Some(ScalarValue::Int64(Some(num_rows as i64))); diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 659fca60447a..1ce1abe09ea8 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -15,7 +15,7 @@ // under the License. //! [`Max`] and [`MaxAccumulator`] accumulator for the `max` function -//! [`Min`] and [`MinAccumulator`] accumulator for the `max` function +//! [`Min`] and [`MinAccumulator`] accumulator for the `min` function // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -52,12 +52,10 @@ use arrow_schema::IntervalUnit; use datafusion_common::stats::Precision; use datafusion_common::{ downcast_value, exec_err, internal_err, ColumnStatistics, DataFusionError, Result, - Statistics, }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use datafusion_physical_expr::{expressions, PhysicalExpr}; +use datafusion_physical_expr::expressions; use std::fmt::Debug; -use std::sync::Arc; use arrow::datatypes::i256; use arrow::datatypes::{ @@ -67,10 +65,10 @@ use arrow::datatypes::{ }; use datafusion_common::ScalarValue; -use datafusion_expr::GroupsAccumulator; use datafusion_expr::{ function::AccumulatorArgs, Accumulator, AggregateUDFImpl, Signature, Volatility, }; +use datafusion_expr::{GroupsAccumulator, StatisticsArgs}; use half::f16; use std::ops::Deref; @@ -159,19 +157,18 @@ trait FromColumnStatistics { fn value_from_statistics( &self, - statistics: &Statistics, - data_type: &DataType, - arguments: &[Arc], + statistics_args: &StatisticsArgs, ) -> Option { - if let Precision::Exact(num_rows) = &statistics.num_rows { + if let Precision::Exact(num_rows) = &statistics_args.statistics.num_rows { match *num_rows { - 0 => return ScalarValue::try_from(data_type).ok(), + 0 => return ScalarValue::try_from(statistics_args.return_type).ok(), value if value > 0 => { - let col_stats = &statistics.column_statistics; - if arguments.len() == 1 { + let col_stats = &statistics_args.statistics.column_statistics; + if statistics_args.exprs.len() == 1 { // TODO optimize with exprs other than Column - if let Some(col_expr) = - arguments[0].as_any().downcast_ref::() + if let Some(col_expr) = statistics_args.exprs[0] + .as_any() + .downcast_ref::() { return self.value_from_column_statistics( &col_stats[col_expr.index()], @@ -336,13 +333,8 @@ impl AggregateUDFImpl for Max { fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF { datafusion_expr::ReversedUDAF::Identical } - fn value_from_stats( - &self, - statistics: &Statistics, - data_type: &DataType, - arguments: &[Arc], - ) -> Option { - self.value_from_statistics(statistics, data_type, arguments) + fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option { + self.value_from_statistics(statistics_args) } } @@ -1128,13 +1120,8 @@ impl AggregateUDFImpl for Min { Some(false) } - fn value_from_stats( - &self, - statistics: &Statistics, - data_type: &DataType, - arguments: &[Arc], - ) -> Option { - self.value_from_statistics(statistics, data_type, arguments) + fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option { + self.value_from_statistics(statistics_args) } fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity { datafusion_expr::utils::AggregateOrderSensitivity::Insensitive diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 0fcd5bf2c95c..58b79fbe3905 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -23,12 +23,12 @@ use datafusion_common::scalar::ScalarValue; use datafusion_common::Result; use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::projection::ProjectionExec; -use datafusion_physical_plan::{expressions, ExecutionPlan, Statistics}; +use datafusion_physical_plan::{expressions, ExecutionPlan}; use crate::PhysicalOptimizerRule; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; -use datafusion_physical_plan::udaf::AggregateFunctionExpr; +use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs}; /// Optimizer that uses available statistics for aggregate functions #[derive(Default)] @@ -55,14 +55,19 @@ impl PhysicalOptimizerRule for AggregateStatistics { let stats = partial_agg_exec.input().statistics()?; let mut projections = vec![]; for expr in partial_agg_exec.aggr_expr() { - if let Some((non_null_rows, name)) = - take_optimizable_column_and_table_count(expr, &stats) + let field = expr.field(); + let args = expr.expressions(); + let statistics_args = StatisticsArgs { + statistics: &stats, + return_type: field.data_type(), + is_distinct: expr.is_distinct(), + exprs: args.as_slice(), + }; + if let Some((optimizable_statistic, name)) = + take_optimizable_value_from_statistics(&statistics_args, expr) { - projections.push((expressions::lit(non_null_rows), name.to_owned())); - } else if let Some((min_or_max, name)) = - take_optimizable_value_from_statistics(expr, &stats) - { - projections.push((expressions::lit(min_or_max), name.to_owned())); + projections + .push((expressions::lit(optimizable_statistic), name.to_owned())); } else { // TODO: we need all aggr_expr to be resolved (cf TODO fullres) break; @@ -133,29 +138,11 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option> None } -/// If this agg_expr is a count that can be exactly derived from the statistics, return it. -fn take_optimizable_column_and_table_count( - agg_expr: &AggregateFunctionExpr, - stats: &Statistics, -) -> Option<(ScalarValue, String)> { - if !agg_expr.is_distinct() { - if let Some((val, name)) = take_optimizable_value_from_statistics(agg_expr, stats) - { - return Some((val, name)); - } - } - None -} - /// If this agg_expr is a max that is exactly defined in the statistics, return it. fn take_optimizable_value_from_statistics( + statistics_args: &StatisticsArgs, agg_expr: &AggregateFunctionExpr, - stats: &Statistics, ) -> Option<(ScalarValue, String)> { - let value = agg_expr.fun().value_from_stats( - &stats, - agg_expr.field().data_type(), - agg_expr.expressions().as_slice(), - ); + let value = agg_expr.fun().value_from_stats(statistics_args); value.map(|val| (val, agg_expr.name().to_string())) } diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 026798c5798b..4e610e7a807c 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -82,6 +82,7 @@ pub mod windows; pub mod work_table; pub mod udaf { + pub use datafusion_expr::StatisticsArgs; pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr; } From faf6c45e172c539b136607999cc390053a674a15 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Sat, 28 Sep 2024 09:53:45 -0400 Subject: [PATCH 5/5] Fixed docs --- datafusion/expr/src/udaf.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 926eb2bc0ac6..7e60f96c5b95 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -99,7 +99,8 @@ pub struct StatisticsArgs<'a> { pub return_type: &'a DataType, /// Whether the aggregate function is distinct. /// - /// ```sql /// SELECT COUNT(DISTINCT column1) FROM t; + /// ```sql + /// SELECT COUNT(DISTINCT column1) FROM t; /// ``` pub is_distinct: bool, /// The physical expression of arguments the aggregate function takes.