Skip to content

Commit b25c441

Browse files
mbrobbeltustvold
andauthored
Change UnionArray constructors (apache#5623)
* Change `UnionArray` constructors * Fix a comment * Clippy and avoid using hashmaps * Additional test --------- Co-authored-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
1 parent 4045fb5 commit b25c441

File tree

10 files changed

+360
-332
lines changed

10 files changed

+360
-332
lines changed

arrow-array/src/array/union_array.rs

+209-163
Large diffs are not rendered by default.

arrow-array/src/builder/union_builder.rs

+39-39
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use arrow_buffer::{ArrowNativeType, Buffer};
2323
use arrow_data::ArrayDataBuilder;
2424
use arrow_schema::{ArrowError, DataType, Field};
2525
use std::any::Any;
26-
use std::collections::HashMap;
26+
use std::collections::BTreeMap;
27+
use std::sync::Arc;
2728

2829
/// `FieldData` is a helper struct to track the state of the fields in the `UnionBuilder`.
2930
#[derive(Debug)]
@@ -142,7 +143,7 @@ pub struct UnionBuilder {
142143
/// The current number of slots in the array
143144
len: usize,
144145
/// Maps field names to `FieldData` instances which track the builders for that field
145-
fields: HashMap<String, FieldData>,
146+
fields: BTreeMap<String, FieldData>,
146147
/// Builder to keep track of type ids
147148
type_id_builder: Int8BufferBuilder,
148149
/// Builder to keep track of offsets (`None` for sparse unions)
@@ -165,7 +166,7 @@ impl UnionBuilder {
165166
pub fn with_capacity_dense(capacity: usize) -> Self {
166167
Self {
167168
len: 0,
168-
fields: HashMap::default(),
169+
fields: Default::default(),
169170
type_id_builder: Int8BufferBuilder::new(capacity),
170171
value_offset_builder: Some(Int32BufferBuilder::new(capacity)),
171172
initial_capacity: capacity,
@@ -176,7 +177,7 @@ impl UnionBuilder {
176177
pub fn with_capacity_sparse(capacity: usize) -> Self {
177178
Self {
178179
len: 0,
179-
fields: HashMap::default(),
180+
fields: Default::default(),
180181
type_id_builder: Int8BufferBuilder::new(capacity),
181182
value_offset_builder: None,
182183
initial_capacity: capacity,
@@ -274,40 +275,39 @@ impl UnionBuilder {
274275
}
275276

276277
/// Builds this builder creating a new `UnionArray`.
277-
pub fn build(mut self) -> Result<UnionArray, ArrowError> {
278-
let type_id_buffer = self.type_id_builder.finish();
279-
let value_offsets_buffer = self.value_offset_builder.map(|mut b| b.finish());
280-
let mut children = Vec::new();
281-
for (
282-
name,
283-
FieldData {
284-
type_id,
285-
data_type,
286-
mut values_buffer,
287-
slots,
288-
null_buffer_builder: mut bitmap_builder,
289-
},
290-
) in self.fields.into_iter()
291-
{
292-
let buffer = values_buffer.finish();
293-
let arr_data_builder = ArrayDataBuilder::new(data_type.clone())
294-
.add_buffer(buffer)
295-
.len(slots)
296-
.nulls(bitmap_builder.finish());
297-
298-
let arr_data_ref = unsafe { arr_data_builder.build_unchecked() };
299-
let array_ref = make_array(arr_data_ref);
300-
children.push((type_id, (Field::new(name, data_type, false), array_ref)))
301-
}
302-
303-
children.sort_by(|a, b| {
304-
a.0.partial_cmp(&b.0)
305-
.expect("This will never be None as type ids are always i8 values.")
306-
});
307-
let children: Vec<_> = children.into_iter().map(|(_, b)| b).collect();
308-
309-
let type_ids: Vec<i8> = (0_i8..children.len() as i8).collect();
310-
311-
UnionArray::try_new(&type_ids, type_id_buffer, value_offsets_buffer, children)
278+
pub fn build(self) -> Result<UnionArray, ArrowError> {
279+
let mut children = Vec::with_capacity(self.fields.len());
280+
let union_fields = self
281+
.fields
282+
.into_iter()
283+
.map(
284+
|(
285+
name,
286+
FieldData {
287+
type_id,
288+
data_type,
289+
mut values_buffer,
290+
slots,
291+
mut null_buffer_builder,
292+
},
293+
)| {
294+
let array_ref = make_array(unsafe {
295+
ArrayDataBuilder::new(data_type.clone())
296+
.add_buffer(values_buffer.finish())
297+
.len(slots)
298+
.nulls(null_buffer_builder.finish())
299+
.build_unchecked()
300+
});
301+
children.push(array_ref);
302+
(type_id, Arc::new(Field::new(name, data_type, false)))
303+
},
304+
)
305+
.collect();
306+
UnionArray::try_new(
307+
union_fields,
308+
self.type_id_builder.into(),
309+
self.value_offset_builder.map(Into::into),
310+
children,
311+
)
312312
}
313313
}

arrow-cast/src/pretty.rs

+11-7
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ mod tests {
142142
use arrow_array::builder::*;
143143
use arrow_array::types::*;
144144
use arrow_array::*;
145-
use arrow_buffer::Buffer;
145+
use arrow_buffer::ScalarBuffer;
146146
use arrow_schema::*;
147147

148148
use crate::display::array_value_to_string;
@@ -851,14 +851,18 @@ mod tests {
851851

852852
// Can't use UnionBuilder with non-primitive types, so manually build outer UnionArray
853853
let a_array = Int32Array::from(vec![None, None, None, Some(1234), Some(23)]);
854-
let type_ids = Buffer::from_slice_ref([1_i8, 1, 0, 0, 1]);
854+
let type_ids = [1, 1, 0, 0, 1].into_iter().collect::<ScalarBuffer<i8>>();
855855

856-
let children: Vec<(Field, Arc<dyn Array>)> = vec![
857-
(Field::new("a", DataType::Int32, true), Arc::new(a_array)),
858-
(inner_field.clone(), Arc::new(inner)),
859-
];
856+
let children = vec![Arc::new(a_array) as Arc<dyn Array>, Arc::new(inner)];
857+
858+
let union_fields = [
859+
(0, Arc::new(Field::new("a", DataType::Int32, true))),
860+
(1, Arc::new(inner_field.clone())),
861+
]
862+
.into_iter()
863+
.collect();
860864

861-
let outer = UnionArray::try_new(&[0, 1], type_ids, None, children).unwrap();
865+
let outer = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
862866

863867
let schema = Schema::new(vec![Field::new_union(
864868
"Teamsters",

arrow-flight/src/encode.rs

+44-57
Original file line numberDiff line numberDiff line change
@@ -597,20 +597,17 @@ fn hydrate_dictionary(array: &ArrayRef, data_type: &DataType) -> Result<ArrayRef
597597
(DataType::Union(_, UnionMode::Sparse), DataType::Union(fields, UnionMode::Sparse)) => {
598598
let union_arr = array.as_any().downcast_ref::<UnionArray>().unwrap();
599599

600-
let (type_ids, fields): (Vec<i8>, Vec<&FieldRef>) = fields.iter().unzip();
601-
602600
Arc::new(UnionArray::try_new(
603-
&type_ids,
604-
union_arr.type_ids().inner().clone(),
601+
fields.clone(),
602+
union_arr.type_ids().clone(),
605603
None,
606604
fields
607605
.iter()
608-
.enumerate()
609-
.map(|(col, field)| {
610-
Ok((
611-
field.as_ref().clone(),
612-
arrow_cast::cast(union_arr.child(col as i8), field.data_type())?,
613-
))
606+
.map(|(type_id, field)| {
607+
Ok(arrow_cast::cast(
608+
union_arr.child(type_id),
609+
field.data_type(),
610+
)?)
614611
})
615612
.collect::<Result<Vec<_>>>()?,
616613
)?)
@@ -625,10 +622,10 @@ mod tests {
625622
use arrow_array::builder::StringDictionaryBuilder;
626623
use arrow_array::*;
627624
use arrow_array::{cast::downcast_array, types::*};
628-
use arrow_buffer::Buffer;
625+
use arrow_buffer::ScalarBuffer;
629626
use arrow_cast::pretty::pretty_format_batches;
630627
use arrow_ipc::MetadataVersion;
631-
use arrow_schema::UnionMode;
628+
use arrow_schema::{UnionFields, UnionMode};
632629
use std::collections::HashMap;
633630

634631
use crate::decode::{DecodedPayload, FlightDataDecoder};
@@ -849,16 +846,23 @@ mod tests {
849846
true,
850847
)];
851848

852-
let type_ids = vec![0, 1, 2];
853-
let union_fields = vec![
854-
Field::new_list(
855-
"dict_list",
856-
Field::new_dictionary("item", DataType::UInt16, DataType::Utf8, true),
857-
true,
849+
let union_fields = [
850+
(
851+
0,
852+
Arc::new(Field::new_list(
853+
"dict_list",
854+
Field::new_dictionary("item", DataType::UInt16, DataType::Utf8, true),
855+
true,
856+
)),
858857
),
859-
Field::new_struct("struct", struct_fields.clone(), true),
860-
Field::new("string", DataType::Utf8, true),
861-
];
858+
(
859+
1,
860+
Arc::new(Field::new_struct("struct", struct_fields.clone(), true)),
861+
),
862+
(2, Arc::new(Field::new("string", DataType::Utf8, true))),
863+
]
864+
.into_iter()
865+
.collect::<UnionFields>();
862866

863867
let struct_fields = vec![Field::new_list(
864868
"dict_list",
@@ -872,21 +876,15 @@ mod tests {
872876

873877
let arr1 = builder.finish();
874878

875-
let type_id_buffer = Buffer::from_slice_ref([0_i8]);
879+
let type_id_buffer = [0].into_iter().collect::<ScalarBuffer<i8>>();
876880
let arr1 = UnionArray::try_new(
877-
&type_ids,
881+
union_fields.clone(),
878882
type_id_buffer,
879883
None,
880884
vec![
881-
(union_fields[0].clone(), Arc::new(arr1)),
882-
(
883-
union_fields[1].clone(),
884-
new_null_array(union_fields[1].data_type(), 1),
885-
),
886-
(
887-
union_fields[2].clone(),
888-
new_null_array(union_fields[2].data_type(), 1),
889-
),
885+
Arc::new(arr1) as Arc<dyn Array>,
886+
new_null_array(union_fields.iter().nth(1).unwrap().1.data_type(), 1),
887+
new_null_array(union_fields.iter().nth(2).unwrap().1.data_type(), 1),
890888
],
891889
)
892890
.unwrap();
@@ -896,47 +894,36 @@ mod tests {
896894
let arr2 = Arc::new(builder.finish());
897895
let arr2 = StructArray::new(struct_fields.clone().into(), vec![arr2], None);
898896

899-
let type_id_buffer = Buffer::from_slice_ref([1_i8]);
897+
let type_id_buffer = [1].into_iter().collect::<ScalarBuffer<i8>>();
900898
let arr2 = UnionArray::try_new(
901-
&type_ids,
899+
union_fields.clone(),
902900
type_id_buffer,
903901
None,
904902
vec![
905-
(
906-
union_fields[0].clone(),
907-
new_null_array(union_fields[0].data_type(), 1),
908-
),
909-
(union_fields[1].clone(), Arc::new(arr2)),
910-
(
911-
union_fields[2].clone(),
912-
new_null_array(union_fields[2].data_type(), 1),
913-
),
903+
new_null_array(union_fields.iter().next().unwrap().1.data_type(), 1),
904+
Arc::new(arr2),
905+
new_null_array(union_fields.iter().nth(2).unwrap().1.data_type(), 1),
914906
],
915907
)
916908
.unwrap();
917909

918-
let type_id_buffer = Buffer::from_slice_ref([2_i8]);
910+
let type_id_buffer = [2].into_iter().collect::<ScalarBuffer<i8>>();
919911
let arr3 = UnionArray::try_new(
920-
&type_ids,
912+
union_fields.clone(),
921913
type_id_buffer,
922914
None,
923915
vec![
924-
(
925-
union_fields[0].clone(),
926-
new_null_array(union_fields[0].data_type(), 1),
927-
),
928-
(
929-
union_fields[1].clone(),
930-
new_null_array(union_fields[1].data_type(), 1),
931-
),
932-
(
933-
union_fields[2].clone(),
934-
Arc::new(StringArray::from(vec!["e"])),
935-
),
916+
new_null_array(union_fields.iter().next().unwrap().1.data_type(), 1),
917+
new_null_array(union_fields.iter().nth(1).unwrap().1.data_type(), 1),
918+
Arc::new(StringArray::from(vec!["e"])),
936919
],
937920
)
938921
.unwrap();
939922

923+
let (type_ids, union_fields): (Vec<_>, Vec<_>) = union_fields
924+
.iter()
925+
.map(|(type_id, field_ref)| (type_id, (*Arc::clone(field_ref)).clone()))
926+
.unzip();
940927
let schema = Arc::new(Schema::new(vec![Field::new_union(
941928
"union",
942929
type_ids.clone(),

arrow-integration-test/src/lib.rs

+8-15
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
//!
2222
//! This is not a canonical format, but provides a human-readable way of verifying language implementations
2323
24+
use arrow_buffer::ScalarBuffer;
2425
use hex::decode;
2526
use num::BigInt;
2627
use num::Signed;
@@ -835,26 +836,18 @@ pub fn array_from_json(
835836
));
836837
};
837838

838-
let offset: Option<Buffer> = json_col.offset.map(|offsets| {
839-
let offsets: Vec<i32> =
840-
offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect();
841-
Buffer::from(&offsets.to_byte_slice())
842-
});
839+
let offset: Option<ScalarBuffer<i32>> = json_col
840+
.offset
841+
.map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect());
843842

844-
let mut children: Vec<(Field, Arc<dyn Array>)> = vec![];
843+
let mut children = Vec::with_capacity(fields.len());
845844
for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) {
846845
let array = array_from_json(field, col, dictionaries)?;
847-
children.push((field.as_ref().clone(), array));
846+
children.push(array);
848847
}
849848

850-
let field_type_ids = fields.iter().map(|(id, _)| id).collect::<Vec<_>>();
851-
let array = UnionArray::try_new(
852-
&field_type_ids,
853-
Buffer::from(&type_ids.to_byte_slice()),
854-
offset,
855-
children,
856-
)
857-
.unwrap();
849+
let array =
850+
UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap();
858851
Ok(Arc::new(array))
859852
}
860853
t => Err(ArrowError::JsonError(format!(

arrow-ipc/src/reader.rs

+8-9
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
3131
use std::sync::Arc;
3232

3333
use arrow_array::*;
34-
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
34+
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
3535
use arrow_data::ArrayData;
3636
use arrow_schema::*;
3737

@@ -214,26 +214,25 @@ fn create_array(
214214
reader.next_buffer()?;
215215
}
216216

217-
let type_ids: Buffer = reader.next_buffer()?[..len].into();
217+
let type_ids: ScalarBuffer<i8> = reader.next_buffer()?.slice_with_length(0, len).into();
218218

219219
let value_offsets = match mode {
220220
UnionMode::Dense => {
221-
let buffer = reader.next_buffer()?;
222-
Some(buffer[..len * 4].into())
221+
let offsets: ScalarBuffer<i32> =
222+
reader.next_buffer()?.slice_with_length(0, len * 4).into();
223+
Some(offsets)
223224
}
224225
UnionMode::Sparse => None,
225226
};
226227

227228
let mut children = Vec::with_capacity(fields.len());
228-
let mut ids = Vec::with_capacity(fields.len());
229229

230-
for (id, field) in fields.iter() {
230+
for (_id, field) in fields.iter() {
231231
let child = create_array(reader, field, variadic_counts, require_alignment)?;
232-
children.push((field.as_ref().clone(), child));
233-
ids.push(id);
232+
children.push(child);
234233
}
235234

236-
let array = UnionArray::try_new(&ids, type_ids, value_offsets, children)?;
235+
let array = UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?;
237236
Ok(Arc::new(array))
238237
}
239238
Null => {

0 commit comments

Comments
 (0)