Skip to content

Commit

Permalink
Add benchmarks for BYTE_STREAM_SPLIT encoded Parquet `FIXED_LEN_BYT…
Browse files Browse the repository at this point in the history
…E_ARRAY` data (#6204)

* save type_width for fixed_len_byte_array

* add decimal128 and float16 byte_stream_split benches

* add f16

* add decimal128 flba(16) bench
  • Loading branch information
etseidl authored Aug 8, 2024
1 parent bd75582 commit ace1401
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 7 deletions.
235 changes: 234 additions & 1 deletion parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use arrow::datatypes::DataType;
use arrow_schema::Field;
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
use half::f16;
use num::FromPrimitive;
use num_bigint::BigInt;
use parquet::arrow::array_reader::{
Expand Down Expand Up @@ -65,6 +66,8 @@ fn build_test_schema() -> SchemaDescPtr {
}
REQUIRED BYTE_ARRAY mandatory_binary_leaf;
OPTIONAL BYTE_ARRAY optional_binary_leaf;
REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_f16_leaf (Float16);
OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_f16_leaf (Float16);
}
";
parse_message_type(message_type)
Expand All @@ -84,6 +87,64 @@ pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}

// support byte array for float16
fn build_encoded_f16_bytes_page_iterator<T>(
column_desc: ColumnDescPtr,
null_density: f32,
encoding: Encoding,
min: f32,
max: f32,
) -> impl PageIterator + Clone
where
T: parquet::data_type::DataType,
T::T: From<Vec<u8>>,
{
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for _j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for _k in 0..VALUES_PER_PAGE {
let def_level = if rng.gen::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
// create the Float16 value
let value = f16::from_f32(rng.gen_range(min..max));
// Float16 in parquet is stored little-endian
let bytes = match column_desc.physical_type() {
Type::FIXED_LEN_BYTE_ARRAY => {
// Float16 annotates FIXED_LEN_BYTE_ARRAY(2)
assert_eq!(column_desc.type_length(), 2);
value.to_le_bytes().to_vec()
}
_ => unimplemented!(),
};
let value = T::T::from(bytes);
values.push(value);
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<T>(encoding, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}
InMemoryPageIterator::new(pages)
}

// support byte array for decimal
fn build_encoded_decimal_bytes_page_iterator<T>(
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -494,6 +555,19 @@ fn create_primitive_array_reader(
}
}

fn create_f16_by_bytes_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
let physical_type = column_desc.physical_type();
match physical_type {
Type::FIXED_LEN_BYTE_ARRAY => {
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}
_ => unimplemented!(),
}
}

fn create_decimal_by_bytes_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -616,6 +690,131 @@ fn bench_byte_decimal<T>(
});
}

fn bench_byte_stream_split_f16<T>(
group: &mut BenchmarkGroup<WallTime>,
mandatory_column_desc: &ColumnDescPtr,
optional_column_desc: &ColumnDescPtr,
min: f32,
max: f32,
) where
T: parquet::data_type::DataType,
T::T: From<Vec<u8>>,
{
let mut count: usize = 0;

// byte_stream_split encoded, no NULLs
let data = build_encoded_f16_bytes_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
Encoding::BYTE_STREAM_SPLIT,
min,
max,
);
group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader =
create_f16_by_bytes_reader(data.clone(), mandatory_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let data = build_encoded_f16_bytes_page_iterator::<T>(
optional_column_desc.clone(),
0.0,
Encoding::BYTE_STREAM_SPLIT,
min,
max,
);
group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader =
create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let data = build_encoded_f16_bytes_page_iterator::<T>(
optional_column_desc.clone(),
0.5,
Encoding::BYTE_STREAM_SPLIT,
min,
max,
);
group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader =
create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
}

fn bench_byte_stream_split_decimal<T>(
group: &mut BenchmarkGroup<WallTime>,
mandatory_column_desc: &ColumnDescPtr,
optional_column_desc: &ColumnDescPtr,
min: i128,
max: i128,
) where
T: parquet::data_type::DataType,
T::T: From<Vec<u8>>,
{
let mut count: usize = 0;

// byte_stream_split encoded, no NULLs
let data = build_encoded_decimal_bytes_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
Encoding::BYTE_STREAM_SPLIT,
min,
max,
);
group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader =
create_decimal_by_bytes_reader(data.clone(), mandatory_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let data = build_encoded_decimal_bytes_page_iterator::<T>(
optional_column_desc.clone(),
0.0,
Encoding::BYTE_STREAM_SPLIT,
min,
max,
);
group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader =
create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// half null
let data = build_encoded_decimal_bytes_page_iterator::<T>(
optional_column_desc.clone(),
0.5,
Encoding::BYTE_STREAM_SPLIT,
min,
max,
);
group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader =
create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
}

fn bench_primitive<T>(
group: &mut BenchmarkGroup<WallTime>,
mandatory_column_desc: &ColumnDescPtr,
Expand Down Expand Up @@ -797,6 +996,35 @@ fn bench_primitive<T>(
});
}

