From be42f3d17c792cd31d49ce06a9a95d87b76ef94a Mon Sep 17 00:00:00 2001 From: June <61218022+itsjunetime@users.noreply.github.com> Date: Tue, 17 Sep 2024 10:23:58 -0600 Subject: [PATCH] Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabled (#12135) * feat: Preemptively filter for pushdown-preventing columns in ListingTable * Fix behavior to make all previous tests work and lay groundwork for future tests * fix: Add some more tests and fix small issue with pushdown specificity * test: Revive unneccesarily removed test * ci: Fix CI issues with different combinations of exprs * fix: run fmt * Fix doc publicity issues * Add ::new fn for PushdownChecker * Remove unnecessary 'pub' qualifier * Fix naming and doc comment of non_pushdown_columns to reflect what it actually does (the opposite) and add back useful comments * fmt * Extend FileFormat trait to allow library users to define formats which support pushdown * fmt * fix: reference real fn in doc to fix CI * Minor: Add tests for using FilterExec when parquet was pushed down * Update datafusion/core/src/datasource/file_format/mod.rs * Pipe schema information through to TableScan and ParquetExec to facilitate unnecessary FilterExec removal * - Remove collect::<(_, _)> to satisfy msrv - Remove expect(_) attr to satisfy msrv - Update comments with more accurate details and explanations * Add more details in comments for `map_partial_batch` Co-authored-by: Andrew Lamb * Remove reference to issue #4028 as it will be closed * Convert normal comments to doc-comments Co-authored-by: Andrew Lamb * Clarify meaning of word `projected` in comment Co-authored-by: Andrew Lamb * Clarify more how `table_schema` is used differently from `projected_table_schema` Co-authored-by: Andrew Lamb * Finish partially-written comment about SchemaMapping struct --------- Co-authored-by: Andrew Lamb --- .../core/src/datasource/file_format/mod.rs | 28 ++ .../src/datasource/file_format/parquet.rs | 28 +- .../core/src/datasource/listing/helpers.rs | 110 +++--- .../core/src/datasource/listing/table.rs | 71 ++-- .../physical_plan/file_scan_config.rs | 2 +- .../core/src/datasource/physical_plan/mod.rs | 5 +- .../datasource/physical_plan/parquet/mod.rs | 4 +- .../physical_plan/parquet/opener.rs | 4 +- .../physical_plan/parquet/row_filter.rs | 312 ++++++++++++++---- .../core/src/datasource/schema_adapter.rs | 147 +++++++-- datafusion/core/src/physical_planner.rs | 2 +- datafusion/expr-common/src/signature.rs | 4 +- datafusion/expr/src/logical_plan/plan.rs | 14 +- .../optimizer/src/optimize_projections/mod.rs | 12 +- .../optimize_projections/required_indices.rs | 15 +- .../physical-expr/src/expressions/binary.rs | 4 +- .../physical-expr/src/expressions/column.rs | 5 +- datafusion/proto/src/logical_plan/mod.rs | 15 +- datafusion/proto/src/physical_plan/mod.rs | 116 ++++--- .../join_disable_repartition_joins.slt.temp | 26 ++ .../test_files/parquet_filter_pushdown.slt | 28 +- .../sqllogictest/test_files/repartition.slt | 2 +- datafusion/sqllogictest/test_files/select.slt | 2 +- .../sqllogictest/test_files/string_view.slt | 1 - datafusion/sqllogictest/test_files/window.slt | 2 +- 25 files changed, 662 insertions(+), 297 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 1dcf480cf4f2..a503e36adbeb 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -45,6 +45,7 @@ use crate::physical_plan::{ExecutionPlan, Statistics}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{internal_err, not_impl_err, GetExt}; +use datafusion_expr::Expr; use datafusion_physical_expr::PhysicalExpr; use async_trait::async_trait; @@ -138,6 +139,33 @@ pub trait FileFormat: Send + Sync + fmt::Debug { ) -> Result> { not_impl_err!("Writer not implemented for this format") } + + /// Check if the specified file format has support for pushing down the provided filters within + /// the given schemas. Added initially to support the Parquet file format's ability to do this. + fn supports_filters_pushdown( + &self, + _file_schema: &Schema, + _table_schema: &Schema, + _filters: &[&Expr], + ) -> Result { + Ok(FilePushdownSupport::NoSupport) + } +} + +/// An enum to distinguish between different states when determining if certain filters can be +/// pushed down to file scanning +#[derive(Debug, PartialEq)] +pub enum FilePushdownSupport { + /// The file format/system being asked does not support any sort of pushdown. This should be + /// used even if the file format theoretically supports some sort of pushdown, but it's not + /// enabled or implemented yet. + NoSupport, + /// The file format/system being asked *does* support pushdown, but it can't make it work for + /// the provided filter/expression + NotSupportedForFilter, + /// The file format/system being asked *does* support pushdown and *can* make it work for the + /// provided filter/expression + Supported, } /// A container of [FileFormatFactory] which also implements [FileType]. diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 2a862dd6dcb3..35296b0d7907 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -26,7 +26,7 @@ use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; use super::{ coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat, - FileFormatFactory, FileScanConfig, + FileFormatFactory, FilePushdownSupport, FileScanConfig, }; use crate::arrow::array::RecordBatch; use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; @@ -53,6 +53,7 @@ use datafusion_common::{ use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::TaskContext; +use datafusion_expr::Expr; use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::MetricsSet; @@ -78,7 +79,9 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; -use crate::datasource::physical_plan::parquet::ParquetExecBuilder; +use crate::datasource::physical_plan::parquet::{ + can_expr_be_pushed_down_with_schemas, ParquetExecBuilder, +}; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; @@ -414,6 +417,27 @@ impl FileFormat for ParquetFormat { order_requirements, )) as _) } + + fn supports_filters_pushdown( + &self, + file_schema: &Schema, + table_schema: &Schema, + filters: &[&Expr], + ) -> Result { + if !self.options().global.pushdown_filters { + return Ok(FilePushdownSupport::NoSupport); + } + + let all_supported = filters.iter().all(|filter| { + can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema) + }); + + Ok(if all_supported { + FilePushdownSupport::Supported + } else { + FilePushdownSupport::NotSupportedForFilter + }) + } } /// Fetches parquet metadata from ObjectStore for given object diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 33a16237e162..72d7277d6ae2 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -53,66 +53,64 @@ use object_store::{ObjectMeta, ObjectStore}; /// was performed pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool { let mut is_applicable = true; - expr.apply(|expr| { - match expr { - Expr::Column(Column { ref name, .. }) => { - is_applicable &= col_names.contains(&name.as_str()); - if is_applicable { - Ok(TreeNodeRecursion::Jump) - } else { - Ok(TreeNodeRecursion::Stop) - } + expr.apply(|expr| match expr { + Expr::Column(Column { ref name, .. }) => { + is_applicable &= col_names.contains(&name.as_str()); + if is_applicable { + Ok(TreeNodeRecursion::Jump) + } else { + Ok(TreeNodeRecursion::Stop) } - Expr::Literal(_) - | Expr::Alias(_) - | Expr::OuterReferenceColumn(_, _) - | Expr::ScalarVariable(_, _) - | Expr::Not(_) - | Expr::IsNotNull(_) - | Expr::IsNull(_) - | Expr::IsTrue(_) - | Expr::IsFalse(_) - | Expr::IsUnknown(_) - | Expr::IsNotTrue(_) - | Expr::IsNotFalse(_) - | Expr::IsNotUnknown(_) - | Expr::Negative(_) - | Expr::Cast { .. } - | Expr::TryCast { .. } - | Expr::BinaryExpr { .. } - | Expr::Between { .. } - | Expr::Like { .. } - | Expr::SimilarTo { .. } - | Expr::InList { .. } - | Expr::Exists { .. } - | Expr::InSubquery(_) - | Expr::ScalarSubquery(_) - | Expr::GroupingSet(_) - | Expr::Case { .. } => Ok(TreeNodeRecursion::Continue), - - Expr::ScalarFunction(scalar_function) => { - match scalar_function.func.signature().volatility { - Volatility::Immutable => Ok(TreeNodeRecursion::Continue), - // TODO: Stable functions could be `applicable`, but that would require access to the context - Volatility::Stable | Volatility::Volatile => { - is_applicable = false; - Ok(TreeNodeRecursion::Stop) - } + } + Expr::Literal(_) + | Expr::Alias(_) + | Expr::OuterReferenceColumn(_, _) + | Expr::ScalarVariable(_, _) + | Expr::Not(_) + | Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsUnknown(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::IsNotUnknown(_) + | Expr::Negative(_) + | Expr::Cast(_) + | Expr::TryCast(_) + | Expr::BinaryExpr(_) + | Expr::Between(_) + | Expr::Like(_) + | Expr::SimilarTo(_) + | Expr::InList(_) + | Expr::Exists(_) + | Expr::InSubquery(_) + | Expr::ScalarSubquery(_) + | Expr::GroupingSet(_) + | Expr::Case(_) => Ok(TreeNodeRecursion::Continue), + + Expr::ScalarFunction(scalar_function) => { + match scalar_function.func.signature().volatility { + Volatility::Immutable => Ok(TreeNodeRecursion::Continue), + // TODO: Stable functions could be `applicable`, but that would require access to the context + Volatility::Stable | Volatility::Volatile => { + is_applicable = false; + Ok(TreeNodeRecursion::Stop) } } + } - // TODO other expressions are not handled yet: - // - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases - // - Can `Wildcard` be considered as a `Literal`? - // - ScalarVariable could be `applicable`, but that would require access to the context - Expr::AggregateFunction { .. } - | Expr::WindowFunction { .. } - | Expr::Wildcard { .. } - | Expr::Unnest { .. } - | Expr::Placeholder(_) => { - is_applicable = false; - Ok(TreeNodeRecursion::Stop) - } + // TODO other expressions are not handled yet: + // - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases + // - Can `Wildcard` be considered as a `Literal`? + // - ScalarVariable could be `applicable`, but that would require access to the context + Expr::AggregateFunction { .. } + | Expr::WindowFunction { .. } + | Expr::Wildcard { .. } + | Expr::Unnest { .. } + | Expr::Placeholder(_) => { + is_applicable = false; + Ok(TreeNodeRecursion::Stop) } }) .unwrap(); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index adf907011b8d..3541a8ff215c 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -18,16 +18,17 @@ //! The table implementation. use std::collections::HashMap; -use std::str::FromStr; -use std::{any::Any, sync::Arc}; +use std::{any::Any, str::FromStr, sync::Arc}; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; -use super::PartitionedFile; +use super::{ListingTableUrl, PartitionedFile}; -use super::ListingTableUrl; -use crate::datasource::{create_ordering, get_statistics_with_limit}; use crate::datasource::{ - file_format::{file_compression_type::FileCompressionType, FileFormat}, + create_ordering, + file_format::{ + file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport, + }, + get_statistics_with_limit, physical_plan::{FileScanConfig, FileSinkConfig}, }; use crate::execution::context::SessionState; @@ -43,8 +44,9 @@ use datafusion_common::{ config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt, ToDFSchema, }; -use datafusion_execution::cache::cache_manager::FileStatisticsCache; -use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; +use datafusion_execution::cache::{ + cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache, +}; use datafusion_physical_expr::{ create_physical_expr, LexOrdering, PhysicalSortRequirement, }; @@ -817,19 +819,22 @@ impl TableProvider for ListingTable { .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) .collect::>>()?; - let filters = if let Some(expr) = conjunction(filters.to_vec()) { - // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns. - let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?; - let filters = - create_physical_expr(&expr, &table_df_schema, state.execution_props())?; - Some(filters) - } else { - None - }; + let filters = conjunction(filters.to_vec()) + .map(|expr| -> Result<_> { + // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns. + let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?; + let filters = create_physical_expr( + &expr, + &table_df_schema, + state.execution_props(), + )?; + Ok(Some(filters)) + }) + .unwrap_or(Ok(None))?; - let object_store_url = if let Some(url) = self.table_paths.first() { - url.object_store() - } else { + let Some(object_store_url) = + self.table_paths.first().map(ListingTableUrl::object_store) + else { return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; @@ -854,7 +859,7 @@ impl TableProvider for ListingTable { &self, filters: &[&Expr], ) -> Result> { - Ok(filters + filters .iter() .map(|filter| { if expr_applicable_for_cols( @@ -862,19 +867,29 @@ impl TableProvider for ListingTable { .options .table_partition_cols .iter() - .map(|x| x.0.as_str()) + .map(|col| col.0.as_str()) .collect::>(), filter, ) { // if filter can be handled by partition pruning, it is exact - TableProviderFilterPushDown::Exact - } else { - // otherwise, we still might be able to handle the filter with file - // level mechanisms such as Parquet row group pruning. - TableProviderFilterPushDown::Inexact + return Ok(TableProviderFilterPushDown::Exact); + } + + // if we can't push it down completely with only the filename-based/path-based + // column names, then we should check if we can do parquet predicate pushdown + let supports_pushdown = self.options.format.supports_filters_pushdown( + &self.file_schema, + &self.table_schema, + &[filter], + )?; + + if supports_pushdown == FilePushdownSupport::Supported { + return Ok(TableProviderFilterPushDown::Exact); } + + Ok(TableProviderFilterPushDown::Inexact) }) - .collect()) + .collect() } fn get_table_definition(&self) -> Option<&str> { diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 9f674185694d..2c438e8b0e78 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -258,7 +258,7 @@ impl FileScanConfig { (projected_schema, table_stats, projected_output_ordering) } - #[allow(unused)] // Only used by avro + #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro pub(crate) fn projected_file_column_names(&self) -> Option> { self.projection.as_ref().map(|p| { p.iter() diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index f810fb86bd89..4018b3bb2920 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -516,7 +516,8 @@ mod tests { Field::new("c3", DataType::Float64, true), ])); - let adapter = DefaultSchemaAdapterFactory::default().create(table_schema.clone()); + let adapter = DefaultSchemaAdapterFactory + .create(table_schema.clone(), table_schema.clone()); let file_schema = Schema::new(vec![ Field::new("c1", DataType::Utf8, true), @@ -573,7 +574,7 @@ mod tests { let indices = vec![1, 2, 4]; let schema = SchemaRef::from(table_schema.project(&indices).unwrap()); - let adapter = DefaultSchemaAdapterFactory::default().create(schema); + let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone()); let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 54d4d7262a8e..f22d02699aac 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -61,6 +61,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; +pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use writer::plan_to_parquet; /// Execution plan for reading one or more Parquet files. @@ -405,6 +406,7 @@ impl ParquetExecBuilder { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); + let cache = ParquetExec::compute_properties( projected_schema, &projected_output_ordering, @@ -707,7 +709,7 @@ impl ExecutionPlan for ParquetExec { let schema_adapter_factory = self .schema_adapter_factory .clone() - .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default())); + .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); let opener = ParquetOpener { partition_index, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 2a198c3d4571..9880c30ddb6b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -99,7 +99,9 @@ impl FileOpener for ParquetOpener { let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?); - let schema_adapter = self.schema_adapter_factory.create(projected_schema); + let schema_adapter = self + .schema_adapter_factory + .create(projected_schema, self.table_schema.clone()); let predicate = self.predicate.clone(); let pruning_predicate = self.pruning_predicate.clone(); let page_pruning_predicate = self.page_pruning_predicate.clone(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 59d23fd68c31..d3bc8030cf7f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -76,7 +76,7 @@ use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; -use datafusion_common::{arrow_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::utils::reassign_predicate_columns; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; @@ -237,12 +237,6 @@ struct FilterCandidateBuilder<'a> { /// The schema of the table (merged schema) -- columns may be in different /// order than in the file and have columns that are not in the file schema table_schema: &'a Schema, - required_column_indices: BTreeSet, - /// Does the expression require any non-primitive columns (like structs)? - non_primitive_columns: bool, - /// Does the expression reference any columns that are in the table - /// schema but not in the file schema? - projected_columns: bool, } impl<'a> FilterCandidateBuilder<'a> { @@ -255,9 +249,6 @@ impl<'a> FilterCandidateBuilder<'a> { expr, file_schema, table_schema, - required_column_indices: BTreeSet::default(), - non_primitive_columns: false, - projected_columns: false, } } @@ -268,53 +259,87 @@ impl<'a> FilterCandidateBuilder<'a> { /// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter /// * `Ok(None)` if the expression cannot be used as an ArrowFilter /// * `Err(e)` if an error occurs while building the candidate - pub fn build( - mut self, - metadata: &ParquetMetaData, - ) -> Result> { - let expr = self.expr.clone().rewrite(&mut self).data()?; - - if self.non_primitive_columns || self.projected_columns { - Ok(None) - } else { - let required_bytes = - size_of_columns(&self.required_column_indices, metadata)?; - let can_use_index = columns_sorted(&self.required_column_indices, metadata)?; - - Ok(Some(FilterCandidate { - expr, - required_bytes, - can_use_index, - projection: self.required_column_indices.into_iter().collect(), - })) + pub fn build(self, metadata: &ParquetMetaData) -> Result> { + let Some((required_indices, rewritten_expr)) = + pushdown_columns(self.expr, self.file_schema, self.table_schema)? + else { + return Ok(None); + }; + + let required_bytes = size_of_columns(&required_indices, metadata)?; + let can_use_index = columns_sorted(&required_indices, metadata)?; + + Ok(Some(FilterCandidate { + expr: rewritten_expr, + required_bytes, + can_use_index, + projection: required_indices.into_iter().collect(), + })) + } +} + +// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree structure to determine +// if any column references in the expression would prevent it from being predicate-pushed-down. +// if non_primitive_columns || projected_columns, it can't be pushed down. +// can't be reused between calls to `rewrite`; each construction must be used only once. +struct PushdownChecker<'schema> { + /// Does the expression require any non-primitive columns (like structs)? + non_primitive_columns: bool, + /// Does the expression reference any columns that are in the table + /// schema but not in the file schema? + projected_columns: bool, + // the indices of all the columns found within the given expression which exist inside the given + // [`file_schema`] + required_column_indices: BTreeSet, + file_schema: &'schema Schema, + table_schema: &'schema Schema, +} + +impl<'schema> PushdownChecker<'schema> { + fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> Self { + Self { + non_primitive_columns: false, + projected_columns: false, + required_column_indices: BTreeSet::default(), + file_schema, + table_schema, } } + + fn check_single_column(&mut self, column_name: &str) -> Option { + if let Ok(idx) = self.file_schema.index_of(column_name) { + self.required_column_indices.insert(idx); + + if DataType::is_nested(self.file_schema.field(idx).data_type()) { + self.non_primitive_columns = true; + return Some(TreeNodeRecursion::Jump); + } + } else if self.table_schema.index_of(column_name).is_err() { + // If the column does not exist in the (un-projected) table schema then + // it must be a projected column. + self.projected_columns = true; + return Some(TreeNodeRecursion::Jump); + } + + None + } + + #[inline] + fn prevents_pushdown(&self) -> bool { + self.non_primitive_columns || self.projected_columns + } } -/// Implement the `TreeNodeRewriter` trait for `FilterCandidateBuilder` that -/// walks the expression tree and rewrites it in preparation of becoming -/// `FilterCandidate`. -impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> { +impl<'schema> TreeNodeRewriter for PushdownChecker<'schema> { type Node = Arc; - /// Called before visiting each child fn f_down( &mut self, node: Arc, ) -> Result>> { if let Some(column) = node.as_any().downcast_ref::() { - if let Ok(idx) = self.file_schema.index_of(column.name()) { - self.required_column_indices.insert(idx); - - if DataType::is_nested(self.file_schema.field(idx).data_type()) { - self.non_primitive_columns = true; - return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump)); - } - } else if self.table_schema.index_of(column.name()).is_err() { - // If the column does not exist in the (un-projected) table schema then - // it must be a projected column. - self.projected_columns = true; - return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump)); + if let Some(recursion) = self.check_single_column(column.name()) { + return Ok(Transformed::new(node, false, recursion)); } } @@ -322,29 +347,30 @@ impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> { } /// After visiting all children, rewrite column references to nulls if - /// they are not in the file schema + /// they are not in the file schema. + /// We do this because they won't be relevant if they're not in the file schema, since that's + /// the only thing we're dealing with here as this is only used for the parquet pushdown during + /// scanning fn f_up( &mut self, expr: Arc, ) -> Result>> { - // if the expression is a column, is it in the file schema? if let Some(column) = expr.as_any().downcast_ref::() { + // if the expression is a column, is it in the file schema? if self.file_schema.field_with_name(column.name()).is_err() { - // Replace the column reference with a NULL (using the type from the table schema) - // e.g. `column = 'foo'` is rewritten be transformed to `NULL = 'foo'` - // - // See comments on `FilterCandidateBuilder` for more information - return match self.table_schema.field_with_name(column.name()) { - Ok(field) => { - // return the null value corresponding to the data type + return self + .table_schema + .field_with_name(column.name()) + .and_then(|field| { + // Replace the column reference with a NULL (using the type from the table schema) + // e.g. `column = 'foo'` is rewritten be transformed to `NULL = 'foo'` + // + // See comments on `FilterCandidateBuilder` for more information let null_value = ScalarValue::try_from(field.data_type())?; - Ok(Transformed::yes(Arc::new(Literal::new(null_value)))) - } - Err(e) => { - // If the column is not in the table schema, should throw the error - arrow_err!(e) - } - }; + Ok(Transformed::yes(Arc::new(Literal::new(null_value)) as _)) + }) + // If the column is not in the table schema, should throw the error + .map_err(|e| arrow_datafusion_err!(e)); } } @@ -352,6 +378,69 @@ impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> { } } +type ProjectionAndExpr = (BTreeSet, Arc); + +// Checks if a given expression can be pushed down into `ParquetExec` as opposed to being evaluated +// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns returns all the +// columns in the given expression so that they can be used in the parquet scanning, along with the +// expression rewritten as defined in [`PushdownChecker::f_up`] +fn pushdown_columns( + expr: Arc, + file_schema: &Schema, + table_schema: &Schema, +) -> Result> { + let mut checker = PushdownChecker::new(file_schema, table_schema); + + let expr = expr.rewrite(&mut checker).data()?; + + Ok((!checker.prevents_pushdown()).then_some((checker.required_column_indices, expr))) +} + +/// creates a PushdownChecker for a single use to check a given column with the given schemes. Used +/// to check preemptively if a column name would prevent pushdowning. +/// effectively does the inverse of [`pushdown_columns`] does, but with a single given column +/// (instead of traversing the entire tree to determine this) +fn would_column_prevent_pushdown( + column_name: &str, + file_schema: &Schema, + table_schema: &Schema, +) -> bool { + let mut checker = PushdownChecker::new(file_schema, table_schema); + + // the return of this is only used for [`PushdownChecker::f_down()`], so we can safely ignore + // it here. I'm just verifying we know the return type of this so nobody accidentally changes + // the return type of this fn and it gets implicitly ignored here. + let _: Option = checker.check_single_column(column_name); + + // and then return a value based on the state of the checker + checker.prevents_pushdown() +} + +/// Recurses through expr as a trea, finds all `column`s, and checks if any of them would prevent +/// this expression from being predicate pushed down. If any of them would, this returns false. +/// Otherwise, true. +pub fn can_expr_be_pushed_down_with_schemas( + expr: &datafusion_expr::Expr, + file_schema: &Schema, + table_schema: &Schema, +) -> bool { + let mut can_be_pushed = true; + expr.apply(|expr| match expr { + datafusion_expr::Expr::Column(column) => { + can_be_pushed &= + !would_column_prevent_pushdown(column.name(), file_schema, table_schema); + Ok(if can_be_pushed { + TreeNodeRecursion::Jump + } else { + TreeNodeRecursion::Stop + }) + } + _ => Ok(TreeNodeRecursion::Continue), + }) + .unwrap(); // we never return an Err, so we can safely unwrap this + can_be_pushed +} + /// Computes the projection required to go from the file's schema order to the projected /// order expected by this filter /// @@ -444,11 +533,13 @@ pub fn build_row_filter( // Determine which conjuncts can be evaluated as ArrowPredicates, if any let mut candidates: Vec = predicates .into_iter() - .flat_map(|expr| { + .map(|expr| { FilterCandidateBuilder::new(expr.clone(), file_schema, table_schema) .build(metadata) - .unwrap_or_default() }) + .collect::, _>>()? + .into_iter() + .flatten() .collect(); // no candidates @@ -485,11 +576,12 @@ pub fn build_row_filter( #[cfg(test)] mod test { use super::*; - use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory; - use crate::datasource::schema_adapter::SchemaAdapterFactory; + use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapterFactory, + }; use arrow::datatypes::Field; - use arrow_schema::TimeUnit::Nanosecond; + use arrow_schema::{Fields, TimeUnit::Nanosecond}; use datafusion_expr::{cast, col, lit, Expr}; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_plan::metrics::{Count, Time}; @@ -583,8 +675,9 @@ mod test { false, )]); + let table_ref = Arc::new(table_schema.clone()); let schema_adapter = - DefaultSchemaAdapterFactory {}.create(Arc::new(table_schema.clone())); + DefaultSchemaAdapterFactory.create(Arc::clone(&table_ref), table_ref); let (schema_mapping, _) = schema_adapter .map_schema(&file_schema) .expect("creating schema mapping"); @@ -661,4 +754,87 @@ mod test { assert_eq!(projection, remapped) } } + + #[test] + fn nested_data_structures_prevent_pushdown() { + let table_schema = get_basic_table_schema(); + + let file_schema = Schema::new(vec![Field::new( + "list_col", + DataType::Struct(Fields::empty()), + true, + )]); + + let expr = col("list_col").is_not_null(); + + assert!(!can_expr_be_pushed_down_with_schemas( + &expr, + &file_schema, + &table_schema + )); + } + + #[test] + fn projected_columns_prevent_pushdown() { + let table_schema = get_basic_table_schema(); + + let file_schema = + Schema::new(vec![Field::new("existing_col", DataType::Int64, true)]); + + let expr = col("nonexistent_column").is_null(); + + assert!(!can_expr_be_pushed_down_with_schemas( + &expr, + &file_schema, + &table_schema + )); + } + + #[test] + fn basic_expr_doesnt_prevent_pushdown() { + let table_schema = get_basic_table_schema(); + + let file_schema = Schema::new(vec![Field::new("str_col", DataType::Utf8, true)]); + + let expr = col("str_col").is_null(); + + assert!(can_expr_be_pushed_down_with_schemas( + &expr, + &file_schema, + &table_schema + )); + } + + #[test] + fn complex_expr_doesnt_prevent_pushdown() { + let table_schema = get_basic_table_schema(); + + let file_schema = Schema::new(vec![ + Field::new("str_col", DataType::Utf8, true), + Field::new("int_col", DataType::UInt64, true), + ]); + + let expr = col("str_col") + .is_not_null() + .or(col("int_col").gt(Expr::Literal(ScalarValue::UInt64(Some(5))))); + + assert!(can_expr_be_pushed_down_with_schemas( + &expr, + &file_schema, + &table_schema + )); + } + + fn get_basic_table_schema() -> Schema { + let testdata = crate::test_util::parquet_test_data(); + let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) + .expect("opening file"); + + let reader = SerializedFileReader::new(file).expect("creating reader"); + + let metadata = reader.metadata(); + + parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) + .expect("parsing schema") + } } diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index de508f2c3415..fdf3381758a4 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -35,7 +35,13 @@ use std::sync::Arc; /// other than null) pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { /// Provides `SchemaAdapter`. - fn create(&self, schema: SchemaRef) -> Box; + // The design of this function is mostly modeled for the needs of DefaultSchemaAdapterFactory, + // read its implementation docs for the reasoning + fn create( + &self, + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box; } /// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema @@ -96,17 +102,33 @@ pub trait SchemaMapper: Debug + Send + Sync { /// Implementation of [`SchemaAdapterFactory`] that maps columns by name /// and casts columns to the expected type. #[derive(Clone, Debug, Default)] -pub struct DefaultSchemaAdapterFactory {} +pub struct DefaultSchemaAdapterFactory; impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { - fn create(&self, table_schema: SchemaRef) -> Box { - Box::new(DefaultSchemaAdapter { table_schema }) + fn create( + &self, + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box { + Box::new(DefaultSchemaAdapter { + projected_table_schema, + table_schema, + }) } } +/// This SchemaAdapter requires both the table schema and the projected table schema because of the +/// needs of the [`SchemaMapping`] it creates. Read its documentation for more details #[derive(Clone, Debug)] pub(crate) struct DefaultSchemaAdapter { - /// Schema for the table + /// The schema for the table, projected to include only the fields being output (projected) by the + /// associated ParquetExec + projected_table_schema: SchemaRef, + /// The entire table schema for the table we're using this to adapt. + /// + /// This is used to evaluate any filters pushed down into the scan + /// which may refer to columns that are not referred to anywhere + /// else in the plan. table_schema: SchemaRef, } @@ -116,7 +138,7 @@ impl SchemaAdapter for DefaultSchemaAdapter { /// /// Panics if index is not in range for the table schema fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.table_schema.field(index); + let field = self.projected_table_schema.field(index); Some(file_schema.fields.find(field.name())?.0) } @@ -133,11 +155,11 @@ impl SchemaAdapter for DefaultSchemaAdapter { file_schema: &Schema, ) -> datafusion_common::Result<(Arc, Vec)> { let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; self.table_schema.fields().len()]; + let mut field_mappings = vec![None; self.projected_table_schema.fields().len()]; for (file_idx, file_field) in file_schema.fields.iter().enumerate() { if let Some((table_idx, table_field)) = - self.table_schema.fields().find(file_field.name()) + self.projected_table_schema.fields().find(file_field.name()) { match can_cast_types(file_field.data_type(), table_field.data_type()) { true => { @@ -158,8 +180,9 @@ impl SchemaAdapter for DefaultSchemaAdapter { Ok(( Arc::new(SchemaMapping { - table_schema: self.table_schema.clone(), + projected_table_schema: self.projected_table_schema.clone(), field_mappings, + table_schema: self.table_schema.clone(), }), projection, )) @@ -168,39 +191,81 @@ impl SchemaAdapter for DefaultSchemaAdapter { /// The SchemaMapping struct holds a mapping from the file schema to the table schema /// and any necessary type conversions that need to be applied. +/// +/// This needs both the projected table schema and full table schema because its different +/// functions have different needs. The [`map_batch`] function is only used by the ParquetOpener to +/// produce a RecordBatch which has the projected schema, since that's the schema which is supposed +/// to come out of the execution of this query. [`map_partial_batch`], however, is used to create a +/// RecordBatch with a schema that can be used for Parquet pushdown, meaning that it may contain +/// fields which are not in the projected schema (as the fields that parquet pushdown filters +/// operate can be completely distinct from the fields that are projected (output) out of the +/// ParquetExec). +/// +/// [`map_partial_batch`] uses `table_schema` to create the resulting RecordBatch (as it could be +/// operating on any fields in the schema), while [`map_batch`] uses `projected_table_schema` (as +/// it can only operate on the projected fields). +/// +/// [`map_batch`]: Self::map_batch +/// [`map_partial_batch`]: Self::map_partial_batch #[derive(Debug)] pub struct SchemaMapping { - /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. - table_schema: SchemaRef, - /// Mapping from field index in `table_schema` to index in projected file_schema + /// The schema of the table. This is the expected schema after conversion and it should match + /// the schema of the query result. + projected_table_schema: SchemaRef, + /// Mapping from field index in `projected_table_schema` to index in projected file_schema. + /// They are Options instead of just plain `usize`s because the table could have fields that + /// don't exist in the file. field_mappings: Vec>, + /// The entire table schema, as opposed to the projected_table_schema (which only contains the + /// columns that we are projecting out of this query). This contains all fields in the table, + /// regardless of if they will be projected out or not. + table_schema: SchemaRef, } impl SchemaMapper for SchemaMapping { - /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. + /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and + /// conversions. The produced RecordBatch has a schema that contains only the projected + /// columns, so if one needs a RecordBatch with a schema that references columns which are not + /// in the projected, it would be better to use `map_partial_batch` fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); let cols = self - .table_schema + .projected_table_schema + // go through each field in the projected schema .fields() .iter() + // and zip it with the index that maps fields from the projected table schema to the + // projected file schema in `batch` .zip(&self.field_mappings) - .map(|(field, file_idx)| match file_idx { - Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()), - None => Ok(new_null_array(field.data_type(), batch_rows)), + // and for each one... + .map(|(field, file_idx)| { + file_idx.map_or_else( + // If this field only exists in the table, and not in the file, then we know + // that it's null, so just return that. + || Ok(new_null_array(field.data_type(), batch_rows)), + // However, if it does exist in both, then try to cast it to the correct output + // type + |batch_idx| cast(&batch_cols[batch_idx], field.data_type()), + ) }) .collect::, _>>()?; // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let schema = self.table_schema.clone(); + let schema = self.projected_table_schema.clone(); let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } + /// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only + /// contains the fields that exist in both the file schema and table schema. + /// + /// Unlike `map_batch` this method also preserves the columns that + /// may not appear in the final output (`projected_table_schema`) but may + /// appear in push down predicates fn map_partial_batch( &self, batch: RecordBatch, @@ -208,15 +273,33 @@ impl SchemaMapper for SchemaMapping { let batch_cols = batch.columns().to_vec(); let schema = batch.schema(); - let mut cols = vec![]; - let mut fields = vec![]; - for (i, f) in schema.fields().iter().enumerate() { - let table_field = self.table_schema.field_with_name(f.name()); - if let Ok(tf) = table_field { - cols.push(cast(&batch_cols[i], tf.data_type())?); - fields.push(tf.clone()); - } - } + // for each field in the batch's schema (which is based on a file, not a table)... + let (cols, fields) = schema + .fields() + .iter() + .zip(batch_cols.iter()) + .flat_map(|(field, batch_col)| { + self.table_schema + // try to get the same field from the table schema that we have stored in self + .field_with_name(field.name()) + // and if we don't have it, that's fine, ignore it. This may occur when we've + // created an external table whose fields are a subset of the fields in this + // file, then tried to read data from the file into this table. If that is the + // case here, it's fine to ignore because we don't care about this field + // anyways + .ok() + // but if we do have it, + .map(|table_field| { + // try to cast it into the correct output type. we don't want to ignore this + // error, though, so it's propagated. + cast(batch_col, table_field.data_type()) + // and if that works, return the field and column. + .map(|new_col| (new_col, table_field.clone())) + }) + }) + .collect::, _>>()? + .into_iter() + .unzip::<_, _, Vec<_>, Vec<_>>(); // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); @@ -322,12 +405,16 @@ mod tests { } #[derive(Debug)] - struct TestSchemaAdapterFactory {} + struct TestSchemaAdapterFactory; impl SchemaAdapterFactory for TestSchemaAdapterFactory { - fn create(&self, schema: SchemaRef) -> Box { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box { Box::new(TestSchemaAdapter { - table_schema: schema, + table_schema: projected_table_schema, }) } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc35255dfe29..2010a5c66412 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -429,7 +429,7 @@ impl DefaultPhysicalPlanner { Ok(Some(plan)) } - /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan counterpart. + /// Given a single LogicalPlan node, map it to its physical ExecutionPlan counterpart. async fn map_logical_node_to_physical( &self, node: &LogicalPlan, diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index ffa5f17cec14..d1553b3315e7 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -43,8 +43,8 @@ pub enum Volatility { Immutable, /// A stable function may return different values given the same input across different /// queries but must return the same value for a given input within a query. An example of - /// this is the `Now` function. DataFusion - /// will attempt to inline `Stable` functions during planning, when possible. + /// this is the `Now` function. DataFusion will attempt to inline `Stable` functions + /// during planning, when possible. /// For query `select col1, now() from t1`, it might take a while to execute but /// `now()` column will be the same for each output row, which is evaluated /// during planning. diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 1c94c7f3afd3..b3f9b26fa46e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1980,7 +1980,7 @@ impl LogicalPlan { .map(|i| &input_columns[*i]) .collect::>(); // get items from input_columns indexed by list_col_indices - write!(f, "Unnest: lists[{}] structs[{}]", + write!(f, "Unnest: lists[{}] structs[{}]", expr_vec_fmt!(list_type_columns), expr_vec_fmt!(struct_type_columns)) } @@ -2124,11 +2124,13 @@ impl Projection { /// the `Result` will contain the schema; otherwise, it will contain an error. pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result> { let metadata = input.schema().metadata().clone(); - let mut schema = - DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?; - schema = schema.with_functional_dependencies(calc_func_dependencies_for_project( - exprs, input, - )?)?; + + let schema = + DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)? + .with_functional_dependencies(calc_func_dependencies_for_project( + exprs, input, + )?)?; + Ok(Arc::new(schema)) } diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 0623be504b9b..65db164c6e55 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -176,7 +176,7 @@ fn optimize_projections( let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); let schema = aggregate.input.schema(); let necessary_indices = - RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?; + RequiredIndicies::new().with_exprs(schema, all_exprs_iter); let necessary_exprs = necessary_indices.get_required_exprs(schema); return optimize_projections( @@ -216,8 +216,7 @@ fn optimize_projections( // Get all the required column indices at the input, either by the // parent or window expression requirements. - let required_indices = - child_reqs.with_exprs(&input_schema, &new_window_expr)?; + let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr); return optimize_projections( Arc::unwrap_or_clone(window.input), @@ -269,7 +268,6 @@ fn optimize_projections( .map(LogicalPlan::TableScan) .map(Transformed::yes); } - // Other node types are handled below _ => {} }; @@ -761,7 +759,7 @@ fn rewrite_projection_given_requirements( let exprs_used = indices.get_at_indices(&expr); let required_indices = - RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter())?; + RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter()); // rewrite the children projection, and if they are changed rewrite the // projection down @@ -781,8 +779,8 @@ fn rewrite_projection_given_requirements( /// - input schema of the projection, output schema of the projection are same, and /// - all projection expressions are either Column or Literal fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result { - Ok(&projection_schema(input, proj_exprs)? == input.schema() - && proj_exprs.iter().all(is_expr_trivial)) + let proj_schema = projection_schema(input, proj_exprs)?; + Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial)) } #[cfg(test)] diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index a9a18898c82e..60d8ef1a8e6c 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -96,7 +96,7 @@ impl RequiredIndicies { // Add indices of the child fields referred to by the expressions in the // parent plan.apply_expressions(|e| { - self.add_expr(schema, e)?; + self.add_expr(schema, e); Ok(TreeNodeRecursion::Continue) })?; Ok(self.compact()) @@ -111,7 +111,7 @@ impl RequiredIndicies { /// /// * `input_schema`: The input schema to analyze for index requirements. /// * `expr`: An expression for which we want to find necessary field indices. - fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) -> Result<()> { + fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) { // TODO could remove these clones (and visit the expression directly) let mut cols = expr.column_refs(); // Get outer-referenced (subquery) columns: @@ -122,7 +122,6 @@ impl RequiredIndicies { self.indices.push(idx); } } - Ok(()) } /// Adds the indices of the fields referred to by the given expressions @@ -136,14 +135,14 @@ impl RequiredIndicies { self, schema: &DFSchemaRef, exprs: impl IntoIterator, - ) -> Result { + ) -> Self { exprs .into_iter() - .try_fold(self, |mut acc, expr| { - acc.add_expr(schema, expr)?; - Ok(acc) + .fold(self, |mut acc, expr| { + acc.add_expr(schema, expr); + acc }) - .map(|acc| acc.compact()) + .compact() } /// Adds all `indices` into this instance. diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index e115ec3c74fe..236b24dd4094 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -213,8 +213,8 @@ macro_rules! compute_utf8_flag_op_scalar { .downcast_ref::<$ARRAYTYPE>() .expect("compute_utf8_flag_op_scalar failed to downcast array"); - if let ScalarValue::Utf8(Some(string_value))|ScalarValue::LargeUtf8(Some(string_value)) = $RIGHT { - let flag = if $FLAG { Some("i") } else { None }; + if let ScalarValue::Utf8(Some(string_value)) | ScalarValue::LargeUtf8(Some(string_value)) = $RIGHT { + let flag = $FLAG.then_some("i"); let mut array = paste::expr! {[<$OP _utf8_scalar>]}(&ll, &string_value, flag)?; if $NOT { diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index bf15821bca7a..4aad959584ac 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -163,7 +163,10 @@ impl Column { internal_err!( "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}", self.name, - self.index, input_schema.fields.len(), input_schema.fields().iter().map(|f| f.name().clone()).collect::>()) + self.index, + input_schema.fields.len(), + input_schema.fields().iter().map(|f| f.name()).collect::>() + ) } } } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index bf5394ec01de..db94563b7adf 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -362,13 +362,18 @@ impl AsLogicalPlan for LogicalPlanNode { "logical_plan::from_proto() Unsupported file format '{self:?}'" )) })? { - #[cfg(feature = "parquet")] + #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] FileFormatType::Parquet(protobuf::ParquetFormat {options}) => { - let mut parquet = ParquetFormat::default(); - if let Some(options) = options { - parquet = parquet.with_options(options.try_into()?) + #[cfg(feature = "parquet")] + { + let mut parquet = ParquetFormat::default(); + if let Some(options) = options { + parquet = parquet.with_options(options.try_into()?) + } + Arc::new(parquet) } - Arc::new(parquet) + #[cfg(not(feature = "parquet"))] + panic!("Unable to process parquet file since `parquet` feature is not enabled"); } FileFormatType::Csv(protobuf::CsvFormat { options diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index e1cc37091bf7..6abfc71288f9 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -234,30 +234,35 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .with_file_compression_type(FileCompressionType::UNCOMPRESSED) .build(), )), - #[cfg(feature = "parquet")] + #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] PhysicalPlanType::ParquetScan(scan) => { - let base_config = parse_protobuf_file_scan_config( - scan.base_conf.as_ref().unwrap(), - registry, - extension_codec, - )?; - let predicate = scan - .predicate - .as_ref() - .map(|expr| { - parse_physical_expr( - expr, - registry, - base_config.file_schema.as_ref(), - extension_codec, - ) - }) - .transpose()?; - let mut builder = ParquetExec::builder(base_config); - if let Some(predicate) = predicate { - builder = builder.with_predicate(predicate) + #[cfg(feature = "parquet")] + { + let base_config = parse_protobuf_file_scan_config( + scan.base_conf.as_ref().unwrap(), + registry, + extension_codec, + )?; + let predicate = scan + .predicate + .as_ref() + .map(|expr| { + parse_physical_expr( + expr, + registry, + base_config.file_schema.as_ref(), + extension_codec, + ) + }) + .transpose()?; + let mut builder = ParquetExec::builder(base_config); + if let Some(predicate) = predicate { + builder = builder.with_predicate(predicate) + } + Ok(builder.build_arc()) } - Ok(builder.build_arc()) + #[cfg(not(feature = "parquet"))] + panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled") } PhysicalPlanType::AvroScan(scan) => { Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config( @@ -1068,35 +1073,45 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { sort_order, ))) } + #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] PhysicalPlanType::ParquetSink(sink) => { - let input = - into_physical_plan(&sink.input, registry, runtime, extension_codec)?; - - let data_sink: ParquetSink = sink - .sink - .as_ref() - .ok_or_else(|| proto_error("Missing required field in protobuf"))? - .try_into()?; - let sink_schema = input.schema(); - let sort_order = sink - .sort_order - .as_ref() - .map(|collection| { - parse_physical_sort_exprs( - &collection.physical_sort_expr_nodes, - registry, - &sink_schema, - extension_codec, - ) - .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) - }) - .transpose()?; - Ok(Arc::new(DataSinkExec::new( - input, - Arc::new(data_sink), - sink_schema, - sort_order, - ))) + #[cfg(feature = "parquet")] + { + let input = into_physical_plan( + &sink.input, + registry, + runtime, + extension_codec, + )?; + + let data_sink: ParquetSink = sink + .sink + .as_ref() + .ok_or_else(|| proto_error("Missing required field in protobuf"))? + .try_into()?; + let sink_schema = input.schema(); + let sort_order = sink + .sort_order + .as_ref() + .map(|collection| { + parse_physical_sort_exprs( + &collection.physical_sort_expr_nodes, + registry, + &sink_schema, + extension_codec, + ) + .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) + }) + .transpose()?; + Ok(Arc::new(DataSinkExec::new( + input, + Arc::new(data_sink), + sink_schema, + sort_order, + ))) + } + #[cfg(not(feature = "parquet"))] + panic!("Trying to use ParquetSink without `parquet` feature enabled"); } PhysicalPlanType::Unnest(unnest) => { let input = into_physical_plan( @@ -1954,6 +1969,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { }); } + #[cfg(feature = "parquet")] if let Some(sink) = exec.sink().as_any().downcast_ref::() { return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new( diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp new file mode 100644 index 000000000000..00e74a207b33 --- /dev/null +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## Join Tests +########## + +# turn off repartition_joins +statement ok +set datafusion.optimizer.repartition_joins = false; + +include ./join.slt diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index d69662f75da4..24ffb963bbe2 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -81,21 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a; ---- logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST -02)--Projection: t_pushdown.a -03)----Filter: t_pushdown.b > Int32(2) -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] +02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2)] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=8192 -04)------FilterExec: b@1 > 2, projection=[a@0] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], predicate=b@1 > 2, pruning_predicate=CASE WHEN b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END, required_guarantees=[] +03)----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], predicate=b@1 > 2, pruning_predicate=CASE WHEN b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END, required_guarantees=[] # When filter pushdown *is* enabled, ParquetExec can filter exactly, # not just metadata, so we expect to see no FilterExec -# once https://github.com/apache/datafusion/issues/4028 is fixed query T select a from t_pushdown where b > 2 ORDER BY a; ---- @@ -133,16 +127,11 @@ EXPLAIN select a from t_pushdown where b > 2 AND a IS NOT NULL order by a; ---- logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST -02)--Projection: t_pushdown.a -03)----Filter: t_pushdown.b > Int32(2) AND t_pushdown.a IS NOT NULL -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL] +02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=8192 -04)------FilterExec: b@1 > 2 AND a@0 IS NOT NULL, projection=[a@0] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=CASE WHEN b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END AND a_null_count@4 != a_row_count@3, required_guarantees=[] +03)----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=CASE WHEN b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END AND a_null_count@4 != a_row_count@3, required_guarantees=[] query I @@ -155,16 +144,11 @@ EXPLAIN select b from t_pushdown where a = 'bar' order by b; ---- logical_plan 01)Sort: t_pushdown.b ASC NULLS LAST -02)--Projection: t_pushdown.b -03)----Filter: t_pushdown.a = Utf8("bar") -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.a = Utf8("bar")] +02)--TableScan: t_pushdown projection=[b], full_filters=[t_pushdown.a = Utf8("bar")] physical_plan 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST] 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----CoalesceBatchesExec: target_batch_size=8192 -04)------FilterExec: a@0 = bar, projection=[b@1] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 -06)----------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], predicate=a@0 = bar, pruning_predicate=CASE WHEN a_null_count@2 = a_row_count@3 THEN false ELSE a_min@0 <= bar AND bar <= a_max@1 END, required_guarantees=[a in (bar)] +03)----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], predicate=a@0 = bar, pruning_predicate=CASE WHEN a_null_count@2 = a_row_count@3 THEN false ELSE a_min@0 <= bar AND bar <= a_max@1 END, required_guarantees=[a in (bar)] ## cleanup statement ok diff --git a/datafusion/sqllogictest/test_files/repartition.slt b/datafusion/sqllogictest/test_files/repartition.slt index 2d59ad2b5b0e..630674bb09ed 100644 --- a/datafusion/sqllogictest/test_files/repartition.slt +++ b/datafusion/sqllogictest/test_files/repartition.slt @@ -146,4 +146,4 @@ FROM t1 WHERE ((false > (v1 = v1)) IS DISTINCT FROM true); statement ok DROP TABLE t1; -# End repartition on empty columns test \ No newline at end of file +# End repartition on empty columns test diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index bdd8deff1873..05de3e0b8091 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1755,4 +1755,4 @@ SELECT "test.a" FROM (SELECT a AS "test.a" FROM test) 1 statement ok -DROP TABLE test; \ No newline at end of file +DROP TABLE test; diff --git a/datafusion/sqllogictest/test_files/string_view.slt b/datafusion/sqllogictest/test_files/string_view.slt index 7df43bb7eddb..c3b5fa8fc458 100644 --- a/datafusion/sqllogictest/test_files/string_view.slt +++ b/datafusion/sqllogictest/test_files/string_view.slt @@ -1638,4 +1638,3 @@ select column2|| ' ' ||column3 from temp; ---- rust fast datafusion cool - diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 505c66aef058..1f90b94aee11 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -4872,4 +4872,4 @@ query error DataFusion error: Execution error: Expected a signed integer literal SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1; statement ok -DROP TABLE t1; \ No newline at end of file +DROP TABLE t1;