Skip to content

Commit

Permalink
chore: remove partition_keys from (Bounded)WindowAggExec (#14526)
Browse files Browse the repository at this point in the history
* chore: remove partition_keys from (Bounded)WindowAggExec

* support bounded_window_agg_exec

* fix

* fix

* fix
  • Loading branch information
irenjj authored Feb 8, 2025
1 parent 94d2baf commit 3550758
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 64 deletions.
31 changes: 6 additions & 25 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::memory::MemorySourceConfig;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;

use crate::schema_equivalence::schema_satisfied_by;
use async_trait::async_trait;
Expand Down Expand Up @@ -557,34 +556,13 @@ impl DefaultPhysicalPlanner {
return exec_err!("Table '{table_name}' does not exist");
}
}
LogicalPlan::Window(Window {
input, window_expr, ..
}) => {
LogicalPlan::Window(Window { window_expr, .. }) => {
if window_expr.is_empty() {
return internal_err!("Impossibly got empty window expression");
}

let input_exec = children.one()?;

// at this moment we are guaranteed by the logical planner
// to have all the window_expr to have equal sort key
let partition_keys = window_expr_common_partition_keys(window_expr)?;

let can_repartition = !partition_keys.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_window_functions();

let physical_partition_keys = if can_repartition {
partition_keys
.iter()
.map(|e| {
self.create_physical_expr(e, input.schema(), session_state)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
} else {
vec![]
};

let get_sort_keys = |expr: &Expr| match expr {
Expr::WindowFunction(WindowFunction {
ref partition_by,
Expand Down Expand Up @@ -626,6 +604,9 @@ impl DefaultPhysicalPlanner {
})
.collect::<Result<Vec<_>>>()?;

let can_repartition = session_state.config().target_partitions() > 1
&& session_state.config().repartition_window_functions();

let uses_bounded_memory =
window_expr.iter().all(|e| e.uses_bounded_memory());
// If all window expressions can run with bounded memory,
Expand All @@ -634,14 +615,14 @@ impl DefaultPhysicalPlanner {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input_exec,
physical_partition_keys,
InputOrderMode::Sorted,
can_repartition,
)?)
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_partition_keys,
can_repartition,
)?)
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
let running_window_exec = Arc::new(BoundedWindowAggExec::try_new(
vec![window_expr],
memory_exec.clone(),
vec![],
Linear,
false,
)?);
let task_ctx = ctx.task_ctx();
let collected_results = collect(running_window_exec, task_ctx).await?;
Expand Down Expand Up @@ -660,7 +660,7 @@ async fn run_window_test(
false,
)?],
exec1,
vec![],
false,
)?) as _;
let exec2 = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[input1.clone()], schema.clone(), None)?
Expand All @@ -678,8 +678,8 @@ async fn run_window_test(
false,
)?],
exec2,
vec![],
search_mode.clone(),
false,
)?) as _;
let task_ctx = ctx.task_ctx();
let collected_usual = collect(usual_window_exec, task_ctx.clone()).await?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ pub fn bounded_window_exec_with_partition(
BoundedWindowAggExec::try_new(
vec![window_expr],
Arc::clone(&input),
vec![],
InputOrderMode::Sorted,
false,
)
.unwrap(),
)
Expand Down Expand Up @@ -266,8 +266,8 @@ pub fn bounded_window_exec_non_set_monotonic(
)
.unwrap()],
Arc::clone(&input),
vec![],
InputOrderMode::Sorted,
false,
)
.unwrap(),
)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1196,15 +1196,15 @@ pub fn ensure_distribution(
if let Some(updated_window) = get_best_fitting_window(
exec.window_expr(),
exec.input(),
&exec.partition_keys,
&exec.partition_keys(),
)? {
plan = updated_window;
}
} else if let Some(exec) = plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
if let Some(updated_window) = get_best_fitting_window(
exec.window_expr(),
exec.input(),
&exec.partition_keys,
&exec.partition_keys(),
)? {
plan = updated_window;
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,12 @@ fn adjust_window_sort_removal(
if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
let window_expr = exec.window_expr();
let new_window =
get_best_fitting_window(window_expr, child_plan, &exec.partition_keys)?;
get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?;
(window_expr, new_window)
} else if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
let window_expr = exec.window_expr();
let new_window =
get_best_fitting_window(window_expr, child_plan, &exec.partition_keys)?;
get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?;
(window_expr, new_window)
} else {
return plan_err!("Expected WindowAggExec or BoundedWindowAggExec");
Expand Down Expand Up @@ -493,14 +493,14 @@ fn adjust_window_sort_removal(
Arc::new(BoundedWindowAggExec::try_new(
window_expr.to_vec(),
child_plan,
window_expr[0].partition_by().to_vec(),
InputOrderMode::Sorted,
!window_expr[0].partition_by().is_empty(),
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr.to_vec(),
child_plan,
window_expr[0].partition_by().to_vec(),
!window_expr[0].partition_by().is_empty(),
)?) as _
}
};
Expand Down
35 changes: 26 additions & 9 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ pub struct BoundedWindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Describes how the input is ordered relative to the partition keys
Expand All @@ -93,15 +91,17 @@ pub struct BoundedWindowAggExec {
ordered_partition_by_indices: Vec<usize>,
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
/// If `can_rerepartition` is false, partition_keys is always empty.
can_repartition: bool,
}

