From 2eaee18d00953dd3b51cb8fc336addc76f067be3 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Thu, 30 Jan 2025 10:17:49 +0100 Subject: [PATCH] feat: Allow for more RG skipping by rewriting expr in planner (#20828) --- crates/polars-core/src/frame/column/mod.rs | 7 + crates/polars-core/src/frame/column/scalar.rs | 5 + crates/polars-core/src/scalar/mod.rs | 10 + crates/polars-expr/Cargo.toml | 1 - .../src/expressions/aggregation.rs | 9 - crates/polars-expr/src/expressions/alias.rs | 5 - crates/polars-expr/src/expressions/apply.rs | 44 -- crates/polars-expr/src/expressions/binary.rs | 26 - crates/polars-expr/src/expressions/cast.rs | 4 - crates/polars-expr/src/expressions/column.rs | 49 +- crates/polars-expr/src/expressions/count.rs | 2 - crates/polars-expr/src/expressions/filter.rs | 5 - crates/polars-expr/src/expressions/gather.rs | 5 - crates/polars-expr/src/expressions/literal.rs | 2 - crates/polars-expr/src/expressions/mod.rs | 20 - crates/polars-expr/src/expressions/rolling.rs | 4 - crates/polars-expr/src/expressions/slice.rs | 6 - crates/polars-expr/src/expressions/sort.rs | 4 - crates/polars-expr/src/expressions/sortby.rs | 7 - crates/polars-expr/src/expressions/ternary.rs | 6 - crates/polars-expr/src/expressions/window.rs | 10 - .../polars-io/src/parquet/read/async_impl.rs | 6 +- .../polars-io/src/parquet/read/predicates.rs | 34 +- .../polars-io/src/parquet/read/read_impl.rs | 63 ++- crates/polars-io/src/parquet/read/reader.rs | 12 +- crates/polars-io/src/predicates.rs | 30 +- crates/polars-lazy/Cargo.toml | 3 - crates/polars-lazy/src/frame/mod.rs | 4 +- .../streaming/construct_pipeline.rs | 3 - .../src/executors/multi_file_scan.rs | 129 +++-- .../src/executors/scan/csv.rs | 14 +- .../src/executors/scan/ipc.rs | 10 +- .../src/executors/scan/mod.rs | 7 +- .../src/executors/scan/ndjson.rs | 15 +- .../src/executors/scan/parquet.rs | 30 +- crates/polars-mem-engine/src/lib.rs | 4 +- crates/polars-mem-engine/src/planner/lp.rs | 97 +++- crates/polars-mem-engine/src/predicate.rs | 264 ++++++++++ .../sinks/group_by/aggregates/convert.rs | 4 +- .../src/executors/sources/parquet.rs | 10 +- crates/polars-pipe/src/pipeline/convert.rs | 20 +- crates/polars-plan/Cargo.toml | 2 - .../polars-plan/src/plans/aexpr/evaluate.rs | 31 ++ crates/polars-plan/src/plans/aexpr/mod.rs | 2 + .../polars-plan/src/plans/aexpr/predicates.rs | 454 ++++++++++++++++++ crates/polars-plan/src/plans/lit.rs | 9 + .../src/plans/optimizer/cache_states.rs | 3 +- crates/polars-plan/src/plans/optimizer/mod.rs | 12 +- .../optimizer/projection_pushdown/mod.rs | 137 +++--- crates/polars-plan/src/plans/schema.rs | 2 +- crates/polars-python/src/cloud.rs | 9 +- crates/polars-schema/src/schema.rs | 5 + .../polars-stream/src/nodes/io_sources/ipc.rs | 4 - .../src/nodes/io_sources/parquet/init.rs | 22 +- .../src/nodes/io_sources/parquet/mod.rs | 13 +- .../parquet/row_group_data_fetch.rs | 7 +- .../io_sources/parquet/row_group_decode.rs | 13 +- .../src/physical_plan/to_graph.rs | 75 +-- py-polars/tests/unit/io/test_lazy_parquet.py | 4 +- py-polars/tests/unit/io/test_scan.py | 2 +- .../tests/unit/streaming/test_streaming_io.py | 45 +- 61 files changed, 1288 insertions(+), 553 deletions(-) create mode 100644 crates/polars-mem-engine/src/predicate.rs create mode 100644 crates/polars-plan/src/plans/aexpr/evaluate.rs create mode 100644 crates/polars-plan/src/plans/aexpr/predicates.rs diff --git a/crates/polars-core/src/frame/column/mod.rs b/crates/polars-core/src/frame/column/mod.rs index fd4429ef4770..1df6b50488e2 100644 --- a/crates/polars-core/src/frame/column/mod.rs +++ b/crates/polars-core/src/frame/column/mod.rs @@ -207,6 +207,13 @@ impl Column { _ => None, } } + #[inline] + pub fn as_scalar_column_mut(&mut self) -> Option<&mut ScalarColumn> { + match self { + Column::Scalar(s) => Some(s), + _ => None, + } + } // # Try to Chunked Arrays pub fn try_bool(&self) -> Option<&BooleanChunked> { diff --git a/crates/polars-core/src/frame/column/scalar.rs b/crates/polars-core/src/frame/column/scalar.rs index 3e2940402d6b..f5bd30a25a88 100644 --- a/crates/polars-core/src/frame/column/scalar.rs +++ b/crates/polars-core/src/frame/column/scalar.rs @@ -299,6 +299,11 @@ impl ScalarColumn { self.scalar = map_scalar(std::mem::take(&mut self.scalar)); self.materialized.take(); } + pub fn with_value(&mut self, value: AnyValue<'static>) -> &mut Self { + self.scalar.update(value); + self.materialized.take(); + self + } } impl IntoColumn for ScalarColumn { diff --git a/crates/polars-core/src/scalar/mod.rs b/crates/polars-core/src/scalar/mod.rs index 3ec84eb00729..cd228b2ddad9 100644 --- a/crates/polars-core/src/scalar/mod.rs +++ b/crates/polars-core/src/scalar/mod.rs @@ -30,6 +30,10 @@ impl Scalar { Self { dtype, value } } + pub fn null(dtype: DataType) -> Self { + Self::new(dtype, AnyValue::Null) + } + #[inline(always)] pub fn is_null(&self) -> bool { self.value.is_null() @@ -74,4 +78,10 @@ impl Scalar { pub fn update(&mut self, value: AnyValue<'static>) { self.value = value; } + + #[inline(always)] + pub fn with_value(mut self, value: AnyValue<'static>) -> Self { + self.update(value); + self + } } diff --git a/crates/polars-expr/Cargo.toml b/crates/polars-expr/Cargo.toml index 6a85d0eb8703..bebab330750b 100644 --- a/crates/polars-expr/Cargo.toml +++ b/crates/polars-expr/Cargo.toml @@ -79,4 +79,3 @@ round_series = ["polars-plan/round_series", "polars-ops/round_series"] is_between = ["polars-plan/is_between"] dynamic_group_by = ["polars-plan/dynamic_group_by", "polars-time", "temporal"] propagate_nans = ["polars-plan/propagate_nans", "polars-ops/propagate_nans"] -panic_on_schema = ["polars-plan/panic_on_schema"] diff --git a/crates/polars-expr/src/expressions/aggregation.rs b/crates/polars-expr/src/expressions/aggregation.rs index 49ebd2980a6b..c4e26ae2a340 100644 --- a/crates/polars-expr/src/expressions/aggregation.rs +++ b/crates/polars-expr/src/expressions/aggregation.rs @@ -442,10 +442,6 @@ impl PhysicalExpr for AggregationExpr { } } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.input.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, @@ -745,11 +741,6 @@ impl PhysicalExpr for AggQuantileExpr { )) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.input.collect_live_columns(lv); - self.quantile.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/alias.rs b/crates/polars-expr/src/expressions/alias.rs index 0bbaf13c276f..fc5057ba2adf 100644 --- a/crates/polars-expr/src/expressions/alias.rs +++ b/crates/polars-expr/src/expressions/alias.rs @@ -59,11 +59,6 @@ impl PhysicalExpr for AliasExpr { Ok(ac) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.physical_expr.collect_live_columns(lv); - lv.insert(self.name.clone()); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/apply.rs b/crates/polars-expr/src/expressions/apply.rs index 2a405e5a69f5..13cbb8312003 100644 --- a/crates/polars-expr/src/expressions/apply.rs +++ b/crates/polars-expr/src/expressions/apply.rs @@ -425,12 +425,6 @@ impl PhysicalExpr for ApplyExpr { } } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - for i in &self.inputs { - i.collect_live_columns(lv); - } - } - fn isolate_column_expr( &self, _name: &str, @@ -441,44 +435,6 @@ impl PhysicalExpr for ApplyExpr { None } - fn replace_elementwise_const_columns( - &self, - const_columns: &PlHashMap>, - ) -> Option> { - if self.collect_groups == ApplyOptions::ElementWise { - let mut new_inputs = Vec::new(); - for i in 0..self.inputs.len() { - match self.inputs[i].replace_elementwise_const_columns(const_columns) { - None => continue, - Some(new) => { - new_inputs.reserve(self.inputs.len()); - new_inputs.extend(self.inputs[..i].iter().cloned()); - new_inputs.push(new); - break; - }, - } - } - - // Only copy inputs if it is actually needed - if new_inputs.is_empty() { - return None; - } - - new_inputs.extend(self.inputs[new_inputs.len()..].iter().map(|i| { - match i.replace_elementwise_const_columns(const_columns) { - None => i.clone(), - Some(new) => new, - } - })); - - let mut slf = self.clone(); - slf.inputs = new_inputs; - return Some(Arc::new(slf)); - } - - None - } - fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.expr.to_field(input_schema, Context::Default) } diff --git a/crates/polars-expr/src/expressions/binary.rs b/crates/polars-expr/src/expressions/binary.rs index b3ba90a21447..ce451ebddc69 100644 --- a/crates/polars-expr/src/expressions/binary.rs +++ b/crates/polars-expr/src/expressions/binary.rs @@ -269,11 +269,6 @@ impl PhysicalExpr for BinaryExpr { } } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.left.collect_live_columns(lv); - self.right.collect_live_columns(lv); - } - fn isolate_column_expr( &self, name: &str, @@ -301,27 +296,6 @@ impl PhysicalExpr for BinaryExpr { Some((Arc::new(self.clone()) as _, specialized)) } - fn replace_elementwise_const_columns( - &self, - const_columns: &PlHashMap>, - ) -> Option> { - let rcc_left = self.left.replace_elementwise_const_columns(const_columns); - let rcc_right = self.right.replace_elementwise_const_columns(const_columns); - - if rcc_left.is_some() || rcc_right.is_some() { - let mut slf = self.clone(); - if let Some(left) = rcc_left { - slf.left = left; - } - if let Some(right) = rcc_right { - slf.right = right; - } - return Some(Arc::new(slf)); - } - - None - } - fn to_field(&self, input_schema: &Schema) -> PolarsResult { self.expr.to_field(input_schema, Context::Default) } diff --git a/crates/polars-expr/src/expressions/cast.rs b/crates/polars-expr/src/expressions/cast.rs index c17bcc01aeb7..828117603231 100644 --- a/crates/polars-expr/src/expressions/cast.rs +++ b/crates/polars-expr/src/expressions/cast.rs @@ -87,10 +87,6 @@ impl PhysicalExpr for CastExpr { Ok(ac) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.input.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/column.rs b/crates/polars-expr/src/expressions/column.rs index f5e2f9b52fdd..07deff931ebb 100644 --- a/crates/polars-expr/src/expressions/column.rs +++ b/crates/polars-expr/src/expressions/column.rs @@ -57,26 +57,7 @@ impl ColumnExpr { } } - // this path should not happen - #[cfg(feature = "panic_on_schema")] - { - if _state.ext_contexts.is_empty() - && std::env::var("POLARS_NO_SCHEMA_CHECK").is_err() - { - panic!( - "got {} expected: {} from schema: {:?} and DataFrame: {:?}", - out.name(), - &*self.name, - _schema, - df - ) - } - } - // in release we fallback to linear search - #[allow(unreachable_code)] - { - df.column(&self.name).cloned() - } + df.column(&self.name).cloned() } else { Ok(out.clone()) } @@ -87,17 +68,6 @@ impl ColumnExpr { _state: &ExecutionState, _panic_during_test: bool, ) -> PolarsResult { - #[cfg(feature = "panic_on_schema")] - { - if _panic_during_test - && _state.ext_contexts.is_empty() - && std::env::var("POLARS_NO_SCHEMA_CHECK").is_err() - { - panic!("invalid schema: df {:?};\ncolumn: {}", df, &self.name) - } - } - // in release we fallback to linear search - #[allow(unreachable_code)] df.column(&self.name).cloned() } @@ -179,10 +149,6 @@ impl PhysicalExpr for ColumnExpr { Some(self) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - lv.insert(self.name.clone()); - } - fn isolate_column_expr( &self, _name: &str, @@ -197,19 +163,6 @@ impl PhysicalExpr for ColumnExpr { Some(&self.name) } - fn replace_elementwise_const_columns( - &self, - const_columns: &PlHashMap>, - ) -> Option> { - if let Some(av) = const_columns.get(&self.name) { - let lv = LiteralValue::from(av.clone()); - let le = LiteralExpr::new(lv, self.expr.clone()); - return Some(Arc::new(le)); - } - - None - } - fn to_field(&self, input_schema: &Schema) -> PolarsResult { input_schema.get_field(&self.name).ok_or_else(|| { polars_err!( diff --git a/crates/polars-expr/src/expressions/count.rs b/crates/polars-expr/src/expressions/count.rs index 4f53d067ed72..1a0fae982d9a 100644 --- a/crates/polars-expr/src/expressions/count.rs +++ b/crates/polars-expr/src/expressions/count.rs @@ -36,8 +36,6 @@ impl PhysicalExpr for CountExpr { Ok(AggregationContext::new(c, Cow::Borrowed(groups), true)) } - fn collect_live_columns(&self, _lv: &mut PlIndexSet) {} - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/filter.rs b/crates/polars-expr/src/expressions/filter.rs index 0693f2d70554..a0bad1d5117a 100644 --- a/crates/polars-expr/src/expressions/filter.rs +++ b/crates/polars-expr/src/expressions/filter.rs @@ -147,11 +147,6 @@ impl PhysicalExpr for FilterExpr { } } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.input.collect_live_columns(lv); - self.by.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/gather.rs b/crates/polars-expr/src/expressions/gather.rs index c92dff341010..62d999ad3c02 100644 --- a/crates/polars-expr/src/expressions/gather.rs +++ b/crates/polars-expr/src/expressions/gather.rs @@ -89,11 +89,6 @@ impl PhysicalExpr for GatherExpr { Ok(ac) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.phys_expr.collect_live_columns(lv); - self.idx.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/literal.rs b/crates/polars-expr/src/expressions/literal.rs index 09f08089498d..e22ecbb59562 100644 --- a/crates/polars-expr/src/expressions/literal.rs +++ b/crates/polars-expr/src/expressions/literal.rs @@ -149,8 +149,6 @@ impl PhysicalExpr for LiteralExpr { Some(self) } - fn collect_live_columns(&self, _lv: &mut PlIndexSet) {} - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/mod.rs b/crates/polars-expr/src/expressions/mod.rs index 33c21d09dd1b..48b936365cc1 100644 --- a/crates/polars-expr/src/expressions/mod.rs +++ b/crates/polars-expr/src/expressions/mod.rs @@ -602,10 +602,6 @@ pub trait PhysicalExpr: Send + Sync { None } - /// Get the variables that are used in the expression i.e. live variables. - /// This can contain duplicates. - fn collect_live_columns(&self, lv: &mut PlIndexSet); - fn isolate_column_expr( &self, name: &str, @@ -617,18 +613,6 @@ pub trait PhysicalExpr: Send + Sync { None } - /// Replace columns that are known to be a constant value with their const value. - /// - /// This should not replace values that are calculated non-elementwise e.g. col.max(), - /// col.std(), etc. - fn replace_elementwise_const_columns( - &self, - const_columns: &PlHashMap>, - ) -> Option> { - _ = const_columns; - None - } - /// Can take &dyn Statistics and determine of a file should be /// read -> `true` /// or not -> `false` @@ -669,10 +653,6 @@ impl PhysicalIoExpr for PhysicalIoHelper { .map(|c| c.take_materialized_series()) } - fn collect_live_columns(&self, live_columns: &mut PlIndexSet) { - self.expr.collect_live_columns(live_columns); - } - #[cfg(feature = "parquet")] fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> { self.expr.as_stats_evaluator() diff --git a/crates/polars-expr/src/expressions/rolling.rs b/crates/polars-expr/src/expressions/rolling.rs index 033e4f2c07b6..249144e7f795 100644 --- a/crates/polars-expr/src/expressions/rolling.rs +++ b/crates/polars-expr/src/expressions/rolling.rs @@ -59,10 +59,6 @@ impl PhysicalExpr for RollingExpr { polars_bail!(InvalidOperation: "rolling expression not allowed in aggregation"); } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.phys_function.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/slice.rs b/crates/polars-expr/src/expressions/slice.rs index aa1a8f6295cb..a8120b9794e0 100644 --- a/crates/polars-expr/src/expressions/slice.rs +++ b/crates/polars-expr/src/expressions/slice.rs @@ -269,12 +269,6 @@ impl PhysicalExpr for SliceExpr { Ok(ac) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.input.collect_live_columns(lv); - self.offset.collect_live_columns(lv); - self.length.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/sort.rs b/crates/polars-expr/src/expressions/sort.rs index 6d44187bd0ff..db52d5f34bae 100644 --- a/crates/polars-expr/src/expressions/sort.rs +++ b/crates/polars-expr/src/expressions/sort.rs @@ -105,10 +105,6 @@ impl PhysicalExpr for SortExpr { Ok(ac) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.physical_expr.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/sortby.rs b/crates/polars-expr/src/expressions/sortby.rs index 93078501535f..1359295c5d97 100644 --- a/crates/polars-expr/src/expressions/sortby.rs +++ b/crates/polars-expr/src/expressions/sortby.rs @@ -400,13 +400,6 @@ impl PhysicalExpr for SortByExpr { Ok(ac_in) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.input.collect_live_columns(lv); - for i in &self.by { - i.collect_live_columns(lv); - } - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/ternary.rs b/crates/polars-expr/src/expressions/ternary.rs index bdd20b455b5e..e6f84686606d 100644 --- a/crates/polars-expr/src/expressions/ternary.rs +++ b/crates/polars-expr/src/expressions/ternary.rs @@ -328,12 +328,6 @@ impl PhysicalExpr for TernaryExpr { Some(self) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.predicate.collect_live_columns(lv); - self.truthy.collect_live_columns(lv); - self.falsy.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-expr/src/expressions/window.rs b/crates/polars-expr/src/expressions/window.rs index 106ee0ceb653..645d3cf193a4 100644 --- a/crates/polars-expr/src/expressions/window.rs +++ b/crates/polars-expr/src/expressions/window.rs @@ -650,16 +650,6 @@ impl PhysicalExpr for WindowExpr { false } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - for i in &self.group_by { - i.collect_live_columns(lv); - } - if let Some((i, _)) = &self.order_by { - i.collect_live_columns(lv); - } - self.phys_function.collect_live_columns(lv); - } - fn isolate_column_expr( &self, _name: &str, diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index bf35337a313d..a0fe9962f8d8 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -19,7 +19,7 @@ use crate::cloud::{ }; use crate::parquet::metadata::FileMetadataRef; use crate::pl_async::get_runtime; -use crate::predicates::PhysicalIoExpr; +use crate::predicates::ScanIOPredicate; type DownloadedRowGroup = PlHashMap; type QueuePayload = (usize, DownloadedRowGroup); @@ -231,7 +231,7 @@ impl FetchRowGroupsFromObjectStore { reader: ParquetObjectStore, schema: ArrowSchemaRef, projection: Option<&[usize]>, - predicate: Option>, + predicate: Option, row_group_range: Range, row_groups: &[RowGroupMetadata], ) -> PolarsResult { @@ -244,7 +244,7 @@ impl FetchRowGroupsFromObjectStore { let mut prefetched: PlHashMap = PlHashMap::new(); - let mut row_groups = if let Some(pred) = predicate.as_deref() { + let mut row_groups = if let Some(pred) = predicate.as_ref() { row_group_range .filter_map(|i| { let rg = &row_groups[i]; diff --git a/crates/polars-io/src/parquet/read/predicates.rs b/crates/polars-io/src/parquet/read/predicates.rs index ea3bc7efb3e4..05d854fff917 100644 --- a/crates/polars-io/src/parquet/read/predicates.rs +++ b/crates/polars-io/src/parquet/read/predicates.rs @@ -3,7 +3,7 @@ use polars_core::prelude::*; use polars_parquet::read::statistics::{deserialize, Statistics}; use polars_parquet::read::RowGroupMetadata; -use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr}; +use crate::predicates::{BatchStats, ColumnStats, ScanIOPredicate}; /// Collect the statistics in a row-group pub(crate) fn collect_statistics( @@ -58,7 +58,7 @@ pub(crate) fn collect_statistics( } pub fn read_this_row_group( - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&ScanIOPredicate>, md: &RowGroupMetadata, schema: &ArrowSchema, ) -> PolarsResult { @@ -69,7 +69,35 @@ pub fn read_this_row_group( let mut should_read = true; if let Some(pred) = predicate { - if let Some(pred) = pred.as_stats_evaluator() { + if let Some(pred) = &pred.skip_batch_predicate { + if let Some(stats) = collect_statistics(md, schema)? { + let stats = PlIndexMap::from_iter(stats.column_stats().iter().map(|col| { + ( + col.field_name().clone(), + crate::predicates::ColumnStatistics { + dtype: stats.schema().get(col.field_name()).unwrap().clone(), + min: col + .to_min() + .map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()), + max: col + .to_max() + .map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()), + null_count: col.null_count().map(|nc| nc as IdxSize), + }, + ) + })); + let pred_result = pred.can_skip_batch(md.num_rows() as IdxSize, stats); + + // a parquet file may not have statistics of all columns + match pred_result { + Err(PolarsError::ColumnNotFound(errstr)) => { + return Err(PolarsError::ColumnNotFound(errstr)) + }, + Ok(true) => should_read = false, + _ => {}, + } + } + } else if let Some(pred) = pred.predicate.as_stats_evaluator() { if let Some(stats) = collect_statistics(md, schema)? { let pred_result = pred.should_read(&stats); diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 27a806e47056..0a581e6c2975 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -25,7 +25,7 @@ use crate::hive::{self, materialize_hive_partitions}; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::metadata::FileMetadataRef; use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR; -use crate::predicates::{apply_predicate, ColumnPredicateExpr, PhysicalIoExpr}; +use crate::predicates::{apply_predicate, ColumnPredicateExpr, ScanIOPredicate}; use crate::utils::get_reader_bytes; use crate::utils::slice::split_slice_at_file; use crate::RowIndex; @@ -141,7 +141,7 @@ fn rg_to_dfs( slice: (usize, usize), file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&ScanIOPredicate>, row_index: Option, parallel: ParallelStrategy, projection: &[usize], @@ -172,9 +172,7 @@ fn rg_to_dfs( if parallel == S::Prefiltered { if let Some(predicate) = predicate { - let mut live_columns = PlIndexSet::new(); - predicate.collect_live_columns(&mut live_columns); - if !live_columns.is_empty() { + if !predicate.live_columns.is_empty() { return rg_to_dfs_prefiltered( store, previous_row_count, @@ -182,7 +180,6 @@ fn rg_to_dfs( row_group_end, file_metadata, schema, - live_columns, predicate, row_index, projection, @@ -246,8 +243,7 @@ fn rg_to_dfs_prefiltered( row_group_end: usize, file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - live_columns: PlIndexSet, - predicate: &dyn PhysicalIoExpr, + predicate: &ScanIOPredicate, row_index: Option, projection: &[usize], use_statistics: bool, @@ -274,7 +270,7 @@ fn rg_to_dfs_prefiltered( }; // Get the number of live columns - let num_live_columns = live_columns.len(); + let num_live_columns = predicate.live_columns.len(); let num_dead_columns = projection.len() + hive_partition_columns.map_or(0, |x| x.len()) - num_live_columns; @@ -290,7 +286,7 @@ fn rg_to_dfs_prefiltered( for &i in projection.iter() { let name = schema.get_at_index(i).unwrap().0.as_str(); - if live_columns.contains(name) { + if predicate.live_columns.contains(name) { live_idx_to_col_idx.push(i); } else { dead_idx_to_col_idx.push(i); @@ -298,21 +294,22 @@ fn rg_to_dfs_prefiltered( } let do_parquet_expr = std::env::var("POLARS_PARQUET_EXPR").as_deref() == Ok("1") - && live_columns.len() == 1 // Only do it with one column for now + && predicate.live_columns.len() == 1 // Only do it with one column for now && hive_partition_columns.is_none_or(|hc| { !hc.iter() - .any(|c| c.name().as_str() == live_columns[0].as_str()) + .any(|c| c.name().as_str() == predicate.live_columns[0].as_str()) }) // No hive columns && !schema - .get(live_columns[0].as_str()) + .get(predicate.live_columns[0].as_str()) .unwrap() .dtype() .is_nested(); // No nested columns let column_exprs = do_parquet_expr.then(|| { - live_columns + predicate + .live_columns .iter() .map(|name| { - let (p, specialized) = predicate.isolate_column_expr(name.as_str())?; + let (p, specialized) = predicate.predicate.isolate_column_expr(name.as_str())?; let p = ColumnPredicateExpr::new( name.clone(), @@ -449,8 +446,7 @@ fn rg_to_dfs_prefiltered( hive_partition_columns, md.num_rows(), ); - - let s = predicate.evaluate_io(&df)?; + let s = predicate.predicate.evaluate_io(&df)?; let mask = s.bool().expect("filter predicates was not of type boolean"); // Create without hive columns - the first merge phase does not handle hive partitions. This also saves @@ -643,7 +639,7 @@ fn rg_to_dfs_optionally_par_over_columns( slice: (usize, usize), file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&ScanIOPredicate>, row_index: Option, parallel: ParallelStrategy, projection: &[usize], @@ -670,11 +666,6 @@ fn rg_to_dfs_optionally_par_over_columns( *previous_row_count += rg_slice.1 as IdxSize; continue; } - // test we don't read the parquet file if this env var is set - #[cfg(debug_assertions)] - { - assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) - } let sorting_map = create_sorting_map(md); @@ -723,7 +714,11 @@ fn rg_to_dfs_optionally_par_over_columns( } materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1); - apply_predicate(&mut df, predicate, true)?; + apply_predicate( + &mut df, + predicate.as_ref().map(|p| p.predicate.as_ref()), + true, + )?; *previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(|| polars_err!( @@ -752,7 +747,7 @@ fn rg_to_dfs_par_over_rg( slice: (usize, usize), file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&ScanIOPredicate>, row_index: Option, projection: &[usize], use_statistics: bool, @@ -861,7 +856,11 @@ fn rg_to_dfs_par_over_rg( hive_partition_columns, slice.1, ); - apply_predicate(&mut df, predicate, false)?; + apply_predicate( + &mut df, + predicate.as_ref().map(|p| p.predicate.as_ref()), + false, + )?; Ok(Some(df)) }) @@ -877,7 +876,7 @@ pub fn read_parquet( projection: Option<&[usize]>, reader_schema: &ArrowSchemaRef, metadata: Option, - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&ScanIOPredicate>, mut parallel: ParallelStrategy, row_index: Option, use_statistics: bool, @@ -922,9 +921,7 @@ pub fn read_parquet( let prefilter_env = std::env::var("POLARS_PARQUET_PREFILTER"); let prefilter_env = prefilter_env.as_deref(); - let mut live_columns = PlIndexSet::new(); - predicate.collect_live_columns(&mut live_columns); - let num_live_variables = live_columns.len(); + let num_live_variables = predicate.live_columns.len(); let mut do_prefilter = false; do_prefilter |= prefilter_env == Ok("1"); // Force enable @@ -1088,7 +1085,7 @@ pub struct BatchedParquetReader { projection: Arc<[usize]>, schema: ArrowSchemaRef, metadata: FileMetadataRef, - predicate: Option>, + predicate: Option, row_index: Option, rows_read: IdxSize, row_group_offset: usize, @@ -1111,7 +1108,7 @@ impl BatchedParquetReader { schema: ArrowSchemaRef, slice: (usize, usize), projection: Option>, - predicate: Option>, + predicate: Option, row_index: Option, chunk_size: usize, use_statistics: bool, @@ -1239,7 +1236,7 @@ impl BatchedParquetReader { slice, &metadata, &schema, - predicate.as_deref(), + predicate.as_ref(), row_index, parallel, &projection, diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 1927b34f84a0..72ea8b28cf31 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -20,7 +20,7 @@ use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_pr use crate::cloud::CloudOptions; use crate::mmap::MmapBytesReader; use crate::parquet::metadata::FileMetadataRef; -use crate::predicates::PhysicalIoExpr; +use crate::predicates::ScanIOPredicate; use crate::prelude::*; use crate::RowIndex; @@ -37,7 +37,7 @@ pub struct ParquetReader { row_index: Option, low_memory: bool, metadata: Option, - predicate: Option>, + predicate: Option, hive_partition_columns: Option>, include_file_path: Option<(PlSmallStr, Arc)>, use_statistics: bool, @@ -189,7 +189,7 @@ impl ParquetReader { Ok(self.metadata.as_ref().unwrap()) } - pub fn with_predicate(mut self, predicate: Option>) -> Self { + pub fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } @@ -261,7 +261,7 @@ impl SerReader for ParquetReader { self.projection.as_deref(), &schema, Some(metadata), - self.predicate.as_deref(), + self.predicate.as_ref(), self.parallel, self.row_index, self.use_statistics, @@ -297,7 +297,7 @@ pub struct ParquetAsyncReader { slice: (usize, usize), rechunk: bool, projection: Option>, - predicate: Option>, + predicate: Option, row_index: Option, use_statistics: bool, hive_partition_columns: Option>, @@ -423,7 +423,7 @@ impl ParquetAsyncReader { self } - pub fn with_predicate(mut self, predicate: Option>) -> Self { + pub fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index b348cb988582..940457baf12f 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -11,10 +11,6 @@ pub trait PhysicalIoExpr: Send + Sync { /// as a predicate mask fn evaluate_io(&self, df: &DataFrame) -> PolarsResult; - /// Get the variables that are used in the expression i.e. live variables. - /// This can contain duplicates. - fn collect_live_columns(&self, live_columns: &mut PlIndexSet); - /// Can take &dyn Statistics and determine of a file should be /// read -> `true` /// or not -> `false` @@ -358,6 +354,32 @@ fn use_min_max(dtype: &DataType) -> bool { ) } +pub struct ColumnStatistics { + pub dtype: DataType, + pub min: AnyValue<'static>, + pub max: AnyValue<'static>, + pub null_count: Option, +} + +pub trait SkipBatchPredicate: Send + Sync { + fn can_skip_batch( + &self, + batch_size: IdxSize, + statistics: PlIndexMap, + ) -> PolarsResult; +} + +#[derive(Clone)] +pub struct ScanIOPredicate { + pub predicate: Arc, + + /// Column names that are used in the predicate. + pub live_columns: Arc>, + + /// A predicate that gets given statistics and evaluates whether a batch can be skipped. + pub skip_batch_predicate: Option>, +} + /// A collection of column stats with a known schema. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone)] diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index 8cd613f5222e..94e93c59839f 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -290,11 +290,8 @@ string_encoding = ["polars-plan/string_encoding"] bigidx = ["polars-plan/bigidx", "polars-utils/bigidx"] polars_cloud = ["polars-plan/polars_cloud"] -panic_on_schema = ["polars-plan/panic_on_schema", "polars-expr/panic_on_schema"] - test = [ "polars-plan/debugging", - "panic_on_schema", "rolling_window", "rank", "round_series", diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 456fb9b1f0c4..04d6b301aec5 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -686,7 +686,7 @@ impl LazyFrame { } else { true }; - let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &expr_arena)?; + let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?; let state = ExecutionState::new(); Ok((state, physical_plan, no_file_sink)) @@ -738,7 +738,7 @@ impl LazyFrame { let mut physical_plan = create_physical_plan( alp_plan.lp_top, &mut alp_plan.lp_arena, - &alp_plan.expr_arena, + &mut alp_plan.expr_arena, )?; let mut state = ExecutionState::new(); physical_plan.execute(&mut state) diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index ea8d0f481989..72757a06d1fc 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -26,9 +26,6 @@ impl PhysicalIoExpr for Wrap { }; h.evaluate_io(df) } - fn collect_live_columns(&self, live_columns: &mut PlIndexSet) { - self.0.collect_live_columns(live_columns); - } fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.0.as_stats_evaluator() } diff --git a/crates/polars-mem-engine/src/executors/multi_file_scan.rs b/crates/polars-mem-engine/src/executors/multi_file_scan.rs index 79f8fd24b3a7..208829e35614 100644 --- a/crates/polars-mem-engine/src/executors/multi_file_scan.rs +++ b/crates/polars-mem-engine/src/executors/multi_file_scan.rs @@ -4,7 +4,7 @@ use hive::HivePartitions; use polars_core::config; use polars_core::frame::column::ScalarColumn; use polars_core::utils::accumulate_dataframes_vertical_unchecked; -use polars_io::predicates::BatchStats; +use polars_io::predicates::SkipBatchPredicate; use polars_io::RowIndex; use super::Executor; @@ -17,6 +17,7 @@ use crate::executors::JsonExec; #[cfg(feature = "parquet")] use crate::executors::ParquetExec; use crate::prelude::*; +use crate::ScanPredicate; pub struct PhysicalExprWithConstCols { constants: Vec<(PlSmallStr, Scalar)>, @@ -69,9 +70,6 @@ impl PhysicalExpr for PhysicalExprWithConstCols { self.child.isolate_column_expr(name) } - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.child.collect_live_columns(lv) - } fn is_scalar(&self) -> bool { self.child.is_scalar() } @@ -84,7 +82,8 @@ pub trait ScanExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult; @@ -155,6 +154,9 @@ fn source_to_exec( if allow_missing_columns && !is_first_file { options.schema.take(); } + if !is_first_file { + file_info.row_estimation.0.take(); + } Box::new(CsvExec { sources: source, @@ -214,7 +216,7 @@ pub struct MultiScanExec { sources: ScanSources, file_info: FileInfo, hive_parts: Option>>, - predicate: Option>, + predicate: Option, file_options: FileScanOptions, scan_type: FileScan, } @@ -224,7 +226,7 @@ impl MultiScanExec { sources: ScanSources, file_info: FileInfo, hive_parts: Option>>, - predicate: Option>, + predicate: Option, file_options: FileScanOptions, scan_type: FileScan, ) -> Self { @@ -292,31 +294,13 @@ impl MultiScanExec { // Look through the predicate and assess whether hive columns are being used in it. let mut has_live_hive_columns = false; if let Some(predicate) = &predicate { - let mut live_columns = PlIndexSet::new(); - predicate.collect_live_columns(&mut live_columns); - for hive_column in &hive_column_set { - has_live_hive_columns |= live_columns.contains(hive_column); - } - } - - // Remove the hive columns for each file load. - let mut file_with_columns = self.file_options.with_columns.take(); - if self.hive_parts.is_some() { - if let Some(with_columns) = &self.file_options.with_columns { - file_with_columns = Some( - with_columns - .iter() - .filter(|&c| !hive_column_set.contains(c)) - .cloned() - .collect(), - ); + has_live_hive_columns |= predicate.live_columns.contains(hive_column); } } let allow_missing_columns = self.file_options.allow_missing_columns; self.file_options.allow_missing_columns = false; - let mut row_index = self.file_options.row_index.take(); let slice = self.file_options.slice.take(); let mut first_slice_file = None; @@ -333,21 +317,34 @@ impl MultiScanExec { }), }; - let final_per_source_schema = &self.file_info.schema; - let file_output_schema = if let Some(file_with_columns) = file_with_columns.as_ref() { - let mut schema = final_per_source_schema.try_project(file_with_columns.as_ref())?; + let mut file_with_columns = self.file_options.with_columns.take(); + let mut row_index = self.file_options.row_index.take(); - if let Some(v) = include_file_paths.clone() { - schema.extend([(v, DataType::String)]); - } + let mut final_per_source_schema = Cow::Borrowed(self.file_info.schema.as_ref()); + if let Some(with_columns) = file_with_columns.as_ref() { + final_per_source_schema = Cow::Owned( + final_per_source_schema + .as_ref() + .try_project(with_columns.as_ref()) + .unwrap(), + ); + } - Arc::new(schema) - } else { - final_per_source_schema.clone() - }; + // Remove the hive columns for each file load. + if self.hive_parts.is_some() { + if let Some(with_columns) = &file_with_columns { + file_with_columns = Some( + with_columns + .iter() + .filter(|&c| !hive_column_set.contains(c)) + .cloned() + .collect(), + ); + } + } if slice.is_some_and(|x| x.1 == 0) { - return Ok(DataFrame::empty_with_schema(final_per_source_schema)); + return Ok(DataFrame::empty_with_schema(&final_per_source_schema)); } let mut missing_columns = Vec::new(); @@ -407,43 +404,36 @@ impl MultiScanExec { // used. if has_live_hive_columns { let hive_part = hive_part.unwrap(); - let child = file_predicate.unwrap(); - - file_predicate = Some(Arc::new(PhysicalExprWithConstCols { - constants: hive_column_set - .iter() - .enumerate() - .map(|(idx, column)| { - let series = hive_part.get_statistics().column_stats()[idx] - .to_min() - .unwrap(); - ( - column.clone(), - Scalar::new( - series.dtype().clone(), - series.get(0).unwrap().into_static(), - ), - ) - }) - .collect(), - child, - })); + file_predicate = Some(file_predicate.unwrap().with_constant_columns( + hive_column_set.iter().enumerate().map(|(idx, column)| { + let series = hive_part.get_statistics().column_stats()[idx] + .to_min() + .unwrap(); + ( + column.clone(), + Scalar::new( + series.dtype().clone(), + series.get(0).unwrap().into_static(), + ), + ) + }), + )); } - let stats_evaluator = file_predicate.as_ref().and_then(|p| p.as_stats_evaluator()); - let stats_evaluator = stats_evaluator.filter(|_| use_statistics); - - if let Some(stats_evaluator) = stats_evaluator { - let allow_predicate_skip = !stats_evaluator - .should_read(&BatchStats::default()) - .unwrap_or(true); - if allow_predicate_skip && verbose { + let skip_batch_predicate = file_predicate + .as_ref() + .take_if(|_| use_statistics) + .and_then(|p| p.to_dyn_skip_batch_predicate(self.file_info.schema.as_ref())); + if let Some(skip_batch_predicate) = &skip_batch_predicate { + let can_skip_batch = skip_batch_predicate + .can_skip_batch(exec_source.num_unfiltered_rows()?, PlIndexMap::default())?; + if can_skip_batch && verbose { eprintln!( "File statistics allows skipping of '{}'", source.to_include_path_name() ); } - do_skip_file |= allow_predicate_skip; + do_skip_file |= can_skip_batch; } if do_skip_file { @@ -513,6 +503,7 @@ impl MultiScanExec { current_source_with_columns.into_owned(), slice, file_predicate, + skip_batch_predicate, row_index.clone(), )?; @@ -563,12 +554,12 @@ impl MultiScanExec { } // Project to ensure that all DataFrames have the proper order. - df = df.select(file_output_schema.iter_names().cloned())?; + df = df.select(final_per_source_schema.iter_names().cloned())?; dfs.push(df); } if dfs.is_empty() { - Ok(DataFrame::empty_with_schema(final_per_source_schema)) + Ok(DataFrame::empty_with_schema(&final_per_source_schema)) } else { Ok(accumulate_dataframes_vertical_unchecked(dfs)) } diff --git a/crates/polars-mem-engine/src/executors/scan/csv.rs b/crates/polars-mem-engine/src/executors/scan/csv.rs index 4dc700878592..8410d09da2fb 100644 --- a/crates/polars-mem-engine/src/executors/scan/csv.rs +++ b/crates/polars-mem-engine/src/executors/scan/csv.rs @@ -4,16 +4,18 @@ use polars_core::config; use polars_core::utils::{ accumulate_dataframes_vertical, accumulate_dataframes_vertical_unchecked, }; +use polars_io::predicates::SkipBatchPredicate; use polars_io::utils::compression::maybe_decompress_bytes; use super::*; +use crate::ScanPredicate; pub struct CsvExec { pub sources: ScanSources, pub file_info: FileInfo, pub options: CsvReadOptions, pub file_options: FileScanOptions, - pub predicate: Option>, + pub predicate: Option, } impl CsvExec { @@ -29,7 +31,10 @@ impl CsvExec { assert_eq!(x.0, 0); x.1 })); - let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + let predicate = self + .predicate + .as_ref() + .map(|p| phys_expr_to_io_expr(p.predicate.clone())); let options_base = self .options .clone() @@ -214,7 +219,8 @@ impl ScanExec for CsvExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + _skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult { self.file_options.with_columns = with_columns; @@ -269,7 +275,7 @@ impl ScanExec for CsvExec { fn num_unfiltered_rows(&mut self) -> PolarsResult { let (lb, ub) = self.file_info.row_estimation; - if lb.is_none_or(|lb| lb != ub) { + if lb.is_some_and(|lb| lb == ub) { return Ok(ub as IdxSize); } diff --git a/crates/polars-mem-engine/src/executors/scan/ipc.rs b/crates/polars-mem-engine/src/executors/scan/ipc.rs index 9c305949f060..3468144ea22c 100644 --- a/crates/polars-mem-engine/src/executors/scan/ipc.rs +++ b/crates/polars-mem-engine/src/executors/scan/ipc.rs @@ -4,17 +4,18 @@ use polars_core::utils::accumulate_dataframes_vertical; use polars_error::feature_gated; use polars_io::cloud::CloudOptions; use polars_io::path_utils::is_cloud_url; -use polars_io::predicates::apply_predicate; +use polars_io::predicates::{apply_predicate, SkipBatchPredicate}; use polars_utils::mmap::MemSlice; use polars_utils::open_file; use rayon::prelude::*; use super::*; +use crate::ScanPredicate; pub struct IpcExec { pub(crate) sources: ScanSources, pub(crate) file_info: FileInfo, - pub(crate) predicate: Option>, + pub(crate) predicate: Option, #[allow(dead_code)] pub(crate) options: IpcScanOptions, pub(crate) file_options: FileScanOptions, @@ -148,7 +149,7 @@ impl IpcExec { }; let dfs = if let Some(predicate) = self.predicate.clone() { - let predicate = phys_expr_to_io_expr(predicate); + let predicate = phys_expr_to_io_expr(predicate.predicate); let predicate = Some(predicate.as_ref()); POOL.install(|| { @@ -198,7 +199,8 @@ impl ScanExec for IpcExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + _skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult { self.file_options.with_columns = with_columns; diff --git a/crates/polars-mem-engine/src/executors/scan/mod.rs b/crates/polars-mem-engine/src/executors/scan/mod.rs index ff37e2f588d3..92d966e938e3 100644 --- a/crates/polars-mem-engine/src/executors/scan/mod.rs +++ b/crates/polars-mem-engine/src/executors/scan/mod.rs @@ -29,6 +29,7 @@ use polars_plan::global::_set_n_rows_for_scan; pub(crate) use self::python_scan::*; use super::*; use crate::prelude::*; +use crate::ScanPredicate; /// Producer of an in memory DataFrame pub struct DataFrameExec { @@ -58,7 +59,7 @@ pub(crate) struct AnonymousScanExec { pub(crate) function: Arc, pub(crate) file_options: FileScanOptions, pub(crate) file_info: FileInfo, - pub(crate) predicate: Option>, + pub(crate) predicate: Option, pub(crate) output_schema: Option, pub(crate) predicate_has_windows: bool, } @@ -82,7 +83,7 @@ impl Executor for AnonymousScanExec { match (self.function.allows_predicate_pushdown(), &self.predicate) { (true, Some(predicate)) => state.record( || { - args.predicate = predicate.as_expression().cloned(); + args.predicate = predicate.predicate.as_expression().cloned(); self.function.scan(args) }, "anonymous_scan".into(), @@ -90,7 +91,7 @@ impl Executor for AnonymousScanExec { (false, Some(predicate)) => state.record( || { let mut df = self.function.scan(args)?; - let s = predicate.evaluate(&df, state)?; + let s = predicate.predicate.evaluate(&df, state)?; if self.predicate_has_windows { state.clear_window_expr_cache() } diff --git a/crates/polars-mem-engine/src/executors/scan/ndjson.rs b/crates/polars-mem-engine/src/executors/scan/ndjson.rs index 7cbcb89810a6..87a8ada66f32 100644 --- a/crates/polars-mem-engine/src/executors/scan/ndjson.rs +++ b/crates/polars-mem-engine/src/executors/scan/ndjson.rs @@ -1,16 +1,18 @@ use polars_core::config; use polars_core::utils::accumulate_dataframes_vertical; +use polars_io::predicates::SkipBatchPredicate; use polars_io::prelude::{JsonLineReader, SerReader}; use polars_io::utils::compression::maybe_decompress_bytes; use super::*; +use crate::ScanPredicate; pub struct JsonExec { sources: ScanSources, options: NDJsonReadOptions, file_options: FileScanOptions, file_info: FileInfo, - predicate: Option>, + predicate: Option, } impl JsonExec { @@ -19,7 +21,7 @@ impl JsonExec { options: NDJsonReadOptions, file_options: FileScanOptions, file_info: FileInfo, - predicate: Option>, + predicate: Option, ) -> Self { Self { sources, @@ -93,7 +95,11 @@ impl JsonExec { .with_rechunk(self.file_options.rechunk) .with_chunk_size(Some(self.options.chunk_size)) .with_row_index(row_index) - .with_predicate(self.predicate.clone().map(phys_expr_to_io_expr)) + .with_predicate( + self.predicate + .as_ref() + .map(|p| phys_expr_to_io_expr(p.predicate.clone())), + ) .with_projection(self.file_options.with_columns.clone()) .low_memory(self.options.low_memory) .with_n_rows(n_rows) @@ -133,7 +139,8 @@ impl ScanExec for JsonExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + _skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult { self.file_options.with_columns = with_columns; diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 5a6efdaa608f..89502cf34017 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -6,10 +6,12 @@ use polars_core::utils::accumulate_dataframes_vertical; use polars_error::feature_gated; use polars_io::cloud::CloudOptions; use polars_io::parquet::metadata::FileMetadataRef; +use polars_io::predicates::{ScanIOPredicate, SkipBatchPredicate}; use polars_io::utils::slice::split_slice_at_file; use polars_io::RowIndex; use super::*; +use crate::ScanPredicate; pub struct ParquetExec { sources: ScanSources, @@ -17,7 +19,9 @@ pub struct ParquetExec { hive_parts: Option>>, - predicate: Option>, + predicate: Option, + skip_batch_predicate: Option>, + pub(crate) options: ParquetOptions, #[allow(dead_code)] cloud_options: Option, @@ -32,7 +36,7 @@ impl ParquetExec { sources: ScanSources, file_info: FileInfo, hive_parts: Option>>, - predicate: Option>, + predicate: Option, options: ParquetOptions, cloud_options: Option, file_options: FileScanOptions, @@ -45,6 +49,8 @@ impl ParquetExec { hive_parts, predicate, + skip_batch_predicate: None, + options, cloud_options, file_options, @@ -75,7 +81,10 @@ impl ParquetExec { None } }; - let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + let predicate = self + .predicate + .as_ref() + .map(|p| p.to_io(self.skip_batch_predicate.as_ref(), &self.file_info.schema)); let mut base_row_index = self.file_options.row_index.take(); // (offset, end) @@ -279,7 +288,14 @@ impl ParquetExec { None } }; - let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + let predicate = self.predicate.as_ref().map(|p| ScanIOPredicate { + predicate: phys_expr_to_io_expr(p.predicate.clone()), + live_columns: p.live_columns.clone(), + skip_batch_predicate: self + .skip_batch_predicate + .clone() + .or_else(|| p.to_dyn_skip_batch_predicate(self.file_info.schema.as_ref())), + }); let mut base_row_index = self.file_options.row_index.take(); // Modified if we have a negative slice @@ -487,7 +503,7 @@ impl ParquetExec { .row_index .as_ref() .and_then(|_| self.predicate.take()) - .map(phys_expr_to_io_expr); + .map(|p| phys_expr_to_io_expr(p.predicate)); let is_cloud = self.sources.is_cloud_url(); let force_async = config::force_async(); @@ -560,12 +576,14 @@ impl ScanExec for ParquetExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult { self.file_options.with_columns = with_columns; self.file_options.slice = slice.map(|(o, l)| (o as i64, l)); self.predicate = predicate; + self.skip_batch_predicate = skip_batch_predicate; self.file_options.row_index = row_index; if self.file_info.reader_schema.is_none() { diff --git a/crates/polars-mem-engine/src/lib.rs b/crates/polars-mem-engine/src/lib.rs index 7129cbaee660..8ab1c2e59ff0 100644 --- a/crates/polars-mem-engine/src/lib.rs +++ b/crates/polars-mem-engine/src/lib.rs @@ -1,7 +1,9 @@ mod executors; mod planner; +mod predicate; mod prelude; mod utils; pub use executors::Executor; -pub use planner::create_physical_plan; +pub use planner::{create_physical_plan, create_scan_predicate}; +pub use predicate::ScanPredicate; diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 8303e613eb69..400aa9dc92d6 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -3,10 +3,13 @@ use polars_core::POOL; use polars_expr::state::ExecutionState; use polars_plan::global::_set_n_rows_for_scan; use polars_plan::plans::expr_ir::ExprIR; +use polars_utils::format_pl_smallstr; +use self::predicates::aexpr_to_skip_batch_predicate; use super::super::executors::{self, Executor}; use super::*; use crate::utils::*; +use crate::ScanPredicate; fn partitionable_gb( keys: &[ExprIR], @@ -56,7 +59,7 @@ impl ConversionState { pub fn create_physical_plan( root: Node, lp_arena: &mut Arena, - expr_arena: &Arena, + expr_arena: &mut Arena, ) -> PolarsResult> { let state = ConversionState::new()?; create_physical_plan_impl(root, lp_arena, expr_arena, &state) @@ -65,7 +68,7 @@ pub fn create_physical_plan( fn create_physical_plan_impl( root: Node, lp_arena: &mut Arena, - expr_arena: &Arena, + expr_arena: &mut Arena, state: &ConversionState, ) -> PolarsResult> { use IR::*; @@ -203,22 +206,39 @@ fn create_physical_plan_impl( }; let mut state = ExpressionConversionState::new(true, state.expr_depth); + let do_new_multifile = (sources.len() > 1 || hive_parts.is_some()) + && !matches!(scan_type, FileScan::Anonymous { .. }) + && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1"); + + let mut create_skip_batch_predicate = false; + create_skip_batch_predicate |= do_new_multifile; + #[cfg(feature = "parquet")] + { + create_skip_batch_predicate |= matches!( + scan_type, + FileScan::Parquet { + options: polars_io::prelude::ParquetOptions { + use_statistics: true, + .. + }, + .. + } + ); + } + let predicate = predicate - .map(|pred| { - create_physical_expr( - &pred, - Context::Default, + .map(|predicate| { + create_scan_predicate( + &predicate, expr_arena, output_schema.as_ref().unwrap_or(&file_info.schema), &mut state, + create_skip_batch_predicate, ) }) - .map_or(Ok(None), |v| v.map(Some))?; + .transpose()?; - if sources.len() > 1 - && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1") - && !matches!(scan_type, FileScan::Anonymous { .. }) - { + if do_new_multifile { return Ok(Box::new(executors::MultiScanExec::new( sources, file_info, @@ -626,3 +646,58 @@ fn create_physical_plan_impl( Invalid => unreachable!(), } } + +pub fn create_scan_predicate( + predicate: &ExprIR, + expr_arena: &mut Arena, + schema: &Arc, + state: &mut ExpressionConversionState, + create_skip_batch_predicate: bool, +) -> PolarsResult { + let phys_predicate = + create_physical_expr(predicate, Context::Default, expr_arena, schema, state)?; + let live_columns = Arc::new(PlIndexSet::from_iter(aexpr_to_leaf_names_iter( + predicate.node(), + expr_arena, + ))); + + let mut skip_batch_predicate = None; + + if create_skip_batch_predicate { + if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) { + let expr = ExprIR::new(node, predicate.output_name_inner().clone()); + + if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") { + eprintln!("predicate: {}", predicate.display(expr_arena)); + eprintln!("skip_batch_predicate: {}", expr.display(expr_arena)); + } + + let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len()); + + skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE); + for (col, dtype) in schema.iter() { + if !live_columns.contains(col) { + continue; + } + + skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone()); + skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone()); + skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE); + } + + skip_batch_predicate = Some(create_physical_expr( + &expr, + Context::Default, + expr_arena, + &Arc::new(skip_batch_schema), + state, + )?); + } + } + + PolarsResult::Ok(ScanPredicate { + predicate: phys_predicate, + live_columns, + skip_batch_predicate, + }) +} diff --git a/crates/polars-mem-engine/src/predicate.rs b/crates/polars-mem-engine/src/predicate.rs new file mode 100644 index 000000000000..4df90dacf16a --- /dev/null +++ b/crates/polars-mem-engine/src/predicate.rs @@ -0,0 +1,264 @@ +use std::sync::Arc; + +use polars_core::frame::DataFrame; +use polars_core::prelude::{ + AnyValue, Column, Field, GroupPositions, PlIndexMap, PlIndexSet, IDX_DTYPE, +}; +use polars_core::scalar::Scalar; +use polars_core::schema::{Schema, SchemaRef}; +use polars_error::PolarsResult; +use polars_expr::prelude::{phys_expr_to_io_expr, AggregationContext, PhysicalExpr}; +use polars_expr::state::ExecutionState; +use polars_io::predicates::{ColumnStatistics, ScanIOPredicate, SkipBatchPredicate}; +use polars_utils::pl_str::PlSmallStr; +use polars_utils::{format_pl_smallstr, IdxSize}; + +/// All the expressions and metadata used to filter out rows using predicates. +#[derive(Clone)] +pub struct ScanPredicate { + pub predicate: Arc, + + /// Column names that are used in the predicate. + pub live_columns: Arc>, + + /// A predicate expression used to skip record batches based on its statistics. + /// + /// This expression will be given a batch size along with a `min`, `max` and `null count` for + /// each live column (set to `null` when it is not known) and the expression evaluates to + /// `true` if the whole batch can for sure be skipped. This may be conservative and evaluate to + /// `false` even when the batch could theoretically be skipped. + pub skip_batch_predicate: Option>, +} + +/// Helper to implement [`SkipBatchPredicate`]. +struct SkipBatchPredicateHelper { + skip_batch_predicate: Arc, + live_columns: Arc>, + + /// A cached dataframe that gets used to evaluate all the expressions. + df: DataFrame, +} + +/// Helper for the [`PhysicalExpr`] trait to include constant columns. +pub struct PhysicalExprWithConstCols { + constants: Vec<(PlSmallStr, Scalar)>, + child: Arc, +} + +impl PhysicalExpr for PhysicalExprWithConstCols { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + let mut df = df.clone(); + for (name, scalar) in &self.constants { + df.with_column(Column::new_scalar( + name.clone(), + scalar.clone(), + df.height(), + ))?; + } + + self.child.evaluate(&df, state) + } + + fn evaluate_on_groups<'a>( + &self, + df: &DataFrame, + groups: &'a GroupPositions, + state: &ExecutionState, + ) -> PolarsResult> { + let mut df = df.clone(); + for (name, scalar) in &self.constants { + df.with_column(Column::new_scalar( + name.clone(), + scalar.clone(), + df.height(), + ))?; + } + + self.child.evaluate_on_groups(&df, groups, state) + } + + fn isolate_column_expr( + &self, + _name: &str, + ) -> Option<( + Arc, + Option, + )> { + None + } + + fn to_field(&self, input_schema: &Schema) -> PolarsResult { + self.child.to_field(input_schema) + } + fn is_scalar(&self) -> bool { + self.child.is_scalar() + } +} + +impl ScanPredicate { + pub fn with_constant_columns( + &self, + constant_columns: impl IntoIterator, + ) -> Self { + let constant_columns = constant_columns.into_iter(); + + let mut live_columns = self.live_columns.as_ref().clone(); + let mut skip_batch_predicate_constants = Vec::with_capacity( + self.skip_batch_predicate + .is_some() + .then_some(1 + constant_columns.size_hint().0 * 3) + .unwrap_or_default(), + ); + + let predicate_constants = constant_columns + .filter_map(|(name, scalar): (PlSmallStr, Scalar)| { + if !live_columns.swap_remove(&name) { + return None; + } + + if self.skip_batch_predicate.is_some() { + let mut null_count: Scalar = (0 as IdxSize).into(); + + // If the constant value is Null, we don't know how many nulls there are + // because the length of the batch may vary. + if scalar.is_null() { + null_count.update(AnyValue::Null); + } + + skip_batch_predicate_constants.extend([ + (format_pl_smallstr!("{name}_min"), scalar.clone()), + (format_pl_smallstr!("{name}_max"), scalar.clone()), + (format_pl_smallstr!("{name}_nc"), null_count), + ]); + } + + Some((name, scalar)) + }) + .collect(); + + let predicate = Arc::new(PhysicalExprWithConstCols { + constants: predicate_constants, + child: self.predicate.clone(), + }); + let skip_batch_predicate = self.skip_batch_predicate.as_ref().map(|skp| { + Arc::new(PhysicalExprWithConstCols { + constants: skip_batch_predicate_constants, + child: skp.clone(), + }) as _ + }); + + Self { + predicate, + live_columns: Arc::new(live_columns), + skip_batch_predicate, + } + } + + /// Create a predicate to skip batches using statistics. + pub(crate) fn to_dyn_skip_batch_predicate( + &self, + schema: &Schema, + ) -> Option> { + let skip_batch_predicate = self.skip_batch_predicate.as_ref()?; + + let mut columns = Vec::with_capacity(1 + self.live_columns.len() * 3); + + columns.push(Column::new_scalar( + PlSmallStr::from_static("len"), + Scalar::null(IDX_DTYPE), + 1, + )); + for col in self.live_columns.as_ref() { + let dtype = schema.get(col).unwrap(); + columns.extend([ + Column::new_scalar( + format_pl_smallstr!("{col}_min"), + Scalar::null(dtype.clone()), + 1, + ), + Column::new_scalar( + format_pl_smallstr!("{col}_max"), + Scalar::null(dtype.clone()), + 1, + ), + Column::new_scalar(format_pl_smallstr!("{col}_nc"), Scalar::null(IDX_DTYPE), 1), + ]); + } + + // SAFETY: + // * Each column is length = 1 + // * We have an IndexSet, so each column name is unique + let df = unsafe { DataFrame::new_no_checks(1, columns) }; + + Some(Arc::new(SkipBatchPredicateHelper { + skip_batch_predicate: skip_batch_predicate.clone(), + live_columns: self.live_columns.clone(), + df, + })) + } + + pub fn to_io( + &self, + skip_batch_predicate: Option<&Arc>, + schema: &SchemaRef, + ) -> ScanIOPredicate { + ScanIOPredicate { + predicate: phys_expr_to_io_expr(self.predicate.clone()), + live_columns: self.live_columns.clone(), + skip_batch_predicate: skip_batch_predicate + .cloned() + .or_else(|| self.to_dyn_skip_batch_predicate(schema)), + } + } +} + +impl SkipBatchPredicate for SkipBatchPredicateHelper { + fn can_skip_batch( + &self, + batch_size: IdxSize, + statistics: PlIndexMap, + ) -> PolarsResult { + // This is the DF with all nulls. + let mut df = self.df.clone(); + + // SAFETY: We don't update the dtype, name or length of columns. + let columns = unsafe { df.get_columns_mut() }; + + // Set `len` statistic. + columns[0] + .as_scalar_column_mut() + .unwrap() + .with_value(batch_size.into()); + + for (col, stat) in statistics { + // Skip all statistics of columns that are not used in the predicate. + let Some(idx) = self.live_columns.get_index_of(col.as_str()) else { + continue; + }; + + let nc = stat.null_count.map_or(AnyValue::Null, |nc| nc.into()); + + // Set `min`, `max` and `null_count` statistics. + let col_idx = (idx * 3) + 1; + columns[col_idx] + .as_scalar_column_mut() + .unwrap() + .with_value(stat.min); + columns[col_idx + 1] + .as_scalar_column_mut() + .unwrap() + .with_value(stat.max); + columns[col_idx + 2] + .as_scalar_column_mut() + .unwrap() + .with_value(nc); + } + + Ok(self + .skip_batch_predicate + .evaluate(&df, &Default::default())? + .bool()? + .first() + .unwrap()) + } +} diff --git a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs index 190c6a2549e5..fc11df269277 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use polars_core::datatypes::Field; use polars_core::error::PolarsResult; use polars_core::frame::DataFrame; -use polars_core::prelude::{DataType, PlIndexSet, SchemaRef, Series, IDX_DTYPE}; +use polars_core::prelude::{DataType, SchemaRef, Series, IDX_DTYPE}; use polars_core::schema::Schema; use polars_expr::state::ExecutionState; use polars_io::predicates::PhysicalIoExpr; @@ -31,8 +31,6 @@ impl PhysicalIoExpr for Len { fn evaluate_io(&self, _df: &DataFrame) -> PolarsResult { unimplemented!() } - - fn collect_live_columns(&self, _live_columns: &mut PlIndexSet) {} } impl PhysicalPipedExpr for Len { fn evaluate(&self, chunk: &DataChunk, _lazy_state: &ExecutionState) -> PolarsResult { diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 058563b91b61..0be39c4a876e 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -15,7 +15,7 @@ use polars_io::parquet::metadata::FileMetadataRef; use polars_io::parquet::read::{BatchedParquetReader, ParquetOptions, ParquetReader}; use polars_io::path_utils::is_cloud_url; use polars_io::pl_async::get_runtime; -use polars_io::predicates::PhysicalIoExpr; +use polars_io::predicates::ScanIOPredicate; #[cfg(feature = "async")] use polars_io::prelude::ParquetAsyncReader; use polars_io::utils::slice::split_slice_at_file; @@ -48,7 +48,7 @@ pub struct ParquetSource { prefetch_size: usize, first_schema: Arc, projected_arrow_schema: Option>, - predicate: Option>, + predicate: Option, } impl ParquetSource { @@ -140,7 +140,7 @@ impl ParquetSource { ri.offset += self.processed_rows.load(Ordering::Relaxed) as IdxSize; ri })) - .with_predicate(predicate.clone()) + .with_predicate(predicate) .use_statistics(options.use_statistics) .with_hive_partition_columns(hive_partitions) .with_include_file_path( @@ -209,7 +209,7 @@ impl ParquetSource { self.file_options.allow_missing_columns, ) .await? - .with_predicate(predicate.clone()) + .with_predicate(predicate) .use_statistics(options.use_statistics) .with_hive_partition_columns(hive_partitions) .with_include_file_path( @@ -252,7 +252,7 @@ impl ParquetSource { file_info: FileInfo, hive_parts: Option>>, verbose: bool, - predicate: Option>, + predicate: Option, ) -> PolarsResult { let paths = sources .as_paths() diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index d4bd3eb80c9c..1a0d9d1bd64b 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -4,6 +4,7 @@ use std::rc::Rc; use hashbrown::hash_map::Entry; use polars_core::prelude::*; use polars_core::with_match_physical_integer_polars_type; +use polars_io::predicates::ScanIOPredicate; #[cfg(feature = "parquet")] use polars_io::predicates::{PhysicalIoExpr, StatsEvaluator}; use polars_ops::prelude::JoinType; @@ -41,7 +42,7 @@ where fn get_source( source: IR, operator_objects: &mut Vec>, - expr_arena: &Arena, + expr_arena: &mut Arena, to_physical: &F, push_predicate: bool, verbose: bool, @@ -126,19 +127,18 @@ where self.p.evaluate_io(df) } - fn collect_live_columns( - &self, - live_columns: &mut PlIndexSet, - ) { - self.p.collect_live_columns(live_columns); - } - fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.p.as_stats_evaluator() } } - - PolarsResult::Ok(Arc::new(Wrap { p }) as Arc) + let live_columns = Arc::new(PlIndexSet::from_iter( + aexpr_to_leaf_names_iter(predicate.node(), expr_arena), + )); + PolarsResult::Ok(ScanIOPredicate { + predicate: Arc::new(Wrap { p }) as Arc, + live_columns, + skip_batch_predicate: None, + }) }) .transpose()?; let src = sources::ParquetSource::new( diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index 8f93faeebae8..22c0aaf61a2f 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -194,8 +194,6 @@ bigidx = ["polars-core/bigidx", "polars-utils/bigidx"] polars_cloud = ["serde"] ir_serde = ["serde", "polars-utils/ir_serde"] -panic_on_schema = [] - [package.metadata.docs.rs] features = [ "bitwise", diff --git a/crates/polars-plan/src/plans/aexpr/evaluate.rs b/crates/polars-plan/src/plans/aexpr/evaluate.rs new file mode 100644 index 000000000000..1c9afe190eb8 --- /dev/null +++ b/crates/polars-plan/src/plans/aexpr/evaluate.rs @@ -0,0 +1,31 @@ +use std::borrow::Cow; + +use polars_core::schema::Schema; +use polars_utils::arena::{Arena, Node}; +use polars_utils::pl_str::PlSmallStr; + +use super::{AExpr, LiteralValue}; + +pub fn constant_evaluate<'a>( + e: Node, + expr_arena: &'a Arena, + _schema: &Schema, + _depth: usize, +) -> Option> { + match expr_arena.get(e) { + AExpr::Literal(lv) => Some(Cow::Borrowed(lv)), + _ => None, + } +} + +pub fn into_column<'a>( + e: Node, + expr_arena: &'a Arena, + _schema: &Schema, + _depth: usize, +) -> Option<&'a PlSmallStr> { + match expr_arena.get(e) { + AExpr::Column(c) => Some(c), + _ => None, + } +} diff --git a/crates/polars-plan/src/plans/aexpr/mod.rs b/crates/polars-plan/src/plans/aexpr/mod.rs index ba0636dd8492..cf3509d53d69 100644 --- a/crates/polars-plan/src/plans/aexpr/mod.rs +++ b/crates/polars-plan/src/plans/aexpr/mod.rs @@ -1,5 +1,7 @@ +mod evaluate; #[cfg(feature = "cse")] mod hash; +pub mod predicates; mod scalar; mod schema; mod traverse; diff --git a/crates/polars-plan/src/plans/aexpr/predicates.rs b/crates/polars-plan/src/plans/aexpr/predicates.rs new file mode 100644 index 000000000000..fbddb2cd243a --- /dev/null +++ b/crates/polars-plan/src/plans/aexpr/predicates.rs @@ -0,0 +1,454 @@ +use std::borrow::Cow; + +use polars_core::prelude::AnyValue; +use polars_core::schema::Schema; +use polars_utils::arena::{Arena, Node}; +use polars_utils::format_pl_smallstr; +use polars_utils::pl_str::PlSmallStr; + +use super::evaluate::{constant_evaluate, into_column}; +use super::{AExpr, BooleanFunction, Operator, OutputName}; +use crate::dsl::FunctionExpr; +use crate::plans::{ExprIR, LiteralValue}; +use crate::prelude::FunctionOptions; + +/// Return a new boolean expression determines whether a batch can be skipped based on min, max and +/// null count statistics. +/// +/// This is conversative and may return `None` or `false` when an expression is not yet supported. +/// +/// To evaluate, the expression it is given all the original column appended with `_min` and +/// `_max`. The `min` or `max` cannot be null and when they are null it is assumed they are not +/// known. +pub fn aexpr_to_skip_batch_predicate( + e: Node, + expr_arena: &mut Arena, + schema: &Schema, +) -> Option { + aexpr_to_skip_batch_predicate_rec(e, expr_arena, schema, 0) +} + +#[recursive::recursive] +fn aexpr_to_skip_batch_predicate_rec( + e: Node, + expr_arena: &mut Arena, + schema: &Schema, + depth: usize, +) -> Option { + use Operator as O; + + macro_rules! rec { + ($node:expr) => {{ + aexpr_to_skip_batch_predicate_rec($node, expr_arena, schema, depth + 1) + }}; + } + macro_rules! and { + ($l:expr, $($r:expr),+ $(,)?) => {{ + let node = $l; + $( + let node = expr_arena.add(AExpr::BinaryExpr { + left: node, + op: O::LogicalAnd, + right: $r, + }); + )+ + node + }} + } + macro_rules! or { + ($l:expr, $($r:expr),+ $(,)?) => {{ + let node = $l; + $( + let node = expr_arena.add(AExpr::BinaryExpr { + left: node, + op: O::LogicalOr, + right: $r, + }); + )+ + node + }} + } + macro_rules! binexpr { + ($op:ident, $l:expr, $r:expr) => {{ + expr_arena.add(AExpr::BinaryExpr { + left: $l, + op: O::$op, + right: $r, + }) + }}; + } + macro_rules! lt { + ($l:expr, $r:expr) => {{ + binexpr!(Lt, $l, $r) + }}; + } + macro_rules! gt { + ($l:expr, $r:expr) => {{ + binexpr!(Gt, $l, $r) + }}; + } + macro_rules! le { + ($l:expr, $r:expr) => {{ + binexpr!(LtEq, $l, $r) + }}; + } + macro_rules! ge { + ($l:expr, $r:expr) => {{ + binexpr!(GtEq, $l, $r) + }}; + } + macro_rules! eq_missing { + ($l:expr, $r:expr) => {{ + binexpr!(EqValidity, $l, $r) + }}; + } + macro_rules! all { + ($i:expr) => {{ + expr_arena.add(AExpr::Function { + input: vec![ExprIR::new($i, OutputName::Alias(PlSmallStr::EMPTY))], + function: FunctionExpr::Boolean(BooleanFunction::All { ignore_nulls: true }), + options: FunctionOptions::default(), + }) + }}; + } + macro_rules! is_stat_defined { + ($i:expr, $dtype:expr) => {{ + let mut expr = expr_arena.add(AExpr::Function { + input: vec![ExprIR::new($i, OutputName::Alias(PlSmallStr::EMPTY))], + function: FunctionExpr::Boolean(BooleanFunction::IsNotNull), + options: FunctionOptions::default(), + }); + + if $dtype.is_float() { + let is_not_nan = expr_arena.add(AExpr::Function { + input: vec![ExprIR::new($i, OutputName::Alias(PlSmallStr::EMPTY))], + function: FunctionExpr::Boolean(BooleanFunction::IsNotNan), + options: FunctionOptions::default(), + }); + expr = and!(is_not_nan, expr); + } + + expr + }}; + } + macro_rules! col { + (len) => {{ + col!(PlSmallStr::from_static("len")) + }}; + ($name:expr) => {{ + expr_arena.add(AExpr::Column($name)) + }}; + (min: $name:expr) => {{ + col!(format_pl_smallstr!("{}_min", $name)) + }}; + (max: $name:expr) => {{ + col!(format_pl_smallstr!("{}_max", $name)) + }}; + (null_count: $name:expr) => {{ + col!(format_pl_smallstr!("{}_nc", $name)) + }}; + } + macro_rules! lv { + ($lv:expr) => {{ + expr_arena.add(AExpr::Literal(LiteralValue::OtherScalar(Scalar::from($lv)))) + }}; + (idx: $lv:expr) => {{ + expr_arena.add(AExpr::Literal(LiteralValue::new_idxsize($lv))) + }}; + (bool: $lv:expr) => {{ + expr_arena.add(AExpr::Literal(LiteralValue::Boolean($lv))) + }}; + } + + if let Some(lv) = constant_evaluate(e, expr_arena, schema, 0) { + if let Some(av) = lv.to_any_value() { + return match av { + AnyValue::Null => Some(lv!(bool: false)), + AnyValue::Boolean(b) => Some(lv!(bool: b)), + _ => None, + }; + } + } + + match expr_arena.get(e) { + AExpr::Explode(_) => None, + AExpr::Alias(_, _) => None, + AExpr::Column(_) => None, + AExpr::Literal(_) => None, + AExpr::BinaryExpr { left, op, right } => { + let left = *left; + let right = *right; + + match op { + O::Eq | O::EqValidity => { + let ((col, _), (lv, lv_node)) = + get_binary_expr_col_and_lv(left, right, expr_arena, schema)?; + + let col = col.clone(); + if lv.is_null() { + if matches!(op, O::Eq) { + Some(lv!(bool: true)) + } else { + // col(A) == null --> NULL_COUNT(A) == 0 + let col_nc = col!(null_count: col); + let idx_zero = lv!(idx: 0); + Some(eq_missing!(col_nc, idx_zero)) + } + } else { + // col(A) == B --> min(A) > B || max(A) < B + let col_min = col!(min: col); + let col_max = col!(max: col); + + let dtype = schema.get(&col)?; + let min_is_defined = is_stat_defined!(col_min, dtype); + let max_is_defined = is_stat_defined!(col_max, dtype); + + let min_gt = gt!(col_min, lv_node); + let min_gt = and!(min_is_defined, min_gt); + + let max_lt = lt!(col_max, lv_node); + let max_lt = and!(max_is_defined, max_lt); + + Some(or!(min_gt, max_lt)) + } + }, + O::NotEq | O::NotEqValidity => { + let ((col, _), (lv, lv_node)) = + get_binary_expr_col_and_lv(left, right, expr_arena, schema)?; + + let col = col.clone(); + if lv.is_null() { + if matches!(op, O::NotEq) { + Some(lv!(bool: false)) + } else { + // col(A) != null --> NULL_COUNT(A) == LEN + let col_nc = col!(null_count: col); + let len = col!(len); + Some(eq_missing!(col_nc, len)) + } + } else { + // col(A) != B --> min(A) == B && max(A) == B + let col_min = col!(min: col); + let col_max = col!(max: col); + let min_eq = eq_missing!(col_min, lv_node); + let max_eq = eq_missing!(col_max, lv_node); + + Some(and!(min_eq, max_eq)) + } + }, + O::Lt | O::GtEq => { + let ((col, col_node), (lv, lv_node)) = + get_binary_expr_col_and_lv(left, right, expr_arena, schema)?; + + if lv.is_null() { + return Some(lv!(bool: true)); + } + + let is_col_less_than_lv = matches!(op, O::Lt) == (col_node == left); + + let col = col.clone(); + let dtype = schema.get(&col)?; + if is_col_less_than_lv { + // col(A) < B --> min(A) >= B + let col_min = col!(min: col); + let min_is_defined = is_stat_defined!(col_min, dtype); + let min_ge = ge!(col_min, lv_node); + Some(and!(min_is_defined, min_ge)) + } else { + // col(A) >= B --> max(A) < B + let col_max = col!(max: col); + let max_is_defined = is_stat_defined!(col_max, dtype); + let max_lt = lt!(col_max, lv_node); + Some(and!(max_is_defined, max_lt)) + } + }, + O::Gt | O::LtEq => { + let ((col, col_node), (lv, lv_node)) = + get_binary_expr_col_and_lv(left, right, expr_arena, schema)?; + + if lv.is_null() { + return Some(lv!(bool: true)); + } + + let is_col_greater_than_lv = matches!(op, O::Gt) == (col_node == left); + + let col = col.clone(); + let dtype = schema.get(&col)?; + if is_col_greater_than_lv { + // col(A) > B --> max(A) <= B + let col_max = col!(max: col); + let max_is_defined = is_stat_defined!(col_max, dtype); + let max_le = le!(col_max, lv_node); + Some(and!(max_is_defined, max_le)) + } else { + // col(A) <= B --> min(A) > B + let col_min = col!(min: col); + let min_is_defined = is_stat_defined!(col_min, dtype); + let min_gt = gt!(col_min, lv_node); + Some(and!(min_is_defined, min_gt)) + } + }, + + O::And | O::LogicalAnd => match (rec!(left), rec!(right)) { + (Some(left), Some(right)) => Some(or!(left, right)), + (Some(n), None) | (None, Some(n)) => Some(n), + (None, None) => None, + }, + O::Or | O::LogicalOr => { + let left = rec!(left)?; + let right = rec!(right)?; + Some(and!(left, right)) + }, + + O::Plus + | O::Minus + | O::Multiply + | O::Divide + | O::TrueDivide + | O::FloorDivide + | O::Modulus + | O::Xor => None, + } + }, + AExpr::Cast { .. } => None, + AExpr::Sort { .. } => None, + AExpr::Gather { .. } => None, + AExpr::SortBy { .. } => None, + AExpr::Filter { .. } => None, + AExpr::Agg(..) => None, + AExpr::Ternary { .. } => None, + AExpr::AnonymousFunction { .. } => None, + AExpr::Function { + input, function, .. + } => match function { + FunctionExpr::Boolean(f) => match f { + #[cfg(feature = "is_in")] + BooleanFunction::IsIn => { + let lv_node = input[1].node(); + match ( + into_column(input[0].node(), expr_arena, schema, 0), + constant_evaluate(lv_node, expr_arena, schema, 0), + ) { + (Some(col), Some(lv)) => match lv.as_ref() { + // col(A).is_in([B1, ..., Bn]) -> + // all(min(A) > [B1, ..., Bn)]) + // || all(max(A) < [B1, ..., Bn)]) + LiteralValue::Series(s) => { + let col = col.clone(); + let dtype = schema.get(&col)?; + let has_nulls = s.has_nulls(); + + let col_min = col!(min: col); + let col_max = col!(max: col); + + let min_is_defined = is_stat_defined!(col_min, dtype); + let max_is_defined = is_stat_defined!(col_max, dtype); + + let min_gt = gt!(col_min, lv_node); + let min_gt = all!(min_gt); + let min_gt = and!(min_is_defined, min_gt); + + let max_lt = lt!(col_max, lv_node); + let max_lt = all!(max_lt); + let max_lt = and!(max_is_defined, max_lt); + + let mut expr = or!(min_gt, max_lt); + + if has_nulls { + let col_nc = col!(null_count: col); + let idx_zero = lv!(idx: 0); + let has_no_nulls = eq_missing!(col_nc, idx_zero); + + expr = and!(has_no_nulls, expr); + } + + Some(expr) + }, + _ => None, + }, + _ => None, + } + }, + BooleanFunction::IsNull => { + let col = into_column(input[0].node(), expr_arena, schema, 0)?; + + // col(A).is_null() --> NULL_COUNT(A) == 0 + let col_nc = col!(null_count: col); + let idx_zero = lv!(idx: 0); + Some(eq_missing!(col_nc, idx_zero)) + }, + BooleanFunction::IsNotNull => { + let col = into_column(input[0].node(), expr_arena, schema, 0)?; + + // col(A).is_not_null() --> NULL_COUNT(A) == LEN + let col_nc = col!(null_count: col); + let len = col!(len); + Some(eq_missing!(col_nc, len)) + }, + #[cfg(feature = "is_between")] + BooleanFunction::IsBetween { closed } => { + let col = into_column(input[0].node(), expr_arena, schema, 0)?; + + let left_node = input[1].node(); + let right_node = input[2].node(); + + let left = constant_evaluate(left_node, expr_arena, schema, 0)?; + let right = constant_evaluate(right_node, expr_arena, schema, 0)?; + + if left.is_null() || right.is_null() { + return None; + } + + let col = col.clone(); + let closed = *closed; + let dtype = schema.get(&col)?; + + let col_min = col!(min: col); + let col_max = col!(max: col); + + use polars_ops::series::ClosedInterval; + let (left, right) = match closed { + ClosedInterval::Both => (lt!(col_max, left_node), gt!(col_min, right_node)), + ClosedInterval::Left => (lt!(col_max, left_node), ge!(col_min, right_node)), + ClosedInterval::Right => { + (le!(col_max, left_node), gt!(col_min, right_node)) + }, + ClosedInterval::None => (le!(col_max, left_node), ge!(col_min, right_node)), + }; + + let min_is_defined = is_stat_defined!(col_min, dtype); + let max_is_defined = is_stat_defined!(col_max, dtype); + + let left = and!(max_is_defined, left); + let right = and!(min_is_defined, right); + + Some(or!(left, right)) + }, + _ => None, + }, + _ => None, + }, + AExpr::Window { .. } => None, + AExpr::Slice { .. } => None, + AExpr::Len => None, + } +} + +#[allow(clippy::type_complexity)] +fn get_binary_expr_col_and_lv<'a>( + left: Node, + right: Node, + expr_arena: &'a Arena, + schema: &Schema, +) -> Option<((&'a PlSmallStr, Node), (Cow<'a, LiteralValue>, Node))> { + match ( + into_column(left, expr_arena, schema, 0), + into_column(right, expr_arena, schema, 0), + constant_evaluate(left, expr_arena, schema, 0), + constant_evaluate(right, expr_arena, schema, 0), + ) { + (Some(col), _, _, Some(lv)) => Some(((col, left), (lv, right))), + (_, Some(col), Some(lv), _) => Some(((col, right), (lv, left))), + _ => None, + } +} diff --git a/crates/polars-plan/src/plans/lit.rs b/crates/polars-plan/src/plans/lit.rs index 56fabd627b76..e3a079977fc3 100644 --- a/crates/polars-plan/src/plans/lit.rs +++ b/crates/polars-plan/src/plans/lit.rs @@ -233,6 +233,15 @@ impl LiteralValue { LiteralValue::UInt32(value) } } + + pub fn is_null(&self) -> bool { + match self { + Self::Null => true, + Self::OtherScalar(sc) => sc.is_null(), + Self::Series(s) => s.len() == 1 && s.null_count() == 1, + _ => false, + } + } } pub trait Literal { diff --git a/crates/polars-plan/src/plans/optimizer/cache_states.rs b/crates/polars-plan/src/plans/optimizer/cache_states.rs index 0162cdfc0645..52075cc95dae 100644 --- a/crates/polars-plan/src/plans/optimizer/cache_states.rs +++ b/crates/polars-plan/src/plans/optimizer/cache_states.rs @@ -123,6 +123,7 @@ pub(super) fn set_cache_states( scratch: &mut Vec, expr_eval: ExprEval<'_>, verbose: bool, + new_streaming: bool, ) -> PolarsResult<()> { let mut stack = Vec::with_capacity(4); let mut names_scratch = vec![]; @@ -289,7 +290,7 @@ pub(super) fn set_cache_states( // and finally remove that last projection and stitch the subplan // back to the cache node again if !cache_schema_and_children.is_empty() { - let mut proj_pd = ProjectionPushDown::new(); + let mut proj_pd = ProjectionPushDown::new(new_streaming); let mut pred_pd = PredicatePushDown::new(expr_eval).block_at_cache(false); for (_cache_id, v) in cache_schema_and_children { // # CHECK IF WE NEED TO REMOVE CACHES diff --git a/crates/polars-plan/src/plans/optimizer/mod.rs b/crates/polars-plan/src/plans/optimizer/mod.rs index 1a16eb714127..3da51ced4406 100644 --- a/crates/polars-plan/src/plans/optimizer/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/mod.rs @@ -156,7 +156,7 @@ More information on the new streaming engine: https://github.com/pola-rs/polars/ // Should be run before predicate pushdown. if opt_state.projection_pushdown() { - let mut projection_pushdown_opt = ProjectionPushDown::new(); + let mut projection_pushdown_opt = ProjectionPushDown::new(opt_state.new_streaming()); let alp = lp_arena.take(lp_top); let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?; lp_arena.replace(lp_top, alp); @@ -219,7 +219,15 @@ More information on the new streaming engine: https://github.com/pola-rs/polars/ if members.has_joins_or_unions && members.has_cache && _cse_plan_changed { // We only want to run this on cse inserted caches - cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, expr_eval, verbose)?; + cache_states::set_cache_states( + lp_top, + lp_arena, + expr_arena, + scratch, + expr_eval, + verbose, + opt_state.new_streaming(), + )?; } // This one should run (nearly) last as this modifies the projections diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index fde659022696..59e69e5871da 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -208,12 +208,15 @@ fn update_scan_schema( pub struct ProjectionPushDown { pub is_count_star: bool, + // @TODO: This is a hack to support both pre-NEW_MULTIFILE and post-NEW_MULTIFILE. + pub in_new_streaming_engine: bool, } impl ProjectionPushDown { - pub(super) fn new() -> Self { + pub(super) fn new(in_new_streaming_engine: bool) -> Self { Self { is_count_star: false, + in_new_streaming_engine, } } @@ -518,74 +521,80 @@ impl ProjectionPushDown { }; if let Some(ref hive_parts) = hive_parts { - // Skip reading hive columns from the file. - let partition_schema = hive_parts.first().unwrap().schema(); - - file_options.with_columns = file_options.with_columns.map(|x| { - x.iter() - .filter(|x| !partition_schema.contains(x)) - .cloned() - .collect::>() - }); - - let mut out = Schema::with_capacity(schema.len()); - - // Ensure the ordering of `schema` matches what the reader will give - - // namely, if a hive column also exists in the file it will be projected - // based on its position in the file. This is extremely important for the - // new-streaming engine. - - // row_index is separate - let opt_row_index_col_name = file_options - .row_index - .as_ref() - .map(|v| &v.name) - .filter(|v| schema.contains(v)) - .cloned(); - - if let Some(name) = &opt_row_index_col_name { - out.insert_at_index( - 0, - name.clone(), - schema.get(name).unwrap().clone(), - ) - .unwrap(); - } - + // @TODO: + // This is a hack to support both pre-NEW_MULTIFILE and + // post-NEW_MULTIFILE. + if !self.in_new_streaming_engine + && std::env::var("POLARS_NEW_MULTIFILE").as_deref() != Ok("1") { - let df_fields_iter = &mut schema - .iter() - .filter(|fld| { - !partition_schema.contains(fld.0) - && Some(fld.0) != opt_row_index_col_name.as_ref() - }) - .map(|(a, b)| (a.clone(), b.clone())); - - let hive_fields_iter = &mut partition_schema - .iter() - .map(|(a, b)| (a.clone(), b.clone())); - - // `schema` also contains the `row_index` column here, so we don't need to handle it - // separately. - - macro_rules! do_merge { - ($schema:expr) => { - hive::merge_sorted_to_schema_order_impl( - df_fields_iter, - hive_fields_iter, - &mut out, - &|v| $schema.index_of(&v.0), - ) - }; + // Skip reading hive columns from the file. + let partition_schema = hive_parts.first().unwrap().schema(); + file_options.with_columns = file_options.with_columns.map(|x| { + x.iter() + .filter(|x| !partition_schema.contains(x)) + .cloned() + .collect::>() + }); + + let mut out = Schema::with_capacity(schema.len()); + + // Ensure the ordering of `schema` matches what the reader will give - + // namely, if a hive column also exists in the file it will be projected + // based on its position in the file. This is extremely important for the + // new-streaming engine. + + // row_index is separate + let opt_row_index_col_name = file_options + .row_index + .as_ref() + .map(|v| &v.name) + .filter(|v| schema.contains(v)) + .cloned(); + + if let Some(name) = &opt_row_index_col_name { + out.insert_at_index( + 0, + name.clone(), + schema.get(name).unwrap().clone(), + ) + .unwrap(); } - match file_info.reader_schema.as_ref().unwrap() { - Either::Left(reader_schema) => do_merge!(reader_schema), - Either::Right(reader_schema) => do_merge!(reader_schema), + { + let df_fields_iter = &mut schema + .iter() + .filter(|fld| { + !partition_schema.contains(fld.0) + && Some(fld.0) != opt_row_index_col_name.as_ref() + }) + .map(|(a, b)| (a.clone(), b.clone())); + + let hive_fields_iter = &mut partition_schema + .iter() + .map(|(a, b)| (a.clone(), b.clone())); + + // `schema` also contains the `row_index` column here, so we don't need to handle it + // separately. + + macro_rules! do_merge { + ($schema:expr) => { + hive::merge_sorted_to_schema_order_impl( + df_fields_iter, + hive_fields_iter, + &mut out, + &|v| $schema.index_of(&v.0), + ) + }; + } + + match file_info.reader_schema.as_ref().unwrap() { + Either::Left(reader_schema) => do_merge!(reader_schema), + Either::Right(reader_schema) => do_merge!(reader_schema), + } } - } - schema = out; + schema = out; + } } if let Some(ref file_path_col) = file_options.include_file_paths { diff --git a/crates/polars-plan/src/plans/schema.rs b/crates/polars-plan/src/plans/schema.rs index 0f19dbbfb31b..bfb588c351c8 100644 --- a/crates/polars-plan/src/plans/schema.rs +++ b/crates/polars-plan/src/plans/schema.rs @@ -60,7 +60,7 @@ impl FileInfo { let schema = Arc::make_mut(&mut self.schema); for field in hive_schema.iter_fields() { - if let Ok(existing) = schema.try_get_mut(&field.name) { + if let Some(existing) = schema.get_mut(&field.name) { *existing = field.dtype().clone(); } else { schema diff --git a/crates/polars-python/src/cloud.rs b/crates/polars-python/src/cloud.rs index d02eaf23b63c..7002be3749c3 100644 --- a/crates/polars-python/src/cloud.rs +++ b/crates/polars-python/src/cloud.rs @@ -41,9 +41,12 @@ pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec, py: Python) -> PyResult

Schema { self.fields.get(name) } + /// Get a mutable reference to the dtype of the field named `name`, or `None` if the field doesn't exist. + pub fn get_mut(&mut self, name: &str) -> Option<&mut D> { + self.fields.get_mut(name) + } + /// Get a reference to the dtype of the field named `name`, or `Err(PolarsErr)` if the field doesn't exist. pub fn try_get(&self, name: &str) -> PolarsResult<&D> { self.get(name) diff --git a/crates/polars-stream/src/nodes/io_sources/ipc.rs b/crates/polars-stream/src/nodes/io_sources/ipc.rs index d04feec2fdce..28a018945192 100644 --- a/crates/polars-stream/src/nodes/io_sources/ipc.rs +++ b/crates/polars-stream/src/nodes/io_sources/ipc.rs @@ -14,7 +14,6 @@ use polars_core::utils::arrow::io::ipc::read::{ }; use polars_core::utils::slice_offsets; use polars_error::{ErrString, PolarsError, PolarsResult}; -use polars_expr::prelude::PhysicalExpr; use polars_expr::state::ExecutionState; use polars_io::cloud::CloudOptions; use polars_io::ipc::IpcScanOptions; @@ -79,14 +78,11 @@ impl IpcSourceNode { pub fn new( sources: ScanSources, _file_info: FileInfo, - predicate: Option>, options: IpcScanOptions, _cloud_options: Option, file_options: FileScanOptions, mut metadata: Option>, ) -> PolarsResult { - // These should have all been removed during lower_ir - assert!(predicate.is_none()); assert!(!sources.is_empty()); assert_eq!(sources.len(), 1); diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs index d2aa481d5006..5759d72a485f 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs @@ -2,7 +2,6 @@ use std::future::Future; use std::sync::Arc; use polars_core::frame::DataFrame; -use polars_core::prelude::PlIndexSet; use polars_error::PolarsResult; use polars_io::prelude::ParallelStrategy; use polars_io::prelude::_internal::PrefilterMaskSetting; @@ -84,8 +83,7 @@ impl ParquetSourceNode { let row_group_prefetch_size = self.config.row_group_prefetch_size; let projection = self.file_options.with_columns.clone(); - assert_eq!(self.physical_predicate.is_some(), self.predicate.is_some()); - let predicate = self.physical_predicate.clone(); + let predicate = self.predicate.clone(); let memory_prefetch_func = self.memory_prefetch_func; let mut row_group_data_fetcher = RowGroupDataFetcher { @@ -194,8 +192,6 @@ impl ParquetSourceNode { /// * `self.projected_arrow_schema` /// * `self.physical_predicate` pub(super) fn init_row_group_decoder(&self) -> RowGroupDecoder { - assert_eq!(self.predicate.is_some(), self.physical_predicate.is_some()); - let scan_sources = self.scan_sources.clone(); let hive_partitions = self.hive_parts.clone(); let hive_partitions_width = hive_partitions @@ -205,24 +201,20 @@ impl ParquetSourceNode { let include_file_paths = self.file_options.include_file_paths.clone(); let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap(); let row_index = self.row_index.clone(); - let physical_predicate = self.physical_predicate.clone(); let min_values_per_thread = self.config.min_values_per_thread; - let mut use_prefiltered = physical_predicate.is_some() + let mut use_prefiltered = self.predicate.is_some() && matches!( self.options.parallel, ParallelStrategy::Auto | ParallelStrategy::Prefiltered ); let predicate_arrow_field_indices = if use_prefiltered { - let mut live_columns = PlIndexSet::default(); - physical_predicate - .as_ref() - .unwrap() - .collect_live_columns(&mut live_columns); - let v = (!live_columns.is_empty()) + let predicate = self.predicate.as_ref().unwrap(); + let v = (!predicate.live_columns.is_empty()) .then(|| { - let mut out = live_columns + let mut out = predicate + .live_columns .iter() // Can be `None` - if the column is e.g. a hive column, or the row index column. .filter_map(|x| projected_arrow_schema.index_of(x)) @@ -272,7 +264,7 @@ impl ParquetSourceNode { reader_schema: self.schema.clone().unwrap(), projected_arrow_schema, row_index, - physical_predicate, + predicate: self.predicate.clone(), use_prefiltered, predicate_arrow_field_indices, non_predicate_arrow_field_indices, diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs index d4dd5cade584..5250f89bca70 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs @@ -5,9 +5,8 @@ use mem_prefetch_funcs::get_memory_prefetch_func; use polars_core::config; use polars_core::prelude::ArrowSchema; use polars_error::PolarsResult; -use polars_expr::prelude::{phys_expr_to_io_expr, PhysicalExpr}; use polars_io::cloud::CloudOptions; -use polars_io::predicates::PhysicalIoExpr; +use polars_io::predicates::ScanIOPredicate; use polars_io::prelude::{FileMetadata, ParquetOptions}; use polars_io::utils::byte_source::DynByteSourceBuilder; use polars_plan::plans::hive::HivePartitions; @@ -39,7 +38,7 @@ pub struct ParquetSourceNode { scan_sources: ScanSources, file_info: FileInfo, hive_parts: Option>>, - predicate: Option>, + predicate: Option, options: ParquetOptions, cloud_options: Option, file_options: FileScanOptions, @@ -47,7 +46,6 @@ pub struct ParquetSourceNode { // Run-time vars config: Config, verbose: bool, - physical_predicate: Option>, schema: Option>, projected_arrow_schema: Option>, byte_source_builder: DynByteSourceBuilder, @@ -83,8 +81,7 @@ impl ParquetSourceNode { pub fn new( scan_sources: ScanSources, file_info: FileInfo, - hive_parts: Option>>, - predicate: Option>, + predicate: Option, options: ParquetOptions, cloud_options: Option, mut file_options: FileScanOptions, @@ -107,7 +104,7 @@ impl ParquetSourceNode { Self { scan_sources, file_info, - hive_parts, + hive_parts: None, predicate, options, cloud_options, @@ -123,7 +120,6 @@ impl ParquetSourceNode { min_values_per_thread: 0, }, verbose, - physical_predicate: None, schema: None, projected_arrow_schema: None, byte_source_builder, @@ -171,7 +167,6 @@ impl ComputeNode for ParquetSourceNode { self.schema = Some(self.file_info.reader_schema.take().unwrap().unwrap_left()); self.init_projected_arrow_schema(); - self.physical_predicate = self.predicate.clone().map(phys_expr_to_io_expr); let (raw_morsel_receivers, raw_morsel_distributor_task_handle) = self.init_raw_morsel_distributor(); diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs index d7d004a33b7e..c1dbd74c2c0a 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs @@ -4,7 +4,7 @@ use polars_core::prelude::{ArrowSchema, PlHashMap}; use polars_core::series::IsSorted; use polars_core::utils::operation_exceeded_idxsize_msg; use polars_error::{polars_err, PolarsResult}; -use polars_io::predicates::PhysicalIoExpr; +use polars_io::predicates::ScanIOPredicate; use polars_io::prelude::_internal::read_this_row_group; use polars_io::prelude::{create_sorting_map, FileMetadata}; use polars_io::utils::byte_source::{ByteSource, DynByteSource}; @@ -41,7 +41,8 @@ pub(super) struct RowGroupDataFetcher { pub(super) verbose: bool, pub(super) reader_schema: Arc, pub(super) projection: Option>, - pub(super) predicate: Option>, + #[allow(unused)] // TODO: Fix! + pub(super) predicate: Option, pub(super) slice_range: Option>, pub(super) memory_prefetch_func: fn(&[u8]) -> (), pub(super) current_path_index: usize, @@ -89,7 +90,7 @@ impl RowGroupDataFetcher { if self.use_statistics && !match read_this_row_group( - self.predicate.as_deref(), + self.predicate.as_ref(), &row_group_metadata, self.reader_schema.as_ref(), ) { diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs index a314b2dffe54..9bea2726c5fa 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs @@ -9,7 +9,7 @@ use polars_core::series::{IsSorted, Series}; use polars_core::utils::arrow::bitmap::{Bitmap, BitmapBuilder}; use polars_error::{polars_bail, PolarsResult}; use polars_io::hive; -use polars_io::predicates::PhysicalIoExpr; +use polars_io::predicates::ScanIOPredicate; use polars_io::prelude::_internal::calc_prefilter_cost; pub use polars_io::prelude::_internal::PrefilterMaskSetting; use polars_io::prelude::try_set_sorted_flag; @@ -32,7 +32,7 @@ pub(super) struct RowGroupDecoder { pub(super) reader_schema: Arc, pub(super) projected_arrow_schema: Arc, pub(super) row_index: Option>, - pub(super) physical_predicate: Option>, + pub(super) predicate: Option, pub(super) use_prefiltered: Option, /// Indices into `projected_arrow_schema. This must be sorted. pub(super) predicate_arrow_field_indices: Vec, @@ -113,8 +113,8 @@ impl RowGroupDecoder { let df = unsafe { DataFrame::new_no_checks(projection_height, out_columns) }; - let df = if let Some(predicate) = self.physical_predicate.as_deref() { - let mask = predicate.evaluate_io(&df)?; + let df = if let Some(predicate) = self.predicate.as_ref() { + let mask = predicate.predicate.evaluate_io(&df)?; let mask = mask.bool().unwrap(); let filtered = @@ -531,9 +531,10 @@ impl RowGroupDecoder { }; let mask = self - .physical_predicate - .as_deref() + .predicate + .as_ref() .unwrap() + .predicate .evaluate_io(&live_df)?; let mask = mask.bool().unwrap(); diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 649cf2cb71e7..f5ed2dd695fe 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -8,7 +8,7 @@ use polars_expr::groups::new_hash_grouper; use polars_expr::planner::{create_physical_expr, get_expr_depth_limit, ExpressionConversionState}; use polars_expr::reduce::into_reduction; use polars_expr::state::ExecutionState; -use polars_mem_engine::create_physical_plan; +use polars_mem_engine::{create_physical_plan, create_scan_predicate}; use polars_plan::dsl::JoinOptions; use polars_plan::global::_set_n_rows_for_scan; use polars_plan::plans::expr_ir::ExprIR; @@ -344,7 +344,7 @@ fn to_graph_rec<'a>( let FileScan { scan_sources, file_info, - hive_parts, + hive_parts: _, output_schema, scan_type, predicate, @@ -360,17 +360,33 @@ fn to_graph_rec<'a>( _set_n_rows_for_scan(None).map(|x| (0, x)) }; + let mut create_skip_batch_predicate = false; + #[cfg(feature = "parquet")] + { + create_skip_batch_predicate |= matches!( + scan_type, + polars_plan::prelude::FileScan::Parquet { + options: polars_io::prelude::ParquetOptions { + use_statistics: true, + .. + }, + .. + } + ); + } + let predicate = predicate .map(|pred| { - create_physical_expr( + create_scan_predicate( &pred, - Context::Default, ctx.expr_arena, output_schema.as_ref().unwrap_or(&file_info.schema), &mut ctx.expr_conversion_state, + create_skip_batch_predicate, ) }) - .map_or(Ok(None), |v| v.map(Some))?; + .transpose()?; + let predicate = predicate.as_ref().map(|p| p.to_io(None, &file_info.schema)); { use polars_plan::prelude::FileScan; @@ -381,32 +397,8 @@ fn to_graph_rec<'a>( options, cloud_options, metadata: first_metadata, - } => { - if std::env::var("POLARS_DISABLE_PARQUET_SOURCE").as_deref() != Ok("1") { - ctx.graph.add_node( - nodes::io_sources::parquet::ParquetSourceNode::new( - scan_sources, - file_info, - hive_parts, - predicate, - options, - cloud_options, - file_options, - first_metadata, - ), - [], - ) - } else { - todo!() - } - }, - #[cfg(feature = "ipc")] - FileScan::Ipc { - options, - cloud_options, - metadata: first_metadata, } => ctx.graph.add_node( - nodes::io_sources::ipc::IpcSourceNode::new( + nodes::io_sources::parquet::ParquetSourceNode::new( scan_sources, file_info, predicate, @@ -414,9 +406,30 @@ fn to_graph_rec<'a>( cloud_options, file_options, first_metadata, - )?, + ), [], ), + #[cfg(feature = "ipc")] + FileScan::Ipc { + options, + cloud_options, + metadata: first_metadata, + } => { + // Should have been rewritten in terms of separate streaming nodes. + assert!(predicate.is_none()); + + ctx.graph.add_node( + nodes::io_sources::ipc::IpcSourceNode::new( + scan_sources, + file_info, + options, + cloud_options, + file_options, + first_metadata, + )?, + [], + ) + }, #[cfg(feature = "csv")] FileScan::Csv { options, .. } => { assert!(predicate.is_none()); diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index 5a82a5304a9d..7a78c73a39ba 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -272,11 +272,11 @@ def test_parquet_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> Non captured = capfd.readouterr().err assert ( - "parquet file must be read, statistics not sufficient for predicate." + "parquet row group must be read, statistics not sufficient for predicate." in captured ) assert ( - "parquet file can be skipped, the statistics were sufficient" + "parquet row group can be skipped, the statistics were sufficient" " to apply the predicate." in captured ) diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index c9862e2fd9f4..23d234b70532 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -43,7 +43,7 @@ def _assert_force_async(capfd: Any, data_file_extension: str) -> None: return captured = capfd.readouterr().err - assert captured.count("ASYNC READING FORCED") == 1 + assert captured.count("ASYNC READING FORCED") >= 1 def _scan( diff --git a/py-polars/tests/unit/streaming/test_streaming_io.py b/py-polars/tests/unit/streaming/test_streaming_io.py index 402fa5b6b86e..44627f0e19f3 100644 --- a/py-polars/tests/unit/streaming/test_streaming_io.py +++ b/py-polars/tests/unit/streaming/test_streaming_io.py @@ -254,7 +254,10 @@ def test_sink_ndjson_should_write_same_data( @pytest.mark.write_disk -def test_parquet_eq_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None: +@pytest.mark.parametrize("streaming", [False, True]) +def test_parquet_eq_statistics( + monkeypatch: Any, capfd: Any, tmp_path: Path, streaming: bool +) -> None: tmp_path.mkdir(exist_ok=True) monkeypatch.setenv("POLARS_VERBOSE", "1") @@ -268,29 +271,23 @@ def test_parquet_eq_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> file_path = tmp_path / "stats.parquet" df.write_parquet(file_path, statistics=True, use_pyarrow=False) - file_path = tmp_path / "stats.parquet" - df.write_parquet(file_path, statistics=True, use_pyarrow=False) - - for streaming in [False, True]: - for pred in [ - pl.col("idx") == 50, - pl.col("idx") == 150, - pl.col("idx") == 210, - ]: - result = ( - pl.scan_parquet(file_path).filter(pred).collect(streaming=streaming) - ) - assert_frame_equal(result, df.filter(pred)) - - captured = capfd.readouterr().err - assert ( - "parquet file must be read, statistics not sufficient for predicate." - in captured - ) - assert ( - "parquet file can be skipped, the statistics were sufficient" - " to apply the predicate." in captured - ) + for pred in [ + pl.col("idx") == 50, + pl.col("idx") == 150, + pl.col("idx") == 210, + ]: + result = pl.scan_parquet(file_path).filter(pred).collect(streaming=streaming) + assert_frame_equal(result, df.filter(pred)) + + captured = capfd.readouterr().err + assert ( + "parquet row group must be read, statistics not sufficient for predicate." + in captured + ) + assert ( + "parquet row group can be skipped, the statistics were sufficient" + " to apply the predicate." in captured + ) @pytest.mark.write_disk