From 116fcbbb766ec1a56eb5cb7c51375b903cce622d Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 3 Jun 2022 21:00:10 +0800 Subject: [PATCH 01/13] add range and rowRanges --- parquet/src/file/metadata.rs | 1 + parquet/src/file/page_index/mod.rs | 1 + parquet/src/file/page_index/range.rs | 235 ++++++++++++++++++++++++++ parquet/src/file/serialized_reader.rs | 1 + 4 files changed, 238 insertions(+) create mode 100644 parquet/src/file/page_index/range.rs diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 2a385be515d4..c672daa230b8 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -223,6 +223,7 @@ pub struct RowGroupMetaData { num_rows: i64, total_byte_size: i64, schema_descr: SchemaDescPtr, + // Todo and filter row range } impl RowGroupMetaData { diff --git a/parquet/src/file/page_index/mod.rs b/parquet/src/file/page_index/mod.rs index dcc1120fc4e3..2855586ff87b 100644 --- a/parquet/src/file/page_index/mod.rs +++ b/parquet/src/file/page_index/mod.rs @@ -17,3 +17,4 @@ pub mod index; pub mod index_reader; +pub mod range; diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs new file mode 100644 index 000000000000..2b7662e957c8 --- /dev/null +++ b/parquet/src/file/page_index/range.rs @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::Ordering; +use std::collections::VecDeque; + +/// A row range +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct Range { + /// Its start + pub from: usize, + /// Its end + pub to: usize, +} + +impl Range { + // Creates a range of [from, to] (from and to are both inclusive) + pub fn new(from: usize, to: usize) -> Self { + assert!(from <= to); + Self { from, to } + } + + pub fn count(&self) -> usize { + self.to - self.from + 1 + } + + pub fn is_before(&self, other: &Range) -> bool { + self.to < other.from + } + + pub fn is_after(&self, other: &Range) -> bool { + self.from > other.to + } + + /// Return the union of the two ranges, + /// Return `None` if there are hole between them. + pub fn union(left: &Range, right: &Range) -> Option { + if left.from <= right.from { + if left.to + 1 >= right.from { + return Some(Range { + from: left.from, + to: std::cmp::max(left.to, right.to), + }); + } + } else if right.to + 1 >= left.from { + return Some(Range { + from: right.from, + to: std::cmp::max(left.to, right.to), + }); + } + None + } + + /// Returns the intersection of the two ranges, + /// return null if they are not overlapped. + pub fn intersection(left: &Range, right: &Range) -> Option { + if left.from <= right.from { + if left.to >= right.from { + return Some(Range { + from: right.from, + to: std::cmp::min(left.to, right.to), + }); + } + } else if right.to >= left.from { + return Some(Range { + from: left.from, + to: std::cmp::min(left.to, right.to), + } + ); + } + None + } +} + +///Struct representing row ranges in a row-group. These row ranges are calculated as a result of using +///the column index on the filtering. +#[derive(Debug, Clone)] +pub struct RowRanges { + pub ranges: VecDeque, +} + +impl RowRanges { + //create an empty RowRanges + pub fn new() -> Self { + RowRanges { + ranges: VecDeque::new(), + } + } + + pub fn count(&self) -> usize { + self.ranges.len() + } + + //Adds a range to the end of the list of ranges. It maintains the disjunctive ascending order of the ranges by + //trying to union the specified range to the last ranges in the list. The specified range shall be larger than + //the last one or might be overlapped with some of the last ones. + // [a, b] < [c, d] if b < c + pub fn add(&mut self, range: Range) { + let mut to_add = range; + let f: i32 = (self.count() as i32) - 1; + if f >= 0 { + for i in f as usize..0 { + let last = self.ranges.get(i).unwrap(); + assert!(!last.is_after(&range)); + // try to merge range + match Range::union(last, &to_add) { + None => { + break; + } + Some(r) => { + to_add = r; + self.ranges.remove(i); + } + } + } + } + self.ranges.push_back(to_add); + } + + /// Calculates the union of the two specified RowRanges object. The union of two range is calculated if there are no + /// elements between them. Otherwise, the two disjunctive ranges are stored separately. + /// For example: + /// [113, 241] ∪ [221, 340] = [113, 330] + /// [113, 230] ∪ [231, 340] = [113, 340] + /// while + /// [113, 230] ∪ [232, 340] = [113, 230], [232, 340] + /// + /// The result RowRanges object will contain all the row indexes that were contained in one of the specified objects. + pub fn union(mut left: RowRanges, mut right: RowRanges) -> RowRanges { + let v1 = &mut left.ranges; + let v2 = &mut right.ranges; + let mut result = RowRanges::new(); + if v2.is_empty() { + left.clone() + } else { + let mut range2 = v2.pop_front().unwrap(); + while !v1.is_empty() { + let range1 = v1.pop_front().unwrap(); + if range1.is_after(&range2) { + result.add(range2); + range2 = range1; + std::mem::swap(v1,v2); + } else { + result.add(range1); + } + } + if !v2.is_empty() { result.ranges.append( v2) } + + result + } + } + + /// Calculates the intersection of the two specified RowRanges object. Two ranges intersect if they have common + /// elements otherwise the result is empty. + /// For example: + /// [113, 241] ∩ [221, 340] = [221, 241] + /// while + /// [113, 230] ∩ [231, 340] = + /// + /// The result RowRanges object will contain all the row indexes there were contained in both of the specified objects + #[allow(clippy::mut_range_bound)] + pub fn intersection(left: RowRanges, right: RowRanges) -> RowRanges { + let mut result = RowRanges::new(); + let mut right_index = 0; + for l in left.ranges.iter() { + for i in right_index..right.ranges.len() { + let r = right.ranges.get(i).unwrap(); + if l.is_before(r) { + break; + } else if l.is_after(r) { + right_index = i + 1; + continue; + } + if let Some(ra) = Range::intersection(l, r) { + result.add(ra); + } + } + } + result + } + + pub fn row_count(&self) -> usize { + self.ranges.iter().map(|x| x.count()).sum() + } + + pub fn is_overlapping(&self, x: &Range) -> bool { + self.ranges.binary_search_by(|y| -> Ordering { + if y.is_before(x) { + Ordering::Less + } else if y.is_after(x) { + Ordering::Greater + } else { + Ordering::Equal + } + }).is_ok() + } +} + + +#[cfg(test)] +mod tests { + use crate::file::page_index::range::{Range, RowRanges}; + + #[test] + fn test_binary_search_overlap() { + let mut ranges = RowRanges::new(); + ranges.add(Range { from: 1, to: 3 }); + ranges.add(Range { from: 6, to: 7 }); + + assert!(ranges.is_overlapping(&Range { from: 1, to: 2 })); + // include both [start, end] + assert!(ranges.is_overlapping(&Range { from: 0, to: 1 })); + assert!(ranges.is_overlapping(&Range { from: 0, to: 3 })); + + assert!(ranges.is_overlapping(&Range { from: 0, to: 7 })); + assert!(ranges.is_overlapping(&Range { from: 2, to: 7 })); + + assert!(!ranges.is_overlapping(&Range { from: 4, to: 5 })); + } + +} \ No newline at end of file diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 1dd374ef85c3..f775779f9799 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -317,6 +317,7 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' fn get_column_page_reader(&self, i: usize) -> Result> { let col = self.metadata.column(i); let (col_start, col_length) = col.byte_range(); + //Todo filter with multi start range let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?; let page_reader = SerializedPageReader::new( file_chunk, From 0f23f207bdc5b1fdea8c505c3a52c3869e26e316 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sat, 4 Jun 2022 23:17:33 +0800 Subject: [PATCH 02/13] add some tests for range --- parquet/src/file/page_index/range.rs | 142 ++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 13 deletions(-) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index 2b7662e957c8..1e867c338d65 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -95,7 +95,7 @@ pub struct RowRanges { impl RowRanges { //create an empty RowRanges - pub fn new() -> Self { + pub fn new_empty() -> Self { RowRanges { ranges: VecDeque::new(), } @@ -105,17 +105,18 @@ impl RowRanges { self.ranges.len() } - //Adds a range to the end of the list of ranges. It maintains the disjunctive ascending order of the ranges by + //Add a range to the end of the list of ranges. It maintains the disjunctive ascending order of the ranges by //trying to union the specified range to the last ranges in the list. The specified range shall be larger than //the last one or might be overlapped with some of the last ones. // [a, b] < [c, d] if b < c pub fn add(&mut self, range: Range) { let mut to_add = range; - let f: i32 = (self.count() as i32) - 1; - if f >= 0 { - for i in f as usize..0 { - let last = self.ranges.get(i).unwrap(); - assert!(!last.is_after(&range)); + let count = self.count(); + if count > 0 { + for i in 1..(count + 1) { + let index = count - i; + let last = self.ranges.get(index).unwrap(); + assert!(!last.is_after(&range), "Must add range in ascending!"); // try to merge range match Range::union(last, &to_add) { None => { @@ -123,7 +124,7 @@ impl RowRanges { } Some(r) => { to_add = r; - self.ranges.remove(i); + self.ranges.remove(index); } } } @@ -143,7 +144,7 @@ impl RowRanges { pub fn union(mut left: RowRanges, mut right: RowRanges) -> RowRanges { let v1 = &mut left.ranges; let v2 = &mut right.ranges; - let mut result = RowRanges::new(); + let mut result = RowRanges::new_empty(); if v2.is_empty() { left.clone() } else { @@ -153,12 +154,16 @@ impl RowRanges { if range1.is_after(&range2) { result.add(range2); range2 = range1; - std::mem::swap(v1,v2); + std::mem::swap(v1, v2); } else { result.add(range1); } } - if !v2.is_empty() { result.ranges.append( v2) } + + result.add(range2); + while !v2.is_empty() { + result.add(v2.pop_front().unwrap()) + } result } @@ -174,7 +179,7 @@ impl RowRanges { /// The result RowRanges object will contain all the row indexes there were contained in both of the specified objects #[allow(clippy::mut_range_bound)] pub fn intersection(left: RowRanges, right: RowRanges) -> RowRanges { - let mut result = RowRanges::new(); + let mut result = RowRanges::new_empty(); let mut right_index = 0; for l in left.ranges.iter() { for i in right_index..right.ranges.len() { @@ -217,7 +222,7 @@ mod tests { #[test] fn test_binary_search_overlap() { - let mut ranges = RowRanges::new(); + let mut ranges = RowRanges::new_empty(); ranges.add(Range { from: 1, to: 3 }); ranges.add(Range { from: 6, to: 7 }); @@ -232,4 +237,115 @@ mod tests { assert!(!ranges.is_overlapping(&Range { from: 4, to: 5 })); } + #[test] + fn test_add_func_ascending_disjunctive() { + let mut ranges_1 = RowRanges::new_empty(); + ranges_1.add(Range { from: 1, to: 3 }); + ranges_1.add(Range { from: 5, to: 6 }); + ranges_1.add(Range { from: 8, to: 9 }); + assert_eq!(ranges_1.count(), 3); + } + + #[test] + fn test_add_func_ascending_merge() { + let mut ranges_1 = RowRanges::new_empty(); + ranges_1.add(Range { from: 1, to: 3 }); + ranges_1.add(Range { from: 4, to: 5 }); + ranges_1.add(Range { from: 6, to: 7 }); + assert_eq!(ranges_1.count(), 1); + } + + #[test] + #[should_panic(expected = "Must add range in ascending!")] + fn test_add_func_not_ascending() { + let mut ranges_1 = RowRanges::new_empty(); + ranges_1.add(Range { from: 6, to: 7 }); + ranges_1.add(Range { from: 1, to: 3 }); + ranges_1.add(Range { from: 4, to: 5 }); + assert_eq!(ranges_1.count(), 1); + } + + #[test] + fn test_union_func() { + let mut ranges_1 = RowRanges::new_empty(); + ranges_1.add(Range { from: 1, to: 2 }); + ranges_1.add(Range { from: 3, to: 4 }); + ranges_1.add(Range { from: 5, to: 6 }); + + let mut ranges_2 = RowRanges::new_empty(); + ranges_2.add(Range { from: 2, to: 3 }); + ranges_2.add(Range { from: 4, to: 5 }); + ranges_2.add(Range { from: 6, to: 7 }); + + let ranges = RowRanges::union(ranges_1, ranges_2); + assert_eq!(ranges.count(), 1); + let range = ranges.ranges.get(0).unwrap(); + assert_eq!(range.from, 1); + assert_eq!(range.to, 7); + + let mut ranges_a = RowRanges::new_empty(); + ranges_a.add(Range { from: 1, to: 3 }); + ranges_a.add(Range { from: 5, to: 8 }); + ranges_a.add(Range { from: 11, to: 12 }); + + let mut ranges_b = RowRanges::new_empty(); + ranges_b.add(Range { from: 0, to: 2 }); + ranges_b.add(Range { from: 6, to: 7 }); + ranges_b.add(Range { from: 10, to: 11 }); + + let ranges = RowRanges::union(ranges_a, ranges_b); + assert_eq!(ranges.count(), 3); + + let range_1 = ranges.ranges.get(0).unwrap(); + assert_eq!(range_1.from, 0); + assert_eq!(range_1.to, 3); + let range_2 = ranges.ranges.get(1).unwrap(); + assert_eq!(range_2.from, 5); + assert_eq!(range_2.to, 8); + let range_3 = ranges.ranges.get(2).unwrap(); + assert_eq!(range_3.from, 10); + assert_eq!(range_3.to, 12); + } + + #[test] + fn test_intersection_func() { + let mut ranges_1 = RowRanges::new_empty(); + ranges_1.add(Range { from: 1, to: 2 }); + ranges_1.add(Range { from: 3, to: 4 }); + ranges_1.add(Range { from: 5, to: 6 }); + + let mut ranges_2 = RowRanges::new_empty(); + ranges_2.add(Range { from: 2, to: 3 }); + ranges_2.add(Range { from: 4, to: 5 }); + ranges_2.add(Range { from: 6, to: 7 }); + + let ranges = RowRanges::intersection(ranges_1, ranges_2); + assert_eq!(ranges.count(), 1); + let range = ranges.ranges.get(0).unwrap(); + assert_eq!(range.from, 2); + assert_eq!(range.to, 6); + + let mut ranges_a = RowRanges::new_empty(); + ranges_a.add(Range { from: 1, to: 3 }); + ranges_a.add(Range { from: 5, to: 8 }); + ranges_a.add(Range { from: 11, to: 12 }); + + let mut ranges_b = RowRanges::new_empty(); + ranges_b.add(Range { from: 0, to: 2 }); + ranges_b.add(Range { from: 6, to: 7 }); + ranges_b.add(Range { from: 10, to: 11 }); + + let ranges = RowRanges::intersection(ranges_a, ranges_b); + assert_eq!(ranges.count(), 3); + + let range_1 = ranges.ranges.get(0).unwrap(); + assert_eq!(range_1.from, 1); + assert_eq!(range_1.to, 2); + let range_2 = ranges.ranges.get(1).unwrap(); + assert_eq!(range_2.from, 6); + assert_eq!(range_2.to, 7); + let range_3 = ranges.ranges.get(2).unwrap(); + assert_eq!(range_3.from, 11); + assert_eq!(range_3.to, 11); + } } \ No newline at end of file From e9c98c096f2fcec03cf2cfe2c60abee5ceb90140 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sat, 4 Jun 2022 23:58:26 +0800 Subject: [PATCH 03/13] add filter logic --- parquet/src/file/page_index/range.rs | 47 ++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index 1e867c338d65..ff684be7acbe 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -17,6 +17,8 @@ use std::cmp::Ordering; use std::collections::VecDeque; +use parquet_format::PageLocation; +use crate::errors::ParquetError; /// A row range #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -105,6 +107,22 @@ impl RowRanges { self.ranges.len() } + pub fn filter_with_mask(&self, mask: &[bool]) -> Result { + if self.ranges.len() != mask.len() { + return Err(ParquetError::General(format!("Mask size{} is not equal to page size {}", mask.len(), self.count()))); + } + let vec_range = mask.iter().zip(self.ranges.clone()).filter_map( + |(&f, r)| { + if f { + Some(r) + } else { + None + } + } + ).collect(); + Ok(RowRanges { ranges: vec_range }) + } + //Add a range to the end of the list of ranges. It maintains the disjunctive ascending order of the ranges by //trying to union the specified range to the last ranges in the list. The specified range shall be larger than //the last one or might be overlapped with some of the last ones. @@ -215,6 +233,35 @@ impl RowRanges { } } +/// Return the row ranges `Vec(start, len)` of all the selected pages +pub fn compute_row_ranges( + mask: &[bool], + locations: &[PageLocation], + total_rows: usize, +) -> Result { + let row_ranges = page_locations_to_row_ranges(locations, total_rows)?; + row_ranges.filter_with_mask(mask) +} + +fn page_locations_to_row_ranges( + locations: &[PageLocation], + total_rows: usize, +) -> Result { + if locations.is_empty() { + return Ok(RowRanges::new_empty()); + } + + let mut vec_range: VecDeque = locations.windows(2).map(|x| { + let start = x[0].first_row_index as usize; + let end = (x[1].first_row_index - 1) as usize; + Range { from: start, to: end } + }).collect(); + + let last = Range { from: locations.last().unwrap().first_row_index as usize, to: total_rows }; + vec_range.push_back(last); + + Ok(RowRanges { ranges: vec_range }) +} #[cfg(test)] mod tests { From 761c9294f9c11ecb1fe50552d65c120054828c46 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 5 Jun 2022 12:16:22 +0800 Subject: [PATCH 04/13] fix fmt --- parquet/src/file/page_index/range.rs | 142 ++++++++++++++++++++------- 1 file changed, 109 insertions(+), 33 deletions(-) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index ff684be7acbe..fe592919fd64 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -14,11 +14,10 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - +use crate::errors::ParquetError; +use parquet_format::PageLocation; use std::cmp::Ordering; use std::collections::VecDeque; -use parquet_format::PageLocation; -use crate::errors::ParquetError; /// A row range #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -81,8 +80,7 @@ impl Range { return Some(Range { from: left.from, to: std::cmp::min(left.to, right.to), - } - ); + }); } None } @@ -109,17 +107,17 @@ impl RowRanges { pub fn filter_with_mask(&self, mask: &[bool]) -> Result { if self.ranges.len() != mask.len() { - return Err(ParquetError::General(format!("Mask size{} is not equal to page size {}", mask.len(), self.count()))); + return Err(ParquetError::General(format!( + "Mask size{} is not equal to page size {}", + mask.len(), + self.count() + ))); } - let vec_range = mask.iter().zip(self.ranges.clone()).filter_map( - |(&f, r)| { - if f { - Some(r) - } else { - None - } - } - ).collect(); + let vec_range = mask + .iter() + .zip(self.ranges.clone()) + .filter_map(|(&f, r)| if f { Some(r) } else { None }) + .collect(); Ok(RowRanges { ranges: vec_range }) } @@ -221,15 +219,17 @@ impl RowRanges { } pub fn is_overlapping(&self, x: &Range) -> bool { - self.ranges.binary_search_by(|y| -> Ordering { - if y.is_before(x) { - Ordering::Less - } else if y.is_after(x) { - Ordering::Greater - } else { - Ordering::Equal - } - }).is_ok() + self.ranges + .binary_search_by(|y| -> Ordering { + if y.is_before(x) { + Ordering::Less + } else if y.is_after(x) { + Ordering::Greater + } else { + Ordering::Equal + } + }) + .is_ok() } } @@ -251,13 +251,22 @@ fn page_locations_to_row_ranges( return Ok(RowRanges::new_empty()); } - let mut vec_range: VecDeque = locations.windows(2).map(|x| { - let start = x[0].first_row_index as usize; - let end = (x[1].first_row_index - 1) as usize; - Range { from: start, to: end } - }).collect(); - - let last = Range { from: locations.last().unwrap().first_row_index as usize, to: total_rows }; + let mut vec_range: VecDeque = locations + .windows(2) + .map(|x| { + let start = x[0].first_row_index as usize; + let end = (x[1].first_row_index - 1) as usize; + Range { + from: start, + to: end, + }; + }) + .collect(); + + let last = Range { + from: locations.last().unwrap().first_row_index as usize, + to: total_rows, + }; vec_range.push_back(last); Ok(RowRanges { ranges: vec_range }) @@ -265,7 +274,10 @@ fn page_locations_to_row_ranges( #[cfg(test)] mod tests { - use crate::file::page_index::range::{Range, RowRanges}; + use crate::basic::Type::INT32; + use crate::file::page_index::index::{NativeIndex, PageIndex}; + use crate::file::page_index::range::{compute_row_ranges, Range, RowRanges}; + use parquet_format::{BoundaryOrder, PageLocation}; #[test] fn test_binary_search_overlap() { @@ -395,4 +407,68 @@ mod tests { assert_eq!(range_3.from, 11); assert_eq!(range_3.to, 11); } -} \ No newline at end of file + + #[test] + fn test_compute_one() { + let locations = &[PageLocation { + offset: 50, + compressed_page_size: 10, + first_row_index: 0, + }]; + let total_rows = 10; + + let row_ranges = compute_row_ranges(&[true], locations, total_rows).unwrap(); + assert_eq!(row_ranges.count(), 1); + assert_eq!( + row_ranges.ranges.get(0).unwrap(), + &Range { from: 0, to: 10 } + ); + } + + #[test] + fn test_compute_multi() { + let index: NativeIndex = NativeIndex { + physical_type: INT32, + indexes: vec![ + PageIndex { + min: Some(0), + max: Some(10), + null_count: Some(0), + }, + PageIndex { + min: Some(15), + max: Some(20), + null_count: Some(0), + }, + ], + boundary_order: BoundaryOrder::Ascending, + }; + let locations = &[ + PageLocation { + offset: 100, + compressed_page_size: 10, + first_row_index: 0, + }, + PageLocation { + offset: 200, + compressed_page_size: 20, + first_row_index: 11, + }, + ]; + let total_rows = 20; + + //filter `x < 11` + let filter = + |page: &PageIndex| page.max.as_ref().map(|&x| x < 11).unwrap_or(false); + + let mask = index.indexes.iter().map(filter).collect::>(); + + let row_ranges = compute_row_ranges(&mask, locations, total_rows).unwrap(); + + assert_eq!(row_ranges.count(), 1); + assert_eq!( + row_ranges.ranges.get(0).unwrap(), + &Range { from: 0, to: 10 } + ); + } +} From be388291036512b925796fdff50b39afdbceee20 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 5 Jun 2022 12:37:34 +0800 Subject: [PATCH 05/13] fix todo --- parquet/src/file/metadata.rs | 2 +- parquet/src/file/serialized_reader.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index c672daa230b8..5a5b8f430757 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -223,7 +223,7 @@ pub struct RowGroupMetaData { num_rows: i64, total_byte_size: i64, schema_descr: SchemaDescPtr, - // Todo and filter row range + // Todo add filter result -> row range } impl RowGroupMetaData { diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index f775779f9799..543a8c591220 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -317,7 +317,7 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' fn get_column_page_reader(&self, i: usize) -> Result> { let col = self.metadata.column(i); let (col_start, col_length) = col.byte_range(); - //Todo filter with multi start range + //Todo filter with multi row range let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?; let page_reader = SerializedPageReader::new( file_chunk, From a4c9e6059fc2fb2374f0880cbbeb787e3b1bea52 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 5 Jun 2022 12:56:33 +0800 Subject: [PATCH 06/13] fix test --- parquet/src/file/page_index/range.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index fe592919fd64..9c7d0c9daddd 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -259,7 +259,7 @@ fn page_locations_to_row_ranges( Range { from: start, to: end, - }; + } }) .collect(); From f57d7e0c38dc3a8a363e0dc00f8b8abd97c29cca Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Sun, 5 Jun 2022 14:04:34 +0800 Subject: [PATCH 07/13] Apply suggestions from code review Co-authored-by: Liang-Chi Hsieh --- parquet/src/file/page_index/range.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index 9c7d0c9daddd..f9519f8985a9 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -86,8 +86,8 @@ impl Range { } } -///Struct representing row ranges in a row-group. These row ranges are calculated as a result of using -///the column index on the filtering. +/// Struct representing row ranges in a row-group. These row ranges are calculated as a result of using +/// the column index on the filtering. #[derive(Debug, Clone)] pub struct RowRanges { pub ranges: VecDeque, @@ -108,7 +108,7 @@ impl RowRanges { pub fn filter_with_mask(&self, mask: &[bool]) -> Result { if self.ranges.len() != mask.len() { return Err(ParquetError::General(format!( - "Mask size{} is not equal to page size {}", + "Mask size {} is not equal to number of pages {}", mask.len(), self.count() ))); From 8329f2135bc158f5aef74d92d6a8ced712d03094 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 5 Jun 2022 15:11:09 +0800 Subject: [PATCH 08/13] fix compute_row_ranges --- parquet/src/file/page_index/range.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index f9519f8985a9..81df1308bd9a 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -265,7 +265,7 @@ fn page_locations_to_row_ranges( let last = Range { from: locations.last().unwrap().first_row_index as usize, - to: total_rows, + to: total_rows - 1, }; vec_range.push_back(last); @@ -419,10 +419,7 @@ mod tests { let row_ranges = compute_row_ranges(&[true], locations, total_rows).unwrap(); assert_eq!(row_ranges.count(), 1); - assert_eq!( - row_ranges.ranges.get(0).unwrap(), - &Range { from: 0, to: 10 } - ); + assert_eq!(row_ranges.ranges.get(0).unwrap(), &Range { from: 0, to: 9 }); } #[test] From d84b337b0f4d19d0fb9368763121a87995e70073 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 5 Jun 2022 15:22:07 +0800 Subject: [PATCH 09/13] fix annotation --- parquet/src/file/page_index/range.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index 81df1308bd9a..bc8f4e3f1612 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -233,7 +233,8 @@ impl RowRanges { } } -/// Return the row ranges `Vec(start, len)` of all the selected pages +/// Return the `RowRanges` of all the selected pages +/// which represents a sequence of ranges of all the selected pages pub fn compute_row_ranges( mask: &[bool], locations: &[PageLocation], From adc48cfb1dc46f3747a8e7364839e397a63bac33 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 6 Jun 2022 14:35:33 +0800 Subject: [PATCH 10/13] change to use std:ops:RangeInclusive --- parquet/src/file/page_index/mod.rs | 2 +- parquet/src/file/page_index/range.rs | 245 +++++++++++++-------------- 2 files changed, 118 insertions(+), 129 deletions(-) diff --git a/parquet/src/file/page_index/mod.rs b/parquet/src/file/page_index/mod.rs index 2855586ff87b..fc87ef20448f 100644 --- a/parquet/src/file/page_index/mod.rs +++ b/parquet/src/file/page_index/mod.rs @@ -17,4 +17,4 @@ pub mod index; pub mod index_reader; -pub mod range; +pub(crate) mod range; diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index bc8f4e3f1612..1d67181a0ae3 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -18,72 +18,68 @@ use crate::errors::ParquetError; use parquet_format::PageLocation; use std::cmp::Ordering; use std::collections::VecDeque; +use std::ops::RangeInclusive; -/// A row range -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct Range { - /// Its start - pub from: usize, - /// Its end - pub to: usize, -} +type Range = RangeInclusive; -impl Range { - // Creates a range of [from, to] (from and to are both inclusive) - pub fn new(from: usize, to: usize) -> Self { - assert!(from <= to); - Self { from, to } - } +pub trait RangeOps { + fn is_before(&self, other: &Range) -> bool; - pub fn count(&self) -> usize { - self.to - self.from + 1 + fn is_after(&self, other: &Range) -> bool; + + fn count(&self) -> usize; +} + +impl RangeOps for Range { + fn is_before(&self, other: &Range) -> bool { + self.end() < other.start() } - pub fn is_before(&self, other: &Range) -> bool { - self.to < other.from + fn is_after(&self, other: &Range) -> bool { + self.start() > other.end() } - pub fn is_after(&self, other: &Range) -> bool { - self.from > other.to + fn count(&self) -> usize { + self.end() - self.start() + 1 } +} - /// Return the union of the two ranges, - /// Return `None` if there are hole between them. - pub fn union(left: &Range, right: &Range) -> Option { - if left.from <= right.from { - if left.to + 1 >= right.from { - return Some(Range { - from: left.from, - to: std::cmp::max(left.to, right.to), - }); - } - } else if right.to + 1 >= left.from { - return Some(Range { - from: right.from, - to: std::cmp::max(left.to, right.to), - }); +/// Return the union of the two ranges, +/// Return `None` if there are hole between them. +pub fn union(left: &Range, right: &Range) -> Option { + if left.start() <= right.start() { + if left.end() + 1 >= *right.start() { + return Some(Range::new( + *left.start(), + std::cmp::max(*left.end(), *right.end()), + )); } - None + } else if right.end() + 1 >= *left.start() { + return Some(Range::new( + *right.start(), + std::cmp::max(*left.end(), *right.end()), + )); } + None +} - /// Returns the intersection of the two ranges, - /// return null if they are not overlapped. - pub fn intersection(left: &Range, right: &Range) -> Option { - if left.from <= right.from { - if left.to >= right.from { - return Some(Range { - from: right.from, - to: std::cmp::min(left.to, right.to), - }); - } - } else if right.to >= left.from { - return Some(Range { - from: left.from, - to: std::cmp::min(left.to, right.to), - }); +/// Returns the intersection of the two ranges, +/// return null if they are not overlapped. +pub fn intersection(left: &Range, right: &Range) -> Option { + if left.start() <= right.start() { + if left.end() >= right.start() { + return Some(Range::new( + *right.start(), + std::cmp::min(*left.end(), *right.end()), + )); } - None + } else if right.end() >= left.start() { + return Some(Range::new( + *left.start(), + std::cmp::min(*left.end(), *right.end()), + )); } + None } /// Struct representing row ranges in a row-group. These row ranges are calculated as a result of using @@ -125,8 +121,7 @@ impl RowRanges { //trying to union the specified range to the last ranges in the list. The specified range shall be larger than //the last one or might be overlapped with some of the last ones. // [a, b] < [c, d] if b < c - pub fn add(&mut self, range: Range) { - let mut to_add = range; + pub fn add(&mut self, mut range: Range) { let count = self.count(); if count > 0 { for i in 1..(count + 1) { @@ -134,18 +129,18 @@ impl RowRanges { let last = self.ranges.get(index).unwrap(); assert!(!last.is_after(&range), "Must add range in ascending!"); // try to merge range - match Range::union(last, &to_add) { + match union(last, &range) { None => { break; } Some(r) => { - to_add = r; + range = r; self.ranges.remove(index); } } } } - self.ranges.push_back(to_add); + self.ranges.push_back(range); } /// Calculates the union of the two specified RowRanges object. The union of two range is calculated if there are no @@ -206,7 +201,7 @@ impl RowRanges { right_index = i + 1; continue; } - if let Some(ra) = Range::intersection(l, r) { + if let Some(ra) = intersection(l, r) { result.add(ra); } } @@ -257,17 +252,14 @@ fn page_locations_to_row_ranges( .map(|x| { let start = x[0].first_row_index as usize; let end = (x[1].first_row_index - 1) as usize; - Range { - from: start, - to: end, - } + Range::new(start, end) }) .collect(); - let last = Range { - from: locations.last().unwrap().first_row_index as usize, - to: total_rows - 1, - }; + let last = Range::new( + locations.last().unwrap().first_row_index as usize, + total_rows - 1, + ); vec_range.push_back(last); Ok(RowRanges { ranges: vec_range }) @@ -283,35 +275,35 @@ mod tests { #[test] fn test_binary_search_overlap() { let mut ranges = RowRanges::new_empty(); - ranges.add(Range { from: 1, to: 3 }); - ranges.add(Range { from: 6, to: 7 }); + ranges.add(Range::new(1, 3)); + ranges.add(Range::new(6, 7)); - assert!(ranges.is_overlapping(&Range { from: 1, to: 2 })); + assert!(ranges.is_overlapping(&Range::new(1, 2))); // include both [start, end] - assert!(ranges.is_overlapping(&Range { from: 0, to: 1 })); - assert!(ranges.is_overlapping(&Range { from: 0, to: 3 })); + assert!(ranges.is_overlapping(&Range::new(0, 1))); + assert!(ranges.is_overlapping(&Range::new(0, 3))); - assert!(ranges.is_overlapping(&Range { from: 0, to: 7 })); - assert!(ranges.is_overlapping(&Range { from: 2, to: 7 })); + assert!(ranges.is_overlapping(&Range::new(0, 7))); + assert!(ranges.is_overlapping(&Range::new(2, 7))); - assert!(!ranges.is_overlapping(&Range { from: 4, to: 5 })); + assert!(!ranges.is_overlapping(&Range::new(4, 5))); } #[test] fn test_add_func_ascending_disjunctive() { let mut ranges_1 = RowRanges::new_empty(); - ranges_1.add(Range { from: 1, to: 3 }); - ranges_1.add(Range { from: 5, to: 6 }); - ranges_1.add(Range { from: 8, to: 9 }); + ranges_1.add(Range::new(1, 3)); + ranges_1.add(Range::new(5, 6)); + ranges_1.add(Range::new(8, 9)); assert_eq!(ranges_1.count(), 3); } #[test] fn test_add_func_ascending_merge() { let mut ranges_1 = RowRanges::new_empty(); - ranges_1.add(Range { from: 1, to: 3 }); - ranges_1.add(Range { from: 4, to: 5 }); - ranges_1.add(Range { from: 6, to: 7 }); + ranges_1.add(Range::new(1, 3)); + ranges_1.add(Range::new(4, 5)); + ranges_1.add(Range::new(6, 7)); assert_eq!(ranges_1.count(), 1); } @@ -319,94 +311,94 @@ mod tests { #[should_panic(expected = "Must add range in ascending!")] fn test_add_func_not_ascending() { let mut ranges_1 = RowRanges::new_empty(); - ranges_1.add(Range { from: 6, to: 7 }); - ranges_1.add(Range { from: 1, to: 3 }); - ranges_1.add(Range { from: 4, to: 5 }); + ranges_1.add(Range::new(6, 7)); + ranges_1.add(Range::new(1, 3)); + ranges_1.add(Range::new(4, 5)); assert_eq!(ranges_1.count(), 1); } #[test] fn test_union_func() { let mut ranges_1 = RowRanges::new_empty(); - ranges_1.add(Range { from: 1, to: 2 }); - ranges_1.add(Range { from: 3, to: 4 }); - ranges_1.add(Range { from: 5, to: 6 }); + ranges_1.add(Range::new(1, 2)); + ranges_1.add(Range::new(3, 4)); + ranges_1.add(Range::new(5, 6)); let mut ranges_2 = RowRanges::new_empty(); - ranges_2.add(Range { from: 2, to: 3 }); - ranges_2.add(Range { from: 4, to: 5 }); - ranges_2.add(Range { from: 6, to: 7 }); + ranges_2.add(Range::new(2, 3)); + ranges_2.add(Range::new(4, 5)); + ranges_2.add(Range::new(6, 7)); let ranges = RowRanges::union(ranges_1, ranges_2); assert_eq!(ranges.count(), 1); let range = ranges.ranges.get(0).unwrap(); - assert_eq!(range.from, 1); - assert_eq!(range.to, 7); + assert_eq!(*range.start(), 1); + assert_eq!(*range.end(), 7); let mut ranges_a = RowRanges::new_empty(); - ranges_a.add(Range { from: 1, to: 3 }); - ranges_a.add(Range { from: 5, to: 8 }); - ranges_a.add(Range { from: 11, to: 12 }); + ranges_a.add(Range::new(1, 3)); + ranges_a.add(Range::new(5, 8)); + ranges_a.add(Range::new(11, 12)); let mut ranges_b = RowRanges::new_empty(); - ranges_b.add(Range { from: 0, to: 2 }); - ranges_b.add(Range { from: 6, to: 7 }); - ranges_b.add(Range { from: 10, to: 11 }); + ranges_b.add(Range::new(0, 2)); + ranges_b.add(Range::new(6, 7)); + ranges_b.add(Range::new(10, 11)); let ranges = RowRanges::union(ranges_a, ranges_b); assert_eq!(ranges.count(), 3); let range_1 = ranges.ranges.get(0).unwrap(); - assert_eq!(range_1.from, 0); - assert_eq!(range_1.to, 3); + assert_eq!(*range_1.start(), 0); + assert_eq!(*range_1.end(), 3); let range_2 = ranges.ranges.get(1).unwrap(); - assert_eq!(range_2.from, 5); - assert_eq!(range_2.to, 8); + assert_eq!(*range_2.start(), 5); + assert_eq!(*range_2.end(), 8); let range_3 = ranges.ranges.get(2).unwrap(); - assert_eq!(range_3.from, 10); - assert_eq!(range_3.to, 12); + assert_eq!(*range_3.start(), 10); + assert_eq!(*range_3.end(), 12); } #[test] fn test_intersection_func() { let mut ranges_1 = RowRanges::new_empty(); - ranges_1.add(Range { from: 1, to: 2 }); - ranges_1.add(Range { from: 3, to: 4 }); - ranges_1.add(Range { from: 5, to: 6 }); + ranges_1.add(Range::new(1, 2)); + ranges_1.add(Range::new(3, 4)); + ranges_1.add(Range::new(5, 6)); let mut ranges_2 = RowRanges::new_empty(); - ranges_2.add(Range { from: 2, to: 3 }); - ranges_2.add(Range { from: 4, to: 5 }); - ranges_2.add(Range { from: 6, to: 7 }); + ranges_2.add(Range::new(2, 3)); + ranges_2.add(Range::new(4, 5)); + ranges_2.add(Range::new(6, 7)); let ranges = RowRanges::intersection(ranges_1, ranges_2); assert_eq!(ranges.count(), 1); let range = ranges.ranges.get(0).unwrap(); - assert_eq!(range.from, 2); - assert_eq!(range.to, 6); + assert_eq!(*range.start(), 2); + assert_eq!(*range.end(), 6); let mut ranges_a = RowRanges::new_empty(); - ranges_a.add(Range { from: 1, to: 3 }); - ranges_a.add(Range { from: 5, to: 8 }); - ranges_a.add(Range { from: 11, to: 12 }); + ranges_a.add(Range::new(1, 3)); + ranges_a.add(Range::new(5, 8)); + ranges_a.add(Range::new(11, 12)); let mut ranges_b = RowRanges::new_empty(); - ranges_b.add(Range { from: 0, to: 2 }); - ranges_b.add(Range { from: 6, to: 7 }); - ranges_b.add(Range { from: 10, to: 11 }); + ranges_b.add(Range::new(0, 2)); + ranges_b.add(Range::new(6, 7)); + ranges_b.add(Range::new(10, 11)); let ranges = RowRanges::intersection(ranges_a, ranges_b); assert_eq!(ranges.count(), 3); let range_1 = ranges.ranges.get(0).unwrap(); - assert_eq!(range_1.from, 1); - assert_eq!(range_1.to, 2); + assert_eq!(*range_1.start(), 1); + assert_eq!(*range_1.end(), 2); let range_2 = ranges.ranges.get(1).unwrap(); - assert_eq!(range_2.from, 6); - assert_eq!(range_2.to, 7); + assert_eq!(*range_2.start(), 6); + assert_eq!(*range_2.end(), 7); let range_3 = ranges.ranges.get(2).unwrap(); - assert_eq!(range_3.from, 11); - assert_eq!(range_3.to, 11); + assert_eq!(*range_3.start(), 11); + assert_eq!(*range_3.end(), 11); } #[test] @@ -420,7 +412,7 @@ mod tests { let row_ranges = compute_row_ranges(&[true], locations, total_rows).unwrap(); assert_eq!(row_ranges.count(), 1); - assert_eq!(row_ranges.ranges.get(0).unwrap(), &Range { from: 0, to: 9 }); + assert_eq!(row_ranges.ranges.get(0).unwrap(), &Range::new(0, 9)); } #[test] @@ -464,9 +456,6 @@ mod tests { let row_ranges = compute_row_ranges(&mask, locations, total_rows).unwrap(); assert_eq!(row_ranges.count(), 1); - assert_eq!( - row_ranges.ranges.get(0).unwrap(), - &Range { from: 0, to: 10 } - ); + assert_eq!(row_ranges.ranges.get(0).unwrap(), &Range::new(0, 10)); } } From fc61155bae929235cd4c12d29de1217b72151107 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Mon, 6 Jun 2022 20:27:37 +0800 Subject: [PATCH 11/13] Apply suggestions from code review Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet/src/file/page_index/range.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index 1d67181a0ae3..f3613ef84346 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -23,9 +23,9 @@ use std::ops::RangeInclusive; type Range = RangeInclusive; pub trait RangeOps { - fn is_before(&self, other: &Range) -> bool; + fn is_before(&self, other: &Self) -> bool; - fn is_after(&self, other: &Range) -> bool; + fn is_after(&self, other: &Self) -> bool; fn count(&self) -> usize; } @@ -40,7 +40,7 @@ impl RangeOps for Range { } fn count(&self) -> usize { - self.end() - self.start() + 1 + self.end() + 1 - self.start() } } @@ -228,10 +228,10 @@ impl RowRanges { } } -/// Return the `RowRanges` of all the selected pages -/// which represents a sequence of ranges of all the selected pages +/// Takes an array of [`PageLocation`], and a total number of rows, and based on the provided `page_mask` +/// returns the corresponding [`RowRanges`] to scan pub fn compute_row_ranges( - mask: &[bool], + page_mask: &[bool], locations: &[PageLocation], total_rows: usize, ) -> Result { @@ -243,7 +243,7 @@ fn page_locations_to_row_ranges( locations: &[PageLocation], total_rows: usize, ) -> Result { - if locations.is_empty() { + if locations.is_empty() || total_rows == 0 { return Ok(RowRanges::new_empty()); } From ce8aec248e26574e24660c415f86e0b7236e393a Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 6 Jun 2022 20:43:32 +0800 Subject: [PATCH 12/13] fix --- parquet/src/file/page_index/range.rs | 71 ++++++++++++++++------------ 1 file changed, 42 insertions(+), 29 deletions(-) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index f3613ef84346..252a371964d3 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -28,6 +28,10 @@ pub trait RangeOps { fn is_after(&self, other: &Self) -> bool; fn count(&self) -> usize; + + fn union(left: &Range, right: &Range) -> Option; + + fn intersection(left: &Range, right: &Range) -> Option; } impl RangeOps for Range { @@ -42,44 +46,44 @@ impl RangeOps for Range { fn count(&self) -> usize { self.end() + 1 - self.start() } -} -/// Return the union of the two ranges, -/// Return `None` if there are hole between them. -pub fn union(left: &Range, right: &Range) -> Option { - if left.start() <= right.start() { - if left.end() + 1 >= *right.start() { + /// Return the union of the two ranges, + /// Return `None` if there are hole between them. + fn union(left: &Range, right: &Range) -> Option { + if left.start() <= right.start() { + if left.end() + 1 >= *right.start() { + return Some(Range::new( + *left.start(), + std::cmp::max(*left.end(), *right.end()), + )); + } + } else if right.end() + 1 >= *left.start() { return Some(Range::new( - *left.start(), + *right.start(), std::cmp::max(*left.end(), *right.end()), )); } - } else if right.end() + 1 >= *left.start() { - return Some(Range::new( - *right.start(), - std::cmp::max(*left.end(), *right.end()), - )); + None } - None -} -/// Returns the intersection of the two ranges, -/// return null if they are not overlapped. -pub fn intersection(left: &Range, right: &Range) -> Option { - if left.start() <= right.start() { - if left.end() >= right.start() { + /// Returns the intersection of the two ranges, + /// return null if they are not overlapped. + fn intersection(left: &Range, right: &Range) -> Option { + if left.start() <= right.start() { + if left.end() >= right.start() { + return Some(Range::new( + *right.start(), + std::cmp::min(*left.end(), *right.end()), + )); + } + } else if right.end() >= left.start() { return Some(Range::new( - *right.start(), + *left.start(), std::cmp::min(*left.end(), *right.end()), )); } - } else if right.end() >= left.start() { - return Some(Range::new( - *left.start(), - std::cmp::min(*left.end(), *right.end()), - )); + None } - None } /// Struct representing row ranges in a row-group. These row ranges are calculated as a result of using @@ -129,7 +133,7 @@ impl RowRanges { let last = self.ranges.get(index).unwrap(); assert!(!last.is_after(&range), "Must add range in ascending!"); // try to merge range - match union(last, &range) { + match Range::union(last, &range) { None => { break; } @@ -201,7 +205,7 @@ impl RowRanges { right_index = i + 1; continue; } - if let Some(ra) = intersection(l, r) { + if let Some(ra) = Range::intersection(l, r) { result.add(ra); } } @@ -235,8 +239,15 @@ pub fn compute_row_ranges( locations: &[PageLocation], total_rows: usize, ) -> Result { + if page_mask.len() != locations.len() { + return Err(ParquetError::General(format!( + "Page_mask size {} is not equal to number of locations {}", + page_mask.len(), + locations.len(), + ))); + } let row_ranges = page_locations_to_row_ranges(locations, total_rows)?; - row_ranges.filter_with_mask(mask) + row_ranges.filter_with_mask(page_mask) } fn page_locations_to_row_ranges( @@ -247,6 +258,8 @@ fn page_locations_to_row_ranges( return Ok(RowRanges::new_empty()); } + // If we read directly from parquet pageIndex to construct locations, + // the location index should be continuous let mut vec_range: VecDeque = locations .windows(2) .map(|x| { From 81292d556ed14074c209a668ef72d259a9e8aa00 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Mon, 6 Jun 2022 21:47:38 +0800 Subject: [PATCH 13/13] Update parquet/src/file/page_index/range.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet/src/file/page_index/range.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/src/file/page_index/range.rs b/parquet/src/file/page_index/range.rs index 252a371964d3..06c06553ccd5 100644 --- a/parquet/src/file/page_index/range.rs +++ b/parquet/src/file/page_index/range.rs @@ -121,10 +121,10 @@ impl RowRanges { Ok(RowRanges { ranges: vec_range }) } - //Add a range to the end of the list of ranges. It maintains the disjunctive ascending order of the ranges by - //trying to union the specified range to the last ranges in the list. The specified range shall be larger than - //the last one or might be overlapped with some of the last ones. - // [a, b] < [c, d] if b < c + /// Add a range to the end of the list of ranges. It maintains the disjunctive ascending order of the ranges by + /// trying to union the specified range to the last ranges in the list. The specified range shall be larger than + /// the last one or might be overlapped with some of the last ones. + /// [a, b] < [c, d] if b < c pub fn add(&mut self, mut range: Range) { let count = self.count(); if count > 0 {