Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for repetition index to the full zip structural encoding #3335

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions rust/lance-encoding/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};

use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
use itertools::Either;
use snafu::{location, Location};

use lance_core::{utils::bit::is_pwr_two, Error, Result};
Expand Down Expand Up @@ -104,6 +105,18 @@ impl LanceBuffer {
hex::encode_upper(self)
}

/// Combine multiple buffers into a single buffer
///
/// This does involve a data copy (and allocation of a new buffer)
pub fn concat(buffers: &[Self]) -> Self {
let total_len = buffers.iter().map(|b| b.len()).sum();
let mut data = Vec::with_capacity(total_len);
for buffer in buffers {
data.extend_from_slice(buffer.as_ref());
}
Self::Owned(data)
}

/// Converts the buffer into a hex string, inserting a space
/// between words
pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
Expand Down Expand Up @@ -390,6 +403,40 @@ impl From<Buffer> for LanceBuffer {
}
}

// An iterator that keeps a clone of a borrowed LanceBuffer so we
// can have a 'static lifetime
pub struct BorrowedBufferIter {
buffer: arrow_buffer::Buffer,
index: usize,
}

impl Iterator for BorrowedBufferIter {
type Item = u8;

fn next(&mut self) -> Option<Self::Item> {
if self.index >= self.buffer.len() {
None
} else {
// SAFETY: we just checked that index is in bounds
let byte = unsafe { self.buffer.get_unchecked(self.index) };
self.index += 1;
Some(*byte)
}
}
}

impl IntoIterator for LanceBuffer {
type Item = u8;
type IntoIter = Either<std::vec::IntoIter<u8>, BorrowedBufferIter>;

fn into_iter(self) -> Self::IntoIter {
match self {
Self::Borrowed(buffer) => Either::Right(BorrowedBufferIter { buffer, index: 0 }),
Self::Owned(buffer) => Either::Left(buffer.into_iter()),
}
}
}

#[cfg(test)]
mod tests {
use arrow_buffer::Buffer;
Expand Down
32 changes: 27 additions & 5 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1221,15 +1221,14 @@ impl StructuralEncodingStrategy {
| DataType::LargeUtf8,
)
}
}

impl FieldEncodingStrategy for StructuralEncodingStrategy {
fn create_field_encoder(
fn do_create_field_encoder(
&self,
_encoding_strategy_root: &dyn FieldEncodingStrategy,
field: &Field,
column_index: &mut ColumnIndexSequence,
options: &EncodingOptions,
root_field_metadata: &HashMap<String, String>,
) -> Result<Box<dyn FieldEncoder>> {
let data_type = field.data_type();
if Self::is_primitive_type(&data_type) {
Expand All @@ -1238,16 +1237,18 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
self.compression_strategy.clone(),
column_index.next_column_index(field.id as u32),
field.clone(),
Arc::new(root_field_metadata.clone()),
)?))
} else {
match data_type {
DataType::List(_) | DataType::LargeList(_) => {
let child = field.children.first().expect("List should have a child");
let child_encoder = self.create_field_encoder(
let child_encoder = self.do_create_field_encoder(
_encoding_strategy_root,
child,
column_index,
options,
root_field_metadata,
)?;
Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
}
Expand All @@ -1258,17 +1259,19 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
self.compression_strategy.clone(),
column_index.next_column_index(field.id as u32),
field.clone(),
Arc::new(root_field_metadata.clone()),
)?))
} else {
let children_encoders = field
.children
.iter()
.map(|field| {
self.create_field_encoder(
self.do_create_field_encoder(
_encoding_strategy_root,
field,
column_index,
options,
root_field_metadata,
)
})
.collect::<Result<Vec<_>>>()?;
Expand All @@ -1283,6 +1286,7 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
self.compression_strategy.clone(),
column_index.next_column_index(field.id as u32),
field.clone(),
Arc::new(root_field_metadata.clone()),
)?))
} else {
// A dictionary of logical is, itself, logical and we don't support that today
Expand All @@ -1299,6 +1303,24 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
}
}

