From 9c00e4945a1be5a9b9c0b689f983a409b71e5606 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Mon, 30 Sep 2024 18:41:53 +0200 Subject: [PATCH] fmt --- .../polars-stream/src/nodes/with_row_index.rs | 5 ++++- crates/polars-stream/src/physical_plan/fmt.rs | 10 ++++------ .../src/physical_plan/lower_ir.rs | 20 ++++++++++--------- crates/polars-stream/src/physical_plan/mod.rs | 4 ++-- .../src/physical_plan/to_graph.rs | 14 +++++++------ 5 files changed, 29 insertions(+), 24 deletions(-) diff --git a/crates/polars-stream/src/nodes/with_row_index.rs b/crates/polars-stream/src/nodes/with_row_index.rs index 653cfe9d16bf..1ca527a83e84 100644 --- a/crates/polars-stream/src/nodes/with_row_index.rs +++ b/crates/polars-stream/src/nodes/with_row_index.rs @@ -45,7 +45,10 @@ impl ComputeNode for WithRowIndexNode { while let Ok(morsel) = recv.recv().await { let morsel = morsel.try_map(|df| { let out = df.with_row_index(self.name.clone(), Some(self.offset)); - self.offset = self.offset.checked_add(df.len().try_into().unwrap()).unwrap(); + self.offset = self + .offset + .checked_add(df.len().try_into().unwrap()) + .unwrap(); out })?; if send.send(morsel).await.is_err() { diff --git a/crates/polars-stream/src/physical_plan/fmt.rs b/crates/polars-stream/src/physical_plan/fmt.rs index b512a8a3d0c1..21dd8e9dd634 100644 --- a/crates/polars-stream/src/physical_plan/fmt.rs +++ b/crates/polars-stream/src/physical_plan/fmt.rs @@ -63,12 +63,10 @@ fn visualize_plan_rec( input, name, offset, - } => { - ( - format!("with-row-index\\nname: {name}\\noffset: {offset:?}"), - from_ref(input), - ) - }, + } => ( + format!("with-row-index\\nname: {name}\\noffset: {offset:?}"), + from_ref(input), + ), PhysNodeKind::InputIndependentSelect { selectors } => ( format!( "input-independent-select\\n{}", diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 15be44eb3eb9..f2b532ca1ca3 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -253,16 +253,18 @@ pub fn lower_ir( schema_cache, expr_cache, )?; - + match function { - FunctionIR::RowIndex { name, offset, schema: _ } => { - PhysNodeKind::WithRowIndex { - input: phys_input, - name, - offset - } + FunctionIR::RowIndex { + name, + offset, + schema: _, + } => PhysNodeKind::WithRowIndex { + input: phys_input, + name, + offset, }, - + function if function.is_streamable() => { let map = Arc::new(move |df| function.evaluate(df)); PhysNodeKind::Map { @@ -277,7 +279,7 @@ pub fn lower_ir( input: phys_input, map, } - } + }, } }, diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index 644232fddb56..eddbc87bda99 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use polars_core::frame::DataFrame; -use polars_core::prelude::{InitHashMaps, PlHashMap, SortMultipleOptions, IdxSize}; +use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions}; use polars_core::schema::{Schema, SchemaRef}; use polars_error::PolarsResult; use polars_plan::plans::hive::HivePartitions; @@ -57,7 +57,7 @@ pub enum PhysNodeKind { selectors: Vec, extend_original: bool, }, - + WithRowIndex { input: PhysNodeKey, name: PlSmallStr, diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 82a46e0f58bd..a4d58847033e 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -130,17 +130,19 @@ fn to_graph_rec<'a>( [input_key], ) }, - - WithRowIndex { input, name, offset } => { + + WithRowIndex { + input, + name, + offset, + } => { let input_key = to_graph_rec(*input, ctx)?; ctx.graph.add_node( - nodes::with_row_index::WithRowIndexNode::new( - name.clone(), offset.clone() - ), + nodes::with_row_index::WithRowIndexNode::new(name.clone(), *offset), [input_key], ) }, - + InputIndependentSelect { selectors } => { let phys_selectors = selectors .iter()