Skip to content

Commit

Permalink
Append Row to Rows (apache#4466)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jun 30, 2023
1 parent 3354a4c commit 3a2a151
Showing 1 changed file with 56 additions and 6 deletions.
62 changes: 56 additions & 6 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ impl Codec {
let nulls = converter.convert_columns(&[null_array])?;

let owned = OwnedRow {
data: nulls.buffer,
data: nulls.buffer.into(),
config: nulls.config,
};
Ok(Self::DictionaryValues(converter, owned))
Expand Down Expand Up @@ -496,7 +496,7 @@ impl Codec {

let nulls = converter.convert_columns(&nulls)?;
let owned = OwnedRow {
data: nulls.buffer,
data: nulls.buffer.into(),
config: nulls.config,
};

Expand Down Expand Up @@ -756,6 +756,45 @@ impl RowConverter {
unsafe { self.convert_raw(&mut rows, validate_utf8) }
}

/// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
/// a total length of `data_capacity`
///
/// This can be used to efficiently buffer a selection of [`Row`]
///
/// ```
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{Row, RowConverter, SortField};
/// # use arrow_schema::DataType;
/// #
/// let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
///
/// let converted = converter.convert_columns(&[Arc::new(array)]).unwrap();
/// let mut distinct_rows = converter.empty_rows(3, 100);
/// let mut dedup: HashSet<Row> = HashSet::with_capacity(3);
/// converted.iter().filter(|row| dedup.insert(*row)).for_each(|row| distinct_rows.push(row));
///
/// let distinct = converter.convert_rows(&distinct_rows).unwrap();
/// let values: Vec<_> = distinct[0].as_string::<i32>().iter().map(Option::unwrap).collect();
/// assert_eq!(&values, &["hello", "world", "a"]);
/// ```
pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
offsets.push(0);

Rows {
offsets,
buffer: Vec::with_capacity(data_capacity),
config: RowConfig {
fields: self.fields.clone(),
validate_utf8: false,
},
}
}

/// Convert raw bytes into [`ArrayRef`]
///
/// # Safety
Expand Down Expand Up @@ -832,14 +871,25 @@ struct RowConfig {
#[derive(Debug)]
pub struct Rows {
/// Underlying row bytes
buffer: Box<[u8]>,
buffer: Vec<u8>,
/// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
offsets: Box<[usize]>,
offsets: Vec<usize>,
/// The config for these rows
config: RowConfig,
}

impl Rows {
/// Append a [`Row`] to this [`Rows`]
pub fn push(&mut self, row: Row<'_>) {
assert!(
Arc::ptr_eq(&row.config.fields, &self.config.fields),
"row was not produced by this RowConverter"
);
self.config.validate_utf8 |= row.config.validate_utf8;
self.buffer.extend_from_slice(row.data);
self.offsets.push(self.buffer.len())
}

pub fn row(&self, row: usize) -> Row<'_> {
let end = self.offsets[row + 1];
let start = self.offsets[row];
Expand Down Expand Up @@ -1384,9 +1434,9 @@ mod tests {
.unwrap();
let rows = converter.convert_columns(&cols).unwrap();

assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]);
assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
assert_eq!(
rows.buffer.as_ref(),
rows.buffer,
&[
1, 128, 1, //
1, 191, 166, 102, 102, //
Expand Down

0 comments on commit 3a2a151

Please sign in to comment.