Skip to content

Commit

Permalink
Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabl…
Browse files Browse the repository at this point in the history
…ed (#12135)

* feat: Preemptively filter for pushdown-preventing columns in ListingTable

* Fix behavior to make all previous tests work and lay groundwork for future tests

* fix: Add some more tests and fix small issue with pushdown specificity

* test: Revive unneccesarily removed test

* ci: Fix CI issues with different combinations of exprs

* fix: run fmt

* Fix doc publicity issues

* Add ::new fn for PushdownChecker

* Remove unnecessary 'pub' qualifier

* Fix naming and doc comment of non_pushdown_columns to reflect what it actually does (the opposite) and add back useful comments

* fmt

* Extend FileFormat trait to allow library users to define formats which support pushdown

* fmt

* fix: reference real fn in doc to fix CI

* Minor: Add tests for using FilterExec when parquet was pushed down

* Update datafusion/core/src/datasource/file_format/mod.rs

* Pipe schema information through to TableScan and ParquetExec to facilitate unnecessary FilterExec removal

* - Remove collect::<(_, _)> to satisfy msrv
- Remove expect(_) attr to satisfy msrv
- Update comments with more accurate details and explanations

* Add more details in comments for `map_partial_batch`

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Remove reference to issue #4028 as it will be closed

* Convert normal comments to doc-comments

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Clarify meaning of word `projected` in comment

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Clarify more how `table_schema` is used differently from `projected_table_schema`

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Finish partially-written comment about SchemaMapping struct

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
itsjunetime and alamb authored Sep 17, 2024
1 parent b309525 commit be42f3d
Show file tree
Hide file tree
Showing 25 changed files with 662 additions and 297 deletions.
28 changes: 28 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::physical_plan::{ExecutionPlan, Statistics};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_expr::Expr;
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
Expand Down Expand Up @@ -138,6 +139,33 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}

/// Check if the specified file format has support for pushing down the provided filters within
/// the given schemas. Added initially to support the Parquet file format's ability to do this.
fn supports_filters_pushdown(
&self,
_file_schema: &Schema,
_table_schema: &Schema,
_filters: &[&Expr],
) -> Result<FilePushdownSupport> {
Ok(FilePushdownSupport::NoSupport)
}
}

/// An enum to distinguish between different states when determining if certain filters can be
/// pushed down to file scanning
#[derive(Debug, PartialEq)]
pub enum FilePushdownSupport {
/// The file format/system being asked does not support any sort of pushdown. This should be
/// used even if the file format theoretically supports some sort of pushdown, but it's not
/// enabled or implemented yet.
NoSupport,
/// The file format/system being asked *does* support pushdown, but it can't make it work for
/// the provided filter/expression
NotSupportedForFilter,
/// The file format/system being asked *does* support pushdown and *can* make it work for the
/// provided filter/expression
Supported,
}

/// A container of [FileFormatFactory] which also implements [FileType].
Expand Down
28 changes: 26 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{
coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat,
FileFormatFactory, FileScanConfig,
FileFormatFactory, FilePushdownSupport, FileScanConfig,
};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
Expand All @@ -53,6 +53,7 @@ use datafusion_common::{
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_expr::Expr;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
Expand All @@ -78,7 +79,9 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use crate::datasource::physical_plan::parquet::{
can_expr_be_pushed_down_with_schemas, ParquetExecBuilder,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
Expand Down Expand Up @@ -414,6 +417,27 @@ impl FileFormat for ParquetFormat {
order_requirements,
)) as _)
}

fn supports_filters_pushdown(
&self,
file_schema: &Schema,
table_schema: &Schema,
filters: &[&Expr],
) -> Result<FilePushdownSupport> {
if !self.options().global.pushdown_filters {
return Ok(FilePushdownSupport::NoSupport);
}

let all_supported = filters.iter().all(|filter| {
can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema)
});

Ok(if all_supported {
FilePushdownSupport::Supported
} else {
FilePushdownSupport::NotSupportedForFilter
})
}
}

