From 15cd7c59271c858ae778cc3ad55141f453f08f8d Mon Sep 17 00:00:00 2001 From: Lordworms Date: Fri, 12 Jul 2024 15:38:14 -0700 Subject: [PATCH 1/2] extract statistics read for struct array in parquet --- .../physical_plan/parquet/statistics.rs | 213 ++++++++++++++---- .../core/tests/parquet/arrow_statistics.rs | 103 ++++++++- datafusion/core/tests/parquet/mod.rs | 55 ++++- 3 files changed, 322 insertions(+), 49 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 59369aba57a9..59e7c0abe49a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -27,7 +27,7 @@ use arrow::{array::ArrayRef, datatypes::DataType}; use arrow_array::{ new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, + Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, StructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, @@ -989,6 +989,11 @@ macro_rules! get_data_page_statistics { } } +fn find_parquet_idx(parquet_schema: &SchemaDescriptor, root_idx: usize) -> Option { + (0..parquet_schema.columns().len()) + .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx) +} + /// Lookups up the parquet column by name /// /// Returns the parquet column index and the corresponding arrow field @@ -998,20 +1003,31 @@ pub(crate) fn parquet_column<'a>( name: &str, ) -> Option<(usize, &'a FieldRef)> { let (root_idx, field) = arrow_schema.fields.find(name)?; - if field.data_type().is_nested() { - // Nested fields are not supported and require non-trivial logic - // to correctly walk the parquet schema accounting for the - // logical type rules - - // - // For example a ListArray could correspond to anything from 1 to 3 levels - // in the parquet schema - return None; + if !field.data_type().is_nested() { + let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?; + return Some((parquet_idx, field)); + } + // Nested field + match field.data_type() { + DataType::Struct(_) => { + let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?; + Some((parquet_idx, field)) + } + _ => { + if field.data_type().is_nested() { + // Nested fields are not supported and require non-trivial logic + // to correctly walk the parquet schema accounting for the + // logical type rules - + // + // For example a ListArray could correspond to anything from 1 to 3 levels + // in the parquet schema + None + } else { + let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?; + Some((parquet_idx, field)) + } + } } - - // This could be made more efficient (#TBD) - let parquet_idx = (0..parquet_schema.columns().len()) - .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; - Some((parquet_idx, field)) } /// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an @@ -1234,7 +1250,87 @@ impl<'a> StatisticsConverter<'a> { arrow_field, }) } + /// recursively get the corresponding statistics for all the column data, used for + /// DataType::Struct + pub(crate) fn get_statistics_min_max_recursive( + metadata: &[&RowGroupMetaData], + index: &mut usize, + is_min: bool, + data_type: &DataType, + ) -> Result { + match data_type.is_nested() { + false => { + let iterator = metadata.iter().map(|meta| { + let stat = meta.column(*index).statistics(); + stat + }); + let stat = if is_min { + min_statistics(data_type, iterator) + } else { + max_statistics(data_type, iterator) + }; + *index += 1; + stat + } + true => { + if let DataType::Struct(fields) = data_type { + let field_arrays: Vec<_> = fields + .iter() + .map(|field| { + let array = Self::get_statistics_min_max_recursive( + metadata, + index, + is_min, + field.data_type(), + )?; + Ok((field.clone(), array)) + }) + .collect::>>()?; + Ok(Arc::new(StructArray::from(field_arrays)) as ArrayRef) + } else { + plan_err!("unsupported nested data type for extracting statistics") + } + } + } + } + /// recursively get the corresponding statistics for all the column data, used for + /// DataType::Struct + pub(crate) fn get_null_counts_recursive( + metadata: &[&RowGroupMetaData], + index: usize, + data_type: &DataType, + ) -> Vec { + if let DataType::Struct(fields) = data_type { + let num_row_groups = metadata.len(); + let mut null_counts = vec![0; num_row_groups]; + + fields.iter().for_each(|field| { + let field_null_counts = Self::get_null_counts_recursive( + metadata, + index + 1, + field.data_type(), + ); + null_counts + .iter_mut() + .zip(field_null_counts) + .for_each(|(acc, count)| { + *acc += count; + }); + }); + null_counts + } else { + metadata + .iter() + .map(|meta| { + meta.column(index) + .statistics() + .map(|s| s.null_count()) + .unwrap_or(0) + }) + .collect() + } + } /// Extract the minimum values from row group statistics in [`RowGroupMetaData`] /// /// # Return Value @@ -1284,11 +1380,22 @@ impl<'a> StatisticsConverter<'a> { let Some(parquet_index) = self.parquet_index else { return Ok(self.make_null_array(data_type, metadatas)); }; - - let iter = metadatas - .into_iter() - .map(|x| x.column(parquet_index).statistics()); - min_statistics(data_type, iter) + let create_iterator = |metadatas: I, parquet_index: usize| { + metadatas + .into_iter() + .map(move |x| x.column(parquet_index).statistics()) + }; + match data_type { + // In a Rowgroup, parquet for nested struct members, + // each one is also stored in the Column of RowGroupMetadata in order. + DataType::Struct(_) => { + let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect(); + Self::get_statistics_min_max_recursive( + &group_vec, &mut 0, true, data_type, + ) + } + _ => min_statistics(data_type, create_iterator(metadatas, parquet_index)), + } } /// Extract the maximum values from row group statistics in [`RowGroupMetaData`] @@ -1304,10 +1411,22 @@ impl<'a> StatisticsConverter<'a> { return Ok(self.make_null_array(data_type, metadatas)); }; - let iter = metadatas - .into_iter() - .map(|x| x.column(parquet_index).statistics()); - max_statistics(data_type, iter) + let create_iterator = |metadatas: I, parquet_index: usize| { + metadatas + .into_iter() + .map(move |x| x.column(parquet_index).statistics()) + }; + match data_type { + // In a Rowgroup, parquet for nested struct members, + // each one is also stored in the Column of RowGroupMetadata in order. + DataType::Struct(_) => { + let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect(); + Self::get_statistics_min_max_recursive( + &group_vec, &mut 0, false, data_type, + ) + } + _ => max_statistics(data_type, create_iterator(metadatas, parquet_index)), + } } /// Extract the null counts from row group statistics in [`RowGroupMetaData`] @@ -1317,18 +1436,33 @@ impl<'a> StatisticsConverter<'a> { where I: IntoIterator, { + let data_type = self.arrow_field.data_type(); + let Some(parquet_index) = self.parquet_index else { let num_row_groups = metadatas.into_iter().count(); return Ok(UInt64Array::from_iter( std::iter::repeat(None).take(num_row_groups), )); }; + let create_iterator = |metadatas: I, parquet_index: usize| { + metadatas + .into_iter() + .map(move |x| x.column(parquet_index).statistics()) + }; - let null_counts = metadatas - .into_iter() - .map(|x| x.column(parquet_index).statistics()) - .map(|s| s.map(|s| s.null_count())); - Ok(UInt64Array::from_iter(null_counts)) + match data_type { + DataType::Struct(_) => { + let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect(); + let null_counts = + Self::get_null_counts_recursive(&group_vec, 0, data_type); + Ok(UInt64Array::from_iter(null_counts)) + } + _ => { + let null_counts = create_iterator(metadatas, parquet_index) + .map(|s| s.map(|s| s.null_count())); + Ok(UInt64Array::from_iter(null_counts)) + } + } } /// Extract the minimum values from Data Page statistics. @@ -1541,10 +1675,10 @@ mod test { use arrow::compute::kernels::cast_utils::Parser; use arrow::datatypes::{i256, Date32Type, Date64Type}; use arrow_array::{ - new_empty_array, new_null_array, Array, BinaryArray, BooleanArray, Date32Array, - Date64Array, Decimal128Array, Decimal256Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, - StringArray, StructArray, TimestampNanosecondArray, + new_empty_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, + Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array, + Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray, + StructArray, TimestampNanosecondArray, }; use arrow_schema::{Field, SchemaRef}; use bytes::Bytes; @@ -1988,7 +2122,7 @@ mod test { #[test] fn roundtrip_struct() { - let mut test = Test { + let test = Test { input: struct_array(vec![ // row group 1 (Some(true), Some(1)), @@ -2005,22 +2139,18 @@ mod test { ]), expected_min: struct_array(vec![ (Some(true), Some(1)), - (Some(true), Some(0)), + (Some(false), Some(0)), (None, None), ]), expected_max: struct_array(vec![ (Some(true), Some(3)), - (Some(true), Some(0)), + (Some(true), Some(5)), (None, None), ]), }; // Due to https://github.com/apache/datafusion/issues/8334, // statistics for struct arrays are not supported - test.expected_min = - new_null_array(test.input.data_type(), test.expected_min.len()); - test.expected_max = - new_null_array(test.input.data_type(), test.expected_min.len()); test.run() } @@ -2376,7 +2506,10 @@ mod test { let row_groups = metadata.row_groups(); for field in schema.fields() { - if field.data_type().is_nested() { + let data_type = field.data_type(); + if field.data_type().is_nested() + && !matches!(data_type, &DataType::Struct(_)) + { let lookup = parquet_column(parquet_schema, &schema, field.name()); assert_eq!(lookup, None); continue; diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 2b4ba0b17133..73cea7aed592 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -32,12 +32,13 @@ use arrow_array::{ make_array, new_null_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, - LargeBinaryArray, LargeStringArray, RecordBatch, StringArray, Time32MillisecondArray, - Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, + LargeBinaryArray, LargeStringArray, RecordBatch, StringArray, StructArray, + Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, + Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, + UInt64Array, UInt8Array, }; -use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use datafusion::datasource::physical_plan::parquet::StatisticsConverter; use half::f16; use parquet::arrow::arrow_reader::{ @@ -250,9 +251,7 @@ impl<'a> Test<'a> { column_name, check, } = self; - let row_groups = reader.metadata().row_groups(); - if check.data_page() { let column_page_index = reader .metadata() @@ -2036,7 +2035,6 @@ async fn test_boolean() { // BUG // https://github.com/apache/datafusion/issues/10609 // Note that: since I have not worked on struct array before, there may be a bug in the test code rather than the real bug in the code -#[ignore] #[tokio::test] async fn test_struct() { // This creates a parquet files of 1 column named "struct" @@ -2058,7 +2056,96 @@ async fn test_struct() { } .run(); } +// test nested struct +#[tokio::test] +async fn test_nested_struct() { + // This creates a parquet file with 1 column named "nested_struct" + // The file is created by 1 record batch with 3 rows in the nested struct array + let reader = TestReader { + scenario: Scenario::StructArrayNested, + row_per_group: 5, + } + .build() + .await; + // Expected minimum and maximum values for nested struct fields + let inner_min = StructArray::from(vec![ + ( + Arc::new(Field::new("b", DataType::Boolean, false)), + Arc::new(BooleanArray::from(vec![Some(false)])) as ArrayRef, + ), + ( + Arc::new(Field::new("c", DataType::Int32, false)), + Arc::new(Int32Array::from(vec![Some(42)])) as ArrayRef, + ), + ]); + let inner_max = StructArray::from(vec![ + ( + Arc::new(Field::new("b", DataType::Boolean, false)), + Arc::new(BooleanArray::from(vec![Some(true)])) as ArrayRef, + ), + ( + Arc::new(Field::new("c", DataType::Int32, false)), + Arc::new(Int32Array::from(vec![Some(44)])) as ArrayRef, + ), + ]); + + let inner_fields = Fields::from(vec![ + Field::new("b", DataType::Boolean, false), + Field::new("c", DataType::Int32, false), + ]); + + // Expected minimum outer struct + let expected_min_outer = StructArray::from(vec![ + ( + Arc::new(Field::new( + "inner_struct", + DataType::Struct(inner_fields.clone()), + false, + )), + Arc::new(inner_min) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_float", DataType::Float64, false)), + Arc::new(Float64Array::from(vec![Some(5.0)])) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_boolean", DataType::Boolean, false)), + Arc::new(BooleanArray::from(vec![Some(false)])) as ArrayRef, + ), + ]); + + // Expected maximum outer struct + let expected_max_outer = StructArray::from(vec![ + ( + Arc::new(Field::new( + "inner_struct", + DataType::Struct(inner_fields), + false, + )), + Arc::new(inner_max) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_float", DataType::Float64, false)), + Arc::new(Float64Array::from(vec![Some(7.0)])) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_boolean", DataType::Boolean, false)), + Arc::new(BooleanArray::from(vec![Some(true)])) as ArrayRef, + ), + ]); + + Test { + reader: &reader, + expected_min: Arc::new(expected_min_outer), + expected_max: Arc::new(expected_max_outer), + expected_null_counts: UInt64Array::from(vec![0]), + expected_row_counts: Some(UInt64Array::from(vec![3])), + column_name: "nested_struct", + check: Check::RowGroup, + } + .run(); +} // UTF8 #[tokio::test] async fn test_utf8() { diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 1b68a4aa4eb3..a0edcabb8d5a 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -34,6 +34,7 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; +use arrow_schema::Fields; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{provider_as_source, TableProvider}, @@ -106,6 +107,7 @@ enum Scenario { WithNullValues, WithNullValuesPageLevel, StructArray, + StructArrayNested, UTF8, } @@ -263,7 +265,6 @@ impl ContextWithParquet { /// Runs the specified SQL query and returns the number of output /// rows and normalized execution metrics async fn query(&mut self, sql: &str) -> TestOutput { - println!("Planning sql {sql}"); let logical_plan = self .ctx .sql(sql) @@ -1238,6 +1239,58 @@ fn create_data_batch(scenario: Scenario) -> Vec { )])); vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()] } + Scenario::StructArrayNested => { + let inner_boolean = Arc::new(BooleanArray::from(vec![false, true, false])); + let inner_int = Arc::new(Int32Array::from(vec![42, 43, 44])); + + let inner_array = StructArray::from(vec![ + ( + Arc::new(Field::new("b", DataType::Boolean, false)), + inner_boolean as ArrayRef, + ), + ( + Arc::new(Field::new("c", DataType::Int32, false)), + inner_int as ArrayRef, + ), + ]); + + let inner_fields = Fields::from(vec![ + Field::new("b", DataType::Boolean, false), + Field::new("c", DataType::Int32, false), + ]); + + let outer_float = Arc::new(Float64Array::from(vec![5.0, 6.0, 7.0])); + let outer_boolean = Arc::new(BooleanArray::from(vec![true, false, true])); + + let outer_struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new( + "inner_struct", + DataType::Struct(inner_fields), + false, + )), + Arc::new(inner_array) as ArrayRef, + ), + ( + Arc::new(Field::new("outer_float", DataType::Float64, false)), + outer_float as ArrayRef, + ), + ( + Arc::new(Field::new("outer_boolean", DataType::Boolean, false)), + outer_boolean as ArrayRef, + ), + ]); + + let schema = Arc::new(Schema::new(vec![Field::new( + "nested_struct", + outer_struct_array.data_type().clone(), + true, + )])); + + vec![ + RecordBatch::try_new(schema, vec![Arc::new(outer_struct_array)]).unwrap(), + ] + } Scenario::Time32Second => { vec![ make_time32_batches( From f2e0a79f1bd3fa2db766aa03c361d9753443b67f Mon Sep 17 00:00:00 2001 From: Lordworms Date: Mon, 15 Jul 2024 11:31:53 -0700 Subject: [PATCH 2/2] refactor code --- .../physical_plan/parquet/statistics.rs | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 59e7c0abe49a..4ded33112b14 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -1302,23 +1302,20 @@ impl<'a> StatisticsConverter<'a> { ) -> Vec { if let DataType::Struct(fields) = data_type { let num_row_groups = metadata.len(); - let mut null_counts = vec![0; num_row_groups]; - - fields.iter().for_each(|field| { - let field_null_counts = Self::get_null_counts_recursive( - metadata, - index + 1, - field.data_type(), - ); - null_counts - .iter_mut() - .zip(field_null_counts) - .for_each(|(acc, count)| { - *acc += count; - }); - }); + fields + .iter() + .fold(vec![0; num_row_groups], |mut acc, field| { + let field_null = Self::get_null_counts_recursive( + metadata, + index + 1, + field.data_type(), + ); - null_counts + acc.iter_mut() + .zip(field_null.iter()) + .for_each(|(a, b)| *a += b); + acc + }) } else { metadata .iter()