From 3e5c76f23f2f5f6ca074935fe242e57de7d8d8bd Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 14 Aug 2024 03:56:11 -0700 Subject: [PATCH] Remove unnecessary null buffer construction when converting arrays to a different type (#6244) * create primitive array from iter and nulls * clippy * speed up some more decimals * add optimizations for byte_stream_split * decimal256 * Revert "add optimizations for byte_stream_split" This reverts commit 5d4ae0dc09f95ee9079b46b117fb554f63157564. * add comments --- arrow-array/src/array/primitive_array.rs | 14 ++++ .../array_reader/fixed_len_byte_array.rs | 83 +++++++++++-------- .../src/arrow/array_reader/primitive_array.rs | 82 +++++++++++------- 3 files changed, 115 insertions(+), 64 deletions(-) diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index 70a8ceaef800..db14845b08d9 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -713,6 +713,20 @@ impl PrimitiveArray { } } + /// Creates a PrimitiveArray based on an iterator of values with provided nulls + pub fn from_iter_values_with_nulls>( + iter: I, + nulls: Option, + ) -> Self { + let val_buf: Buffer = iter.into_iter().collect(); + let len = val_buf.len() / std::mem::size_of::(); + Self { + data_type: T::DATA_TYPE, + values: ScalarBuffer::new(val_buf, 0, len), + nulls, + } + } + /// Creates a PrimitiveArray based on a constant value with `count` elements pub fn from_value(value: T::Native, count: usize) -> Self { unsafe { diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 670a97f351ba..3b2600c54795 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -27,7 +27,7 @@ use crate::column::reader::decoder::ColumnValueDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use arrow_array::{ - ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, + Array, ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray, }; use arrow_buffer::{i256, Buffer, IntervalDayTime}; @@ -165,57 +165,68 @@ impl ArrayReader for FixedLenByteArrayReader { // TODO: An improvement might be to do this conversion on read let array: ArrayRef = match &self.data_type { ArrowType::Decimal128(p, s) => { - let decimal = binary - .iter() - .map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?)))) - .collect::() + // We can simply reuse the null buffer from `binary` rather than recomputing it + // (as was the case when we simply used `collect` to produce the new array). + // The same applies to the transformations below. + let nulls = binary.nulls().cloned(); + let decimal = binary.iter().map(|o| match o { + Some(b) => i128::from_be_bytes(sign_extend_be(b)), + None => i128::default(), + }); + let decimal = Decimal128Array::from_iter_values_with_nulls(decimal, nulls) .with_precision_and_scale(*p, *s)?; - Arc::new(decimal) } ArrowType::Decimal256(p, s) => { - let decimal = binary - .iter() - .map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?)))) - .collect::() + let nulls = binary.nulls().cloned(); + let decimal = binary.iter().map(|o| match o { + Some(b) => i256::from_be_bytes(sign_extend_be(b)), + None => i256::default(), + }); + let decimal = Decimal256Array::from_iter_values_with_nulls(decimal, nulls) .with_precision_and_scale(*p, *s)?; - Arc::new(decimal) } ArrowType::Interval(unit) => { + let nulls = binary.nulls().cloned(); // An interval is stored as 3x 32-bit unsigned integers storing months, days, // and milliseconds match unit { - IntervalUnit::YearMonth => Arc::new( - binary - .iter() - .map(|o| o.map(|b| i32::from_le_bytes(b[0..4].try_into().unwrap()))) - .collect::(), - ) as ArrayRef, - IntervalUnit::DayTime => Arc::new( - binary - .iter() - .map(|o| { - o.map(|b| { - IntervalDayTime::new( - i32::from_le_bytes(b[4..8].try_into().unwrap()), - i32::from_le_bytes(b[8..12].try_into().unwrap()), - ) - }) - }) - .collect::(), - ) as ArrayRef, + IntervalUnit::YearMonth => { + let iter = binary.iter().map(|o| match o { + Some(b) => i32::from_le_bytes(b[0..4].try_into().unwrap()), + None => i32::default(), + }); + let interval = + IntervalYearMonthArray::from_iter_values_with_nulls(iter, nulls); + Arc::new(interval) as ArrayRef + } + IntervalUnit::DayTime => { + let iter = binary.iter().map(|o| match o { + Some(b) => IntervalDayTime::new( + i32::from_le_bytes(b[4..8].try_into().unwrap()), + i32::from_le_bytes(b[8..12].try_into().unwrap()), + ), + None => IntervalDayTime::default(), + }); + let interval = + IntervalDayTimeArray::from_iter_values_with_nulls(iter, nulls); + Arc::new(interval) as ArrayRef + } IntervalUnit::MonthDayNano => { return Err(nyi_err!("MonthDayNano intervals not supported")); } } } - ArrowType::Float16 => Arc::new( - binary - .iter() - .map(|o| o.map(|b| f16::from_le_bytes(b[..2].try_into().unwrap()))) - .collect::(), - ) as ArrayRef, + ArrowType::Float16 => { + let nulls = binary.nulls().cloned(); + let f16s = binary.iter().map(|o| match o { + Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()), + None => f16::default(), + }); + let f16s = Float16Array::from_iter_values_with_nulls(f16s, nulls); + Arc::new(f16s) as ArrayRef + } _ => Arc::new(binary) as ArrayRef, }; diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 07ecc27d9b0b..5e0e09212c7e 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -217,22 +217,35 @@ where arrow_cast::cast(&a, target_type)? } ArrowType::Decimal128(p, s) => { + // We can simply reuse the null buffer from `array` rather than recomputing it + // (as was the case when we simply used `collect` to produce the new array). + let nulls = array.nulls().cloned(); let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| v as i128)) - .collect::(), + ArrowType::Int32 => { + let decimal = array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| match v { + Some(i) => i as i128, + None => i128::default(), + }); + Decimal128Array::from_iter_values_with_nulls(decimal, nulls) + } - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| v as i128)) - .collect::(), + ArrowType::Int64 => { + let decimal = array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| match v { + Some(i) => i as i128, + None => i128::default(), + }); + Decimal128Array::from_iter_values_with_nulls(decimal, nulls) + } _ => { return Err(arrow_err!( "Cannot convert {:?} to decimal", @@ -245,22 +258,35 @@ where Arc::new(array) as ArrayRef } ArrowType::Decimal256(p, s) => { + // We can simply reuse the null buffer from `array` rather than recomputing it + // (as was the case when we simply used `collect` to produce the new array). + let nulls = array.nulls().cloned(); let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| i256::from_i128(v as i128))) - .collect::(), + ArrowType::Int32 => { + let decimal = array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| match v { + Some(i) => i256::from_i128(i as i128), + None => i256::default(), + }); + Decimal256Array::from_iter_values_with_nulls(decimal, nulls) + } - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| i256::from_i128(v as i128))) - .collect::(), + ArrowType::Int64 => { + let decimal = array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| match v { + Some(i) => i256::from_i128(i as i128), + None => i256::default(), + }); + Decimal256Array::from_iter_values_with_nulls(decimal, nulls) + } _ => { return Err(arrow_err!( "Cannot convert {:?} to decimal",