Skip to content

Commit

Permalink
Refactor arrow-ipc: Rename ArrayReader to RecodeBatchDecoder (#7028)
Browse files Browse the repository at this point in the history
* Rename `ArrayReader` to `RecordBatchDecoder`

* Remove alias for `self`
  • Loading branch information
alamb authored Feb 6, 2025
1 parent 7302888 commit 92cfd99
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 56 deletions.
110 changes: 56 additions & 54 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn read_buffer(
(false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
}
}
impl ArrayReader<'_> {
impl RecordBatchDecoder<'_> {
/// Coordinates reading arrays based on data types.
///
/// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
Expand All @@ -83,18 +83,17 @@ impl ArrayReader<'_> {
field: &Field,
variadic_counts: &mut VecDeque<i64>,
) -> Result<ArrayRef, ArrowError> {
let reader = self;
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
reader.next_node(field)?,
self.next_node(field)?,
data_type,
&[
reader.next_buffer()?,
reader.next_buffer()?,
reader.next_buffer()?,
self.next_buffer()?,
self.next_buffer()?,
self.next_buffer()?,
],
reader.require_alignment,
self.require_alignment,
),
BinaryView | Utf8View => {
let count = variadic_counts
Expand All @@ -104,55 +103,55 @@ impl ArrayReader<'_> {
)))?;
let count = count + 2; // view and null buffer.
let buffers = (0..count)
.map(|_| reader.next_buffer())
.map(|_| self.next_buffer())
.collect::<Result<Vec<_>, _>>()?;
create_primitive_array(
reader.next_node(field)?,
self.next_node(field)?,
data_type,
&buffers,
reader.require_alignment,
self.require_alignment,
)
}
FixedSizeBinary(_) => create_primitive_array(
reader.next_node(field)?,
self.next_node(field)?,
data_type,
&[reader.next_buffer()?, reader.next_buffer()?],
reader.require_alignment,
&[self.next_buffer()?, self.next_buffer()?],
self.require_alignment,
),
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let values = reader.create_array(list_field, variadic_counts)?;
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?, self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
create_list_array(
list_node,
data_type,
&list_buffers,
values,
reader.require_alignment,
self.require_alignment,
)
}
FixedSizeList(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?];
let values = reader.create_array(list_field, variadic_counts)?;
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
create_list_array(
list_node,
data_type,
&list_buffers,
values,
reader.require_alignment,
self.require_alignment,
)
}
Struct(struct_fields) => {
let struct_node = reader.next_node(field)?;
let null_buffer = reader.next_buffer()?;
let struct_node = self.next_node(field)?;
let null_buffer = self.next_buffer()?;

// read the arrays for each field
let mut struct_arrays = vec![];
// TODO investigate whether just knowing the number of buffers could
// still work
for struct_field in struct_fields {
let child = reader.create_array(struct_field, variadic_counts)?;
let child = self.create_array(struct_field, variadic_counts)?;
struct_arrays.push(child);
}
let null_count = struct_node.null_count() as usize;
Expand All @@ -175,32 +174,32 @@ impl ArrayReader<'_> {
Ok(Arc::new(struct_array))
}
RunEndEncoded(run_ends_field, values_field) => {
let run_node = reader.next_node(field)?;
let run_ends = reader.create_array(run_ends_field, variadic_counts)?;
let values = reader.create_array(values_field, variadic_counts)?;
let run_node = self.next_node(field)?;
let run_ends = self.create_array(run_ends_field, variadic_counts)?;
let values = self.create_array(values_field, variadic_counts)?;

let run_array_length = run_node.length() as usize;
let array_data = ArrayData::builder(data_type.clone())
.len(run_array_length)
.offset(0)
.add_child_data(run_ends.into_data())
.add_child_data(values.into_data())
.align_buffers(!reader.require_alignment)
.align_buffers(!self.require_alignment)
.build()?;

Ok(make_array(array_data))
}
// Create dictionary array from RecordBatch
Dictionary(_, _) => {
let index_node = reader.next_node(field)?;
let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let index_node = self.next_node(field)?;
let index_buffers = [self.next_buffer()?, self.next_buffer()?];

#[allow(deprecated)]
let dict_id = field.dict_id().ok_or_else(|| {
ArrowError::ParseError(format!("Field {field} does not have dict id"))
})?;

let value_array = reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
ArrowError::ParseError(format!(
"Cannot find a dictionary batch with dict id: {dict_id}"
))
Expand All @@ -211,26 +210,26 @@ impl ArrayReader<'_> {
data_type,
&index_buffers,
value_array.clone(),
reader.require_alignment,
self.require_alignment,
)
}
Union(fields, mode) => {
let union_node = reader.next_node(field)?;
let union_node = self.next_node(field)?;
let len = union_node.length() as usize;

// In V4, union types has validity bitmap
// In V5 and later, union types have no validity bitmap
if reader.version < MetadataVersion::V5 {
reader.next_buffer()?;
if self.version < MetadataVersion::V5 {
self.next_buffer()?;
}

let type_ids: ScalarBuffer<i8> =
reader.next_buffer()?.slice_with_length(0, len).into();
self.next_buffer()?.slice_with_length(0, len).into();

let value_offsets = match mode {
UnionMode::Dense => {
let offsets: ScalarBuffer<i32> =
reader.next_buffer()?.slice_with_length(0, len * 4).into();
self.next_buffer()?.slice_with_length(0, len * 4).into();
Some(offsets)
}
UnionMode::Sparse => None,
Expand All @@ -239,15 +238,15 @@ impl ArrayReader<'_> {
let mut children = Vec::with_capacity(fields.len());

for (_id, field) in fields.iter() {
let child = reader.create_array(field, variadic_counts)?;
let child = self.create_array(field, variadic_counts)?;
children.push(child);
}

let array = UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?;
Ok(Arc::new(array))
}
Null => {
let node = reader.next_node(field)?;
let node = self.next_node(field)?;
let length = node.length();
let null_count = node.null_count();

Expand All @@ -260,17 +259,17 @@ impl ArrayReader<'_> {
let array_data = ArrayData::builder(data_type.clone())
.len(length as usize)
.offset(0)
.align_buffers(!reader.require_alignment)
.align_buffers(!self.require_alignment)
.build()?;

// no buffer increases
Ok(Arc::new(NullArray::from(array_data)))
}
_ => create_primitive_array(
reader.next_node(field)?,
self.next_node(field)?,
data_type,
&[reader.next_buffer()?, reader.next_buffer()?],
reader.require_alignment,
&[self.next_buffer()?, self.next_buffer()?],
self.require_alignment,
),
}
}
Expand Down Expand Up @@ -370,8 +369,11 @@ fn create_dictionary_array(
}
}

