Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use unary() for array conversion in Parquet array readers, speed up Decimal128, Decimal256 and Float16 #6252

Merged
merged 9 commits into from
Aug 22, 2024
30 changes: 30 additions & 0 deletions arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,36 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
PrimitiveArray::new(values, Some(nulls))
}

/// Applies a unary infallible function to each value in an array, producing a
/// new primitive array.
///
/// # Null Handling
///
/// See [`Self::unary`] for more information on null handling.
///
/// # Example: create an [`Int16Array`] from an [`ArrayAccessor`] with item type `&[u8]`
/// ```
/// use arrow_array::{Array, FixedSizeBinaryArray, Int16Array};
/// let input_arg = vec![ vec![1, 0], vec![2, 0], vec![3, 0] ];
/// let arr = FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap();
/// let c = Int16Array::from_unary(&arr, |x| i16::from_le_bytes(x[..2].try_into().unwrap()));
/// assert_eq!(c, Int16Array::from(vec![Some(1i16), Some(2i16), Some(3i16)]));
/// ```
pub fn from_unary<U: ArrayAccessor, F>(left: U, mut op: F) -> Self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how this follows the model of BooleanArray::from_unary https://docs.rs/arrow/latest/arrow/array/struct.BooleanArray.html#method.from_unary

where
F: FnMut(U::Item) -> T::Native,
{
let nulls = left.logical_nulls();
let buffer = unsafe {
// SAFETY: i in range 0..left.len()
let iter = (0..left.len()).map(|i| op(left.value_unchecked(i)));
// SAFETY: upper bound is trusted because `iter` is over a range
Buffer::from_trusted_len_iter(iter)
};

PrimitiveArray::new(buffer.into(), nulls)
}

/// Returns a `PrimitiveBuilder` for this array, suitable for mutating values
/// in place.
///
Expand Down
26 changes: 14 additions & 12 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,25 @@ impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
ArrowType::Decimal128(p, s) => {
let array = buffer.into_array(null_buffer, ArrowType::Binary);
let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let decimal = binary
.iter()
.map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal128Array>()
.with_precision_and_scale(p, s)?;

// Null slots will have 0 length, so we need to check for that in the lambda
// or sign_extend_be will panic.
let decimal = Decimal128Array::from_unary(binary, |x| match x.len() {
0 => i128::default(),
_ => i128::from_be_bytes(sign_extend_be(x)),
})
.with_precision_and_scale(p, s)?;
Arc::new(decimal)
}
ArrowType::Decimal256(p, s) => {
let array = buffer.into_array(null_buffer, ArrowType::Binary);
let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let decimal = binary
.iter()
.map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal256Array>()
.with_precision_and_scale(p, s)?;

// Null slots will have 0 length, so we need to check for that in the lambda
// or sign_extend_be will panic.
let decimal = Decimal256Array::from_unary(binary, |x| match x.len() {
0 => i256::default(),
_ => i256::from_be_bytes(sign_extend_be(x)),
})
.with_precision_and_scale(p, s)?;
Arc::new(decimal)
}
_ => buffer.into_array(null_buffer, self.data_type.clone()),
Expand Down
61 changes: 17 additions & 44 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::{
Array, ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
IntervalDayTimeArray, IntervalYearMonthArray,
};
use arrow_buffer::{i256, Buffer, IntervalDayTime};
Expand Down Expand Up @@ -165,67 +165,40 @@ impl ArrayReader for FixedLenByteArrayReader {
// TODO: An improvement might be to do this conversion on read
let array: ArrayRef = match &self.data_type {
ArrowType::Decimal128(p, s) => {
// We can simply reuse the null buffer from `binary` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
// The same applies to the transformations below.
let nulls = binary.nulls().cloned();
let decimal = binary.iter().map(|o| match o {
Some(b) => i128::from_be_bytes(sign_extend_be(b)),
None => i128::default(),
});
let decimal = Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;
Arc::new(decimal)
let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
as ArrayRef
}
ArrowType::Decimal256(p, s) => {
let nulls = binary.nulls().cloned();
let decimal = binary.iter().map(|o| match o {
Some(b) => i256::from_be_bytes(sign_extend_be(b)),
None => i256::default(),
});
let decimal = Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;
Arc::new(decimal)
let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
as ArrayRef
}
ArrowType::Interval(unit) => {
let nulls = binary.nulls().cloned();
// An interval is stored as 3x 32-bit unsigned integers storing months, days,
// and milliseconds
match unit {
IntervalUnit::YearMonth => {
let iter = binary.iter().map(|o| match o {
Some(b) => i32::from_le_bytes(b[0..4].try_into().unwrap()),
None => i32::default(),
});
let interval =
IntervalYearMonthArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap());
Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef
}
IntervalUnit::DayTime => {
let iter = binary.iter().map(|o| match o {
Some(b) => IntervalDayTime::new(
let f = |b: &[u8]| {
IntervalDayTime::new(
i32::from_le_bytes(b[4..8].try_into().unwrap()),
i32::from_le_bytes(b[8..12].try_into().unwrap()),
),
None => IntervalDayTime::default(),
});
let interval =
IntervalDayTimeArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
)
};
Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
}
IntervalUnit::MonthDayNano => {
return Err(nyi_err!("MonthDayNano intervals not supported"));
}
}
}
ArrowType::Float16 => {
let nulls = binary.nulls().cloned();
let f16s = binary.iter().map(|o| match o {
Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()),
None => f16::default(),
});
let f16s = Float16Array::from_iter_values_with_nulls(f16s, nulls);
Arc::new(f16s) as ArrayRef
let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap());
Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
}
_ => Arc::new(binary) as ArrayRef,
};
Expand Down Expand Up @@ -488,8 +461,8 @@ mod tests {
use crate::arrow::ArrowWriter;
use arrow::datatypes::Field;
use arrow::error::Result as ArrowResult;
use arrow_array::RecordBatch;
use arrow_array::{Array, ListArray};
use arrow_array::{Decimal256Array, RecordBatch};
use bytes::Bytes;
use std::sync::Arc;

Expand Down
80 changes: 24 additions & 56 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,35 +217,19 @@ where
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
// We can simply reuse the null buffer from `array` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
let nulls = array.nulls().cloned();
let array = match array.data_type() {
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}
ArrowType::Int32 => array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we add a comment explaining the rationale for not checking nulls -- something like

// Apply conversion to all elements regardless of null slots as the conversion
// to `i128` is infallible. This improves performance by avoiding a branch in 
// the inner loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand All @@ -258,35 +242,19 @@ where
Arc::new(array) as ArrayRef
}
ArrowType::Decimal256(p, s) => {
// We can simply reuse the null buffer from `array` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
let nulls = array.nulls().cloned();
let array = match array.data_type() {
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|i| i256::from_i128(i as i128))
as Decimal256Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(|i| i256::from_i128(i as i128))
as Decimal256Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand Down
Loading