From 40cb3a52d61cb7ebfea7899124e19aa246df9f26 Mon Sep 17 00:00:00 2001 From: Johan Peltenburg Date: Tue, 25 Jun 2024 15:27:15 +0200 Subject: [PATCH] feat: Add support for variable size binary interop with arrow-rs (#191) Signed-off-by: Matthijs Brobbel Co-authored-by: Matthijs Brobbel --- examples/parquet.rs | 9 +- src/array/mod.rs | 44 +++++ src/array/variable_size_binary.rs | 32 ++-- src/arrow/array/mod.rs | 1 + src/arrow/array/variable_size_binary.rs | 232 ++++++++++++++++++++++++ src/length.rs | 12 ++ src/offset.rs | 15 ++ 7 files changed, 330 insertions(+), 15 deletions(-) create mode 100644 src/arrow/array/variable_size_binary.rs diff --git a/examples/parquet.rs b/examples/parquet.rs index 5af19400..de40a804 100644 --- a/examples/parquet.rs +++ b/examples/parquet.rs @@ -3,7 +3,11 @@ fn main() { use arrow_array::RecordBatch; use arrow_cast::pretty; use bytes::Bytes; - use narrow::{array::StructArray, arrow::buffer::ScalarBuffer, ArrayType}; + use narrow::{ + array::{StructArray, VariableSizeBinary}, + arrow::buffer::ScalarBuffer, + ArrayType, + }; use parquet::arrow::{arrow_reader::ParquetRecordBatchReader, ArrowWriter}; use uuid::Uuid; @@ -20,6 +24,7 @@ fn main() { f: Bar, g: [u8; 8], h: Uuid, + i: VariableSizeBinary, } let input = [ Foo { @@ -31,6 +36,7 @@ fn main() { f: Bar(Some(true)), g: [1, 2, 3, 4, 5, 6, 7, 8], h: Uuid::from_u128(1234), + i: vec![1, 3, 3, 7].into(), }, Foo { a: 42, @@ -41,6 +47,7 @@ fn main() { f: Bar(None), g: [9, 10, 11, 12, 13, 14, 15, 16], h: Uuid::from_u128(42), + i: vec![4, 2].into(), }, ]; diff --git a/src/array/mod.rs b/src/array/mod.rs index 27687fe3..b3be8877 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -3,6 +3,7 @@ use crate::{ buffer::BufferType, offset::{self, OffsetElement}, + Length, }; use std::{collections::VecDeque, marker::PhantomData}; @@ -163,6 +164,49 @@ impl From> for [u8; N] { } } +/// An byte vector wrapper that maps to [`VariableSizeBinaryArray`] via its +/// [`ArrayType`] implementation. Used for example to map `Vec` to +/// a [`VariableSizeBinaryArray`] instead of a [`VariableSizeListArray`]. +#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct VariableSizeBinary(Vec); + +impl ArrayType for VariableSizeBinary { + type Array = + VariableSizeBinaryArray; +} + +impl ArrayType for Option { + type Array = + VariableSizeBinaryArray; +} + +impl From> for VariableSizeBinary { + fn from(value: Vec) -> Self { + Self(value) + } +} + +impl From for Vec { + fn from(value: VariableSizeBinary) -> Self { + value.0 + } +} + +impl Length for VariableSizeBinary { + fn len(&self) -> usize { + self.0.len() + } +} + +impl IntoIterator for VariableSizeBinary { + type Item = u8; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + impl, const N: usize> ArrayType<[T; N]> for [T; N] { type Array = FixedSizeListArray< diff --git a/src/array/variable_size_binary.rs b/src/array/variable_size_binary.rs index dbb798a8..a3d78954 100644 --- a/src/array/variable_size_binary.rs +++ b/src/array/variable_size_binary.rs @@ -107,12 +107,13 @@ where impl FromIterator for VariableSizeBinaryArray where + T: IntoIterator, ::Buffer: Validity, Offset, NULLABLE, OffsetItem, Buffer>: - FromIterator, + FromIterator<::IntoIter>, { fn from_iter>(iter: I) -> Self { - Self(iter.into_iter().collect()) + Self(iter.into_iter().map(IntoIterator::into_iter).collect()) } } @@ -217,17 +218,26 @@ impl ValidityBitmap #[cfg(test)] mod tests { use super::*; - use crate::buffer::BufferRef; + use crate::{array::VariableSizeBinary, buffer::BufferRef}; use std::mem; #[test] - fn from_iter() { - let input: [&[u8]; 4] = [&[1], &[2, 3], &[4, 5, 6], &[7, 8, 9, 0]]; + fn from_variable_size_binary() { + let input: [Vec; 4] = [vec![0, 1, 2], vec![3], vec![], vec![4, 5]]; let array = input .into_iter() - .map(<[u8]>::to_vec) + .map(VariableSizeBinary) .collect::(); assert_eq!(array.len(), 4); + assert_eq!(array.0.data.0, &[0, 1, 2, 3, 4, 5]); + assert_eq!(array.0.offsets, &[0, 3, 4, 4, 6]); + } + + #[test] + fn from_iter() { + let input: [&[u8]; 4] = [&[1], &[2, 3], &[4, 5, 6], &[7, 8, 9, 0]]; + let array = input.into_iter().collect::(); + assert_eq!(array.len(), 4); assert_eq!(array.0.data.0, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0]); assert_eq!(array.0.offsets, &[0, 1, 3, 6, 10]); @@ -241,10 +251,7 @@ mod tests { #[test] fn from_iter_nullable() { let input: [Option<&[u8]>; 4] = [Some(&[1]), None, Some(&[4, 5, 6]), Some(&[7, 8, 9, 0])]; - let array = input - .into_iter() - .map(|x| x.map(<[u8]>::to_vec)) - .collect::>(); + let array = input.into_iter().collect::>(); assert_eq!(array.len(), 4); assert_eq!(array.0.data.0, &[1, 4, 5, 6, 7, 8, 9, 0]); assert_eq!(array.0.offsets.as_ref(), &[0, 1, 1, 4, 8]); @@ -271,10 +278,7 @@ mod tests { #[test] fn index() { let input: [&[u8]; 4] = [&[1], &[2, 3], &[4, 5, 6], &[7, 8, 9, 0]]; - let array = input - .into_iter() - .map(<[u8]>::to_vec) - .collect::(); + let array = input.into_iter().collect::(); assert_eq!(array.index_checked(0), &[1]); assert_eq!(array.index_checked(1), &[2, 3]); assert_eq!(array.index_checked(2), &[4, 5, 6]); diff --git a/src/arrow/array/mod.rs b/src/arrow/array/mod.rs index c21aa8fd..7ba88727 100644 --- a/src/arrow/array/mod.rs +++ b/src/arrow/array/mod.rs @@ -11,4 +11,5 @@ mod logical; mod null; mod union; pub use union::UnionArrayTypeFields; +mod variable_size_binary; mod variable_size_list; diff --git a/src/arrow/array/variable_size_binary.rs b/src/arrow/array/variable_size_binary.rs new file mode 100644 index 00000000..480f792d --- /dev/null +++ b/src/arrow/array/variable_size_binary.rs @@ -0,0 +1,232 @@ +//! Interop with [`arrow-rs`] binary array. + +use std::sync::Arc; + +use arrow_array::OffsetSizeTrait; +use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_schema::{DataType, Field}; + +use crate::{ + array::{FixedSizePrimitiveArray, VariableSizeBinaryArray}, + arrow::OffsetElement, + bitmap::Bitmap, + buffer::BufferType, + nullable::Nullable, + offset::Offset, + validity::{Nullability, Validity}, +}; + +impl + crate::arrow::Array for VariableSizeBinaryArray +where + ::Buffer: Validity, + Vec: Nullability, +{ + type Array = arrow_array::GenericBinaryArray; + + fn as_field(name: &str) -> arrow_schema::Field { + Field::new( + name, + if OffsetItem::LARGE { + DataType::LargeBinary + } else { + DataType::Binary + }, + NULLABLE, + ) + } +} + +impl + From> for VariableSizeBinaryArray +where + ::Buffer: Validity, + Self: From>, +{ + fn from(value: Arc) -> Self { + Self::from(arrow_array::GenericBinaryArray::::from( + value.to_data(), + )) + } +} + +impl + From> for Arc +where + ::Buffer: Into>, + FixedSizePrimitiveArray: Into>, +{ + fn from(value: VariableSizeBinaryArray) -> Self { + let array: arrow_array::GenericBinaryArray = value.into(); + Arc::new(array) + } +} + +impl + From> for Arc +where + ::Buffer: Into>, + FixedSizePrimitiveArray: Into>, + Bitmap: Into, +{ + fn from(value: VariableSizeBinaryArray) -> Self { + let array: arrow_array::GenericBinaryArray = value.into(); + Arc::new(array) + } +} + +impl + From> + for arrow_array::GenericBinaryArray +where + ::Buffer: Into>, + FixedSizePrimitiveArray: Into>, +{ + fn from(value: VariableSizeBinaryArray) -> Self { + arrow_array::GenericBinaryArray::new( + // Safety: + // - The narrow offset buffer contains valid offset data + unsafe { OffsetBuffer::new_unchecked(value.0.offsets.into()) }, + value.0.data.into().into_inner(), + None, + ) + } +} + +impl + From> + for arrow_array::GenericBinaryArray +where + ::Buffer: Into>, + FixedSizePrimitiveArray: Into>, + Bitmap: Into, +{ + fn from(value: VariableSizeBinaryArray) -> Self { + arrow_array::GenericBinaryArray::new( + // Safety: + // - The narrow offset buffer contains valid offset data + unsafe { OffsetBuffer::new_unchecked(value.0.offsets.data.into()) }, + value.0.data.into().into_inner(), + Some(value.0.offsets.validity.into()), + ) + } +} + +/// Panics when there are nulls +impl + From> + for VariableSizeBinaryArray +where + FixedSizePrimitiveArray: From>, + ::Buffer: From>, +{ + fn from(value: arrow_array::GenericBinaryArray) -> Self { + let (offsets, values, nulls_opt) = value.into_parts(); + match nulls_opt { + Some(_) => panic!("expected array without a null buffer"), + None => VariableSizeBinaryArray(Offset { + data: ScalarBuffer::from(values).into(), + offsets: offsets.into_inner().into(), + }), + } + } +} + +/// Panics when there are no nulls +impl + From> + for VariableSizeBinaryArray +where + FixedSizePrimitiveArray: From>, + ::Buffer: From>, + Bitmap: From, +{ + fn from(value: arrow_array::GenericBinaryArray) -> Self { + let (offsets, values, nulls_opt) = value.into_parts(); + match nulls_opt { + Some(null_buffer) => VariableSizeBinaryArray(Offset { + data: ScalarBuffer::from(values).into(), + offsets: Nullable { + data: offsets.into_inner().into(), + validity: null_buffer.into(), + }, + }), + None => panic!("expected array with a null buffer"), + } + } +} + +#[cfg(test)] +mod tests { + use crate::array::VariableSizeBinaryArray; + + fn input() -> [Vec; 3] { + [vec![0, 1, 2], vec![3], vec![]] + } + + fn input_nullable() -> [Option>; 3] { + [Some(vec![0, 1, 2]), Some(vec![3]), None] + } + + #[test] + fn from() { + let vsb_array = input().into_iter().collect::(); + assert_eq!( + arrow_array::BinaryArray::from(vsb_array) + .into_iter() + .flatten() + .collect::>(), + input() + ); + + let vsb_array_nullable = input_nullable() + .into_iter() + .collect::>(); + assert_eq!( + arrow_array::GenericBinaryArray::::from(vsb_array_nullable) + .into_iter() + .map(|o| o.map(<[u8]>::to_vec)) + .collect::>(), + input_nullable() + ); + } + + #[test] + #[should_panic(expected = "expected array with a null buffer")] + fn into_nullable() { + let vsb_array = input() + .into_iter() + .map(Option::Some) + .collect::(); + let _: VariableSizeBinaryArray = + vsb_array.into(); + } + + #[test] + #[should_panic(expected = "expected array without a null buffer")] + fn into_non_nullable() { + let vsb_array_nullable = input_nullable() + .into_iter() + .collect::(); + let _: VariableSizeBinaryArray = + vsb_array_nullable.into(); + } + + #[test] + fn into() { + let vsb_array = input() + .into_iter() + .map(Option::Some) + .collect::(); + let _: VariableSizeBinaryArray = + vsb_array.into(); + // todo(mbrobbel): intoiterator for Binaryarray + + let vsb_array_nullable = input_nullable() + .into_iter() + .collect::(); + let _: VariableSizeBinaryArray = + vsb_array_nullable.into(); + // todo(mbrobbel): intoiterator for Binaryarray + } +} diff --git a/src/length.rs b/src/length.rs index b3f4f9a2..b66dc1e9 100644 --- a/src/length.rs +++ b/src/length.rs @@ -94,3 +94,15 @@ impl Length for Option { self.as_ref().map_or(0, Length::len) } } + +impl Length for std::vec::IntoIter { + fn len(&self) -> usize { + ExactSizeIterator::len(self) + } +} + +impl Length for std::slice::Iter<'_, T> { + fn len(&self) -> usize { + ExactSizeIterator::len(self) + } +} diff --git a/src/offset.rs b/src/offset.rs index 196a0a51..c88b2e03 100644 --- a/src/offset.rs +++ b/src/offset.rs @@ -345,6 +345,21 @@ where } } +impl + FromIterator> for Offset +where + Self: Default, + T: Extend<::Item>, + <::Buffer as Validity>::Storage: + Extend<(bool, OffsetItem)>, +{ + fn from_iter>>(iter: I) -> Self { + let mut offset = Self::default(); + offset.extend(iter.into_iter().map(|mut v| v.next())); + offset + } +} + /// An iterator over items in an offset. pub struct OffsetSlice<'a, T, const NULLABLE: bool, OffsetItem: OffsetElement, Buffer: BufferType> where