/// State for decoding arrays from an encoded [`RecordBatch`]
struct ArrayReader<'a> {
/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
/// [`RecordBatch`]
///
/// [IPC RecordBatch]: crate::RecordBatch
struct RecordBatchDecoder<'a> {
/// The flatbuffers encoded record batch
batch: crate::RecordBatch<'a>,
/// The output schema
Expand All @@ -389,14 +391,14 @@ struct ArrayReader<'a> {
/// The buffers comprising this array
buffers: VectorIter<'a, crate::Buffer>,
/// Projection (subset of columns) to read, if any
/// See [`ArrayReader::with_projection`] for details
/// See [`RecordBatchDecoder::with_projection`] for details
projection: Option<&'a [usize]>,
/// Are buffers required to already be aligned? See
/// [`ArrayReader::with_require_alignment`] for details
/// [`RecordBatchDecoder::with_require_alignment`] for details
require_alignment: bool,
}

impl<'a> ArrayReader<'a> {
impl<'a> RecordBatchDecoder<'a> {
/// Create a reader for decoding arrays from an encoded [`RecordBatch`]
fn try_new(
buf: &'a Buffer,
Expand Down Expand Up @@ -604,7 +606,7 @@ pub fn read_record_batch(
projection: Option<&[usize]>,
metadata: &MetadataVersion,
) -> Result<RecordBatch, ArrowError> {
ArrayReader::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
.with_projection(projection)
.with_require_alignment(false)
.read_record_batch()
Expand Down Expand Up @@ -652,7 +654,7 @@ fn read_dictionary_impl(
let value = value_type.as_ref().clone();
let schema = Schema::new(vec![Field::new("", value, true)]);
// Read a single column
let record_batch = ArrayReader::try_new(
let record_batch = RecordBatchDecoder::try_new(
buf,
batch.data().unwrap(),
Arc::new(schema),
Expand Down Expand Up @@ -876,7 +878,7 @@ impl FileDecoder {
ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
})?;
// read the block that makes up the record batch into a buffer
ArrayReader::try_new(
RecordBatchDecoder::try_new(
&buf.slice(block.metaDataLength() as _),
batch,
self.schema.clone(),
Expand Down Expand Up @@ -1426,7 +1428,7 @@ impl<R: Read> StreamReader<R> {
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.read_exact(&mut buf)?;

ArrayReader::try_new(
RecordBatchDecoder::try_new(
&buf.into(),
batch,
self.schema(),
Expand Down Expand Up @@ -2277,7 +2279,7 @@ mod tests {
assert_ne!(b.as_ptr().align_offset(8), 0);

let ipc_batch = message.header_as_record_batch().unwrap();
let roundtrip = ArrayReader::try_new(
let roundtrip = RecordBatchDecoder::try_new(
&b,
ipc_batch,
batch.schema(),
Expand Down Expand Up @@ -2316,7 +2318,7 @@ mod tests {
assert_ne!(b.as_ptr().align_offset(8), 0);

let ipc_batch = message.header_as_record_batch().unwrap();
let result = ArrayReader::try_new(
let result = RecordBatchDecoder::try_new(
&b,
ipc_batch,
batch.schema(),
Expand Down
4 changes: 2 additions & 2 deletions arrow-ipc/src/reader/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow_buffer::{Buffer, MutableBuffer};
use arrow_schema::{ArrowError, SchemaRef};

use crate::convert::MessageBuffer;
use crate::reader::{read_dictionary_impl, ArrayReader};
use crate::reader::{read_dictionary_impl, RecordBatchDecoder};
use crate::{MessageHeader, CONTINUATION_MARKER};

/// A low-level interface for reading [`RecordBatch`] data from a stream of bytes
Expand Down Expand Up @@ -211,7 +211,7 @@ impl StreamDecoder {
let schema = self.schema.clone().ok_or_else(|| {
ArrowError::IpcError("Missing schema".to_string())
})?;
let batch = ArrayReader::try_new(
let batch = RecordBatchDecoder::try_new(
&body,
batch,
schema,
Expand Down

0 comments on commit 92cfd99

Please sign in to comment.