diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index e4085472ea2..ba5d8c3d694 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -69,6 +69,7 @@ paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.33.0", optional = true, default-features = false, features = ["system"] } crc32fast = { version = "1.4.2", optional = true, default-features = false } +simdutf8 = "0.1.5" [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 0e16642940d..64d839fa587 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -463,6 +463,8 @@ impl ByteViewArrayDecoderDictionary { } } + output.views.reserve(len); + // Calculate the offset of the dictionary buffers in the output buffers // For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers, // then the base_buffer_idx is 5 - 2 = 3 @@ -683,7 +685,7 @@ impl ByteViewArrayDecoderDelta { /// Check that `val` is a valid UTF-8 sequence pub fn check_valid_utf8(val: &[u8]) -> Result<()> { - match std::str::from_utf8(val) { + match simdutf8::basic::from_utf8(val) { Ok(_) => Ok(()), Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), } diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs new file mode 100644 index 00000000000..92e585756d4 --- /dev/null +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -0,0 +1,574 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::{Mutex, MutexGuard}; +use std::{collections::VecDeque, sync::Arc}; + +use arrow_array::ArrayRef; +use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader}; +use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; +use arrow_select::filter::prep_null_mask_filter; + +use crate::basic::PageType; +use crate::column::page::{Page, PageMetadata, PageReader}; +use crate::errors::ParquetError; +use crate::{ + arrow::{ + array_reader::ArrayReader, + arrow_reader::{RowFilter, RowSelection, RowSelector}, + }, + file::reader::{ChunkReader, SerializedPageReader}, +}; + +pub struct FilteredParquetRecordBatchReader { + batch_size: usize, + array_reader: Box, + predicate_readers: Vec>, + schema: SchemaRef, + selection: VecDeque, + row_filter: Option, +} + +fn read_selection( + reader: &mut dyn ArrayReader, + selection: &RowSelection, +) -> Result { + for selector in selection.iter() { + if selector.skip { + let skipped = reader.skip_records(selector.row_count)?; + debug_assert_eq!(skipped, selector.row_count, "failed to skip rows"); + } else { + let read_records = reader.read_records(selector.row_count)?; + debug_assert_eq!(read_records, selector.row_count, "failed to read rows"); + } + } + reader.consume_batch() +} + +/// Take the next selection from the selection queue, and return the selection +/// whose selected row count is to_select or less (if input selection is exhausted). +fn take_next_selection( + selection: &mut VecDeque, + to_select: usize, +) -> Option { + let mut current_selected = 0; + let mut rt = Vec::new(); + while let Some(front) = selection.pop_front() { + if front.skip { + rt.push(front); + continue; + } + + if current_selected + front.row_count <= to_select { + rt.push(front); + current_selected += front.row_count; + } else { + let select = to_select - current_selected; + let remaining = front.row_count - select; + rt.push(RowSelector::select(select)); + selection.push_front(RowSelector::select(remaining)); + + return Some(rt.into()); + } + } + if !rt.is_empty() { + return Some(rt.into()); + } + None +} + +impl FilteredParquetRecordBatchReader { + pub(crate) fn new( + batch_size: usize, + array_reader: Box, + selection: RowSelection, + filter_readers: Vec>, + row_filter: Option, + ) -> Self { + let schema = match array_reader.get_data_type() { + DataType::Struct(ref fields) => Schema::new(fields.clone()), + _ => unreachable!("Struct array reader's data type is not struct!"), + }; + + Self { + batch_size, + array_reader, + predicate_readers: filter_readers, + schema: Arc::new(schema), + selection: selection.into(), + row_filter, + } + } + + pub(crate) fn take_filter(&mut self) -> Option { + self.row_filter.take() + } + + /// Take a selection, and return the new selection where the rows are filtered by the predicate. + fn build_predicate_filter( + &mut self, + mut selection: RowSelection, + ) -> Result { + match &mut self.row_filter { + None => Ok(selection), + Some(filter) => { + debug_assert_eq!( + self.predicate_readers.len(), + filter.predicates.len(), + "predicate readers and predicates should have the same length" + ); + + for (predicate, reader) in filter + .predicates + .iter_mut() + .zip(self.predicate_readers.iter_mut()) + { + let array = read_selection(reader.as_mut(), &selection)?; + let batch = RecordBatch::from(array.as_struct_opt().ok_or_else(|| { + general_err!("Struct array reader should return struct array") + })?); + let input_rows = batch.num_rows(); + let predicate_filter = predicate.evaluate(batch)?; + if predicate_filter.len() != input_rows { + return Err(ArrowError::ParquetError(format!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + predicate_filter.len() + ))); + } + let predicate_filter = match predicate_filter.null_count() { + 0 => predicate_filter, + _ => prep_null_mask_filter(&predicate_filter), + }; + let raw = RowSelection::from_filters(&[predicate_filter]); + selection = selection.and_then(&raw); + } + Ok(selection) + } + } + } +} + +impl Iterator for FilteredParquetRecordBatchReader { + type Item = Result; + + fn next(&mut self) -> Option { + // With filter pushdown, it's very hard to predict the number of rows to return -- depends on the selectivity of the filter. + // We can do one of the following: + // 1. Add a coalescing step to coalesce the resulting batches. + // 2. Ask parquet reader to collect more rows before returning. + + // Approach 1 has the drawback of extra overhead of coalesce batch, which can be painful to be efficient. + // Code below implements approach 2, where we keep consuming the selection until we select at least 3/4 of the batch size. + // It boils down to leveraging array_reader's ability to collect large batches natively, + // rather than concatenating multiple small batches. + + let mut selected = 0; + while let Some(cur_selection) = + take_next_selection(&mut self.selection, self.batch_size - selected) + { + let filtered_selection = match self.build_predicate_filter(cur_selection) { + Ok(selection) => selection, + Err(e) => return Some(Err(e)), + }; + + for selector in filtered_selection.iter() { + if selector.skip { + self.array_reader.skip_records(selector.row_count).ok()?; + } else { + self.array_reader.read_records(selector.row_count).ok()?; + } + } + selected += filtered_selection.row_count(); + if selected >= (self.batch_size / 4 * 3) { + break; + } + } + if selected == 0 { + return None; + } + + let array = self.array_reader.consume_batch().ok()?; + let struct_array = array + .as_struct_opt() + .ok_or_else(|| general_err!("Struct array reader should return struct array")) + .ok()?; + Some(Ok(RecordBatch::from(struct_array.clone()))) + } +} + +impl RecordBatchReader for FilteredParquetRecordBatchReader { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +struct CachedPage { + dict: Option<(usize, Page)>, // page offset -> page + data: Option<(usize, Page)>, // page offset -> page +} + +struct PredicatePageCacheInner { + pages: HashMap, // col_id (Parquet's leaf column index) -> CachedPage +} + +impl PredicatePageCacheInner { + pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option { + self.pages.get(&col_id).and_then(|pages| { + pages + .dict + .iter() + .chain(pages.data.iter()) + .find(|(page_offset, _)| *page_offset == offset) + .map(|(_, page)| page.clone()) + }) + } + + /// Insert a page into the cache. + /// Inserting a page will override the existing page, if any. + /// This is because we only need to cache 2 pages per column, see below. + pub(crate) fn insert_page(&mut self, col_id: usize, offset: usize, page: Page) { + let is_dict = page.page_type() == PageType::DICTIONARY_PAGE; + + let cached_pages = self.pages.entry(col_id); + match cached_pages { + Entry::Occupied(mut entry) => { + if is_dict { + entry.get_mut().dict = Some((offset, page)); + } else { + entry.get_mut().data = Some((offset, page)); + } + } + Entry::Vacant(entry) => { + let cached_page = if is_dict { + CachedPage { + dict: Some((offset, page)), + data: None, + } + } else { + CachedPage { + dict: None, + data: Some((offset, page)), + } + }; + entry.insert(cached_page); + } + } + } +} + +/// A simple cache to avoid double-decompressing pages with filter pushdown. +/// In filter pushdown, we first decompress a page, apply the filter, and then decompress the page again. +/// This double decompression is expensive, so we cache the decompressed page. +/// +/// This implementation contains subtle dynamics that can be hard to understand. +/// +/// ## Which columns to cache +/// +/// Let's consider this example: SELECT B, C FROM table WHERE A = 42 and B = 37; +/// We have 3 columns, and the predicate is applied to column A and B, and projection is on B and C. +/// +/// For column A, we need to decompress it, apply the filter (A=42), and never have to decompress it again, as it's not in the projection. +/// For column B, we need to decompress it, apply the filter (B=37), and then decompress it again, as it's in the projection. +/// For column C, we don't have predicate, so we only decompress it once. +/// +/// A, C is only decompressed once, and B is decompressed twice (as it appears in both the predicate and the projection). +/// The PredicatePageCache will only cache B. +/// We use B's col_id (Parquet's leaf column index) to identify the cache entry. +/// +/// ## How many pages to cache +/// +/// Now we identified the columns to cache, next question is to determine the **minimal** number of pages to cache. +/// +/// Let's revisit our decoding pipeline: +/// Load batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1 +/// Load batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2 +/// ... +/// Load batch N -> evaluate predicates -> filter N -> load & emit batch N +/// +/// Assumption & observation: each page consists multiple batches. +/// Then our pipeline looks like this: +/// Load Page 1 +/// Load batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1 +/// Load batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2 +/// Load batch 3 -> evaluate predicates -> filter 3 -> load & emit batch 3 +/// Load Page 2 +/// Load batch 4 -> evaluate predicates -> filter 4 -> load & emit batch 4 +/// Load batch 5 -> evaluate predicates -> filter 5 -> load & emit batch 5 +/// ... +/// +/// This means that we only need to cache one page per column, +/// because the page that is used by the predicate is the same page, and is immediately used in loading the batch. +/// +/// The only exception is the dictionary page -- the first page of each column. +/// If we encountered a dict page, we will need to immediately read next page, and cache it. +/// +/// To summarize, the cache only contains 2 pages per column: one dict page and one data page. +/// This is a nice property as it means the caching memory consumption is negligible and constant to the number of columns. +/// +/// ## How to identify a page +/// We use the page offset (the offset to the Parquet file) to uniquely identify a page. +pub(crate) struct PredicatePageCache { + inner: Mutex, +} + +impl PredicatePageCache { + pub(crate) fn new(capacity: usize) -> Self { + Self { + inner: Mutex::new(PredicatePageCacheInner { + pages: HashMap::with_capacity(capacity), + }), + } + } + + fn get(&self) -> MutexGuard { + self.inner.lock().unwrap() + } +} + +pub(crate) struct CachedPageReader { + inner: SerializedPageReader, + cache: Arc, + col_id: usize, +} + +impl CachedPageReader { + pub(crate) fn new( + inner: SerializedPageReader, + cache: Arc, + col_id: usize, + ) -> Self { + Self { + inner, + cache, + col_id, + } + } +} + +impl Iterator for CachedPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +impl PageReader for CachedPageReader { + fn get_next_page(&mut self) -> Result, ParquetError> { + let next_page_offset = self.inner.peek_next_page_offset()?; + + let Some(offset) = next_page_offset else { + return Ok(None); + }; + + let mut cache = self.cache.get(); + + let page = cache.get_page(self.col_id, offset); + if let Some(page) = page { + self.inner.skip_next_page()?; + Ok(Some(page)) + } else { + let inner_page = self.inner.get_next_page()?; + let Some(inner_page) = inner_page else { + return Ok(None); + }; + cache.insert_page(self.col_id, offset, inner_page.clone()); + Ok(Some(inner_page)) + } + } + + fn peek_next_page(&mut self) -> Result, ParquetError> { + self.inner.peek_next_page() + } + + fn skip_next_page(&mut self) -> Result<(), ParquetError> { + self.inner.skip_next_page() + } + + fn at_record_boundary(&mut self) -> Result { + self.inner.at_record_boundary() + } +} + +// Helper implementation for testing +#[cfg(test)] +impl Page { + fn dummy_page(page_type: PageType, size: usize) -> Self { + use crate::basic::Encoding; + match page_type { + PageType::DATA_PAGE => Page::DataPage { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + def_level_encoding: Encoding::PLAIN, + rep_level_encoding: Encoding::PLAIN, + statistics: None, + }, + PageType::DICTIONARY_PAGE => Page::DictionaryPage { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + is_sorted: false, + }, + PageType::DATA_PAGE_V2 => Page::DataPageV2 { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + def_levels_byte_len: 0, + rep_levels_byte_len: 0, + is_compressed: false, + statistics: None, + num_nulls: 0, + num_rows: 0, + }, + _ => unreachable!(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_take_next_selection_exact_match() { + let mut queue = VecDeque::from(vec![ + RowSelector::skip(5), + RowSelector::select(3), + RowSelector::skip(2), + RowSelector::select(7), + ]); + + // Request exactly 10 rows (5 skip + 3 select + 2 skip) + let selection = take_next_selection(&mut queue, 3).unwrap(); + assert_eq!( + selection, + vec![ + RowSelector::skip(5), + RowSelector::select(3), + RowSelector::skip(2) + ] + .into() + ); + + // Check remaining queue + assert_eq!(queue.len(), 1); + assert_eq!(queue[0].row_count, 7); + assert!(!queue[0].skip); + } + + #[test] + fn test_take_next_selection_split_required() { + let mut queue = VecDeque::from(vec![RowSelector::select(10), RowSelector::select(10)]); + + // Request 15 rows, which should split the first selector + let selection = take_next_selection(&mut queue, 15).unwrap(); + + assert_eq!( + selection, + vec![RowSelector::select(10), RowSelector::select(5)].into() + ); + + // Check remaining queue - should have 5 rows from split and original 10 + assert_eq!(queue.len(), 1); + assert!(!queue[0].skip); + assert_eq!(queue[0].row_count, 5); + } + + #[test] + fn test_take_next_selection_empty_queue() { + let mut queue = VecDeque::new(); + + // Should return None for empty queue + let selection = take_next_selection(&mut queue, 10); + assert!(selection.is_none()); + + // Test with queue that becomes empty + queue.push_back(RowSelector::select(5)); + let selection = take_next_selection(&mut queue, 10).unwrap(); + assert_eq!(selection, vec![RowSelector::select(5)].into()); + + // Queue should now be empty + let selection = take_next_selection(&mut queue, 10); + assert!(selection.is_none()); + } + + #[test] + fn test_predicate_page_cache_basic_operations() { + use super::*; + + let cache = PredicatePageCache::new(2); + let page1 = Page::dummy_page(PageType::DATA_PAGE, 100); + let page2 = Page::dummy_page(PageType::DICTIONARY_PAGE, 200); + + // Insert and retrieve a data page + cache.get().insert_page(0, 1000, page1.clone()); + let retrieved = cache.get().get_page(0, 1000); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().page_type(), PageType::DATA_PAGE); + + // Insert and retrieve a dictionary page for same column + cache.get().insert_page(0, 2000, page2.clone()); + let retrieved = cache.get().get_page(0, 2000); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().page_type(), PageType::DICTIONARY_PAGE); + + // Both pages should still be accessible + assert!(cache.get().get_page(0, 1000).is_some()); + assert!(cache.get().get_page(0, 2000).is_some()); + } + + #[test] + fn test_predicate_page_cache_replacement() { + use super::*; + + let cache = PredicatePageCache::new(2); + let data_page1 = Page::dummy_page(PageType::DATA_PAGE, 100); + let data_page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); + + // Insert first data page + cache.get().insert_page(0, 1000, data_page1.clone()); + assert!(cache.get().get_page(0, 1000).is_some()); + + // Insert second data page - should replace first data page + cache.get().insert_page(0, 2000, data_page2.clone()); + assert!(cache.get().get_page(0, 2000).is_some()); + assert!(cache.get().get_page(0, 1000).is_none()); // First page should be gone + } + + #[test] + fn test_predicate_page_cache_multiple_columns() { + use super::*; + + let cache = PredicatePageCache::new(2); + let page1 = Page::dummy_page(PageType::DATA_PAGE, 100); + let page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); + + // Insert pages for different columns + cache.get().insert_page(0, 1000, page1.clone()); + cache.get().insert_page(1, 1000, page2.clone()); + + // Both pages should be accessible + assert!(cache.get().get_page(0, 1000).is_some()); + assert!(cache.get().get_page(1, 1000).is_some()); + + // Non-existent column should return None + assert!(cache.get().get_page(2, 1000).is_none()); + } +} diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5323251b07e..f93846292ba 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -83,6 +83,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use arrow_reader::{CachedPageReader, FilteredParquetRecordBatchReader, PredicatePageCache}; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -94,8 +95,7 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ - apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, - ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, + ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; @@ -110,6 +110,7 @@ use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::file::FOOTER_SIZE; use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; +mod arrow_reader; mod metadata; pub use metadata::*; @@ -120,6 +121,8 @@ use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; +use super::arrow_reader::RowSelector; + /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files /// /// Notes: @@ -469,10 +472,10 @@ impl ParquetRecordBatchStreamBuilder { } } -type ReadResult = Result<(ReaderFactory, Option)>; +type ReadResult = Result<(ReaderFactory, Option)>; /// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create -/// [`ParquetRecordBatchReader`] +/// [`FilteredParquetRecordBatchReader`] struct ReaderFactory { metadata: Arc, @@ -497,68 +500,74 @@ where async fn read_row_group( mut self, row_group_idx: usize, - mut selection: Option, + selection: Option, projection: ProjectionMask, batch_size: usize, ) -> ReadResult { - // TODO: calling build_array multiple times is wasteful - let meta = self.metadata.row_group(row_group_idx); let offset_index = self .metadata .offset_index() // filter out empty offset indexes (old versions specified Some(vec![]) when no present) - .filter(|index| !index.is_empty()) + .filter(|index| index.first().map(|v| !v.is_empty()).unwrap_or(false)) .map(|x| x[row_group_idx].as_slice()); - let mut row_group = InMemoryRowGroup { - metadata: meta, - // schema: meta.schema_descr_ptr(), - row_count: meta.num_rows() as usize, - column_chunks: vec![None; meta.columns().len()], - offset_index, - }; + let mut predicate_projection: Option = None; + if let Some(filter) = self.filter.as_mut() { + for predicate in filter.predicates.iter_mut() { + let p_projection = predicate.projection(); + if let Some(ref mut p) = predicate_projection { + p.union(p_projection); + } else { + predicate_projection = Some(p_projection.clone()); + } + } + } + let projection_to_cache = predicate_projection.map(|mut p| { + p.intersect(&projection); + p + }); + + let mut row_group = InMemoryRowGroup::new(meta, offset_index, projection_to_cache); + let mut selection = + selection.unwrap_or_else(|| vec![RowSelector::select(row_group.row_count)].into()); + + let mut filter_readers = Vec::new(); if let Some(filter) = self.filter.as_mut() { for predicate in filter.predicates.iter_mut() { - if !selects_any(selection.as_ref()) { + if !selection.selects_any() { return Ok((self, None)); } - let predicate_projection = predicate.projection(); + let p_projection = predicate.projection(); row_group - .fetch(&mut self.input, predicate_projection, selection.as_ref()) + .fetch(&mut self.input, p_projection, Some(&selection)) .await?; let array_reader = - build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; - - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); + build_array_reader(self.fields.as_deref(), p_projection, &row_group)?; + filter_readers.push(array_reader); } } // Compute the number of rows in the selection before applying limit and offset - let rows_before = selection - .as_ref() - .map(|s| s.row_count()) - .unwrap_or(row_group.row_count); + let rows_before = selection.row_count(); if rows_before == 0 { return Ok((self, None)); } - selection = apply_range(selection, row_group.row_count, self.offset, self.limit); + if let Some(offset) = self.offset { + selection = selection.offset(offset); + } + + if let Some(limit) = self.limit { + selection = selection.limit(limit); + } // Compute the number of rows in the selection after applying limit and offset - let rows_after = selection - .as_ref() - .map(|s| s.row_count()) - .unwrap_or(row_group.row_count); + let rows_after = selection.row_count(); // Update offset if necessary if let Some(offset) = &mut self.offset { @@ -576,13 +585,16 @@ where } row_group - .fetch(&mut self.input, &projection, selection.as_ref()) + .fetch(&mut self.input, &projection, Some(&selection)) .await?; - let reader = ParquetRecordBatchReader::new( + let array_reader = build_array_reader(self.fields.as_deref(), &projection, &row_group)?; + let reader = FilteredParquetRecordBatchReader::new( batch_size, - build_array_reader(self.fields.as_deref(), &projection, &row_group)?, + array_reader, selection, + filter_readers, + self.filter.take(), ); Ok((self, Some(reader))) @@ -593,7 +605,7 @@ enum StreamState { /// At the start of a new row group, or the end of the parquet stream Init, /// Decoding a batch - Decoding(ParquetRecordBatchReader), + Decoding(FilteredParquetRecordBatchReader), /// Reading data from input Reading(BoxFuture<'static, ReadResult>), /// Error @@ -686,7 +698,7 @@ where /// - `Ok(None)` if the stream has ended. /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`. /// - `Ok(Some(reader))` which holds all the data for the row group. - pub async fn next_row_group(&mut self) -> Result> { + pub async fn next_row_group(&mut self) -> Result> { loop { match &mut self.state { StreamState::Decoding(_) | StreamState::Reading(_) => { @@ -750,7 +762,12 @@ where self.state = StreamState::Error; return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string())))); } - None => self.state = StreamState::Init, + None => { + // this is ugly, but works for now. + let filter = batch_reader.take_filter(); + self.reader.as_mut().unwrap().filter = filter; + self.state = StreamState::Init + } }, StreamState::Init => { let row_group_idx = match self.row_groups.pop_front() { @@ -802,8 +819,36 @@ struct InMemoryRowGroup<'a> { offset_index: Option<&'a [OffsetIndexMetaData]>, column_chunks: Vec>>, row_count: usize, + cache: Arc, + projection_to_cache: Option, } +impl<'a> InMemoryRowGroup<'a> { + fn new( + metadata: &'a RowGroupMetaData, + offset_index: Option<&'a [OffsetIndexMetaData]>, + projection_to_cache: Option, + ) -> Self { + let to_cache_column_cnt = projection_to_cache + .as_ref() + .map(|p| { + if let Some(mask) = &p.mask { + mask.iter().filter(|&&b| b).count() + } else { + metadata.columns().len() + } + }) + .unwrap_or(0); + Self { + metadata, + offset_index, + column_chunks: vec![None; metadata.columns().len()], + row_count: metadata.num_rows() as usize, + cache: Arc::new(PredicatePageCache::new(to_cache_column_cnt)), + projection_to_cache, + } + } +} impl InMemoryRowGroup<'_> { /// Fetches the necessary column data into memory async fn fetch( @@ -913,12 +958,32 @@ impl RowGroups for InMemoryRowGroup<'_> { // filter out empty offset indexes (old versions specified Some(vec![]) when no present) .filter(|index| !index.is_empty()) .map(|index| index[i].page_locations.clone()); - let page_reader: Box = Box::new(SerializedPageReader::new( - data.clone(), - self.metadata.column(i), - self.row_count, - page_locations, - )?); + + let cached_reader = if let Some(projection_to_cache) = &self.projection_to_cache { + projection_to_cache.leaf_included(i) + } else { + false + }; + + let page_reader: Box = if cached_reader { + Box::new(CachedPageReader::new( + SerializedPageReader::new( + data.clone(), + self.metadata.column(i), + self.row_count, + page_locations, + )?, + self.cache.clone(), + i, + )) + } else { + Box::new(SerializedPageReader::new( + data.clone(), + self.metadata.column(i), + self.row_count, + page_locations, + )?) + }; Ok(Box::new(ColumnChunkIterator { reader: Some(Ok(page_reader)), diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 81ba0a66463..3f2f1e0539d 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -421,7 +421,6 @@ pub(crate) fn decode_page( uncompressed_size )); } - Bytes::from(decompressed) } _ => buffer, @@ -574,8 +573,8 @@ impl SerializedPageReader { /// /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice. /// This function allows us to check if the next page is being cached or read previously. - #[cfg(test)] - fn peek_next_page_offset(&mut self) -> Result> { + #[cfg(feature = "async")] + pub(crate) fn peek_next_page_offset(&mut self) -> Result> { match &mut self.state { SerializedPageReaderState::Values { offset, @@ -859,7 +858,6 @@ impl PageReader for SerializedPageReader { #[cfg(test)] mod tests { - use std::collections::HashSet; use bytes::Buf; @@ -1166,6 +1164,7 @@ mod tests { assert_eq!(page_count, 2); } + #[cfg(feature = "async")] fn get_serialized_page_reader( file_reader: &SerializedFileReader, row_group: usize, @@ -1202,12 +1201,13 @@ mod tests { ) } + #[cfg(feature = "async")] #[test] fn test_peek_next_page_offset_matches_actual() -> Result<()> { let test_file = get_test_file("alltypes_plain.parquet"); let reader = SerializedFileReader::new(test_file)?; - let mut offset_set = HashSet::new(); + let mut offset_set = std::collections::HashSet::new(); let num_row_groups = reader.metadata.num_row_groups(); for row_group in 0..num_row_groups { let num_columns = reader.metadata.row_group(row_group).num_columns();