Skip to content

Commit

Permalink
Fix writing of invalid Parquet ColumnIndex when row group contains nu…
Browse files Browse the repository at this point in the history
…ll pages (#6319)

* Fix writing of invalid Parquet ColumnIndex when row group contains null pages

Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>

* fix lint

* more rusty

Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>

* re-enable tests

---------

Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
3 people authored Aug 31, 2024
1 parent 69e5e5f commit acdd27a
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 14 deletions.
77 changes: 69 additions & 8 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,57 @@ mod tests {
true,
)]));

let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
// build row groups / pages that exercise different combinations of nulls and values
// note that below we set the row group and page sizes to 4 and 2 respectively
// so that these "groupings" make sense
let a: ArrayRef = Arc::new(Int32Array::from(vec![
// a row group that has all values
Some(i32::MIN),
Some(-1),
Some(1),
Some(i32::MAX),
// a row group with a page of all nulls and a page of all values
None,
None,
Some(2),
Some(3),
// a row group that has all null pages
None,
None,
None,
None,
// a row group having 1 page with all values and 1 page with some nulls
Some(4),
Some(5),
None,
Some(6),
// a row group having 1 page with all nulls and 1 page with some nulls
None,
None,
Some(7),
None,
// a row group having all pages with some nulls
None,
Some(8),
Some(9),
None,
]));

let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let writer_props = match write_page_index {
true => WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build(),
false => WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build(),
let writer_props_builder = match write_page_index {
true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page),
false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk),
};

// tune the size or pages to the data above
// to make sure we exercise code paths where all items in a page are null, etc.
let writer_props = writer_props_builder
.set_max_row_group_size(4)
.set_data_page_row_count_limit(2)
.set_write_batch_size(2)
.build();

let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
Expand Down Expand Up @@ -610,6 +648,29 @@ mod tests {
decoded_metadata.num_row_groups()
);

// check that the mins and maxes are what we expect for each page
// also indirectly checking that the pages were written out as we expected them to be laid out
// (if they're not, or something gets refactored in the future that breaks that assumption,
// this test may have to drop down to a lower level and create metadata directly instead of relying on
// writing an entire file)
let column_indexes = metadata.metadata.column_index().unwrap();
assert_eq!(column_indexes.len(), 6);
// make sure each row group has 2 pages by checking the first column
// page counts for each column for each row group, should all be the same and there should be
// 12 pages in total across 6 row groups / 1 column
let mut page_counts = vec![];
for row_group in column_indexes {
for column in row_group {
match column {
Index::INT32(column_index) => {
page_counts.push(column_index.indexes.len());
}
_ => panic!("unexpected column index type"),
}
}
}
assert_eq!(page_counts, vec![2; 6]);

metadata
.metadata
.row_groups()
Expand Down
10 changes: 4 additions & 6 deletions parquet/src/file/page_index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,14 @@ impl<T: ParquetValueType> NativeIndex<T> {
let min_values = self
.indexes
.iter()
.map(|x| x.min_bytes().map(|x| x.to_vec()))
.collect::<Option<Vec<_>>>()
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
.map(|x| x.min_bytes().unwrap_or(&[]).to_vec())
.collect::<Vec<_>>();

let max_values = self
.indexes
.iter()
.map(|x| x.max_bytes().map(|x| x.to_vec()))
.collect::<Option<Vec<_>>>()
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
.map(|x| x.max_bytes().unwrap_or(&[]).to_vec())
.collect::<Vec<_>>();

let null_counts = self
.indexes
Expand Down

0 comments on commit acdd27a

Please sign in to comment.