fn byte_stream_split_benches(c: &mut Criterion) {
let schema = build_test_schema();

let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Decimal128Array");
let mandatory_decimal4_leaf_desc = schema.column(12);
let optional_decimal4_leaf_desc = schema.column(13);
bench_byte_stream_split_decimal::<FixedLenByteArrayType>(
&mut group,
&mandatory_decimal4_leaf_desc,
&optional_decimal4_leaf_desc,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();

let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Float16Array");
let mandatory_f16_leaf_desc = schema.column(17);
let optional_f16_leaf_desc = schema.column(18);
bench_byte_stream_split_f16::<FixedLenByteArrayType>(
&mut group,
&mandatory_f16_leaf_desc,
&optional_f16_leaf_desc,
-1.0,
1.0,
);
group.finish();
}

fn decimal_benches(c: &mut Criterion) {
let schema = build_test_schema();
// parquet int32, logical type decimal(8,2)
Expand Down Expand Up @@ -1334,5 +1562,10 @@ fn add_benches(c: &mut Criterion) {
});
}

criterion_group!(benches, add_benches, decimal_benches,);
criterion_group!(
benches,
add_benches,
decimal_benches,
byte_stream_split_benches,
);
criterion_main!(benches);
27 changes: 23 additions & 4 deletions parquet/benches/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@
// under the License.

use criterion::*;
use half::f16;
use parquet::basic::Encoding;
use parquet::data_type::{DataType, DoubleType, FloatType};
use parquet::data_type::{
DataType, DoubleType, FixedLenByteArray, FixedLenByteArrayType, FloatType,
};
use parquet::decoding::{get_decoder, Decoder};
use parquet::encoding::get_encoder;
use parquet::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type};
use rand::prelude::*;
use std::sync::Arc;

fn bench_typed<T: DataType>(c: &mut Criterion, values: &[T::T], encoding: Encoding) {
fn bench_typed<T: DataType>(
c: &mut Criterion,
values: &[T::T],
encoding: Encoding,
type_length: i32,
) {
let name = format!(
"dtype={}, encoding={:?}",
std::any::type_name::<T::T>(),
Expand All @@ -33,6 +41,7 @@ fn bench_typed<T: DataType>(c: &mut Criterion, values: &[T::T], encoding: Encodi
let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
Arc::new(
Type::primitive_type_builder("", T::get_physical_type())
.with_length(type_length)
.build()
.unwrap(),
),
Expand Down Expand Up @@ -68,15 +77,25 @@ fn criterion_benchmark(c: &mut Criterion) {
let mut rng = StdRng::seed_from_u64(0);
let n = 16 * 1024;

let mut f16s = Vec::new();
let mut f32s = Vec::new();
let mut f64s = Vec::new();
let mut d128s = Vec::new();
for _ in 0..n {
f16s.push(FixedLenByteArray::from(
f16::from_f32(rng.gen::<f32>()).to_le_bytes().to_vec(),
));
f32s.push(rng.gen::<f32>());
f64s.push(rng.gen::<f64>());
d128s.push(FixedLenByteArray::from(
rng.gen::<i128>().to_be_bytes().to_vec(),
));
}

bench_typed::<FloatType>(c, &f32s, Encoding::BYTE_STREAM_SPLIT);
bench_typed::<DoubleType>(c, &f64s, Encoding::BYTE_STREAM_SPLIT);
bench_typed::<FloatType>(c, &f32s, Encoding::BYTE_STREAM_SPLIT, 0);
bench_typed::<DoubleType>(c, &f64s, Encoding::BYTE_STREAM_SPLIT, 0);
bench_typed::<FixedLenByteArrayType>(c, &f16s, Encoding::BYTE_STREAM_SPLIT, 2);
bench_typed::<FixedLenByteArrayType>(c, &d128s, Encoding::BYTE_STREAM_SPLIT, 16);
}

criterion_group!(benches, criterion_benchmark);
Expand Down
6 changes: 4 additions & 2 deletions parquet/src/util/test_common/page_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,22 @@ pub struct DataPageBuilderImpl {
rep_levels_byte_len: u32,
def_levels_byte_len: u32,
datapage_v2: bool,
type_width: i32,
}

impl DataPageBuilderImpl {
// `num_values` is the number of non-null values to put in the data page.
// `datapage_v2` flag is used to indicate if the generated data page should use V2
// format or not.
pub fn new(_desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> Self {
pub fn new(desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> Self {
DataPageBuilderImpl {
encoding: None,
num_values,
buffer: vec![],
rep_levels_byte_len: 0,
def_levels_byte_len: 0,
datapage_v2,
type_width: desc.type_length(),
}
}

Expand Down Expand Up @@ -111,7 +113,7 @@ impl DataPageBuilder for DataPageBuilderImpl {
// Create test column descriptor.
let desc = {
let ty = SchemaType::primitive_type_builder("t", T::get_physical_type())
.with_length(0)
.with_length(self.type_width)
.build()
.unwrap();
Arc::new(ColumnDescriptor::new(
Expand Down

0 comments on commit ace1401

Please sign in to comment.