Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Sep 30, 2024
1 parent 012a050 commit 9c00e49
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 24 deletions.
5 changes: 4 additions & 1 deletion crates/polars-stream/src/nodes/with_row_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ impl ComputeNode for WithRowIndexNode {
while let Ok(morsel) = recv.recv().await {
let morsel = morsel.try_map(|df| {
let out = df.with_row_index(self.name.clone(), Some(self.offset));
self.offset = self.offset.checked_add(df.len().try_into().unwrap()).unwrap();
self.offset = self
.offset
.checked_add(df.len().try_into().unwrap())
.unwrap();
out
})?;
if send.send(morsel).await.is_err() {
Expand Down
10 changes: 4 additions & 6 deletions crates/polars-stream/src/physical_plan/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ fn visualize_plan_rec(
input,
name,
offset,
} => {
(
format!("with-row-index\\nname: {name}\\noffset: {offset:?}"),
from_ref(input),
)
},
} => (
format!("with-row-index\\nname: {name}\\noffset: {offset:?}"),
from_ref(input),
),
PhysNodeKind::InputIndependentSelect { selectors } => (
format!(
"input-independent-select\\n{}",
Expand Down
20 changes: 11 additions & 9 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,18 @@ pub fn lower_ir(
schema_cache,
expr_cache,
)?;

match function {
FunctionIR::RowIndex { name, offset, schema: _ } => {
PhysNodeKind::WithRowIndex {
input: phys_input,
name,
offset
}
FunctionIR::RowIndex {
name,
offset,
schema: _,
} => PhysNodeKind::WithRowIndex {
input: phys_input,
name,
offset,
},

function if function.is_streamable() => {
let map = Arc::new(move |df| function.evaluate(df));
PhysNodeKind::Map {
Expand All @@ -277,7 +279,7 @@ pub fn lower_ir(
input: phys_input,
map,
}
}
},
}
},

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use polars_core::frame::DataFrame;
use polars_core::prelude::{InitHashMaps, PlHashMap, SortMultipleOptions, IdxSize};
use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions};
use polars_core::schema::{Schema, SchemaRef};
use polars_error::PolarsResult;
use polars_plan::plans::hive::HivePartitions;
Expand Down Expand Up @@ -57,7 +57,7 @@ pub enum PhysNodeKind {
selectors: Vec<ExprIR>,
extend_original: bool,
},

WithRowIndex {
input: PhysNodeKey,
name: PlSmallStr,
Expand Down
14 changes: 8 additions & 6 deletions crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,19 @@ fn to_graph_rec<'a>(
[input_key],
)
},

WithRowIndex { input, name, offset } => {

WithRowIndex {
input,
name,
offset,
} => {
let input_key = to_graph_rec(*input, ctx)?;
ctx.graph.add_node(
nodes::with_row_index::WithRowIndexNode::new(
name.clone(), offset.clone()
),
nodes::with_row_index::WithRowIndexNode::new(name.clone(), *offset),
[input_key],
)
},

InputIndependentSelect { selectors } => {
let phys_selectors = selectors
.iter()
Expand Down

0 comments on commit 9c00e49

Please sign in to comment.