Skip to content

Commit

Permalink
fix: Ensure parquet schema arg is propagated to IR (#19084)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Oct 3, 2024
1 parent a796300 commit 0cef5b7
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 40 deletions.
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/read/options.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use arrow::datatypes::ArrowSchemaRef;
use polars_core::schema::SchemaRef;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetOptions {
pub schema: Option<ArrowSchemaRef>,
pub schema: Option<SchemaRef>,
pub parallel: ParallelStrategy,
pub low_memory: bool,
pub use_statistics: bool,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.low_memory,
self.args.cloud_options,
self.args.use_statistics,
self.args.schema.as_deref(),
self.args.schema,
self.args.hive_options,
self.args.glob,
self.args.include_file_paths,
Expand Down
12 changes: 2 additions & 10 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ impl ParquetExec {
// Modified if we have a negative slice
let mut first_source = 0;

let first_schema = self
.options
.schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());
let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Expand Down Expand Up @@ -262,11 +258,7 @@ impl ParquetExec {
eprintln!("POLARS PREFETCH_SIZE: {}", batch_size)
}

let first_schema = self
.options
.schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());
let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,7 @@ impl ParquetSource {
}
let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async();

let first_schema = options
.schema
.clone()
.unwrap_or_else(|| file_info.reader_schema.clone().unwrap().unwrap_left());
let first_schema = file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = file_options.with_columns.as_deref() {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl DslBuilder {
low_memory: bool,
cloud_options: Option<CloudOptions>,
use_statistics: bool,
schema: Option<&Schema>,
schema: Option<SchemaRef>,
hive_options: HiveOptions,
glob: bool,
include_file_paths: Option<PlSmallStr>,
Expand All @@ -109,7 +109,7 @@ impl DslBuilder {
file_options: options,
scan_type: FileScan::Parquet {
options: ParquetOptions {
schema: schema.map(|x| Arc::new(x.to_arrow(CompatLevel::newest()))),
schema,
parallel,
low_memory,
use_statistics,
Expand Down
31 changes: 22 additions & 9 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,31 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
let mut file_info = match &mut scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet {
options,
cloud_options,
metadata,
..
} => {
let (file_info, md) = scans::parquet_file_info(
&sources,
&file_options,
cloud_options.as_ref(),
)
.map_err(|e| e.context(failed_here!(parquet scan)))?;
*metadata = md;
file_info
if let Some(schema) = &options.schema {
// We were passed a schema, we don't have to call `parquet_file_info`,
// but this does mean we don't have `row_estimation` and `first_metadata`.
FileInfo {
schema: schema.clone(),
reader_schema: Some(either::Either::Left(Arc::new(
schema.to_arrow(CompatLevel::newest()),
))),
row_estimation: (None, 0),
}
} else {
let (file_info, md) = scans::parquet_file_info(
&sources,
&file_options,
cloud_options.as_ref(),
)
.map_err(|e| e.context(failed_here!(parquet scan)))?;

*metadata = md;
file_info
}
},
#[cfg(feature = "ipc")]
FileScan::Ipc {
Expand Down
13 changes: 1 addition & 12 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,7 @@ impl ComputeNode for ParquetSourceNode {
eprintln!("[ParquetSource]: {:?}", &self.config);
}

self.schema = Some(
self.options
.schema
.take()
.unwrap_or_else(|| self.file_info.reader_schema.take().unwrap().unwrap_left()),
);

{
// Ensure these are not used anymore
self.options.schema.take();
self.file_info.reader_schema.take();
}
self.schema = Some(self.file_info.reader_schema.take().unwrap().unwrap_left());

self.init_projected_arrow_schema();
self.physical_predicate = self.predicate.clone().map(phys_expr_to_io_expr);
Expand Down
12 changes: 12 additions & 0 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,18 @@ def test_parquet_schema_arg(
pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),
)

# Issue #19081: If a schema arg is passed, ensure its fields are propagated
# to the IR, otherwise even if `allow_missing_columns=True`, downstream
# `select()`s etc. will fail with ColumnNotFound if the column is not in
# the first file.
lf = pl.scan_parquet(
paths, parallel=parallel, schema=schema, allow_missing_columns=True
).select("1")

s = lf.collect(streaming=streaming).to_series()
assert s.len() == 2
assert s.null_count() == 2

# Test files containing extra columns not in `schema`

schema: dict[str, type[pl.DataType]] = {"a": pl.Int64} # type: ignore[no-redef]
Expand Down

0 comments on commit 0cef5b7

Please sign in to comment.