From 3550758d43c02a8675faa19acd1299bf45404aa3 Mon Sep 17 00:00:00 2001 From: irenjj Date: Sun, 9 Feb 2025 02:08:52 +0800 Subject: [PATCH] chore: remove partition_keys from (Bounded)WindowAggExec (#14526) * chore: remove partition_keys from (Bounded)WindowAggExec * support bounded_window_agg_exec * fix * fix * fix --- datafusion/core/src/physical_planner.rs | 31 ++++------------ .../core/tests/fuzz_cases/window_fuzz.rs | 6 ++-- .../tests/physical_optimizer/test_utils.rs | 4 +-- .../src/enforce_distribution.rs | 4 +-- .../src/enforce_sorting/mod.rs | 8 ++--- .../src/windows/bounded_window_agg_exec.rs | 35 ++++++++++++++----- datafusion/physical-plan/src/windows/mod.rs | 6 ++-- .../src/windows/window_agg_exec.rs | 31 ++++++++++++---- datafusion/proto/src/physical_plan/mod.rs | 8 ++--- .../tests/cases/roundtrip_physical_plan.rs | 10 +++--- 10 files changed, 79 insertions(+), 64 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index edac0fb16381..9fcb9562a485 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -86,7 +86,6 @@ use datafusion_physical_plan::execution_plan::InvariantLevel; use datafusion_physical_plan::memory::MemorySourceConfig; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::unnest::ListUnnest; -use datafusion_sql::utils::window_expr_common_partition_keys; use crate::schema_equivalence::schema_satisfied_by; use async_trait::async_trait; @@ -557,34 +556,13 @@ impl DefaultPhysicalPlanner { return exec_err!("Table '{table_name}' does not exist"); } } - LogicalPlan::Window(Window { - input, window_expr, .. - }) => { + LogicalPlan::Window(Window { window_expr, .. }) => { if window_expr.is_empty() { return internal_err!("Impossibly got empty window expression"); } let input_exec = children.one()?; - // at this moment we are guaranteed by the logical planner - // to have all the window_expr to have equal sort key - let partition_keys = window_expr_common_partition_keys(window_expr)?; - - let can_repartition = !partition_keys.is_empty() - && session_state.config().target_partitions() > 1 - && session_state.config().repartition_window_functions(); - - let physical_partition_keys = if can_repartition { - partition_keys - .iter() - .map(|e| { - self.create_physical_expr(e, input.schema(), session_state) - }) - .collect::>>>()? - } else { - vec![] - }; - let get_sort_keys = |expr: &Expr| match expr { Expr::WindowFunction(WindowFunction { ref partition_by, @@ -626,6 +604,9 @@ impl DefaultPhysicalPlanner { }) .collect::>>()?; + let can_repartition = session_state.config().target_partitions() > 1 + && session_state.config().repartition_window_functions(); + let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory()); // If all window expressions can run with bounded memory, @@ -634,14 +615,14 @@ impl DefaultPhysicalPlanner { Arc::new(BoundedWindowAggExec::try_new( window_expr, input_exec, - physical_partition_keys, InputOrderMode::Sorted, + can_repartition, )?) } else { Arc::new(WindowAggExec::try_new( window_expr, input_exec, - physical_partition_keys, + can_repartition, )?) } } diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 9c66bf2d78f2..4a484221a88a 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -293,8 +293,8 @@ async fn bounded_window_causal_non_causal() -> Result<()> { let running_window_exec = Arc::new(BoundedWindowAggExec::try_new( vec![window_expr], memory_exec.clone(), - vec![], Linear, + false, )?); let task_ctx = ctx.task_ctx(); let collected_results = collect(running_window_exec, task_ctx).await?; @@ -660,7 +660,7 @@ async fn run_window_test( false, )?], exec1, - vec![], + false, )?) as _; let exec2 = Arc::new(DataSourceExec::new(Arc::new( MemorySourceConfig::try_new(&[input1.clone()], schema.clone(), None)? @@ -678,8 +678,8 @@ async fn run_window_test( false, )?], exec2, - vec![], search_mode.clone(), + false, )?) as _; let task_ctx = ctx.task_ctx(); let collected_usual = collect(usual_window_exec, task_ctx.clone()).await?; diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index fb36be3da1b4..721dfca029b0 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -237,8 +237,8 @@ pub fn bounded_window_exec_with_partition( BoundedWindowAggExec::try_new( vec![window_expr], Arc::clone(&input), - vec![], InputOrderMode::Sorted, + false, ) .unwrap(), ) @@ -266,8 +266,8 @@ pub fn bounded_window_exec_non_set_monotonic( ) .unwrap()], Arc::clone(&input), - vec![], InputOrderMode::Sorted, + false, ) .unwrap(), ) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index d0f7b3671e60..5e76edad1f56 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1196,7 +1196,7 @@ pub fn ensure_distribution( if let Some(updated_window) = get_best_fitting_window( exec.window_expr(), exec.input(), - &exec.partition_keys, + &exec.partition_keys(), )? { plan = updated_window; } @@ -1204,7 +1204,7 @@ pub fn ensure_distribution( if let Some(updated_window) = get_best_fitting_window( exec.window_expr(), exec.input(), - &exec.partition_keys, + &exec.partition_keys(), )? { plan = updated_window; } diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 2d23894d6b5e..a25e6c6f17ac 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -460,12 +460,12 @@ fn adjust_window_sort_removal( if let Some(exec) = plan.downcast_ref::() { let window_expr = exec.window_expr(); let new_window = - get_best_fitting_window(window_expr, child_plan, &exec.partition_keys)?; + get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?; (window_expr, new_window) } else if let Some(exec) = plan.downcast_ref::() { let window_expr = exec.window_expr(); let new_window = - get_best_fitting_window(window_expr, child_plan, &exec.partition_keys)?; + get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?; (window_expr, new_window) } else { return plan_err!("Expected WindowAggExec or BoundedWindowAggExec"); @@ -493,14 +493,14 @@ fn adjust_window_sort_removal( Arc::new(BoundedWindowAggExec::try_new( window_expr.to_vec(), child_plan, - window_expr[0].partition_by().to_vec(), InputOrderMode::Sorted, + !window_expr[0].partition_by().is_empty(), )?) as _ } else { Arc::new(WindowAggExec::try_new( window_expr.to_vec(), child_plan, - window_expr[0].partition_by().to_vec(), + !window_expr[0].partition_by().is_empty(), )?) as _ } }; diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 79eaf4447434..a734feae5533 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -78,8 +78,6 @@ pub struct BoundedWindowAggExec { window_expr: Vec>, /// Schema after the window is run schema: SchemaRef, - /// Partition Keys - pub partition_keys: Vec>, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Describes how the input is ordered relative to the partition keys @@ -93,6 +91,8 @@ pub struct BoundedWindowAggExec { ordered_partition_by_indices: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// If `can_rerepartition` is false, partition_keys is always empty. + can_repartition: bool, } impl BoundedWindowAggExec { @@ -100,8 +100,8 @@ impl BoundedWindowAggExec { pub fn try_new( window_expr: Vec>, input: Arc, - partition_keys: Vec>, input_order_mode: InputOrderMode, + can_repartition: bool, ) -> Result { let schema = create_schema(&input.schema(), &window_expr)?; let schema = Arc::new(schema); @@ -128,11 +128,11 @@ impl BoundedWindowAggExec { input, window_expr, schema, - partition_keys, metrics: ExecutionPlanMetricsSet::new(), input_order_mode, ordered_partition_by_indices, cache, + can_repartition, }) } @@ -209,6 +209,23 @@ impl BoundedWindowAggExec { input.boundedness(), ) } + + pub fn partition_keys(&self) -> Vec> { + if !self.can_repartition { + vec![] + } else { + let all_partition_keys = self + .window_expr() + .iter() + .map(|expr| expr.partition_by().to_vec()) + .collect::>(); + + all_partition_keys + .into_iter() + .min_by_key(|s| s.len()) + .unwrap_or_else(Vec::new) + } + } } impl DisplayAs for BoundedWindowAggExec { @@ -269,11 +286,11 @@ impl ExecutionPlan for BoundedWindowAggExec { } fn required_input_distribution(&self) -> Vec { - if self.partition_keys.is_empty() { + if self.partition_keys().is_empty() { debug!("No partition defined for BoundedWindowAggExec!!!"); vec![Distribution::SinglePartition] } else { - vec![Distribution::HashPartitioned(self.partition_keys.clone())] + vec![Distribution::HashPartitioned(self.partition_keys().clone())] } } @@ -288,8 +305,8 @@ impl ExecutionPlan for BoundedWindowAggExec { Ok(Arc::new(BoundedWindowAggExec::try_new( self.window_expr.clone(), Arc::clone(&children[0]), - self.partition_keys.clone(), self.input_order_mode.clone(), + self.can_repartition, )?)) } @@ -1329,8 +1346,8 @@ mod tests { false, )?], input, - partitionby_exprs, input_order_mode, + true, )?)) } @@ -1610,8 +1627,8 @@ mod tests { let physical_plan = BoundedWindowAggExec::try_new( window_exprs, memory_exec, - vec![], InputOrderMode::Sorted, + true, ) .map(|e| Arc::new(e) as Arc)?; diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 3785230c0e79..cdab1fa5929d 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -425,8 +425,8 @@ pub fn get_best_fitting_window( Ok(Some(Arc::new(BoundedWindowAggExec::try_new( window_expr, Arc::clone(input), - physical_partition_keys.to_vec(), input_order_mode, + !physical_partition_keys.is_empty(), )?) as _)) } else if input_order_mode != InputOrderMode::Sorted { // For `WindowAggExec` to work correctly PARTITION BY columns should be sorted. @@ -438,7 +438,7 @@ pub fn get_best_fitting_window( Ok(Some(Arc::new(WindowAggExec::try_new( window_expr, Arc::clone(input), - physical_partition_keys.to_vec(), + !physical_partition_keys.is_empty(), )?) as _)) } } @@ -663,7 +663,7 @@ mod tests { false, )?], blocking_exec, - vec![], + false, )?); let fut = collect(window_agg_exec, task_ctx); diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index f0c258a02576..d31fd66ca1f1 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -57,8 +57,6 @@ pub struct WindowAggExec { window_expr: Vec>, /// Schema after the window is run schema: SchemaRef, - /// Partition Keys - pub partition_keys: Vec>, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Partition by indices that defines preset for existing ordering @@ -66,6 +64,8 @@ pub struct WindowAggExec { ordered_partition_by_indices: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// If `can_partition` is false, partition_keys is always empty. + can_repartition: bool, } impl WindowAggExec { @@ -73,7 +73,7 @@ impl WindowAggExec { pub fn try_new( window_expr: Vec>, input: Arc, - partition_keys: Vec>, + can_repartition: bool, ) -> Result { let schema = create_schema(&input.schema(), &window_expr)?; let schema = Arc::new(schema); @@ -85,10 +85,10 @@ impl WindowAggExec { input, window_expr, schema, - partition_keys, metrics: ExecutionPlanMetricsSet::new(), ordered_partition_by_indices, cache, + can_repartition, }) } @@ -139,6 +139,23 @@ impl WindowAggExec { input.boundedness(), ) } + + pub fn partition_keys(&self) -> Vec> { + if !self.can_repartition { + vec![] + } else { + let all_partition_keys = self + .window_expr() + .iter() + .map(|expr| expr.partition_by().to_vec()) + .collect::>(); + + all_partition_keys + .into_iter() + .min_by_key(|s| s.len()) + .unwrap_or_else(Vec::new) + } + } } impl DisplayAs for WindowAggExec { @@ -206,10 +223,10 @@ impl ExecutionPlan for WindowAggExec { } fn required_input_distribution(&self) -> Vec { - if self.partition_keys.is_empty() { + if self.partition_keys().is_empty() { vec![Distribution::SinglePartition] } else { - vec![Distribution::HashPartitioned(self.partition_keys.clone())] + vec![Distribution::HashPartitioned(self.partition_keys())] } } @@ -220,7 +237,7 @@ impl ExecutionPlan for WindowAggExec { Ok(Arc::new(WindowAggExec::try_new( self.window_expr.clone(), Arc::clone(&children[0]), - self.partition_keys.clone(), + true, )?)) } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 6fa28e882ed6..84b952965958 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -404,14 +404,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { Ok(Arc::new(BoundedWindowAggExec::try_new( physical_window_expr, input, - partition_keys, input_order_mode, + !partition_keys.is_empty(), )?)) } else { Ok(Arc::new(WindowAggExec::try_new( physical_window_expr, input, - partition_keys, + !partition_keys.is_empty(), )?)) } } @@ -1921,7 +1921,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .collect::>>()?; let partition_keys = exec - .partition_keys + .partition_keys() .iter() .map(|e| serialize_physical_expr(e, extension_codec)) .collect::>>()?; @@ -1951,7 +1951,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .collect::>>()?; let partition_keys = exec - .partition_keys + .partition_keys() .iter() .map(|e| serialize_physical_expr(e, extension_codec)) .collect::>>()?; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index fdd529cfd1b9..7418184fcac1 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -305,8 +305,8 @@ fn roundtrip_udwf() -> Result<()> { roundtrip_test(Arc::new(BoundedWindowAggExec::try_new( vec![udwf_expr], input, - vec![col("a", &schema)?], InputOrderMode::Sorted, + true, )?)) } @@ -382,7 +382,7 @@ fn roundtrip_window() -> Result<()> { roundtrip_test(Arc::new(WindowAggExec::try_new( vec![plain_aggr_window_expr, sliding_aggr_window_expr, udwf_expr], input, - vec![col("b", &schema)?], + false, )?)) } @@ -1108,7 +1108,7 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { Arc::new(WindowFrame::new(None)), ))], filter, - vec![col("author", &schema)?], + true, )?); let aggregate = Arc::new(AggregateExec::try_new( @@ -1163,8 +1163,8 @@ fn roundtrip_udwf_extension_codec() -> Result<()> { let window = Arc::new(BoundedWindowAggExec::try_new( vec![udwf_expr], input, - vec![col("b", &schema)?], InputOrderMode::Sorted, + true, )?); let ctx = SessionContext::new(); @@ -1216,7 +1216,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { Arc::new(WindowFrame::new(None)), ))], filter, - vec![col("author", &schema)?], + true, )?); let aggr_expr = AggregateExprBuilder::new(udaf, aggr_args.clone())