From cc6dd14b5f191b3f8d8d72b6c958f6c0965c8b5a Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 4 Sep 2024 09:53:31 -0400 Subject: [PATCH 01/21] update --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index 735ae7128d5..e270341fb5f 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 735ae7128d571398dd798d7ff004adebeb342883 +Subproject commit e270341fb5f3ff785410e6286cc42898e9d6a99c From 5837fc7e62c221451f6245b2739757e48df43495 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 22 Dec 2024 07:51:44 -0600 Subject: [PATCH 02/21] update --- parquet-testing | 2 +- testing | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-testing b/parquet-testing index 4439a223a31..1ba34478f53 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 4439a223a315cf874746d3b5da25e6a6b2a2b16e +Subproject commit 1ba34478f535c89382263c42c675a9af4f57f2dd diff --git a/testing b/testing index e270341fb5f..735ae7128d5 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit e270341fb5f3ff785410e6286cc42898e9d6a99c +Subproject commit 735ae7128d571398dd798d7ff004adebeb342883 From fec63139ba7e3e6a1b9f30d312a189122a7b674b Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sat, 28 Dec 2024 12:57:00 -0600 Subject: [PATCH 03/21] update --- arrow-buffer/src/buffer/boolean.rs | 45 ++++ parquet/src/arrow/array_reader/builder.rs | 6 + .../src/arrow/array_reader/primitive_array.rs | 13 +- .../src/arrow/array_reader/struct_array.rs | 2 +- parquet/src/arrow/arrow_reader/filter.rs | 2 +- parquet/src/arrow/arrow_reader/selection.rs | 6 + .../src/arrow/async_reader/arrow_reader.rs | 244 ++++++++++++++++++ parquet/src/arrow/async_reader/mod.rs | 1 + 8 files changed, 313 insertions(+), 6 deletions(-) create mode 100644 parquet/src/arrow/async_reader/arrow_reader.rs diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs index aaa86832f69..4e42d3c27e1 100644 --- a/arrow-buffer/src/buffer/boolean.rs +++ b/arrow-buffer/src/buffer/boolean.rs @@ -204,6 +204,51 @@ impl BooleanBuffer { pub fn set_slices(&self) -> BitSliceIterator<'_> { BitSliceIterator::new(self.values(), self.offset, self.len) } + + /// Combines this [`BooleanBuffer`] with another using logical AND on the selected bits. + /// + /// Unlike intersection, the `other` [`BooleanBuffer`] must have exactly as many **set bits** as `self`, + /// i.e., self.count_set_bits() == other.len(). + /// + /// This method will keep only the bits in `self` that are also set in `other` + /// at the positions corresponding to `self`'s set bits. + /// For example: + /// self: NNYYYNNYYNYN + /// other: YNY NY N + /// result: NNYNYNNNYNNN + pub fn and_then(&self, other: &Self) -> Self { + // Ensure that 'other' has exactly as many set bits as 'self' + debug_assert_eq!( + self.count_set_bits(), + other.len(), + "The 'other' selection must have exactly as many set bits as 'self'." + ); + + if self.len() == other.len() { + // fast path if the two bool masks are the same length + // this happens when self selects all rows + debug_assert_eq!(self.count_set_bits(), self.len()); + return other.clone(); + } + + let mut buffer = MutableBuffer::from_len_zeroed(self.values().len()); + buffer.copy_from_slice(self.values()); + let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.len()); + + // Create iterators for 'self' and 'other' bits + let mut other_bits = other.iter(); + + for bit_idx in self.set_indices() { + let predicate = other_bits + .next() + .expect("Mismatch in set bits between self and other"); + if !predicate { + builder.set_bit(bit_idx, false); + } + } + + builder.finish() + } } impl Not for &BooleanBuffer { diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 945f62526a7..23f77a9ab96 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -245,6 +245,7 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::INT32 => { if let Some(DataType::Null) = arrow_type { @@ -257,6 +258,7 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, + col_idx, )?) as _ } } @@ -264,21 +266,25 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::INT96 => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, + col_idx, )?) as _, PhysicalType::BYTE_ARRAY => match arrow_type { Some(DataType::Dictionary(_, _)) => { diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index a952e00e12e..529513ce200 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -80,6 +80,7 @@ where def_levels_buffer: Option>, rep_levels_buffer: Option>, record_reader: RecordReader, + column_idx: usize, } impl PrimitiveArrayReader @@ -93,6 +94,7 @@ where pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, + column_idx: usize, ) -> Result { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -110,6 +112,7 @@ where def_levels_buffer: None, rep_levels_buffer: None, record_reader, + column_idx, }) } } @@ -371,6 +374,7 @@ mod tests { Box::::default(), schema.column(0), None, + 0, ) .unwrap(); @@ -414,7 +418,7 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) .unwrap(); // Read first 50 values, which are all from the first column chunk @@ -484,6 +488,7 @@ mod tests { Box::new(page_iterator), column_desc.clone(), None, + 0, ) .expect("Unable to get array reader"); @@ -620,7 +625,7 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) .unwrap(); let mut accu_len: usize = 0; @@ -696,7 +701,7 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) .unwrap(); // read data from the reader @@ -755,7 +760,7 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) .unwrap(); // read data from the reader diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs index fb2f2f8928b..e048fbae66f 100644 --- a/parquet/src/arrow/array_reader/struct_array.rs +++ b/parquet/src/arrow/array_reader/struct_array.rs @@ -25,7 +25,7 @@ use std::sync::Arc; /// Implementation of struct array reader. pub struct StructArrayReader { - children: Vec>, + pub children: Vec>, data_type: ArrowType, struct_def_level: i16, struct_rep_level: i16, diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index 2e22f7e01cf..931e13e252f 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -110,7 +110,7 @@ where /// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection pub struct RowFilter { /// A list of [`ArrowPredicate`] - pub(crate) predicates: Vec>, + pub predicates: Vec>, } impl RowFilter { diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 378d2253f19..92d7eab1e58 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -366,6 +366,12 @@ impl RowSelection { self } + /// Returns the internal selectors of this [`RowSelection`], testing only + #[cfg(test)] + pub(crate) fn selectors(&self) -> &[RowSelector] { + &self.selectors + } + /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows pub(crate) fn offset(mut self, offset: usize) -> Self { if offset == 0 { diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs new file mode 100644 index 00000000000..a2497fc9adc --- /dev/null +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::VecDeque, sync::Arc}; + +use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader, StructArray}; +use arrow_schema::{ArrowError, SchemaRef}; +use arrow_select::filter::prep_null_mask_filter; + +use crate::arrow::{ + array_reader::{build_array_reader, ArrayReader, RowGroups, StructArrayReader}, + arrow_reader::{ArrowPredicate, RowFilter, RowSelection, RowSelector}, +}; +use crate::errors::ParquetError; + +use super::ParquetField; + +pub struct FilteredParquetRecordBatchReader { + batch_size: usize, + array_reader: StructArrayReader, + predicate_readers: Vec>, + schema: SchemaRef, + selection: VecDeque, + row_filter: Option, +} + +fn read_selection( + reader: &mut dyn ArrayReader, + selection: &RowSelection, +) -> Result { + for selector in selection.iter() { + if selector.skip { + let skipped = reader.skip_records(selector.row_count)?; + debug_assert_eq!(skipped, selector.row_count, "failed to skip rows"); + } else { + let read_records = reader.read_records(selector.row_count)?; + debug_assert_eq!(read_records, selector.row_count, "failed to read rows"); + } + } + let array = reader.consume_batch()?; + let struct_array = array + .as_struct_opt() + .ok_or_else(|| general_err!("Struct array reader should return struct array"))?; + Ok(struct_array.clone()) +} + +/// Take the next selection from the selection queue, and return the selection +/// whose selected row count is to_select or less (if input selection is exhausted). +fn take_next_selection( + selection: &mut VecDeque, + to_select: usize, +) -> Option { + let mut current_selected = 0; + let mut rt = Vec::new(); + while let Some(front) = selection.pop_front() { + if front.skip { + rt.push(front); + continue; + } + + if current_selected + front.row_count <= to_select { + rt.push(front); + current_selected += front.row_count; + } else { + let select = to_select - current_selected; + let remaining = front.row_count - select; + rt.push(RowSelector::select(select)); + selection.push_front(RowSelector::select(remaining)); + + return Some(rt.into()); + } + } + if !rt.is_empty() { + return Some(rt.into()); + } + None +} + +fn build_array_reader_for_filters( + filters: &RowFilter, + fields: &Option>, + row_group: &dyn RowGroups, +) -> Result>, ArrowError> { + let mut array_readers = Vec::new(); + for predicate in filters.predicates.iter() { + let predicate_projection = predicate.projection(); + let array_reader = build_array_reader(fields.as_deref(), predicate_projection, row_group)?; + array_readers.push(array_reader); + } + Ok(array_readers) +} + +impl FilteredParquetRecordBatchReader { + fn new(batch_size: usize, array_reader: StructArrayReader, selection: RowSelection) -> Self { + todo!() + } + + fn build_predicate_filter( + &mut self, + selection: &RowSelection, + ) -> Result { + match &mut self.row_filter { + None => Ok(selection.clone()), + Some(filter) => { + debug_assert_eq!( + self.predicate_readers.len(), + filter.predicates.len(), + "predicate readers and predicates should have the same length" + ); + let mut selection = selection.clone(); + + for (predicate, reader) in filter + .predicates + .iter_mut() + .zip(self.predicate_readers.iter_mut()) + { + let array = read_selection(reader.as_mut(), &selection)?; + let batch = RecordBatch::from(array); + let input_rows = batch.num_rows(); + let predicate_filter = predicate.evaluate(batch)?; + if predicate_filter.len() != input_rows { + return Err(ArrowError::ParquetError(format!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + predicate_filter.len() + ))); + } + let predicate_filter = match predicate_filter.null_count() { + 0 => predicate_filter, + _ => prep_null_mask_filter(&predicate_filter), + }; + let raw = RowSelection::from_filters(&[predicate_filter]); + selection = selection.and_then(&raw); + } + Ok(selection) + } + } + } +} + +impl Iterator for FilteredParquetRecordBatchReader { + type Item = Result; + + fn next(&mut self) -> Option { + let selection = take_next_selection(&mut self.selection, self.batch_size)?; + let filtered_selection = match self.build_predicate_filter(&selection) { + Ok(selection) => selection, + Err(e) => return Some(Err(e)), + }; + + let rt = read_selection(&mut self.array_reader, &filtered_selection); + match rt { + Ok(array) => Some(Ok(RecordBatch::from(array))), + Err(e) => Some(Err(e.into())), + } + } +} + +impl RecordBatchReader for FilteredParquetRecordBatchReader { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_take_next_selection_exact_match() { + let mut queue = VecDeque::from(vec![ + RowSelector::skip(5), + RowSelector::select(3), + RowSelector::skip(2), + RowSelector::select(7), + ]); + + // Request exactly 10 rows (5 skip + 3 select + 2 skip) + let selection = take_next_selection(&mut queue, 3).unwrap(); + assert_eq!( + selection, + vec![ + RowSelector::skip(5), + RowSelector::select(3), + RowSelector::skip(2) + ] + .into() + ); + + // Check remaining queue + assert_eq!(queue.len(), 1); + assert_eq!(queue[0].row_count, 7); + assert_eq!(queue[0].skip, false); + } + + #[test] + fn test_take_next_selection_split_required() { + let mut queue = VecDeque::from(vec![RowSelector::select(10), RowSelector::select(10)]); + + // Request 15 rows, which should split the first selector + let selection = take_next_selection(&mut queue, 15).unwrap(); + + assert_eq!( + selection, + vec![RowSelector::select(10), RowSelector::select(5)].into() + ); + + // Check remaining queue - should have 5 rows from split and original 10 + assert_eq!(queue.len(), 1); + assert_eq!(queue[0].skip, false); + assert_eq!(queue[0].row_count, 5); + } + + #[test] + fn test_take_next_selection_empty_queue() { + let mut queue = VecDeque::new(); + + // Should return None for empty queue + let selection = take_next_selection(&mut queue, 10); + assert!(selection.is_none()); + + // Test with queue that becomes empty + queue.push_back(RowSelector::select(5)); + let selection = take_next_selection(&mut queue, 10).unwrap(); + assert_eq!(selection, vec![RowSelector::select(5)].into()); + + // Queue should now be empty + let selection = take_next_selection(&mut queue, 10); + assert!(selection.is_none()); + } +} diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 4f3befe4266..2b31dcd9163 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -110,6 +110,7 @@ use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::file::FOOTER_SIZE; use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; +mod arrow_reader; mod metadata; pub use metadata::*; From 948db872bf622f623e11ef26b1d185614ef32886 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 29 Dec 2024 15:46:03 -0600 Subject: [PATCH 04/21] update --- .../src/arrow/array_reader/primitive_array.rs | 41 ++-- parquet/src/arrow/arrow_reader/selection.rs | 10 +- .../src/arrow/async_reader/arrow_reader.rs | 195 +++++++++++++++--- parquet/src/arrow/async_reader/mod.rs | 91 ++++---- parquet/src/file/serialized_reader.rs | 64 +++++- testing | 2 +- 6 files changed, 309 insertions(+), 94 deletions(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 529513ce200..c665cee5bf2 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -80,6 +80,7 @@ where def_levels_buffer: Option>, rep_levels_buffer: Option>, record_reader: RecordReader, + #[allow(unused)] column_idx: usize, } @@ -417,9 +418,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 0, + ) + .unwrap(); // Read first 50 values, which are all from the first column chunk let array = array_reader.next_batch(50).unwrap(); @@ -624,9 +629,13 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 0, + ) + .unwrap(); let mut accu_len: usize = 0; @@ -700,9 +709,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 0, + ) + .unwrap(); // read data from the reader // the data type is decimal(8,2) @@ -759,9 +772,13 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None, 0) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + 0, + ) + .unwrap(); // read data from the reader // the data type is decimal(18,4) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 92d7eab1e58..f83724a3841 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -366,12 +366,6 @@ impl RowSelection { self } - /// Returns the internal selectors of this [`RowSelection`], testing only - #[cfg(test)] - pub(crate) fn selectors(&self) -> &[RowSelector] { - &self.selectors - } - /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows pub(crate) fn offset(mut self, offset: usize) -> Self { if offset == 0 { @@ -447,6 +441,10 @@ impl RowSelection { pub fn skipped_row_count(&self) -> usize { self.iter().filter(|s| s.skip).map(|s| s.row_count).sum() } + + pub(crate) fn extend(&mut self, other: Self) { + self.selectors.extend(other.selectors); + } } impl From> for RowSelection { diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index a2497fc9adc..04cc115ce39 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -15,23 +15,27 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; +use std::sync::RwLock; use std::{collections::VecDeque, sync::Arc}; use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader, StructArray}; -use arrow_schema::{ArrowError, SchemaRef}; +use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; -use crate::arrow::{ - array_reader::{build_array_reader, ArrayReader, RowGroups, StructArrayReader}, - arrow_reader::{ArrowPredicate, RowFilter, RowSelection, RowSelector}, -}; +use crate::column::page::{Page, PageMetadata, PageReader}; use crate::errors::ParquetError; - -use super::ParquetField; +use crate::{ + arrow::{ + array_reader::ArrayReader, + arrow_reader::{RowFilter, RowSelection, RowSelector}, + }, + file::reader::{ChunkReader, SerializedPageReader}, +}; pub struct FilteredParquetRecordBatchReader { batch_size: usize, - array_reader: StructArrayReader, + array_reader: Box, predicate_readers: Vec>, schema: SchemaRef, selection: VecDeque, @@ -90,38 +94,47 @@ fn take_next_selection( None } -fn build_array_reader_for_filters( - filters: &RowFilter, - fields: &Option>, - row_group: &dyn RowGroups, -) -> Result>, ArrowError> { - let mut array_readers = Vec::new(); - for predicate in filters.predicates.iter() { - let predicate_projection = predicate.projection(); - let array_reader = build_array_reader(fields.as_deref(), predicate_projection, row_group)?; - array_readers.push(array_reader); - } - Ok(array_readers) -} - impl FilteredParquetRecordBatchReader { - fn new(batch_size: usize, array_reader: StructArrayReader, selection: RowSelection) -> Self { - todo!() + pub(crate) fn new( + batch_size: usize, + array_reader: Box, + selection: RowSelection, + filter_readers: Vec>, + row_filter: Option, + ) -> Self { + let schema = match array_reader.get_data_type() { + DataType::Struct(ref fields) => Schema::new(fields.clone()), + _ => unreachable!("Struct array reader's data type is not struct!"), + }; + + Self { + batch_size, + array_reader, + predicate_readers: filter_readers, + schema: Arc::new(schema), + selection: selection.into(), + row_filter, + } + } + + pub(crate) fn take_filter(&mut self) -> Option { + self.row_filter.take() } + #[inline(never)] + /// Take a selection, and return the new selection where the rows are filtered by the predicate. fn build_predicate_filter( &mut self, - selection: &RowSelection, + mut selection: RowSelection, ) -> Result { match &mut self.row_filter { - None => Ok(selection.clone()), + None => Ok(selection), Some(filter) => { debug_assert_eq!( self.predicate_readers.len(), filter.predicates.len(), "predicate readers and predicates should have the same length" ); - let mut selection = selection.clone(); for (predicate, reader) in filter .predicates @@ -155,13 +168,36 @@ impl Iterator for FilteredParquetRecordBatchReader { type Item = Result; fn next(&mut self) -> Option { - let selection = take_next_selection(&mut self.selection, self.batch_size)?; - let filtered_selection = match self.build_predicate_filter(&selection) { - Ok(selection) => selection, - Err(e) => return Some(Err(e)), - }; + // With filter pushdown, it's very hard to predict the number of rows to return -- depends on the selectivity of the filter. + // We can do one of the following: + // 1. Add a coalescing step to coalesce the resulting batches. + // 2. Ask parquet reader to collect more rows before returning. - let rt = read_selection(&mut self.array_reader, &filtered_selection); + // Approach 1 has the drawback of extra overhead of coalesce batch, which can be painful to be efficient. + // Code below implements approach 2, where we keep consuming the selection until we select at least 3/4 of the batch size. + // It boils down to leveraging array_reader's ability to collect large batches natively, + // rather than concatenating multiple small batches. + + let mut selection = RowSelection::default(); + let mut selected = 0; + while let Some(cur_selection) = + take_next_selection(&mut self.selection, self.batch_size - selected) + { + let filtered_selection = match self.build_predicate_filter(cur_selection) { + Ok(selection) => selection, + Err(e) => return Some(Err(e)), + }; + selected += filtered_selection.row_count(); + selection.extend(filtered_selection); + if selected >= (self.batch_size / 4 * 3) { + break; + } + } + if !selection.selects_any() { + return None; + } + + let rt = read_selection(&mut *self.array_reader, &selection); match rt { Ok(array) => Some(Ok(RecordBatch::from(array))), Err(e) => Some(Err(e.into())), @@ -175,6 +211,99 @@ impl RecordBatchReader for FilteredParquetRecordBatchReader { } } +struct PageCacheInner { + queue: VecDeque, + pages: HashMap, +} + +/// A simple FIFO cache for pages. +pub(crate) struct PageCache { + inner: RwLock, +} + +impl PageCache { + const CAPACITY: usize = 16; + + pub(crate) fn new() -> Self { + Self { + inner: RwLock::new(PageCacheInner { + queue: VecDeque::with_capacity(Self::CAPACITY), + pages: HashMap::with_capacity(Self::CAPACITY), + }), + } + } + + pub(crate) fn get_page(&self, offset: usize) -> Option { + let read_lock = self.inner.read().unwrap(); + read_lock.pages.get(&offset).cloned() + } + + pub(crate) fn insert_page(&self, offset: usize, page: Page) { + let mut write_lock = self.inner.write().unwrap(); + if write_lock.pages.len() >= Self::CAPACITY { + let oldest_offset = write_lock.queue.pop_front().unwrap(); + write_lock.pages.remove(&oldest_offset).unwrap(); + } + write_lock.pages.insert(offset, page); + write_lock.queue.push_back(offset); + } +} + +pub(crate) struct CachedPageReader { + inner: SerializedPageReader, + cache: Arc, +} + +impl CachedPageReader { + pub(crate) fn new(inner: SerializedPageReader, cache: Arc) -> Self { + Self { inner, cache } + } +} + +impl Iterator for CachedPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +impl PageReader for CachedPageReader { + fn get_next_page(&mut self) -> Result, ParquetError> { + // self.inner.get_next_page() + let next_page_offset = self.inner.peek_next_page_offset()?; + + let Some(offset) = next_page_offset else { + return Ok(None); + }; + + let page = self.cache.get_page(offset); + if let Some(page) = page { + self.inner.skip_next_page()?; + Ok(Some(page)) + } else { + let inner_page = self.inner.get_next_page()?; + let Some(inner_page) = inner_page else { + return Ok(None); + }; + self.cache.insert_page(offset, inner_page.clone()); + Ok(Some(inner_page)) + } + } + + fn peek_next_page(&mut self) -> Result, ParquetError> { + self.inner.peek_next_page() + } + + fn skip_next_page(&mut self) -> Result<(), ParquetError> { + self.inner.skip_next_page() + } + + fn at_record_boundary(&mut self) -> Result { + self.inner.at_record_boundary() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 2b31dcd9163..3ee85a26fff 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -83,6 +83,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use arrow_reader::{CachedPageReader, FilteredParquetRecordBatchReader, PageCache}; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -94,8 +95,7 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ - apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, - ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, + ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; @@ -121,6 +121,8 @@ use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; +use super::arrow_reader::RowSelector; + /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files /// /// Notes: @@ -470,7 +472,7 @@ impl ParquetRecordBatchStreamBuilder { } } -type ReadResult = Result<(ReaderFactory, Option)>; +type ReadResult = Result<(ReaderFactory, Option)>; /// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create /// [`ParquetRecordBatchReader`] @@ -498,18 +500,16 @@ where async fn read_row_group( mut self, row_group_idx: usize, - mut selection: Option, + selection: Option, projection: ProjectionMask, batch_size: usize, ) -> ReadResult { - // TODO: calling build_array multiple times is wasteful - let meta = self.metadata.row_group(row_group_idx); let offset_index = self .metadata .offset_index() // filter out empty offset indexes (old versions specified Some(vec![]) when no present) - .filter(|index| !index.is_empty()) + .filter(|index| index.first().map(|v| !v.is_empty()).unwrap_or(false)) .map(|x| x[row_group_idx].as_slice()); let mut row_group = InMemoryRowGroup { @@ -518,48 +518,47 @@ where row_count: meta.num_rows() as usize, column_chunks: vec![None; meta.columns().len()], offset_index, + cache: Arc::new(PageCache::new()), }; + let mut selection = + selection.unwrap_or_else(|| vec![RowSelector::select(row_group.row_count)].into()); + + let mut filter_readers = Vec::new(); if let Some(filter) = self.filter.as_mut() { for predicate in filter.predicates.iter_mut() { - if !selects_any(selection.as_ref()) { + if !selection.selects_any() { return Ok((self, None)); } let predicate_projection = predicate.projection(); row_group - .fetch(&mut self.input, predicate_projection, selection.as_ref()) + .fetch(&mut self.input, predicate_projection, Some(&selection)) .await?; let array_reader = build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; - - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); + filter_readers.push(array_reader); } } // Compute the number of rows in the selection before applying limit and offset - let rows_before = selection - .as_ref() - .map(|s| s.row_count()) - .unwrap_or(row_group.row_count); + let rows_before = selection.row_count(); if rows_before == 0 { return Ok((self, None)); } - selection = apply_range(selection, row_group.row_count, self.offset, self.limit); + if let Some(offset) = self.offset { + selection = selection.offset(offset); + } + + if let Some(limit) = self.limit { + selection = selection.limit(limit); + } // Compute the number of rows in the selection after applying limit and offset - let rows_after = selection - .as_ref() - .map(|s| s.row_count()) - .unwrap_or(row_group.row_count); + let rows_after = selection.row_count(); // Update offset if necessary if let Some(offset) = &mut self.offset { @@ -577,13 +576,16 @@ where } row_group - .fetch(&mut self.input, &projection, selection.as_ref()) + .fetch(&mut self.input, &projection, Some(&selection)) .await?; - let reader = ParquetRecordBatchReader::new( + let array_reader = build_array_reader(self.fields.as_deref(), &projection, &row_group)?; + let reader = FilteredParquetRecordBatchReader::new( batch_size, - build_array_reader(self.fields.as_deref(), &projection, &row_group)?, + array_reader, selection, + filter_readers, + self.filter.take(), ); Ok((self, Some(reader))) @@ -594,7 +596,7 @@ enum StreamState { /// At the start of a new row group, or the end of the parquet stream Init, /// Decoding a batch - Decoding(ParquetRecordBatchReader), + Decoding(FilteredParquetRecordBatchReader), /// Reading data from input Reading(BoxFuture<'static, ReadResult>), /// Error @@ -739,7 +741,12 @@ where self.state = StreamState::Error; return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string())))); } - None => self.state = StreamState::Init, + None => { + // this is ugly, but works for now. + let filter = batch_reader.take_filter(); + self.reader.as_mut().unwrap().filter = filter; + self.state = StreamState::Init + } }, StreamState::Init => { let row_group_idx = match self.row_groups.pop_front() { @@ -791,6 +798,7 @@ struct InMemoryRowGroup<'a> { offset_index: Option<&'a [OffsetIndexMetaData]>, column_chunks: Vec>>, row_count: usize, + cache: Arc, } impl InMemoryRowGroup<'_> { @@ -902,12 +910,23 @@ impl RowGroups for InMemoryRowGroup<'_> { // filter out empty offset indexes (old versions specified Some(vec![]) when no present) .filter(|index| !index.is_empty()) .map(|index| index[i].page_locations.clone()); - let page_reader: Box = Box::new(SerializedPageReader::new( - data.clone(), - self.metadata.column(i), - self.row_count, - page_locations, - )?); + + // let page_reader: Box = Box::new(SerializedPageReader::new( + // data.clone(), + // self.metadata.column(i), + // self.row_count, + // page_locations, + // )?); + + let page_reader: Box = Box::new(CachedPageReader::new( + SerializedPageReader::new( + data.clone(), + self.metadata.column(i), + self.row_count, + page_locations, + )?, + self.cache.clone(), + )); Ok(Box::new(ColumnChunkIterator { reader: Some(Ok(page_reader)), diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index a942481f7e4..146b16f3842 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -371,7 +371,7 @@ fn read_page_header_len(input: &mut T) -> Result<(usize, PageHeader)> { /// Decodes a [`Page`] from the provided `buffer` pub(crate) fn decode_page( page_header: PageHeader, - buffer: Bytes, + buffer: Vec, physical_type: Type, decompressor: Option<&mut Box>, ) -> Result { @@ -406,8 +406,8 @@ pub(crate) fn decode_page( Some(decompressor) if can_decompress => { let uncompressed_size = page_header.uncompressed_page_size as usize; let mut decompressed = Vec::with_capacity(uncompressed_size); - let compressed = &buffer.as_ref()[offset..]; - decompressed.extend_from_slice(&buffer.as_ref()[..offset]); + let compressed = &buffer[offset..]; + decompressed.extend_from_slice(&buffer[..offset]); decompressor.decompress( compressed, &mut decompressed, @@ -422,10 +422,11 @@ pub(crate) fn decode_page( )); } - Bytes::from(decompressed) + decompressed } _ => buffer, }; + let buffer = Bytes::from(buffer); let result = match page_header.type_ { PageType::DICTIONARY_PAGE => { @@ -568,6 +569,57 @@ impl SerializedPageReader { physical_type: meta.column_type(), }) } + + pub(crate) fn peek_next_page_offset(&mut self) -> Result> { + match &mut self.state { + SerializedPageReaderState::Values { + offset, + remaining_bytes, + next_page_header, + } => { + loop { + if *remaining_bytes == 0 { + return Ok(None); + } + return if let Some(header) = next_page_header.as_ref() { + if let Ok(_page_meta) = PageMetadata::try_from(&**header) { + Ok(Some(*offset)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + *next_page_header = None; + continue; + } + } else { + let mut read = self.reader.get_read(*offset as u64)?; + let (header_len, header) = read_page_header_len(&mut read)?; + *offset += header_len; + *remaining_bytes -= header_len; + let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) { + Ok(Some(*offset)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + }; + *next_page_header = Some(Box::new(header)); + page_meta + }; + } + } + SerializedPageReaderState::Pages { + page_locations, + dictionary_page, + .. + } => { + if let Some(page) = dictionary_page { + Ok(Some(page.offset as usize)) + } else if let Some(page) = page_locations.front() { + Ok(Some(page.offset as usize)) + } else { + Ok(None) + } + } + } + } } impl Iterator for SerializedPageReader { @@ -648,7 +700,7 @@ impl PageReader for SerializedPageReader { decode_page( header, - Bytes::from(buffer), + buffer, self.physical_type, self.decompressor.as_mut(), )? @@ -677,7 +729,7 @@ impl PageReader for SerializedPageReader { let bytes = buffer.slice(offset..); decode_page( header, - bytes, + bytes.to_vec(), self.physical_type, self.decompressor.as_mut(), )? diff --git a/testing b/testing index 735ae7128d5..4d209492d51 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 735ae7128d571398dd798d7ff004adebeb342883 +Subproject commit 4d209492d514c2d3cb2d392681b9aa00e6d8da1c From 8c50d90b98a10419c57b362621f68a8db147e70c Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 30 Dec 2024 09:58:04 -0600 Subject: [PATCH 05/21] poc reader --- parquet/src/arrow/arrow_reader/selection.rs | 4 - .../src/arrow/async_reader/arrow_reader.rs | 86 ++++++++++++------- parquet/src/arrow/async_reader/mod.rs | 30 ++++--- 3 files changed, 71 insertions(+), 49 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index f83724a3841..378d2253f19 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -441,10 +441,6 @@ impl RowSelection { pub fn skipped_row_count(&self) -> usize { self.iter().filter(|s| s.skip).map(|s| s.row_count).sum() } - - pub(crate) fn extend(&mut self, other: Self) { - self.selectors.extend(other.selectors); - } } impl From> for RowSelection { diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index 04cc115ce39..f2f681cc7d3 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -19,7 +19,8 @@ use std::collections::HashMap; use std::sync::RwLock; use std::{collections::VecDeque, sync::Arc}; -use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader, StructArray}; +use arrow_array::ArrayRef; +use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; @@ -45,7 +46,7 @@ pub struct FilteredParquetRecordBatchReader { fn read_selection( reader: &mut dyn ArrayReader, selection: &RowSelection, -) -> Result { +) -> Result { for selector in selection.iter() { if selector.skip { let skipped = reader.skip_records(selector.row_count)?; @@ -55,11 +56,7 @@ fn read_selection( debug_assert_eq!(read_records, selector.row_count, "failed to read rows"); } } - let array = reader.consume_batch()?; - let struct_array = array - .as_struct_opt() - .ok_or_else(|| general_err!("Struct array reader should return struct array"))?; - Ok(struct_array.clone()) + reader.consume_batch() } /// Take the next selection from the selection queue, and return the selection @@ -142,7 +139,9 @@ impl FilteredParquetRecordBatchReader { .zip(self.predicate_readers.iter_mut()) { let array = read_selection(reader.as_mut(), &selection)?; - let batch = RecordBatch::from(array); + let batch = RecordBatch::from(array.as_struct_opt().ok_or_else(|| { + general_err!("Struct array reader should return struct array") + })?); let input_rows = batch.num_rows(); let predicate_filter = predicate.evaluate(batch)?; if predicate_filter.len() != input_rows { @@ -178,7 +177,6 @@ impl Iterator for FilteredParquetRecordBatchReader { // It boils down to leveraging array_reader's ability to collect large batches natively, // rather than concatenating multiple small batches. - let mut selection = RowSelection::default(); let mut selected = 0; while let Some(cur_selection) = take_next_selection(&mut self.selection, self.batch_size - selected) @@ -187,21 +185,29 @@ impl Iterator for FilteredParquetRecordBatchReader { Ok(selection) => selection, Err(e) => return Some(Err(e)), }; + + for selector in filtered_selection.iter() { + if selector.skip { + self.array_reader.skip_records(selector.row_count).ok()?; + } else { + self.array_reader.read_records(selector.row_count).ok()?; + } + } selected += filtered_selection.row_count(); - selection.extend(filtered_selection); if selected >= (self.batch_size / 4 * 3) { break; } } - if !selection.selects_any() { + if selected == 0 { return None; } - let rt = read_selection(&mut *self.array_reader, &selection); - match rt { - Ok(array) => Some(Ok(RecordBatch::from(array))), - Err(e) => Some(Err(e.into())), - } + let array = self.array_reader.consume_batch().ok()?; + let struct_array = array + .as_struct_opt() + .ok_or_else(|| general_err!("Struct array reader should return struct array")) + .ok()?; + Some(Ok(RecordBatch::from(struct_array.clone()))) } } @@ -212,11 +218,11 @@ impl RecordBatchReader for FilteredParquetRecordBatchReader { } struct PageCacheInner { - queue: VecDeque, - pages: HashMap, + pages: HashMap, // col_id -> (offset, page) } -/// A simple FIFO cache for pages. +/// A simple cache for decompressed pages. +/// We cache only one page per column pub(crate) struct PageCache { inner: RwLock, } @@ -227,36 +233,49 @@ impl PageCache { pub(crate) fn new() -> Self { Self { inner: RwLock::new(PageCacheInner { - queue: VecDeque::with_capacity(Self::CAPACITY), pages: HashMap::with_capacity(Self::CAPACITY), }), } } - pub(crate) fn get_page(&self, offset: usize) -> Option { + pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option { let read_lock = self.inner.read().unwrap(); - read_lock.pages.get(&offset).cloned() + read_lock + .pages + .get(&col_id) + .and_then(|(cached_offset, page)| { + if *cached_offset == offset { + Some(page) + } else { + None + } + }) + .cloned() } - pub(crate) fn insert_page(&self, offset: usize, page: Page) { + pub(crate) fn insert_page(&self, col_id: usize, offset: usize, page: Page) { let mut write_lock = self.inner.write().unwrap(); - if write_lock.pages.len() >= Self::CAPACITY { - let oldest_offset = write_lock.queue.pop_front().unwrap(); - write_lock.pages.remove(&oldest_offset).unwrap(); - } - write_lock.pages.insert(offset, page); - write_lock.queue.push_back(offset); + write_lock.pages.insert(col_id, (offset, page)); } } pub(crate) struct CachedPageReader { inner: SerializedPageReader, cache: Arc, + col_id: usize, } impl CachedPageReader { - pub(crate) fn new(inner: SerializedPageReader, cache: Arc) -> Self { - Self { inner, cache } + pub(crate) fn new( + inner: SerializedPageReader, + cache: Arc, + col_id: usize, + ) -> Self { + Self { + inner, + cache, + col_id, + } } } @@ -277,7 +296,7 @@ impl PageReader for CachedPageReader { return Ok(None); }; - let page = self.cache.get_page(offset); + let page = self.cache.get_page(self.col_id, offset); if let Some(page) = page { self.inner.skip_next_page()?; Ok(Some(page)) @@ -286,7 +305,8 @@ impl PageReader for CachedPageReader { let Some(inner_page) = inner_page else { return Ok(None); }; - self.cache.insert_page(offset, inner_page.clone()); + self.cache + .insert_page(self.col_id, offset, inner_page.clone()); Ok(Some(inner_page)) } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 3ee85a26fff..cf48b391ed6 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -911,22 +911,28 @@ impl RowGroups for InMemoryRowGroup<'_> { .filter(|index| !index.is_empty()) .map(|index| index[i].page_locations.clone()); - // let page_reader: Box = Box::new(SerializedPageReader::new( - // data.clone(), - // self.metadata.column(i), - // self.row_count, - // page_locations, - // )?); - - let page_reader: Box = Box::new(CachedPageReader::new( - SerializedPageReader::new( + let page_reader: Box = if std::env::var("CACHE_PAGES") + .map(|v| v == "1") + .unwrap_or(false) + { + Box::new(CachedPageReader::new( + SerializedPageReader::new( + data.clone(), + self.metadata.column(i), + self.row_count, + page_locations, + )?, + self.cache.clone(), + i, + )) + } else { + Box::new(SerializedPageReader::new( data.clone(), self.metadata.column(i), self.row_count, page_locations, - )?, - self.cache.clone(), - )); + )?) + }; Ok(Box::new(ColumnChunkIterator { reader: Some(Ok(page_reader)), From f5422ced6c46c99f43978c96ad5b18fe13b58ceb Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 09:37:59 -0600 Subject: [PATCH 06/21] update --- parquet/Cargo.toml | 1 + .../src/arrow/array_reader/byte_view_array.rs | 6 +- .../src/arrow/async_reader/arrow_reader.rs | 58 ++++++++++++++----- parquet/src/arrow/buffer/view_buffer.rs | 7 +++ 4 files changed, 56 insertions(+), 16 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index e4085472ea2..ba5d8c3d694 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -69,6 +69,7 @@ paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.33.0", optional = true, default-features = false, features = ["system"] } crc32fast = { version = "1.4.2", optional = true, default-features = false } +simdutf8 = "0.1.5" [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 92a8b0592d0..cff55d19245 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -161,7 +161,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { )); } - let mut buffer = ViewBuffer::default(); + let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); let mut decoder = ByteViewArrayDecoderPlain::new( buf, num_values as usize, @@ -457,6 +457,8 @@ impl ByteViewArrayDecoderDictionary { } } + output.views.reserve(len); + // Calculate the offset of the dictionary buffers in the output buffers // For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers, // then the base_buffer_idx is 5 - 2 = 3 @@ -677,7 +679,7 @@ impl ByteViewArrayDecoderDelta { /// Check that `val` is a valid UTF-8 sequence pub fn check_valid_utf8(val: &[u8]) -> Result<()> { - match std::str::from_utf8(val) { + match simdutf8::basic::from_utf8(val) { Ok(_) => Ok(()), Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), } diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index f2f681cc7d3..91e651b77e2 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::RwLock; use std::{collections::VecDeque, sync::Arc}; @@ -24,6 +25,7 @@ use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; +use crate::basic::PageType; use crate::column::page::{Page, PageMetadata, PageReader}; use crate::errors::ParquetError; use crate::{ @@ -217,12 +219,17 @@ impl RecordBatchReader for FilteredParquetRecordBatchReader { } } +struct CachedPage { + dict: Option<(usize, Page)>, + data: Option<(usize, Page)>, +} + struct PageCacheInner { - pages: HashMap, // col_id -> (offset, page) + pages: HashMap, // col_id -> CachedPage } /// A simple cache for decompressed pages. -/// We cache only one page per column +/// We cache only one dictionary page and one data page per column pub(crate) struct PageCache { inner: RwLock, } @@ -240,22 +247,45 @@ impl PageCache { pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option { let read_lock = self.inner.read().unwrap(); - read_lock - .pages - .get(&col_id) - .and_then(|(cached_offset, page)| { - if *cached_offset == offset { - Some(page) - } else { - None - } - }) - .cloned() + read_lock.pages.get(&col_id).and_then(|pages| { + pages + .dict + .iter() + .chain(pages.data.iter()) + .find(|(page_offset, _)| *page_offset == offset) + .map(|(_, page)| page.clone()) + }) } pub(crate) fn insert_page(&self, col_id: usize, offset: usize, page: Page) { let mut write_lock = self.inner.write().unwrap(); - write_lock.pages.insert(col_id, (offset, page)); + + let is_dict = page.page_type() == PageType::DICTIONARY_PAGE; + + let cached_pages = write_lock.pages.entry(col_id); + match cached_pages { + Entry::Occupied(mut entry) => { + if is_dict { + entry.get_mut().dict = Some((offset, page)); + } else { + entry.get_mut().data = Some((offset, page)); + } + } + Entry::Vacant(entry) => { + let cached_page = if is_dict { + CachedPage { + dict: Some((offset, page)), + data: None, + } + } else { + CachedPage { + dict: None, + data: Some((offset, page)), + } + }; + entry.insert(cached_page); + } + } } } diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index fd7d6c213f0..a21d153df76 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -33,6 +33,13 @@ pub struct ViewBuffer { } impl ViewBuffer { + pub fn with_capacity(view_capacity: usize, buffer_capacity: usize) -> Self { + Self { + views: Vec::with_capacity(view_capacity), + buffers: Vec::with_capacity(buffer_capacity), + } + } + pub fn is_empty(&self) -> bool { self.views.is_empty() } From dfdc1b644bcce4e4e74b05f8cc123b8c601b4518 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 13:04:35 -0600 Subject: [PATCH 07/21] avoid recreating new buffers --- .../src/arrow/array_reader/byte_view_array.rs | 74 +++++++++++++++---- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index cff55d19245..45cfc5715a4 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -33,6 +33,9 @@ use arrow_data::ByteView; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::{Arc, LazyLock, Mutex}; /// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. pub fn make_byte_view_array_reader( @@ -127,11 +130,14 @@ impl ArrayReader for ByteViewArrayReader { /// A [`ColumnValueDecoder`] for variable length byte arrays struct ByteViewArrayColumnValueDecoder { - dict: Option, + dict: Option>, decoder: Option, validate_utf8: bool, } +pub(crate) static DICT_CACHE: LazyLock>>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { type Buffer = ViewBuffer; @@ -144,6 +150,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { } } + #[inline(never)] fn set_dict( &mut self, buf: Bytes, @@ -161,18 +168,35 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { )); } - let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); - let mut decoder = ByteViewArrayDecoderPlain::new( - buf, - num_values as usize, - Some(num_values as usize), - self.validate_utf8, - ); - decoder.read(&mut buffer, usize::MAX)?; - self.dict = Some(buffer); + let buf_id = buf.as_ptr() as usize; + + let mut cache = DICT_CACHE.lock().unwrap(); + + match cache.entry(buf_id) { + Entry::Vacant(v) => { + let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); + let mut decoder = ByteViewArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + + let dict = Arc::new(buffer); + v.insert(dict.clone()); + self.dict = Some(dict); + } + Entry::Occupied(e) => { + // Remove and take ownership of the existing dictionary + self.dict = Some(e.remove()); + // self.dict = Some(e.get().clone()); + } + } Ok(()) } + #[inline(never)] fn set_data( &mut self, encoding: Encoding, @@ -190,22 +214,24 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { Ok(()) } + #[inline(never)] fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { let decoder = self .decoder .as_mut() .ok_or_else(|| general_err!("no decoder set"))?; - decoder.read(out, num_values, self.dict.as_ref()) + decoder.read(out, num_values, self.dict.as_ref().map(|b| b.as_ref())) } + #[inline(never)] fn skip_values(&mut self, num_values: usize) -> Result { let decoder = self .decoder .as_mut() .ok_or_else(|| general_err!("no decoder set"))?; - decoder.skip(num_values, self.dict.as_ref()) + decoder.skip(num_values, self.dict.as_ref().map(|b| b.as_ref())) } } @@ -255,6 +281,7 @@ impl ByteViewArrayDecoder { } /// Read up to `len` values to `out` with the optional dictionary + #[inline(never)] pub fn read( &mut self, out: &mut ViewBuffer, @@ -290,7 +317,7 @@ impl ByteViewArrayDecoder { /// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] pub struct ByteViewArrayDecoderPlain { - buf: Bytes, + buf: Buffer, offset: usize, validate_utf8: bool, @@ -307,6 +334,9 @@ impl ByteViewArrayDecoderPlain { num_values: Option, validate_utf8: bool, ) -> Self { + // Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy + // Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy + let buf = arrow_buffer::Buffer::from_bytes(buf.clone().into()); Self { buf, offset: 0, @@ -315,10 +345,21 @@ impl ByteViewArrayDecoderPlain { } } + #[inline(never)] pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - // Zero copy convert `bytes::Bytes` into `arrow_buffer::Buffer` - let buf = arrow_buffer::Buffer::from(self.buf.clone()); - let block_id = output.append_block(buf); + let need_to_create_new_buffer = { + if let Some(last_buffer) = output.buffers.last() { + last_buffer.ptr_eq(&self.buf) + } else { + true + } + }; + + let block_id = if need_to_create_new_buffer { + output.append_block(self.buf.clone()) + } else { + output.buffers.len() as u32 - 1 + }; let to_read = len.min(self.max_remaining_values); @@ -432,6 +473,7 @@ impl ByteViewArrayDecoderDictionary { /// Assumptions / Optimization /// This function checks if dict.buffers() are the last buffers in `output`, and if so /// reuses the dictionary page buffers directly without copying data + #[inline(never)] fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { if dict.is_empty() || len == 0 { return Ok(0); From 3c526f8ff3a36182914a23d060436a1acd6c5656 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 14:02:34 -0600 Subject: [PATCH 08/21] update --- .../src/arrow/array_reader/byte_view_array.rs | 72 +++++++++++-------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 45cfc5715a4..3b6d9f8a22f 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -33,9 +33,9 @@ use arrow_data::ByteView; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::{Arc, LazyLock, Mutex}; +// use std::collections::hash_map::Entry; +// use std::collections::HashMap; +use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. pub fn make_byte_view_array_reader( @@ -135,8 +135,8 @@ struct ByteViewArrayColumnValueDecoder { validate_utf8: bool, } -pub(crate) static DICT_CACHE: LazyLock>>> = - LazyLock::new(|| Mutex::new(HashMap::new())); +// pub(crate) static DICT_CACHE: LazyLock>>> = +// LazyLock::new(|| Mutex::new(HashMap::new())); impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { type Buffer = ViewBuffer; @@ -168,31 +168,43 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { )); } - let buf_id = buf.as_ptr() as usize; - - let mut cache = DICT_CACHE.lock().unwrap(); - - match cache.entry(buf_id) { - Entry::Vacant(v) => { - let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); - let mut decoder = ByteViewArrayDecoderPlain::new( - buf, - num_values as usize, - Some(num_values as usize), - self.validate_utf8, - ); - decoder.read(&mut buffer, usize::MAX)?; - - let dict = Arc::new(buffer); - v.insert(dict.clone()); - self.dict = Some(dict); - } - Entry::Occupied(e) => { - // Remove and take ownership of the existing dictionary - self.dict = Some(e.remove()); - // self.dict = Some(e.get().clone()); - } - } + let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); + let mut decoder = ByteViewArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + + let dict = Arc::new(buffer); + self.dict = Some(dict); + + // let buf_id = buf.as_ptr() as usize; + + // let mut cache = DICT_CACHE.lock().unwrap(); + + // match cache.entry(buf_id) { + // Entry::Vacant(v) => { + // let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); + // let mut decoder = ByteViewArrayDecoderPlain::new( + // buf, + // num_values as usize, + // Some(num_values as usize), + // self.validate_utf8, + // ); + // decoder.read(&mut buffer, usize::MAX)?; + + // let dict = Arc::new(buffer); + // v.insert(dict.clone()); + // self.dict = Some(dict); + // } + // Entry::Occupied(e) => { + // // Remove and take ownership of the existing dictionary + // self.dict = Some(e.remove()); + // // self.dict = Some(e.get().clone()); + // } + // } Ok(()) } From 53f5fad772578f74dd7e9a4b05945da0cf9205c5 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 14:26:56 -0600 Subject: [PATCH 09/21] bug fix --- .../src/arrow/array_reader/byte_view_array.rs | 52 +++---------------- 1 file changed, 7 insertions(+), 45 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 3b6d9f8a22f..a4366877dcb 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -33,9 +33,6 @@ use arrow_data::ByteView; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; -// use std::collections::hash_map::Entry; -// use std::collections::HashMap; -use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. pub fn make_byte_view_array_reader( @@ -130,14 +127,11 @@ impl ArrayReader for ByteViewArrayReader { /// A [`ColumnValueDecoder`] for variable length byte arrays struct ByteViewArrayColumnValueDecoder { - dict: Option>, + dict: Option, decoder: Option, validate_utf8: bool, } -// pub(crate) static DICT_CACHE: LazyLock>>> = -// LazyLock::new(|| Mutex::new(HashMap::new())); - impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { type Buffer = ViewBuffer; @@ -150,7 +144,6 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { } } - #[inline(never)] fn set_dict( &mut self, buf: Bytes, @@ -177,38 +170,11 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { ); decoder.read(&mut buffer, usize::MAX)?; - let dict = Arc::new(buffer); - self.dict = Some(dict); - - // let buf_id = buf.as_ptr() as usize; - - // let mut cache = DICT_CACHE.lock().unwrap(); - - // match cache.entry(buf_id) { - // Entry::Vacant(v) => { - // let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); - // let mut decoder = ByteViewArrayDecoderPlain::new( - // buf, - // num_values as usize, - // Some(num_values as usize), - // self.validate_utf8, - // ); - // decoder.read(&mut buffer, usize::MAX)?; - - // let dict = Arc::new(buffer); - // v.insert(dict.clone()); - // self.dict = Some(dict); - // } - // Entry::Occupied(e) => { - // // Remove and take ownership of the existing dictionary - // self.dict = Some(e.remove()); - // // self.dict = Some(e.get().clone()); - // } - // } + self.dict = Some(buffer); + Ok(()) } - #[inline(never)] fn set_data( &mut self, encoding: Encoding, @@ -226,24 +192,22 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { Ok(()) } - #[inline(never)] fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { let decoder = self .decoder .as_mut() .ok_or_else(|| general_err!("no decoder set"))?; - decoder.read(out, num_values, self.dict.as_ref().map(|b| b.as_ref())) + decoder.read(out, num_values, self.dict.as_ref()) } - #[inline(never)] fn skip_values(&mut self, num_values: usize) -> Result { let decoder = self .decoder .as_mut() .ok_or_else(|| general_err!("no decoder set"))?; - decoder.skip(num_values, self.dict.as_ref().map(|b| b.as_ref())) + decoder.skip(num_values, self.dict.as_ref()) } } @@ -293,7 +257,6 @@ impl ByteViewArrayDecoder { } /// Read up to `len` values to `out` with the optional dictionary - #[inline(never)] pub fn read( &mut self, out: &mut ViewBuffer, @@ -357,11 +320,11 @@ impl ByteViewArrayDecoderPlain { } } - #[inline(never)] pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + // let block_id = output.append_block(self.buf.clone()); let need_to_create_new_buffer = { if let Some(last_buffer) = output.buffers.last() { - last_buffer.ptr_eq(&self.buf) + !last_buffer.ptr_eq(&self.buf) } else { true } @@ -485,7 +448,6 @@ impl ByteViewArrayDecoderDictionary { /// Assumptions / Optimization /// This function checks if dict.buffers() are the last buffers in `output`, and if so /// reuses the dictionary page buffers directly without copying data - #[inline(never)] fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { if dict.is_empty() || len == 0 { return Ok(0); From 56980defc5f52fd315d452d7f1f1c1df1a9e1d84 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 22:04:18 -0600 Subject: [PATCH 10/21] selective cache --- parquet/src/arrow/async_reader/mod.rs | 35 +++++++++++++++++++++------ 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index cf48b391ed6..f490bb6d5bc 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -512,6 +512,22 @@ where .filter(|index| index.first().map(|v| !v.is_empty()).unwrap_or(false)) .map(|x| x[row_group_idx].as_slice()); + let mut predicate_projection: Option = None; + if let Some(filter) = self.filter.as_mut() { + for predicate in filter.predicates.iter_mut() { + let p_projection = predicate.projection(); + if let Some(ref mut p) = predicate_projection { + p.union(&p_projection); + } else { + predicate_projection = Some(p_projection.clone()); + } + } + } + let projection_to_cache = predicate_projection.map(|mut p| { + p.intersect(&projection); + p + }); + let mut row_group = InMemoryRowGroup { metadata: meta, // schema: meta.schema_descr_ptr(), @@ -519,6 +535,7 @@ where column_chunks: vec![None; meta.columns().len()], offset_index, cache: Arc::new(PageCache::new()), + projection_to_cache, }; let mut selection = @@ -531,13 +548,13 @@ where return Ok((self, None)); } - let predicate_projection = predicate.projection(); + let p_projection = predicate.projection(); row_group - .fetch(&mut self.input, predicate_projection, Some(&selection)) + .fetch(&mut self.input, p_projection, Some(&selection)) .await?; let array_reader = - build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; + build_array_reader(self.fields.as_deref(), p_projection, &row_group)?; filter_readers.push(array_reader); } } @@ -799,6 +816,7 @@ struct InMemoryRowGroup<'a> { column_chunks: Vec>>, row_count: usize, cache: Arc, + projection_to_cache: Option, } impl InMemoryRowGroup<'_> { @@ -911,10 +929,13 @@ impl RowGroups for InMemoryRowGroup<'_> { .filter(|index| !index.is_empty()) .map(|index| index[i].page_locations.clone()); - let page_reader: Box = if std::env::var("CACHE_PAGES") - .map(|v| v == "1") - .unwrap_or(false) - { + let cached_reader = if let Some(projection_to_cache) = &self.projection_to_cache { + projection_to_cache.leaf_included(i) + } else { + false + }; + + let page_reader: Box = if cached_reader { Box::new(CachedPageReader::new( SerializedPageReader::new( data.clone(), From 4dd1b6b8cb6b7f988cb0c6d06ff2e90f62bbc68d Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 31 Dec 2024 22:27:14 -0600 Subject: [PATCH 11/21] clean up changes --- parquet/src/arrow/array_reader/builder.rs | 6 ------ parquet/src/arrow/array_reader/primitive_array.rs | 10 ---------- 2 files changed, 16 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 23f77a9ab96..945f62526a7 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -245,7 +245,6 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::INT32 => { if let Some(DataType::Null) = arrow_type { @@ -258,7 +257,6 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, - col_idx, )?) as _ } } @@ -266,25 +264,21 @@ fn build_primitive_reader( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::INT96 => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, arrow_type, - col_idx, )?) as _, PhysicalType::BYTE_ARRAY => match arrow_type { Some(DataType::Dictionary(_, _)) => { diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index c665cee5bf2..6fe49945cff 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -80,8 +80,6 @@ where def_levels_buffer: Option>, rep_levels_buffer: Option>, record_reader: RecordReader, - #[allow(unused)] - column_idx: usize, } impl PrimitiveArrayReader @@ -95,7 +93,6 @@ where pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, - column_idx: usize, ) -> Result { // Check if Arrow type is specified, else create it from Parquet type let data_type = match arrow_type { @@ -113,7 +110,6 @@ where def_levels_buffer: None, rep_levels_buffer: None, record_reader, - column_idx, }) } } @@ -375,7 +371,6 @@ mod tests { Box::::default(), schema.column(0), None, - 0, ) .unwrap(); @@ -422,7 +417,6 @@ mod tests { Box::new(page_iterator), column_desc, None, - 0, ) .unwrap(); @@ -493,7 +487,6 @@ mod tests { Box::new(page_iterator), column_desc.clone(), None, - 0, ) .expect("Unable to get array reader"); @@ -633,7 +626,6 @@ mod tests { Box::new(page_iterator), column_desc, None, - 0, ) .unwrap(); @@ -713,7 +705,6 @@ mod tests { Box::new(page_iterator), column_desc, None, - 0, ) .unwrap(); @@ -776,7 +767,6 @@ mod tests { Box::new(page_iterator), column_desc, None, - 0, ) .unwrap(); From f8f983e5f9ea66b18656a0588e58e9dc4b7c4658 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 1 Jan 2025 10:37:02 -0600 Subject: [PATCH 12/21] clean up more and format --- .../src/arrow/array_reader/primitive_array.rs | 36 +++++++------------ .../src/arrow/array_reader/struct_array.rs | 2 +- parquet/src/file/serialized_reader.rs | 13 ++++--- 3 files changed, 19 insertions(+), 32 deletions(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 6fe49945cff..a952e00e12e 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -413,12 +413,9 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); // Read first 50 values, which are all from the first column chunk let array = array_reader.next_batch(50).unwrap(); @@ -622,12 +619,9 @@ mod tests { let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); let mut accu_len: usize = 0; @@ -701,12 +695,9 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); // read data from the reader // the data type is decimal(8,2) @@ -763,12 +754,9 @@ mod tests { ); let page_iterator = InMemoryPageIterator::new(page_lists); - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); // read data from the reader // the data type is decimal(18,4) diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs index e048fbae66f..fb2f2f8928b 100644 --- a/parquet/src/arrow/array_reader/struct_array.rs +++ b/parquet/src/arrow/array_reader/struct_array.rs @@ -25,7 +25,7 @@ use std::sync::Arc; /// Implementation of struct array reader. pub struct StructArrayReader { - pub children: Vec>, + children: Vec>, data_type: ArrowType, struct_def_level: i16, struct_rep_level: i16, diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 146b16f3842..7d173bc0c16 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -371,7 +371,7 @@ fn read_page_header_len(input: &mut T) -> Result<(usize, PageHeader)> { /// Decodes a [`Page`] from the provided `buffer` pub(crate) fn decode_page( page_header: PageHeader, - buffer: Vec, + buffer: Bytes, physical_type: Type, decompressor: Option<&mut Box>, ) -> Result { @@ -406,8 +406,8 @@ pub(crate) fn decode_page( Some(decompressor) if can_decompress => { let uncompressed_size = page_header.uncompressed_page_size as usize; let mut decompressed = Vec::with_capacity(uncompressed_size); - let compressed = &buffer[offset..]; - decompressed.extend_from_slice(&buffer[..offset]); + let compressed = &buffer.as_ref()[offset..]; + decompressed.extend_from_slice(&buffer.as_ref()[..offset]); decompressor.decompress( compressed, &mut decompressed, @@ -421,8 +421,7 @@ pub(crate) fn decode_page( uncompressed_size )); } - - decompressed + Bytes::from(decompressed) } _ => buffer, }; @@ -700,7 +699,7 @@ impl PageReader for SerializedPageReader { decode_page( header, - buffer, + Bytes::from(buffer), self.physical_type, self.decompressor.as_mut(), )? @@ -729,7 +728,7 @@ impl PageReader for SerializedPageReader { let bytes = buffer.slice(offset..); decode_page( header, - bytes.to_vec(), + bytes, self.physical_type, self.decompressor.as_mut(), )? From 882aaf14eba48d02846697f7739f6b106021c581 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 1 Jan 2025 11:32:11 -0600 Subject: [PATCH 13/21] cleanup and add docs --- .../src/arrow/async_reader/arrow_reader.rs | 181 ++++++++++++++++-- parquet/src/arrow/async_reader/mod.rs | 14 +- parquet/src/file/serialized_reader.rs | 2 +- 3 files changed, 169 insertions(+), 28 deletions(-) diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index 91e651b77e2..44f327ee853 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -220,27 +220,74 @@ impl RecordBatchReader for FilteredParquetRecordBatchReader { } struct CachedPage { - dict: Option<(usize, Page)>, - data: Option<(usize, Page)>, + dict: Option<(usize, Page)>, // page offset -> page + data: Option<(usize, Page)>, // page offset -> page } -struct PageCacheInner { - pages: HashMap, // col_id -> CachedPage +struct PredicatePageCacheInner { + pages: HashMap, // col_id (Parquet's leaf column index) -> CachedPage } -/// A simple cache for decompressed pages. -/// We cache only one dictionary page and one data page per column -pub(crate) struct PageCache { - inner: RwLock, +/// A simple cache to avoid double-decompressing pages with filter pushdown. +/// In filter pushdown, we first decompress a page, apply the filter, and then decompress the page again. +/// This double decompression is expensive, so we cache the decompressed page. +/// +/// This implementation contains subtle dynamics that can be hard to understand. +/// +/// ## Which columns to cache +/// +/// Let's consider this example: SELECT B, C FROM table WHERE A = 42 and B = 37; +/// We have 3 columns, and the predicate is applied to column A and B, and projection is on B and C. +/// +/// For column A, we need to decompress it, apply the filter (A=42), and never have to decompress it again, as it's not in the projection. +/// For column B, we need to decompress it, apply the filter (B=37), and then decompress it again, as it's in the projection. +/// For column C, we don't have predicate, so we only decompress it once. +/// +/// A, C is only decompressed once, and B is decompressed twice (as it appears in both the predicate and the projection). +/// The PredicatePageCache will only cache B. +/// We use B's col_id (Parquet's leaf column index) to identify the cache entry. +/// +/// ## How many pages to cache +/// +/// Now we identified the columns to cache, next question is to determine the **minimal** number of pages to cache. +/// +/// Let's revisit our decoding pipeline: +/// Load batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1 +/// Load batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2 +/// ... +/// Load batch N -> evaluate predicates -> filter N -> load & emit batch N +/// +/// Assumption & observation: each page consists multiple batches. +/// Then our pipeline looks like this: +/// Load Page 1 +/// Load batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1 +/// Load batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2 +/// Load batch 3 -> evaluate predicates -> filter 3 -> load & emit batch 3 +/// Load Page 2 +/// Load batch 4 -> evaluate predicates -> filter 4 -> load & emit batch 4 +/// Load batch 5 -> evaluate predicates -> filter 5 -> load & emit batch 5 +/// ... +/// +/// This means that we only need to cache one page per column, +/// because the page that is used by the predicate is the same page, and is immediately used in loading the batch. +/// +/// The only exception is the dictionary page -- the first page of each column. +/// If we encountered a dict page, we will need to immediately read next page, and cache it. +/// +/// To summarize, the cache only contains 2 pages per column: one dict page and one data page. +/// This is a nice property as it means the caching memory consumption is negligible and constant to the number of columns. +/// +/// ## How to identify a page +/// We use the page offset (the offset to the Parquet file) to uniquely identify a page. +pub(crate) struct PredicatePageCache { + inner: RwLock, } -impl PageCache { - const CAPACITY: usize = 16; - - pub(crate) fn new() -> Self { +impl PredicatePageCache { + pub(crate) fn new(capacity: usize) -> Self { Self { - inner: RwLock::new(PageCacheInner { - pages: HashMap::with_capacity(Self::CAPACITY), + inner: RwLock::new(PredicatePageCacheInner { + pages: HashMap::with_capacity(capacity), }), } } @@ -257,6 +304,9 @@ impl PageCache { }) } + /// Insert a page into the cache. + /// Inserting a page will override the existing page, if any. + /// This is because we only need to cache 2 pages per column, see above. pub(crate) fn insert_page(&self, col_id: usize, offset: usize, page: Page) { let mut write_lock = self.inner.write().unwrap(); @@ -291,14 +341,14 @@ impl PageCache { pub(crate) struct CachedPageReader { inner: SerializedPageReader, - cache: Arc, + cache: Arc, col_id: usize, } impl CachedPageReader { pub(crate) fn new( inner: SerializedPageReader, - cache: Arc, + cache: Arc, col_id: usize, ) -> Self { Self { @@ -420,4 +470,103 @@ mod tests { let selection = take_next_selection(&mut queue, 10); assert!(selection.is_none()); } + + #[test] + fn test_predicate_page_cache_basic_operations() { + use super::*; + + let cache = PredicatePageCache::new(2); + let page1 = Page::dummy_page(PageType::DATA_PAGE, 100); + let page2 = Page::dummy_page(PageType::DICTIONARY_PAGE, 200); + + // Insert and retrieve a data page + cache.insert_page(0, 1000, page1.clone()); + let retrieved = cache.get_page(0, 1000); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().page_type(), PageType::DATA_PAGE); + + // Insert and retrieve a dictionary page for same column + cache.insert_page(0, 2000, page2.clone()); + let retrieved = cache.get_page(0, 2000); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().page_type(), PageType::DICTIONARY_PAGE); + + // Both pages should still be accessible + assert!(cache.get_page(0, 1000).is_some()); + assert!(cache.get_page(0, 2000).is_some()); + } + + #[test] + fn test_predicate_page_cache_replacement() { + use super::*; + + let cache = PredicatePageCache::new(2); + let data_page1 = Page::dummy_page(PageType::DATA_PAGE, 100); + let data_page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); + + // Insert first data page + cache.insert_page(0, 1000, data_page1.clone()); + assert!(cache.get_page(0, 1000).is_some()); + + // Insert second data page - should replace first data page + cache.insert_page(0, 2000, data_page2.clone()); + assert!(cache.get_page(0, 2000).is_some()); + assert!(cache.get_page(0, 1000).is_none()); // First page should be gone + } + + #[test] + fn test_predicate_page_cache_multiple_columns() { + use super::*; + + let cache = PredicatePageCache::new(2); + let page1 = Page::dummy_page(PageType::DATA_PAGE, 100); + let page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); + + // Insert pages for different columns + cache.insert_page(0, 1000, page1.clone()); + cache.insert_page(1, 1000, page2.clone()); + + // Both pages should be accessible + assert!(cache.get_page(0, 1000).is_some()); + assert!(cache.get_page(1, 1000).is_some()); + + // Non-existent column should return None + assert!(cache.get_page(2, 1000).is_none()); + } +} + +// Helper implementation for testing +#[cfg(test)] +impl Page { + fn dummy_page(page_type: PageType, size: usize) -> Self { + use crate::basic::Encoding; + match page_type { + PageType::DATA_PAGE => Page::DataPage { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + def_level_encoding: Encoding::PLAIN, + rep_level_encoding: Encoding::PLAIN, + statistics: None, + }, + PageType::DICTIONARY_PAGE => Page::DictionaryPage { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + is_sorted: false, + }, + PageType::DATA_PAGE_V2 => Page::DataPageV2 { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + def_levels_byte_len: 0, + rep_levels_byte_len: 0, + is_compressed: false, + statistics: None, + num_nulls: 0, + num_rows: 0, + }, + _ => unreachable!(), + } + } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index f490bb6d5bc..b466e0b318e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -83,7 +83,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow_reader::{CachedPageReader, FilteredParquetRecordBatchReader, PageCache}; +use arrow_reader::{CachedPageReader, FilteredParquetRecordBatchReader, PredicatePageCache}; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -528,15 +528,7 @@ where p }); - let mut row_group = InMemoryRowGroup { - metadata: meta, - // schema: meta.schema_descr_ptr(), - row_count: meta.num_rows() as usize, - column_chunks: vec![None; meta.columns().len()], - offset_index, - cache: Arc::new(PageCache::new()), - projection_to_cache, - }; + let mut row_group = InMemoryRowGroup::new(meta, offset_index, projection_to_cache); let mut selection = selection.unwrap_or_else(|| vec![RowSelector::select(row_group.row_count)].into()); @@ -815,7 +807,7 @@ struct InMemoryRowGroup<'a> { offset_index: Option<&'a [OffsetIndexMetaData]>, column_chunks: Vec>>, row_count: usize, - cache: Arc, + cache: Arc, projection_to_cache: Option, } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 7d173bc0c16..0356455f901 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -425,7 +425,6 @@ pub(crate) fn decode_page( } _ => buffer, }; - let buffer = Bytes::from(buffer); let result = match page_header.type_ { PageType::DICTIONARY_PAGE => { @@ -569,6 +568,7 @@ impl SerializedPageReader { }) } + #[cfg(feature = "async")] pub(crate) fn peek_next_page_offset(&mut self) -> Result> { match &mut self.state { SerializedPageReaderState::Values { From c8bdbcf13e67fdcbb7938d22b127d2343692d036 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 2 Jan 2025 10:40:41 -0600 Subject: [PATCH 14/21] switch to mutex instead of rwlock --- .../src/arrow/async_reader/arrow_reader.rs | 136 +++++++++--------- 1 file changed, 70 insertions(+), 66 deletions(-) diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index 44f327ee853..12a8b839ab7 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -17,7 +17,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::sync::RwLock; +use std::sync::{Mutex, MutexGuard, RwLock}; use std::{collections::VecDeque, sync::Arc}; use arrow_array::ArrayRef; @@ -228,6 +228,51 @@ struct PredicatePageCacheInner { pages: HashMap, // col_id (Parquet's leaf column index) -> CachedPage } +impl PredicatePageCacheInner { + pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option { + self.pages.get(&col_id).and_then(|pages| { + pages + .dict + .iter() + .chain(pages.data.iter()) + .find(|(page_offset, _)| *page_offset == offset) + .map(|(_, page)| page.clone()) + }) + } + + /// Insert a page into the cache. + /// Inserting a page will override the existing page, if any. + /// This is because we only need to cache 2 pages per column, see below. + pub(crate) fn insert_page(&mut self, col_id: usize, offset: usize, page: Page) { + let is_dict = page.page_type() == PageType::DICTIONARY_PAGE; + + let cached_pages = self.pages.entry(col_id); + match cached_pages { + Entry::Occupied(mut entry) => { + if is_dict { + entry.get_mut().dict = Some((offset, page)); + } else { + entry.get_mut().data = Some((offset, page)); + } + } + Entry::Vacant(entry) => { + let cached_page = if is_dict { + CachedPage { + dict: Some((offset, page)), + data: None, + } + } else { + CachedPage { + dict: None, + data: Some((offset, page)), + } + }; + entry.insert(cached_page); + } + } + } +} + /// A simple cache to avoid double-decompressing pages with filter pushdown. /// In filter pushdown, we first decompress a page, apply the filter, and then decompress the page again. /// This double decompression is expensive, so we cache the decompressed page. @@ -280,62 +325,20 @@ struct PredicatePageCacheInner { /// ## How to identify a page /// We use the page offset (the offset to the Parquet file) to uniquely identify a page. pub(crate) struct PredicatePageCache { - inner: RwLock, + inner: Mutex, } impl PredicatePageCache { pub(crate) fn new(capacity: usize) -> Self { Self { - inner: RwLock::new(PredicatePageCacheInner { + inner: Mutex::new(PredicatePageCacheInner { pages: HashMap::with_capacity(capacity), }), } } - pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option { - let read_lock = self.inner.read().unwrap(); - read_lock.pages.get(&col_id).and_then(|pages| { - pages - .dict - .iter() - .chain(pages.data.iter()) - .find(|(page_offset, _)| *page_offset == offset) - .map(|(_, page)| page.clone()) - }) - } - - /// Insert a page into the cache. - /// Inserting a page will override the existing page, if any. - /// This is because we only need to cache 2 pages per column, see above. - pub(crate) fn insert_page(&self, col_id: usize, offset: usize, page: Page) { - let mut write_lock = self.inner.write().unwrap(); - - let is_dict = page.page_type() == PageType::DICTIONARY_PAGE; - - let cached_pages = write_lock.pages.entry(col_id); - match cached_pages { - Entry::Occupied(mut entry) => { - if is_dict { - entry.get_mut().dict = Some((offset, page)); - } else { - entry.get_mut().data = Some((offset, page)); - } - } - Entry::Vacant(entry) => { - let cached_page = if is_dict { - CachedPage { - dict: Some((offset, page)), - data: None, - } - } else { - CachedPage { - dict: None, - data: Some((offset, page)), - } - }; - entry.insert(cached_page); - } - } + fn get(&self) -> MutexGuard { + self.inner.lock().unwrap() } } @@ -376,7 +379,9 @@ impl PageReader for CachedPageReader { return Ok(None); }; - let page = self.cache.get_page(self.col_id, offset); + let mut cache = self.cache.get(); + + let page = cache.get_page(self.col_id, offset); if let Some(page) = page { self.inner.skip_next_page()?; Ok(Some(page)) @@ -385,8 +390,7 @@ impl PageReader for CachedPageReader { let Some(inner_page) = inner_page else { return Ok(None); }; - self.cache - .insert_page(self.col_id, offset, inner_page.clone()); + cache.insert_page(self.col_id, offset, inner_page.clone()); Ok(Some(inner_page)) } } @@ -480,20 +484,20 @@ mod tests { let page2 = Page::dummy_page(PageType::DICTIONARY_PAGE, 200); // Insert and retrieve a data page - cache.insert_page(0, 1000, page1.clone()); - let retrieved = cache.get_page(0, 1000); + cache.get().insert_page(0, 1000, page1.clone()); + let retrieved = cache.get().get_page(0, 1000); assert!(retrieved.is_some()); assert_eq!(retrieved.unwrap().page_type(), PageType::DATA_PAGE); // Insert and retrieve a dictionary page for same column - cache.insert_page(0, 2000, page2.clone()); - let retrieved = cache.get_page(0, 2000); + cache.get().insert_page(0, 2000, page2.clone()); + let retrieved = cache.get().get_page(0, 2000); assert!(retrieved.is_some()); assert_eq!(retrieved.unwrap().page_type(), PageType::DICTIONARY_PAGE); // Both pages should still be accessible - assert!(cache.get_page(0, 1000).is_some()); - assert!(cache.get_page(0, 2000).is_some()); + assert!(cache.get().get_page(0, 1000).is_some()); + assert!(cache.get().get_page(0, 2000).is_some()); } #[test] @@ -505,13 +509,13 @@ mod tests { let data_page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); // Insert first data page - cache.insert_page(0, 1000, data_page1.clone()); - assert!(cache.get_page(0, 1000).is_some()); + cache.get().insert_page(0, 1000, data_page1.clone()); + assert!(cache.get().get_page(0, 1000).is_some()); // Insert second data page - should replace first data page - cache.insert_page(0, 2000, data_page2.clone()); - assert!(cache.get_page(0, 2000).is_some()); - assert!(cache.get_page(0, 1000).is_none()); // First page should be gone + cache.get().insert_page(0, 2000, data_page2.clone()); + assert!(cache.get().get_page(0, 2000).is_some()); + assert!(cache.get().get_page(0, 1000).is_none()); // First page should be gone } #[test] @@ -523,15 +527,15 @@ mod tests { let page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); // Insert pages for different columns - cache.insert_page(0, 1000, page1.clone()); - cache.insert_page(1, 1000, page2.clone()); + cache.get().insert_page(0, 1000, page1.clone()); + cache.get().insert_page(1, 1000, page2.clone()); // Both pages should be accessible - assert!(cache.get_page(0, 1000).is_some()); - assert!(cache.get_page(1, 1000).is_some()); + assert!(cache.get().get_page(0, 1000).is_some()); + assert!(cache.get().get_page(1, 1000).is_some()); // Non-existent column should return None - assert!(cache.get_page(2, 1000).is_none()); + assert!(cache.get().get_page(2, 1000).is_none()); } } From cdb1d859d0b300c74b61d6ce3bc2354977fc96dd Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 2 Jan 2025 10:46:01 -0600 Subject: [PATCH 15/21] revert irrelevant changes --- arrow-buffer/src/buffer/boolean.rs | 45 ------------------- .../src/arrow/array_reader/byte_view_array.rs | 2 +- .../src/arrow/async_reader/arrow_reader.rs | 2 +- parquet/src/arrow/buffer/view_buffer.rs | 7 --- 4 files changed, 2 insertions(+), 54 deletions(-) diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs index 4e42d3c27e1..aaa86832f69 100644 --- a/arrow-buffer/src/buffer/boolean.rs +++ b/arrow-buffer/src/buffer/boolean.rs @@ -204,51 +204,6 @@ impl BooleanBuffer { pub fn set_slices(&self) -> BitSliceIterator<'_> { BitSliceIterator::new(self.values(), self.offset, self.len) } - - /// Combines this [`BooleanBuffer`] with another using logical AND on the selected bits. - /// - /// Unlike intersection, the `other` [`BooleanBuffer`] must have exactly as many **set bits** as `self`, - /// i.e., self.count_set_bits() == other.len(). - /// - /// This method will keep only the bits in `self` that are also set in `other` - /// at the positions corresponding to `self`'s set bits. - /// For example: - /// self: NNYYYNNYYNYN - /// other: YNY NY N - /// result: NNYNYNNNYNNN - pub fn and_then(&self, other: &Self) -> Self { - // Ensure that 'other' has exactly as many set bits as 'self' - debug_assert_eq!( - self.count_set_bits(), - other.len(), - "The 'other' selection must have exactly as many set bits as 'self'." - ); - - if self.len() == other.len() { - // fast path if the two bool masks are the same length - // this happens when self selects all rows - debug_assert_eq!(self.count_set_bits(), self.len()); - return other.clone(); - } - - let mut buffer = MutableBuffer::from_len_zeroed(self.values().len()); - buffer.copy_from_slice(self.values()); - let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.len()); - - // Create iterators for 'self' and 'other' bits - let mut other_bits = other.iter(); - - for bit_idx in self.set_indices() { - let predicate = other_bits - .next() - .expect("Mismatch in set bits between self and other"); - if !predicate { - builder.set_bit(bit_idx, false); - } - } - - builder.finish() - } } impl Not for &BooleanBuffer { diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index a4366877dcb..5573e49a42c 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -161,7 +161,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { )); } - let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1); + let mut buffer = ViewBuffer::default(); let mut decoder = ByteViewArrayDecoderPlain::new( buf, num_values as usize, diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index 12a8b839ab7..a654928f0b2 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -17,7 +17,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::sync::{Mutex, MutexGuard, RwLock}; +use std::sync::{Mutex, MutexGuard}; use std::{collections::VecDeque, sync::Arc}; use arrow_array::ArrayRef; diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index a21d153df76..fd7d6c213f0 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -33,13 +33,6 @@ pub struct ViewBuffer { } impl ViewBuffer { - pub fn with_capacity(view_capacity: usize, buffer_capacity: usize) -> Self { - Self { - views: Vec::with_capacity(view_capacity), - buffers: Vec::with_capacity(buffer_capacity), - } - } - pub fn is_empty(&self) -> bool { self.views.is_empty() } From 69720e507533dc5ce47090510405b3a99023c50c Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 3 Jan 2025 10:23:15 -0600 Subject: [PATCH 16/21] submodule --- parquet-testing | 2 +- testing | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-testing b/parquet-testing index 1ba34478f53..4439a223a31 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 1ba34478f535c89382263c42c675a9af4f57f2dd +Subproject commit 4439a223a315cf874746d3b5da25e6a6b2a2b16e diff --git a/testing b/testing index 4d209492d51..735ae7128d5 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 4d209492d514c2d3cb2d392681b9aa00e6d8da1c +Subproject commit 735ae7128d571398dd798d7ff004adebeb342883 From a9550ab730db807460f14d4de424c747a97ca9af Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 3 Jan 2025 11:17:44 -0600 Subject: [PATCH 17/21] update --- parquet/src/arrow/async_reader/arrow_reader.rs | 2 -- parquet/src/arrow/async_reader/mod.rs | 2 +- parquet/src/file/serialized_reader.rs | 3 +++ 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index a654928f0b2..b8a1466c838 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -120,7 +120,6 @@ impl FilteredParquetRecordBatchReader { self.row_filter.take() } - #[inline(never)] /// Take a selection, and return the new selection where the rows are filtered by the predicate. fn build_predicate_filter( &mut self, @@ -372,7 +371,6 @@ impl Iterator for CachedPageReader { impl PageReader for CachedPageReader { fn get_next_page(&mut self) -> Result, ParquetError> { - // self.inner.get_next_page() let next_page_offset = self.inner.peek_next_page_offset()?; let Some(offset) = next_page_offset else { diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index b466e0b318e..48d12d6434f 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -686,7 +686,7 @@ where /// - `Ok(None)` if the stream has ended. /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`. /// - `Ok(Some(reader))` which holds all the data for the row group. - pub async fn next_row_group(&mut self) -> Result> { + pub async fn next_row_group(&mut self) -> Result> { loop { match &mut self.state { StreamState::Decoding(_) | StreamState::Reading(_) => { diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 0356455f901..4bd69b41029 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -568,6 +568,9 @@ impl SerializedPageReader { }) } + /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata. + /// Unlike page metadata, an offset can uniquely identify a page. + /// Useful when we want to if the next page is being cached or read previously. #[cfg(feature = "async")] pub(crate) fn peek_next_page_offset(&mut self) -> Result> { match &mut self.state { From be1435f8ab10c1e0bd089e1be88f087d4d17c9e7 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 6 Jan 2025 11:29:35 -0600 Subject: [PATCH 18/21] rebase --- parquet/src/arrow/async_reader/mod.rs | 28 ++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 48d12d6434f..8f66c5ddd74 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -811,7 +811,33 @@ struct InMemoryRowGroup<'a> { projection_to_cache: Option, } -impl InMemoryRowGroup<'_> { +impl<'a> InMemoryRowGroup<'a> { + fn new( + metadata: &'a RowGroupMetaData, + offset_index: Option<&'a [OffsetIndexMetaData]>, + projection_to_cache: Option, + ) -> Self { + let to_cache_column_cnt = projection_to_cache + .as_ref() + .map(|p| { + if let Some(mask) = &p.mask { + mask.iter().filter(|&&b| b).count() + } else { + metadata.columns().len() + } + }) + .unwrap_or(0); + Self { + metadata, + offset_index, + column_chunks: vec![None; metadata.columns().len()], + row_count: metadata.num_rows() as usize, + cache: Arc::new(PredicatePageCache::new(to_cache_column_cnt)), + projection_to_cache, + } + } +} +impl<'a> InMemoryRowGroup<'a> { /// Fetches the necessary column data into memory async fn fetch( &mut self, From 21e015b13dae40442bec0a918188131dc8f2906f Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 8 Jan 2025 10:04:25 -0600 Subject: [PATCH 19/21] remove unrelated changes --- parquet/src/arrow/array_reader/byte_view_array.rs | 5 ----- parquet/src/arrow/arrow_reader/filter.rs | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index f3b7bf46d5d..64d839fa587 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -169,9 +169,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { self.validate_utf8, ); decoder.read(&mut buffer, usize::MAX)?; - self.dict = Some(buffer); - Ok(()) } @@ -309,9 +307,6 @@ impl ByteViewArrayDecoderPlain { num_values: Option, validate_utf8: bool, ) -> Self { - // Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy - // Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy - let buf = arrow_buffer::Buffer::from_bytes(buf.clone().into()); Self { buf: Buffer::from(buf), offset: 0, diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index 931e13e252f..2e22f7e01cf 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -110,7 +110,7 @@ where /// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection pub struct RowFilter { /// A list of [`ArrowPredicate`] - pub predicates: Vec>, + pub(crate) predicates: Vec>, } impl RowFilter { From 547fb4611ef86849d5d5a66119ab1066c21b6aaa Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 10 Jan 2025 09:59:13 -0600 Subject: [PATCH 20/21] fix clippy --- parquet/src/file/serialized_reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index fd6fad7326a..e5796a0ba7d 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -573,8 +573,8 @@ impl SerializedPageReader { /// /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice. /// This function allows us to check if the next page is being cached or read previously. - #[cfg(test)] - fn peek_next_page_offset(&mut self) -> Result> { + #[cfg(feature = "async")] + pub(crate) fn peek_next_page_offset(&mut self) -> Result> { match &mut self.state { SerializedPageReaderState::Values { offset, From 05c8c8f05b9b4a136dbdfdb6feb524274cbd9027 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 10 Jan 2025 10:44:30 -0600 Subject: [PATCH 21/21] make various ci improvements --- .../src/arrow/async_reader/arrow_reader.rs | 76 +++++++++---------- parquet/src/arrow/async_reader/mod.rs | 6 +- parquet/src/file/serialized_reader.rs | 5 +- 3 files changed, 44 insertions(+), 43 deletions(-) diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs index b8a1466c838..92e585756d4 100644 --- a/parquet/src/arrow/async_reader/arrow_reader.rs +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -406,6 +406,42 @@ impl PageReader for CachedPageReader { } } +// Helper implementation for testing +#[cfg(test)] +impl Page { + fn dummy_page(page_type: PageType, size: usize) -> Self { + use crate::basic::Encoding; + match page_type { + PageType::DATA_PAGE => Page::DataPage { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + def_level_encoding: Encoding::PLAIN, + rep_level_encoding: Encoding::PLAIN, + statistics: None, + }, + PageType::DICTIONARY_PAGE => Page::DictionaryPage { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + is_sorted: false, + }, + PageType::DATA_PAGE_V2 => Page::DataPageV2 { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + def_levels_byte_len: 0, + rep_levels_byte_len: 0, + is_compressed: false, + statistics: None, + num_nulls: 0, + num_rows: 0, + }, + _ => unreachable!(), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -434,7 +470,7 @@ mod tests { // Check remaining queue assert_eq!(queue.len(), 1); assert_eq!(queue[0].row_count, 7); - assert_eq!(queue[0].skip, false); + assert!(!queue[0].skip); } #[test] @@ -451,7 +487,7 @@ mod tests { // Check remaining queue - should have 5 rows from split and original 10 assert_eq!(queue.len(), 1); - assert_eq!(queue[0].skip, false); + assert!(!queue[0].skip); assert_eq!(queue[0].row_count, 5); } @@ -536,39 +572,3 @@ mod tests { assert!(cache.get().get_page(2, 1000).is_none()); } } - -// Helper implementation for testing -#[cfg(test)] -impl Page { - fn dummy_page(page_type: PageType, size: usize) -> Self { - use crate::basic::Encoding; - match page_type { - PageType::DATA_PAGE => Page::DataPage { - buf: vec![0; size].into(), - num_values: size as u32, - encoding: Encoding::PLAIN, - def_level_encoding: Encoding::PLAIN, - rep_level_encoding: Encoding::PLAIN, - statistics: None, - }, - PageType::DICTIONARY_PAGE => Page::DictionaryPage { - buf: vec![0; size].into(), - num_values: size as u32, - encoding: Encoding::PLAIN, - is_sorted: false, - }, - PageType::DATA_PAGE_V2 => Page::DataPageV2 { - buf: vec![0; size].into(), - num_values: size as u32, - encoding: Encoding::PLAIN, - def_levels_byte_len: 0, - rep_levels_byte_len: 0, - is_compressed: false, - statistics: None, - num_nulls: 0, - num_rows: 0, - }, - _ => unreachable!(), - } - } -} diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 40e68f01d4b..f93846292ba 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -475,7 +475,7 @@ impl ParquetRecordBatchStreamBuilder { type ReadResult = Result<(ReaderFactory, Option)>; /// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create -/// [`ParquetRecordBatchReader`] +/// [`FilteredParquetRecordBatchReader`] struct ReaderFactory { metadata: Arc, @@ -517,7 +517,7 @@ where for predicate in filter.predicates.iter_mut() { let p_projection = predicate.projection(); if let Some(ref mut p) = predicate_projection { - p.union(&p_projection); + p.union(p_projection); } else { predicate_projection = Some(p_projection.clone()); } @@ -849,7 +849,7 @@ impl<'a> InMemoryRowGroup<'a> { } } } -impl<'a> InMemoryRowGroup<'a> { +impl InMemoryRowGroup<'_> { /// Fetches the necessary column data into memory async fn fetch( &mut self, diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index e5796a0ba7d..3f2f1e0539d 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -858,7 +858,6 @@ impl PageReader for SerializedPageReader { #[cfg(test)] mod tests { - use std::collections::HashSet; use bytes::Buf; @@ -1165,6 +1164,7 @@ mod tests { assert_eq!(page_count, 2); } + #[cfg(feature = "async")] fn get_serialized_page_reader( file_reader: &SerializedFileReader, row_group: usize, @@ -1201,12 +1201,13 @@ mod tests { ) } + #[cfg(feature = "async")] #[test] fn test_peek_next_page_offset_matches_actual() -> Result<()> { let test_file = get_test_file("alltypes_plain.parquet"); let reader = SerializedFileReader::new(test_file)?; - let mut offset_set = HashSet::new(); + let mut offset_set = std::collections::HashSet::new(); let num_row_groups = reader.metadata.num_row_groups(); for row_group in 0..num_row_groups { let num_columns = reader.metadata.row_group(row_group).num_columns();