Skip to content

Commit

Permalink
Refactor arrow-ipc: Move create_*_array methods into `RecordBatchDe…
Browse files Browse the repository at this point in the history
…coder` (#7029)

* Move `create_primitive_array` into RecordBatchReader

* Move `create_list-array` into RecordBatchReader

* Move `create_dictionay_array` into RecordBatchReader
  • Loading branch information
alamb authored Feb 8, 2025
1 parent 1081d01 commit 37e408b
Showing 1 changed file with 104 additions and 124 deletions.
228 changes: 104 additions & 124 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,15 @@ impl RecordBatchDecoder<'_> {
) -> Result<ArrayRef, ArrowError> {
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
self.next_node(field)?,
data_type,
&[
Utf8 | Binary | LargeBinary | LargeUtf8 => {
let field_node = self.next_node(field)?;
let buffers = [
self.next_buffer()?,
self.next_buffer()?,
self.next_buffer()?,
],
self.require_alignment,
),
];
self.create_primitive_array(field_node, data_type, &buffers)
}
BinaryView | Utf8View => {
let count = variadic_counts
.pop_front()
Expand All @@ -105,42 +104,25 @@ impl RecordBatchDecoder<'_> {
let buffers = (0..count)
.map(|_| self.next_buffer())
.collect::<Result<Vec<_>, _>>()?;
create_primitive_array(
self.next_node(field)?,
data_type,
&buffers,
self.require_alignment,
)
let field_node = self.next_node(field)?;
self.create_primitive_array(field_node, data_type, &buffers)
}
FixedSizeBinary(_) => {
let field_node = self.next_node(field)?;
let buffers = [self.next_buffer()?, self.next_buffer()?];
self.create_primitive_array(field_node, data_type, &buffers)
}
FixedSizeBinary(_) => create_primitive_array(
self.next_node(field)?,
data_type,
&[self.next_buffer()?, self.next_buffer()?],
self.require_alignment,
),
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?, self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
create_list_array(
list_node,
data_type,
&list_buffers,
values,
self.require_alignment,
)
self.create_list_array(list_node, data_type, &list_buffers, values)
}
FixedSizeList(ref list_field, _) => {
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
create_list_array(
list_node,
data_type,
&list_buffers,
values,
self.require_alignment,
)
self.create_list_array(list_node, data_type, &list_buffers, values)
}
Struct(struct_fields) => {
let struct_node = self.next_node(field)?;
Expand Down Expand Up @@ -205,12 +187,11 @@ impl RecordBatchDecoder<'_> {
))
})?;

create_dictionary_array(
self.create_dictionary_array(
index_node,
data_type,
&index_buffers,
value_array.clone(),
self.require_alignment,
)
}
Union(fields, mode) => {
Expand Down Expand Up @@ -265,107 +246,106 @@ impl RecordBatchDecoder<'_> {
// no buffer increases
Ok(Arc::new(NullArray::from(array_data)))
}
_ => create_primitive_array(
self.next_node(field)?,
data_type,
&[self.next_buffer()?, self.next_buffer()?],
self.require_alignment,
),
_ => {
let field_node = self.next_node(field)?;
let buffers = [self.next_buffer()?, self.next_buffer()?];
self.create_primitive_array(field_node, data_type, &buffers)
}
}
}
}

/// Reads the correct number of buffers based on data type and null_count, and creates a
/// primitive array ref
fn create_primitive_array(
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
require_alignment: bool,
) -> Result<ArrayRef, ArrowError> {
let length = field_node.length() as usize;
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let builder = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
// read 3 buffers: null buffer (optional), offsets buffer and data buffer
ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..3].to_vec())
.null_bit_buffer(null_buffer)
}
BinaryView | Utf8View => ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.null_bit_buffer(null_buffer),
_ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
/// Reads the correct number of buffers based on data type and null_count, and creates a
/// primitive array ref
fn create_primitive_array(
&self,
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
) -> Result<ArrayRef, ArrowError> {
let length = field_node.length() as usize;
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let builder = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
// read 3 buffers: null buffer (optional), offsets buffer and data buffer
ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..3].to_vec())
.null_bit_buffer(null_buffer)
}
BinaryView | Utf8View => ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.null_bit_buffer(null_buffer)
}
t => unreachable!("Data type {:?} either unsupported or not primitive", t),
};
.buffers(buffers[1..].to_vec())
.null_bit_buffer(null_buffer),
_ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.null_bit_buffer(null_buffer)
}
t => unreachable!("Data type {:?} either unsupported or not primitive", t),
};

let array_data = builder.align_buffers(!require_alignment).build()?;
let array_data = builder.align_buffers(!self.require_alignment).build()?;

Ok(make_array(array_data))
}
Ok(make_array(array_data))
}

/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_list_array(
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
require_alignment: bool,
) -> Result<ArrayRef, ArrowError> {
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let length = field_node.length() as usize;
let child_data = child_array.into_data();
let builder = match data_type {
List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.add_child_data(child_data)
.null_bit_buffer(null_buffer),

FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
.len(length)
.add_child_data(child_data)
.null_bit_buffer(null_buffer),

_ => unreachable!("Cannot create list or map array from {:?}", data_type),
};
/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_list_array(
&self,
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
) -> Result<ArrayRef, ArrowError> {
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let length = field_node.length() as usize;
let child_data = child_array.into_data();
let builder = match data_type {
List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.add_child_data(child_data)
.null_bit_buffer(null_buffer),

let array_data = builder.align_buffers(!require_alignment).build()?;
FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
.len(length)
.add_child_data(child_data)
.null_bit_buffer(null_buffer),

Ok(make_array(array_data))
}
_ => unreachable!("Cannot create list or map array from {:?}", data_type),
};

/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_dictionary_array(
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
value_array: ArrayRef,
require_alignment: bool,
) -> Result<ArrayRef, ArrowError> {
if let Dictionary(_, _) = *data_type {
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let array_data = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.add_buffer(buffers[1].clone())
.add_child_data(value_array.into_data())
.null_bit_buffer(null_buffer)
.align_buffers(!require_alignment)
.build()?;
let array_data = builder.align_buffers(!self.require_alignment).build()?;

Ok(make_array(array_data))
} else {
unreachable!("Cannot create dictionary array from {:?}", data_type)
}

/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_dictionary_array(
&self,
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
value_array: ArrayRef,
) -> Result<ArrayRef, ArrowError> {
if let Dictionary(_, _) = *data_type {
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let array_data = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.add_buffer(buffers[1].clone())
.add_child_data(value_array.into_data())
.null_bit_buffer(null_buffer)
.align_buffers(!self.require_alignment)
.build()?;

Ok(make_array(array_data))
} else {
unreachable!("Cannot create dictionary array from {:?}", data_type)
}
}
}

Expand Down

0 comments on commit 37e408b

Please sign in to comment.