diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 40ae3b20f2b7..681267b7b491 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -33,7 +33,7 @@ use crate::data_type::private::ParquetValueType; use crate::data_type::*; use crate::encodings::levels::LevelEncoder; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; +use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder}; use crate::file::properties::EnabledStatistics; use crate::file::statistics::{Statistics, ValueStatistics}; use crate::file::{ @@ -183,25 +183,14 @@ pub struct ColumnCloseResult { pub offset_index: Option, } -/// Creates a vector to hold level histogram data. Length will be `max_level + 1`. -/// Because histograms are not necessary when `max_level == 0`, this will return -/// `None` in that case. -fn new_histogram(max_level: i16) -> Option> { - if max_level > 0 { - Some(vec![0; max_level as usize + 1]) - } else { - None - } -} - // Metrics per page #[derive(Default)] struct PageMetrics { num_buffered_values: u32, num_buffered_rows: u32, num_page_nulls: u64, - repetition_level_histogram: Option>, - definition_level_histogram: Option>, + repetition_level_histogram: Option, + definition_level_histogram: Option, } impl PageMetrics { @@ -211,50 +200,37 @@ impl PageMetrics { /// Initialize the repetition level histogram fn with_repetition_level_histogram(mut self, max_level: i16) -> Self { - self.repetition_level_histogram = new_histogram(max_level); + self.repetition_level_histogram = LevelHistogram::try_new(max_level); self } /// Initialize the definition level histogram fn with_definition_level_histogram(mut self, max_level: i16) -> Self { - self.definition_level_histogram = new_histogram(max_level); + self.definition_level_histogram = LevelHistogram::try_new(max_level); self } - /// Sets all elements of `histogram` to 0 - fn reset_histogram(histogram: &mut Option>) { - if let Some(ref mut hist) = histogram { - for v in hist { - *v = 0 - } - } - } - /// Resets the state of this `PageMetrics` to the initial state. /// If histograms have been initialized their contents will be reset to zero. fn new_page(&mut self) { self.num_buffered_values = 0; self.num_buffered_rows = 0; self.num_page_nulls = 0; - PageMetrics::reset_histogram(&mut self.repetition_level_histogram); - PageMetrics::reset_histogram(&mut self.definition_level_histogram); + self.repetition_level_histogram.as_mut().map(LevelHistogram::reset); + self.definition_level_histogram.as_mut().map(LevelHistogram::reset); } /// Updates histogram values using provided repetition levels fn update_repetition_level_histogram(&mut self, levels: &[i16]) { if let Some(ref mut rep_hist) = self.repetition_level_histogram { - for &level in levels { - rep_hist[level as usize] += 1; - } + rep_hist.update_from_levels(levels); } } /// Updates histogram values using provided definition levels fn update_definition_level_histogram(&mut self, levels: &[i16]) { if let Some(ref mut def_hist) = self.definition_level_histogram { - for &level in levels { - def_hist[level as usize] += 1; - } + def_hist.update_from_levels(levels); } } } @@ -274,8 +250,8 @@ struct ColumnMetrics { num_column_nulls: u64, column_distinct_count: Option, variable_length_bytes: Option, - repetition_level_histogram: Option>, - definition_level_histogram: Option>, + repetition_level_histogram: Option, + definition_level_histogram: Option, } impl ColumnMetrics { @@ -285,24 +261,23 @@ impl ColumnMetrics { /// Initialize the repetition level histogram fn with_repetition_level_histogram(mut self, max_level: i16) -> Self { - self.repetition_level_histogram = new_histogram(max_level); + self.repetition_level_histogram = LevelHistogram::try_new(max_level); self } /// Initialize the definition level histogram fn with_definition_level_histogram(mut self, max_level: i16) -> Self { - self.definition_level_histogram = new_histogram(max_level); + self.definition_level_histogram = LevelHistogram::try_new(max_level); self } /// Sum `page_histogram` into `chunk_histogram` - fn update_histogram(chunk_histogram: &mut Option>, page_histogram: &Option>) { - if page_histogram.is_some() && chunk_histogram.is_some() { - let chunk_hist = chunk_histogram.as_mut().unwrap(); - let page_hist = page_histogram.as_ref().unwrap(); - for i in 0..page_hist.len() { - chunk_hist[i] += page_hist[i] - } + fn update_histogram( + chunk_histogram: &mut Option, + page_histogram: &Option, + ) { + if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) { + chunk_hist.add(page_hist); } } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index c6895fad5bbd..864746ef8a83 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -557,8 +557,114 @@ pub struct ColumnChunkMetaData { column_index_offset: Option, column_index_length: Option, unencoded_byte_array_data_bytes: Option, - repetition_level_histogram: Option>, - definition_level_histogram: Option>, + repetition_level_histogram: Option, + definition_level_histogram: Option, +} + +/// Histograms for repetition and definition levels. +/// +/// Each histogram is a vector of length `max_level + 1`. The value at index `i` is the number of +/// values at level `i`. +/// +/// For example, `vec[0]` is the number of rows with level 0, `vec[1]` is the +/// number of rows with level 1, and so on. +/// +#[derive(Debug, Clone, PartialEq)] +pub struct LevelHistogram { + inner: Vec, +} + +impl LevelHistogram { + /// Creates a new level histogram data. + /// + /// Length will be `max_level + 1`. + /// + /// Returns `None` when `max_level == 0` (because histograms are not necessary in this case) + pub fn try_new(max_level: i16) -> Option { + if max_level > 0 { + Some(Self { + inner: vec![0; max_level as usize + 1], + }) + } else { + None + } + } + /// Returns a reference to the the histogram's values. + pub fn values(&self) -> &[i64] { + &self.inner + } + + /// Return the inner vector, consuming self + pub fn into_inner(self) -> Vec { + self.inner + } + + /// Returns the histogram value at the given index. + /// + /// The value of `i` is the number of values with level `i`. For example, + /// `get(1)` returns the number of values with level 1. + /// + /// Returns `None` if the index is out of bounds. + pub fn get(&self, index: usize) -> Option { + self.inner.get(index).copied() + } + + /// Adds the values from the other histogram to this histogram + /// + /// # Panics + /// If the histograms have different lengths + pub fn add(&mut self, other: &Self) { + assert_eq!(self.len(), other.len()); + for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) { + *dst += src; + } + } + + /// return the length of the histogram + pub fn len(&self) -> usize { + self.inner.len() + } + + /// returns if the histogram is empty + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Sets the values of all histogram levels to 0. + pub fn reset(&mut self) { + for value in self.inner.iter_mut() { + *value = 0; + } + } + + /// Updates histogram values using provided repetition levels + /// + /// # Panics + /// if any of the levels is greater than the length of the histogram ( + /// the argument supplied to [`Self::try_new`]) + pub fn update_from_levels(&mut self, levels: &[i16]) { + for &level in levels { + self.inner[level as usize] += 1; + } + } +} + +impl From> for LevelHistogram { + fn from(inner: Vec) -> Self { + Self { inner } + } +} + +impl From for Vec { + fn from(value: LevelHistogram) -> Self { + value.into_inner() + } +} + +impl HeapSize for LevelHistogram { + fn heap_size(&self) -> usize { + self.inner.heap_size() + } } /// Represents common operations for a column chunk. @@ -724,7 +830,7 @@ impl ColumnChunkMetaData { /// The returned value `vec[i]` is how many values are at repetition level `i`. For example, /// `vec[0]` indicates how many rows the page contains. /// This field may not be set by older writers. - pub fn repetition_level_histogram(&self) -> Option<&Vec> { + pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> { self.repetition_level_histogram.as_ref() } @@ -733,7 +839,7 @@ impl ColumnChunkMetaData { /// The returned value `vec[i]` is how many values are at definition level `i`. For example, /// `vec[max_definition_level-1]` indicates how many non-null values are present in the page. /// This field may not be set by older writers. - pub fn definition_level_histogram(&self) -> Option<&Vec> { + pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> { self.definition_level_histogram.as_ref() } @@ -788,6 +894,9 @@ impl ColumnChunkMetaData { (None, None, None) }; + let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from); + let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from); + let result = ColumnChunkMetaData { column_descr, encodings, @@ -838,10 +947,20 @@ impl ColumnChunkMetaData { || self.repetition_level_histogram.is_some() || self.definition_level_histogram.is_some() { + let repetition_level_histogram = self + .repetition_level_histogram + .as_ref() + .map(|hist| hist.clone().into_inner()); + + let definition_level_histogram = self + .definition_level_histogram + .as_ref() + .map(|hist| hist.clone().into_inner()); + Some(SizeStatistics { unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes, - repetition_level_histogram: self.repetition_level_histogram.clone(), - definition_level_histogram: self.definition_level_histogram.clone(), + repetition_level_histogram, + definition_level_histogram, }) } else { None @@ -1023,13 +1142,13 @@ impl ColumnChunkMetaDataBuilder { } /// Sets optional repetition level histogram - pub fn set_repetition_level_histogram(mut self, value: Option>) -> Self { + pub fn set_repetition_level_histogram(mut self, value: Option) -> Self { self.0.repetition_level_histogram = value; self } /// Sets optional repetition level histogram - pub fn set_definition_level_histogram(mut self, value: Option>) -> Self { + pub fn set_definition_level_histogram(mut self, value: Option) -> Self { self.0.definition_level_histogram = value; self } @@ -1049,7 +1168,9 @@ pub struct ColumnIndexBuilder { max_values: Vec>, null_counts: Vec, boundary_order: BoundaryOrder, + /// contains the concatenation of the histograms of all pages repetition_level_histograms: Option>, + /// contains the concatenation of the histograms of all pages definition_level_histograms: Option>, /// Is the information in the builder valid? /// @@ -1099,8 +1220,8 @@ impl ColumnIndexBuilder { /// Does nothing if the `ColumnIndexBuilder` is not in the `valid` state. pub fn append_histograms( &mut self, - repetition_level_histogram: &Option>, - definition_level_histogram: &Option>, + repetition_level_histogram: &Option, + definition_level_histogram: &Option, ) { if !self.valid { return; @@ -1108,12 +1229,12 @@ impl ColumnIndexBuilder { if let Some(ref rep_lvl_hist) = repetition_level_histogram { let hist = self.repetition_level_histograms.get_or_insert(Vec::new()); hist.reserve(rep_lvl_hist.len()); - hist.extend(rep_lvl_hist); + hist.extend(rep_lvl_hist.values()); } if let Some(ref def_lvl_hist) = definition_level_histogram { let hist = self.definition_level_histograms.get_or_insert(Vec::new()); hist.reserve(def_lvl_hist.len()); - hist.extend(def_lvl_hist); + hist.extend(def_lvl_hist.values()); } } @@ -1358,8 +1479,8 @@ mod tests { .set_column_index_offset(Some(8000)) .set_column_index_length(Some(25)) .set_unencoded_byte_array_data_bytes(Some(2000)) - .set_repetition_level_histogram(Some(vec![100, 100])) - .set_definition_level_histogram(Some(vec![0, 200])) + .set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100]))) + .set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200]))) .build() .unwrap(); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index e27a414507db..f2e8f74a378c 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1895,7 +1895,7 @@ mod tests { assert_eq!(file_metadata.row_groups[0].columns.len(), 1); assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); - let check_def_hist = |def_hist: &Vec| { + let check_def_hist = |def_hist: &[i64]| { assert_eq!(def_hist.len(), 2); assert_eq!(def_hist[0], 3); assert_eq!(def_hist[1], 7); @@ -1931,7 +1931,7 @@ mod tests { assert!(column.definition_level_histogram().is_some()); assert!(column.repetition_level_histogram().is_none()); assert!(column.unencoded_byte_array_data_bytes().is_some()); - check_def_hist(column.definition_level_histogram().unwrap()); + check_def_hist(column.definition_level_histogram().unwrap().values()); assert_eq!( unenc_size, column.unencoded_byte_array_data_bytes().unwrap() @@ -2009,7 +2009,7 @@ mod tests { assert_eq!(file_metadata.row_groups[0].columns.len(), 1); assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); - let check_def_hist = |def_hist: &Vec| { + let check_def_hist = |def_hist: &[i64]| { assert_eq!(def_hist.len(), 4); assert_eq!(def_hist[0], 1); assert_eq!(def_hist[1], 1); @@ -2017,7 +2017,7 @@ mod tests { assert_eq!(def_hist[3], 7); }; - let check_rep_hist = |rep_hist: &Vec| { + let check_rep_hist = |rep_hist: &[i64]| { assert_eq!(rep_hist.len(), 2); assert_eq!(rep_hist[0], 5); assert_eq!(rep_hist[1], 5); @@ -2051,8 +2051,8 @@ mod tests { assert!(column.definition_level_histogram().is_some()); assert!(column.repetition_level_histogram().is_some()); assert!(column.unencoded_byte_array_data_bytes().is_none()); - check_def_hist(column.definition_level_histogram().unwrap()); - check_rep_hist(column.repetition_level_histogram().unwrap()); + check_def_hist(column.definition_level_histogram().unwrap().values()); + check_rep_hist(column.repetition_level_histogram().unwrap().values()); // check histogram in column index as well assert!(reader.metadata().column_index().is_some());