From ace14018ed3f6571b313e5e34761128242853fda Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 8 Aug 2024 04:45:12 -0700 Subject: [PATCH] Add benchmarks for `BYTE_STREAM_SPLIT` encoded Parquet `FIXED_LEN_BYTE_ARRAY` data (#6204) * save type_width for fixed_len_byte_array * add decimal128 and float16 byte_stream_split benches * add f16 * add decimal128 flba(16) bench --- parquet/benches/arrow_reader.rs | 235 +++++++++++++++++++++- parquet/benches/encoding.rs | 27 ++- parquet/src/util/test_common/page_util.rs | 6 +- 3 files changed, 261 insertions(+), 7 deletions(-) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 814e75c249bf..18e16f0a4297 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -20,6 +20,7 @@ use arrow::datatypes::DataType; use arrow_schema::Field; use criterion::measurement::WallTime; use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion}; +use half::f16; use num::FromPrimitive; use num_bigint::BigInt; use parquet::arrow::array_reader::{ @@ -65,6 +66,8 @@ fn build_test_schema() -> SchemaDescPtr { } REQUIRED BYTE_ARRAY mandatory_binary_leaf; OPTIONAL BYTE_ARRAY optional_binary_leaf; + REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_f16_leaf (Float16); + OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_f16_leaf (Float16); } "; parse_message_type(message_type) @@ -84,6 +87,64 @@ pub fn seedable_rng() -> StdRng { StdRng::seed_from_u64(42) } +// support byte array for float16 +fn build_encoded_f16_bytes_page_iterator( + column_desc: ColumnDescPtr, + null_density: f32, + encoding: Encoding, + min: f32, + max: f32, +) -> impl PageIterator + Clone +where + T: parquet::data_type::DataType, + T::T: From>, +{ + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + for _i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = Vec::new(); + for _j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for _k in 0..VALUES_PER_PAGE { + let def_level = if rng.gen::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + // create the Float16 value + let value = f16::from_f32(rng.gen_range(min..max)); + // Float16 in parquet is stored little-endian + let bytes = match column_desc.physical_type() { + Type::FIXED_LEN_BYTE_ARRAY => { + // Float16 annotates FIXED_LEN_BYTE_ARRAY(2) + assert_eq!(column_desc.type_length(), 2); + value.to_le_bytes().to_vec() + } + _ => unimplemented!(), + }; + let value = T::T::from(bytes); + values.push(value); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + page_builder.add_values::(encoding, &values); + column_chunk_pages.push(page_builder.consume()); + } + pages.push(column_chunk_pages); + } + InMemoryPageIterator::new(pages) +} + // support byte array for decimal fn build_encoded_decimal_bytes_page_iterator( column_desc: ColumnDescPtr, @@ -494,6 +555,19 @@ fn create_primitive_array_reader( } } +fn create_f16_by_bytes_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> Box { + let physical_type = column_desc.physical_type(); + match physical_type { + Type::FIXED_LEN_BYTE_ARRAY => { + make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() + } + _ => unimplemented!(), + } +} + fn create_decimal_by_bytes_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, @@ -616,6 +690,131 @@ fn bench_byte_decimal( }); } +fn bench_byte_stream_split_f16( + group: &mut BenchmarkGroup, + mandatory_column_desc: &ColumnDescPtr, + optional_column_desc: &ColumnDescPtr, + min: f32, + max: f32, +) where + T: parquet::data_type::DataType, + T::T: From>, +{ + let mut count: usize = 0; + + // byte_stream_split encoded, no NULLs + let data = build_encoded_f16_bytes_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_encoded_f16_bytes_page_iterator::( + optional_column_desc.clone(), + 0.0, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_encoded_f16_bytes_page_iterator::( + optional_column_desc.clone(), + 0.5, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); +} + +fn bench_byte_stream_split_decimal( + group: &mut BenchmarkGroup, + mandatory_column_desc: &ColumnDescPtr, + optional_column_desc: &ColumnDescPtr, + min: i128, + max: i128, +) where + T: parquet::data_type::DataType, + T::T: From>, +{ + let mut count: usize = 0; + + // byte_stream_split encoded, no NULLs + let data = build_encoded_decimal_bytes_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_decimal_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_encoded_decimal_bytes_page_iterator::( + optional_column_desc.clone(), + 0.0, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // half null + let data = build_encoded_decimal_bytes_page_iterator::( + optional_column_desc.clone(), + 0.5, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = + create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); +} + fn bench_primitive( group: &mut BenchmarkGroup, mandatory_column_desc: &ColumnDescPtr, @@ -797,6 +996,35 @@ fn bench_primitive( }); } +fn byte_stream_split_benches(c: &mut Criterion) { + let schema = build_test_schema(); + + let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Decimal128Array"); + let mandatory_decimal4_leaf_desc = schema.column(12); + let optional_decimal4_leaf_desc = schema.column(13); + bench_byte_stream_split_decimal::( + &mut group, + &mandatory_decimal4_leaf_desc, + &optional_decimal4_leaf_desc, + // precision is 16: the max is 9999999999999999 + 9999999999999000, + 9999999999999999, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Float16Array"); + let mandatory_f16_leaf_desc = schema.column(17); + let optional_f16_leaf_desc = schema.column(18); + bench_byte_stream_split_f16::( + &mut group, + &mandatory_f16_leaf_desc, + &optional_f16_leaf_desc, + -1.0, + 1.0, + ); + group.finish(); +} + fn decimal_benches(c: &mut Criterion) { let schema = build_test_schema(); // parquet int32, logical type decimal(8,2) @@ -1334,5 +1562,10 @@ fn add_benches(c: &mut Criterion) { }); } -criterion_group!(benches, add_benches, decimal_benches,); +criterion_group!( + benches, + add_benches, + decimal_benches, + byte_stream_split_benches, +); criterion_main!(benches); diff --git a/parquet/benches/encoding.rs b/parquet/benches/encoding.rs index 80befe8dadff..bc18a49da2a4 100644 --- a/parquet/benches/encoding.rs +++ b/parquet/benches/encoding.rs @@ -16,15 +16,23 @@ // under the License. use criterion::*; +use half::f16; use parquet::basic::Encoding; -use parquet::data_type::{DataType, DoubleType, FloatType}; +use parquet::data_type::{ + DataType, DoubleType, FixedLenByteArray, FixedLenByteArrayType, FloatType, +}; use parquet::decoding::{get_decoder, Decoder}; use parquet::encoding::get_encoder; use parquet::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type}; use rand::prelude::*; use std::sync::Arc; -fn bench_typed(c: &mut Criterion, values: &[T::T], encoding: Encoding) { +fn bench_typed( + c: &mut Criterion, + values: &[T::T], + encoding: Encoding, + type_length: i32, +) { let name = format!( "dtype={}, encoding={:?}", std::any::type_name::(), @@ -33,6 +41,7 @@ fn bench_typed(c: &mut Criterion, values: &[T::T], encoding: Encodi let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new( Arc::new( Type::primitive_type_builder("", T::get_physical_type()) + .with_length(type_length) .build() .unwrap(), ), @@ -68,15 +77,25 @@ fn criterion_benchmark(c: &mut Criterion) { let mut rng = StdRng::seed_from_u64(0); let n = 16 * 1024; + let mut f16s = Vec::new(); let mut f32s = Vec::new(); let mut f64s = Vec::new(); + let mut d128s = Vec::new(); for _ in 0..n { + f16s.push(FixedLenByteArray::from( + f16::from_f32(rng.gen::()).to_le_bytes().to_vec(), + )); f32s.push(rng.gen::()); f64s.push(rng.gen::()); + d128s.push(FixedLenByteArray::from( + rng.gen::().to_be_bytes().to_vec(), + )); } - bench_typed::(c, &f32s, Encoding::BYTE_STREAM_SPLIT); - bench_typed::(c, &f64s, Encoding::BYTE_STREAM_SPLIT); + bench_typed::(c, &f32s, Encoding::BYTE_STREAM_SPLIT, 0); + bench_typed::(c, &f64s, Encoding::BYTE_STREAM_SPLIT, 0); + bench_typed::(c, &f16s, Encoding::BYTE_STREAM_SPLIT, 2); + bench_typed::(c, &d128s, Encoding::BYTE_STREAM_SPLIT, 16); } criterion_group!(benches, criterion_benchmark); diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index 3db43aef0eec..a1709efa92b3 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -51,13 +51,14 @@ pub struct DataPageBuilderImpl { rep_levels_byte_len: u32, def_levels_byte_len: u32, datapage_v2: bool, + type_width: i32, } impl DataPageBuilderImpl { // `num_values` is the number of non-null values to put in the data page. // `datapage_v2` flag is used to indicate if the generated data page should use V2 // format or not. - pub fn new(_desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> Self { + pub fn new(desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> Self { DataPageBuilderImpl { encoding: None, num_values, @@ -65,6 +66,7 @@ impl DataPageBuilderImpl { rep_levels_byte_len: 0, def_levels_byte_len: 0, datapage_v2, + type_width: desc.type_length(), } } @@ -111,7 +113,7 @@ impl DataPageBuilder for DataPageBuilderImpl { // Create test column descriptor. let desc = { let ty = SchemaType::primitive_type_builder("t", T::get_physical_type()) - .with_length(0) + .with_length(self.type_width) .build() .unwrap(); Arc::new(ColumnDescriptor::new(