/// Fetches parquet metadata from ObjectStore for given object
Expand Down
110 changes: 54 additions & 56 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,66 +53,64 @@ use object_store::{ObjectMeta, ObjectStore};
/// was performed
pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.apply(|expr| {
match expr {
Expr::Column(Column { ref name, .. }) => {
is_applicable &= col_names.contains(&name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
} else {
Ok(TreeNodeRecursion::Stop)
}
expr.apply(|expr| match expr {
Expr::Column(Column { ref name, .. }) => {
is_applicable &= col_names.contains(&name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
} else {
Ok(TreeNodeRecursion::Stop)
}
Expr::Literal(_)
| Expr::Alias(_)
| Expr::OuterReferenceColumn(_, _)
| Expr::ScalarVariable(_, _)
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsUnknown(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::IsNotUnknown(_)
| Expr::Negative(_)
| Expr::Cast { .. }
| Expr::TryCast { .. }
| Expr::BinaryExpr { .. }
| Expr::Between { .. }
| Expr::Like { .. }
| Expr::SimilarTo { .. }
| Expr::InList { .. }
| Expr::Exists { .. }
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::GroupingSet(_)
| Expr::Case { .. } => Ok(TreeNodeRecursion::Continue),

Expr::ScalarFunction(scalar_function) => {
match scalar_function.func.signature().volatility {
Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
}
Expr::Literal(_)
| Expr::Alias(_)
| Expr::OuterReferenceColumn(_, _)
| Expr::ScalarVariable(_, _)
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsUnknown(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::IsNotUnknown(_)
| Expr::Negative(_)
| Expr::Cast(_)
| Expr::TryCast(_)
| Expr::BinaryExpr(_)
| Expr::Between(_)
| Expr::Like(_)
| Expr::SimilarTo(_)
| Expr::InList(_)
| Expr::Exists(_)
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::GroupingSet(_)
| Expr::Case(_) => Ok(TreeNodeRecursion::Continue),

Expr::ScalarFunction(scalar_function) => {
match scalar_function.func.signature().volatility {
Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
}
}

// TODO other expressions are not handled yet:
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateFunction { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
// TODO other expressions are not handled yet:
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateFunction { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
}
})
.unwrap();
Expand Down
71 changes: 43 additions & 28 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
//! The table implementation.
use std::collections::HashMap;
use std::str::FromStr;
use std::{any::Any, sync::Arc};
use std::{any::Any, str::FromStr, sync::Arc};

use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
use super::PartitionedFile;
use super::{ListingTableUrl, PartitionedFile};

use super::ListingTableUrl;
use crate::datasource::{create_ordering, get_statistics_with_limit};
use crate::datasource::{
file_format::{file_compression_type::FileCompressionType, FileFormat},
create_ordering,
file_format::{
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
},
get_statistics_with_limit,
physical_plan::{FileScanConfig, FileSinkConfig},
};
use crate::execution::context::SessionState;
Expand All @@ -43,8 +44,9 @@ use datafusion_common::{
config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
SchemaExt, ToDFSchema,
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_execution::cache::{
cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
};
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};
Expand Down Expand Up @@ -817,19 +819,22 @@ impl TableProvider for ListingTable {
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

let filters = if let Some(expr) = conjunction(filters.to_vec()) {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters =
create_physical_expr(&expr, &table_df_schema, state.execution_props())?;
Some(filters)
} else {
None
};
let filters = conjunction(filters.to_vec())
.map(|expr| -> Result<_> {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
state.execution_props(),
)?;
Ok(Some(filters))
})
.unwrap_or(Ok(None))?;

let object_store_url = if let Some(url) = self.table_paths.first() {
url.object_store()
} else {
let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};

Expand All @@ -854,27 +859,37 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(filters
filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|x| x.0.as_str())
.map(|col| col.0.as_str())
.collect::<Vec<_>>(),
filter,
) {
// if filter can be handled by partition pruning, it is exact
TableProviderFilterPushDown::Exact
} else {
// otherwise, we still might be able to handle the filter with file
// level mechanisms such as Parquet row group pruning.
TableProviderFilterPushDown::Inexact
return Ok(TableProviderFilterPushDown::Exact);
}

// if we can't push it down completely with only the filename-based/path-based
// column names, then we should check if we can do parquet predicate pushdown
let supports_pushdown = self.options.format.supports_filters_pushdown(
&self.file_schema,
&self.table_schema,
&[filter],
)?;

if supports_pushdown == FilePushdownSupport::Supported {
return Ok(TableProviderFilterPushDown::Exact);
}

Ok(TableProviderFilterPushDown::Inexact)
})
.collect())
.collect()
}

fn get_table_definition(&self) -> Option<&str> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl FileScanConfig {
(projected_schema, table_stats, projected_output_ordering)
}

#[allow(unused)] // Only used by avro
#[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
pub(crate) fn projected_file_column_names(&self) -> Option<Vec<String>> {
self.projection.as_ref().map(|p| {
p.iter()
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ mod tests {
Field::new("c3", DataType::Float64, true),
]));

let adapter = DefaultSchemaAdapterFactory::default().create(table_schema.clone());
let adapter = DefaultSchemaAdapterFactory
.create(table_schema.clone(), table_schema.clone());

let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Expand Down Expand Up @@ -573,7 +574,7 @@ mod tests {

let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
let adapter = DefaultSchemaAdapterFactory::default().create(schema);
let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone());
let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();

let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use writer::plan_to_parquet;

/// Execution plan for reading one or more Parquet files.
Expand Down Expand Up @@ -405,6 +406,7 @@ impl ParquetExecBuilder {

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();

let cache = ParquetExec::compute_properties(
projected_schema,
&projected_output_ordering,
Expand Down Expand Up @@ -707,7 +709,7 @@ impl ExecutionPlan for ParquetExec {
let schema_adapter_factory = self
.schema_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default()));
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));

let opener = ParquetOpener {
partition_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ impl FileOpener for ParquetOpener {

let projected_schema =
SchemaRef::from(self.table_schema.project(&self.projection)?);
let schema_adapter = self.schema_adapter_factory.create(projected_schema);
let schema_adapter = self
.schema_adapter_factory
.create(projected_schema, self.table_schema.clone());
let predicate = self.predicate.clone();
let pruning_predicate = self.pruning_predicate.clone();
let page_pruning_predicate = self.page_pruning_predicate.clone();
Expand Down
Loading

0 comments on commit be42f3d

Please sign in to comment.