impl BoundedWindowAggExec {
/// Create a new execution plan for window aggregates
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
input_order_mode: InputOrderMode,
can_repartition: bool,
) -> Result<Self> {
let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);
Expand All @@ -128,11 +128,11 @@ impl BoundedWindowAggExec {
input,
window_expr,
schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
input_order_mode,
ordered_partition_by_indices,
cache,
can_repartition,
})
}

Expand Down Expand Up @@ -209,6 +209,23 @@ impl BoundedWindowAggExec {
input.boundedness(),
)
}

pub fn partition_keys(&self) -> Vec<Arc<dyn PhysicalExpr>> {
if !self.can_repartition {
vec![]
} else {
let all_partition_keys = self
.window_expr()
.iter()
.map(|expr| expr.partition_by().to_vec())
.collect::<Vec<_>>();

all_partition_keys
.into_iter()
.min_by_key(|s| s.len())
.unwrap_or_else(Vec::new)
}
}
}

impl DisplayAs for BoundedWindowAggExec {
Expand Down Expand Up @@ -269,11 +286,11 @@ impl ExecutionPlan for BoundedWindowAggExec {
}

fn required_input_distribution(&self) -> Vec<Distribution> {
if self.partition_keys.is_empty() {
if self.partition_keys().is_empty() {
debug!("No partition defined for BoundedWindowAggExec!!!");
vec![Distribution::SinglePartition]
} else {
vec![Distribution::HashPartitioned(self.partition_keys.clone())]
vec![Distribution::HashPartitioned(self.partition_keys().clone())]
}
}

Expand All @@ -288,8 +305,8 @@ impl ExecutionPlan for BoundedWindowAggExec {
Ok(Arc::new(BoundedWindowAggExec::try_new(
self.window_expr.clone(),
Arc::clone(&children[0]),
self.partition_keys.clone(),
self.input_order_mode.clone(),
self.can_repartition,
)?))
}

Expand Down Expand Up @@ -1329,8 +1346,8 @@ mod tests {
false,
)?],
input,
partitionby_exprs,
input_order_mode,
true,
)?))
}

Expand Down Expand Up @@ -1610,8 +1627,8 @@ mod tests {
let physical_plan = BoundedWindowAggExec::try_new(
window_exprs,
memory_exec,
vec![],
InputOrderMode::Sorted,
true,
)
.map(|e| Arc::new(e) as Arc<dyn ExecutionPlan>)?;

Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,8 @@ pub fn get_best_fitting_window(
Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
window_expr,
Arc::clone(input),
physical_partition_keys.to_vec(),
input_order_mode,
!physical_partition_keys.is_empty(),
)?) as _))
} else if input_order_mode != InputOrderMode::Sorted {
// For `WindowAggExec` to work correctly PARTITION BY columns should be sorted.
Expand All @@ -438,7 +438,7 @@ pub fn get_best_fitting_window(
Ok(Some(Arc::new(WindowAggExec::try_new(
window_expr,
Arc::clone(input),
physical_partition_keys.to_vec(),
!physical_partition_keys.is_empty(),
)?) as _))
}
}
Expand Down Expand Up @@ -663,7 +663,7 @@ mod tests {
false,
)?],
blocking_exec,
vec![],
false,
)?);

let fut = collect(window_agg_exec, task_ctx);
Expand Down
31 changes: 24 additions & 7 deletions datafusion/physical-plan/src/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,23 @@ pub struct WindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Partition by indices that defines preset for existing ordering
// see `get_ordered_partition_by_indices` for more details.
ordered_partition_by_indices: Vec<usize>,
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
/// If `can_partition` is false, partition_keys is always empty.
can_repartition: bool,
}

impl WindowAggExec {
/// Create a new execution plan for window aggregates
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
can_repartition: bool,
) -> Result<Self> {
let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);
Expand All @@ -85,10 +85,10 @@ impl WindowAggExec {
input,
window_expr,
schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
ordered_partition_by_indices,
cache,
can_repartition,
})
}

Expand Down Expand Up @@ -139,6 +139,23 @@ impl WindowAggExec {
input.boundedness(),
)
}

pub fn partition_keys(&self) -> Vec<Arc<dyn PhysicalExpr>> {
if !self.can_repartition {
vec![]
} else {
let all_partition_keys = self
.window_expr()
.iter()
.map(|expr| expr.partition_by().to_vec())
.collect::<Vec<_>>();

all_partition_keys
.into_iter()
.min_by_key(|s| s.len())
.unwrap_or_else(Vec::new)
}
}
}

impl DisplayAs for WindowAggExec {
Expand Down Expand Up @@ -206,10 +223,10 @@ impl ExecutionPlan for WindowAggExec {
}

fn required_input_distribution(&self) -> Vec<Distribution> {
if self.partition_keys.is_empty() {
if self.partition_keys().is_empty() {
vec![Distribution::SinglePartition]
} else {
vec![Distribution::HashPartitioned(self.partition_keys.clone())]
vec![Distribution::HashPartitioned(self.partition_keys())]
}
}

Expand All @@ -220,7 +237,7 @@ impl ExecutionPlan for WindowAggExec {
Ok(Arc::new(WindowAggExec::try_new(
self.window_expr.clone(),
Arc::clone(&children[0]),
self.partition_keys.clone(),
true,
)?))
}

Expand Down
Loading

0 comments on commit 3550758

Please sign in to comment.