impl FieldEncodingStrategy for StructuralEncodingStrategy {
fn create_field_encoder(
&self,
encoding_strategy_root: &dyn FieldEncodingStrategy,
field: &Field,
column_index: &mut ColumnIndexSequence,
options: &EncodingOptions,
) -> Result<Box<dyn FieldEncoder>> {
self.do_create_field_encoder(
encoding_strategy_root,
field,
column_index,
options,
&field.metadata,
)
}
}

/// A batch encoder that encodes RecordBatch objects by delegating
/// to field encoders for each top-level field in the batch.
pub struct BatchEncoder {
Expand Down
39 changes: 33 additions & 6 deletions rust/lance-encoding/src/encodings/logical/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,9 @@ mod tests {
};
use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field, Fields};
use lance_core::datatypes::{
STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
};
use rstest::rstest;

use crate::{
Expand All @@ -1484,8 +1487,16 @@ mod tests {
#[test_log::test(tokio::test)]
async fn test_list(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
structural_encoding: &str,
) {
let field = Field::new("", make_list_type(DataType::Int32), true);
let mut field_metadata = HashMap::new();
field_metadata.insert(
STRUCTURAL_ENCODING_META_KEY.to_string(),
structural_encoding.into(),
);
let field =
Field::new("", make_list_type(DataType::Int32), true).with_metadata(field_metadata);
check_round_trip_encoding_random(field, version).await;
}

Expand Down Expand Up @@ -1544,6 +1555,8 @@ mod tests {
#[test_log::test(tokio::test)]
async fn test_simple_list(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
structural_encoding: &str,
) {
let items_builder = Int32Builder::new();
let mut list_builder = ListBuilder::new(items_builder);
Expand All @@ -1553,18 +1566,23 @@ mod tests {
list_builder.append_value([Some(6), Some(7), Some(8)]);
let list_array = list_builder.finish();

let mut field_metadata = HashMap::new();
field_metadata.insert(
STRUCTURAL_ENCODING_META_KEY.to_string(),
structural_encoding.into(),
);

let test_cases = TestCases::default()
.with_range(0..2)
.with_range(0..3)
.with_range(1..3)
.with_indices(vec![1, 3])
.with_indices(vec![2])
.with_file_version(version);
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
.await;
}

#[rstest]
#[test_log::test(tokio::test)]
async fn test_simple_sliced_list() {
let items_builder = Int32Builder::new();
Expand All @@ -1587,7 +1605,6 @@ mod tests {
.await;
}

#[rstest]
#[test_log::test(tokio::test)]
async fn test_list_with_garbage_nulls() {
// In Arrow, list nulls are allowed to be non-empty, with masked garbage values
Expand All @@ -1613,8 +1630,12 @@ mod tests {
.await;
}

#[rstest]
#[test_log::test(tokio::test)]
async fn test_simple_two_page_list() {
async fn test_simple_two_page_list(
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
structural_encoding: &str,
) {
// This is a simple pre-defined list that spans two pages. This test is useful for
// debugging the repetition index
let items_builder = Int64Builder::new();
Expand All @@ -1632,14 +1653,20 @@ mod tests {
}
let list_array_2 = list_builder.finish();

let mut metadata = HashMap::new();
metadata.insert(
STRUCTURAL_ENCODING_META_KEY.to_string(),
structural_encoding.into(),
);

let test_cases = TestCases::default()
.with_file_version(LanceFileVersion::V2_1)
.with_page_sizes(vec![100])
.with_range(800..900);
check_round_trip_encoding_of_data(
vec![Arc::new(list_array_1), Arc::new(list_array_2)],
&test_cases,
HashMap::new(),
metadata,
)
.await;
}
Expand Down
Loading
Loading