Skip to content

Commit 86a879c

Browse files
committedJan 7, 2025
Add support for repetition index in full-zip encoding
1 parent a6aadaf commit 86a879c

File tree

9 files changed

+1206
-159
lines changed

9 files changed

+1206
-159
lines changed
 

‎rust/lance-encoding/src/buffer.rs

+47
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use std::{ops::Deref, panic::RefUnwindSafe, ptr::NonNull, sync::Arc};
77

88
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
9+
use itertools::Either;
910
use snafu::{location, Location};
1011

1112
use lance_core::{utils::bit::is_pwr_two, Error, Result};
@@ -104,6 +105,18 @@ impl LanceBuffer {
104105
hex::encode_upper(self)
105106
}
106107

108+
/// Combine multiple buffers into a single buffer
109+
///
110+
/// This does involve a data copy (and allocation of a new buffer)
111+
pub fn concat(buffers: &[Self]) -> Self {
112+
let total_len = buffers.iter().map(|b| b.len()).sum();
113+
let mut data = Vec::with_capacity(total_len);
114+
for buffer in buffers {
115+
data.extend_from_slice(buffer.as_ref());
116+
}
117+
Self::Owned(data)
118+
}
119+
107120
/// Converts the buffer into a hex string, inserting a space
108121
/// between words
109122
pub fn as_spaced_hex(&self, bytes_per_word: u32) -> String {
@@ -390,6 +403,40 @@ impl From<Buffer> for LanceBuffer {
390403
}
391404
}
392405

406+
// An iterator that keeps a clone of a borrowed LanceBuffer so we
407+
// can have a 'static lifetime
408+
pub struct BorrowedBufferIter {
409+
buffer: arrow_buffer::Buffer,
410+
index: usize,
411+
}
412+
413+
impl Iterator for BorrowedBufferIter {
414+
type Item = u8;
415+
416+
fn next(&mut self) -> Option<Self::Item> {
417+
if self.index >= self.buffer.len() {
418+
None
419+
} else {
420+
// SAFETY: we just checked that index is in bounds
421+
let byte = unsafe { self.buffer.get_unchecked(self.index) };
422+
self.index += 1;
423+
Some(*byte)
424+
}
425+
}
426+
}
427+
428+
impl IntoIterator for LanceBuffer {
429+
type Item = u8;
430+
type IntoIter = Either<std::vec::IntoIter<u8>, BorrowedBufferIter>;
431+
432+
fn into_iter(self) -> Self::IntoIter {
433+
match self {
434+
Self::Borrowed(buffer) => Either::Right(BorrowedBufferIter { buffer, index: 0 }),
435+
Self::Owned(buffer) => Either::Left(buffer.into_iter()),
436+
}
437+
}
438+
}
439+
393440
#[cfg(test)]
394441
mod tests {
395442
use arrow_buffer::Buffer;

‎rust/lance-encoding/src/encoder.rs

+27-5
Original file line numberDiff line numberDiff line change
@@ -1221,15 +1221,14 @@ impl StructuralEncodingStrategy {
12211221
| DataType::LargeUtf8,
12221222
)
12231223
}
1224-
}
12251224

1226-
impl FieldEncodingStrategy for StructuralEncodingStrategy {
1227-
fn create_field_encoder(
1225+
fn do_create_field_encoder(
12281226
&self,
12291227
_encoding_strategy_root: &dyn FieldEncodingStrategy,
12301228
field: &Field,
12311229
column_index: &mut ColumnIndexSequence,
12321230
options: &EncodingOptions,
1231+
root_field_metadata: &HashMap<String, String>,
12331232
) -> Result<Box<dyn FieldEncoder>> {
12341233
let data_type = field.data_type();
12351234
if Self::is_primitive_type(&data_type) {
@@ -1238,16 +1237,18 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
12381237
self.compression_strategy.clone(),
12391238
column_index.next_column_index(field.id as u32),
12401239
field.clone(),
1240+
Arc::new(root_field_metadata.clone()),
12411241
)?))
12421242
} else {
12431243
match data_type {
12441244
DataType::List(_) | DataType::LargeList(_) => {
12451245
let child = field.children.first().expect("List should have a child");
1246-
let child_encoder = self.create_field_encoder(
1246+
let child_encoder = self.do_create_field_encoder(
12471247
_encoding_strategy_root,
12481248
child,
12491249
column_index,
12501250
options,
1251+
root_field_metadata,
12511252
)?;
12521253
Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
12531254
}
@@ -1258,17 +1259,19 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
12581259
self.compression_strategy.clone(),
12591260
column_index.next_column_index(field.id as u32),
12601261
field.clone(),
1262+
Arc::new(root_field_metadata.clone()),
12611263
)?))
12621264
} else {
12631265
let children_encoders = field
12641266
.children
12651267
.iter()
12661268
.map(|field| {
1267-
self.create_field_encoder(
1269+
self.do_create_field_encoder(
12681270
_encoding_strategy_root,
12691271
field,
12701272
column_index,
12711273
options,
1274+
root_field_metadata,
12721275
)
12731276
})
12741277
.collect::<Result<Vec<_>>>()?;
@@ -1283,6 +1286,7 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
12831286
self.compression_strategy.clone(),
12841287
column_index.next_column_index(field.id as u32),
12851288
field.clone(),
1289+
Arc::new(root_field_metadata.clone()),
12861290
)?))
12871291
} else {
12881292
// A dictionary of logical is, itself, logical and we don't support that today
@@ -1299,6 +1303,24 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
12991303
}
13001304
}
13011305

1306+
impl FieldEncodingStrategy for StructuralEncodingStrategy {
1307+
fn create_field_encoder(
1308+
&self,
1309+
encoding_strategy_root: &dyn FieldEncodingStrategy,
1310+
field: &Field,
1311+
column_index: &mut ColumnIndexSequence,
1312+
options: &EncodingOptions,
1313+
) -> Result<Box<dyn FieldEncoder>> {
1314+
self.do_create_field_encoder(
1315+
encoding_strategy_root,
1316+
field,
1317+
column_index,
1318+
options,
1319+
&field.metadata,
1320+
)
1321+
}
1322+
}
1323+
13021324
/// A batch encoder that encodes RecordBatch objects by delegating
13031325
/// to field encoders for each top-level field in the batch.
13041326
pub struct BatchEncoder {

‎rust/lance-encoding/src/encodings/logical/list.rs

+33-6
Original file line numberDiff line numberDiff line change
@@ -1465,6 +1465,9 @@ mod tests {
14651465
};
14661466
use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
14671467
use arrow_schema::{DataType, Field, Fields};
1468+
use lance_core::datatypes::{
1469+
STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
1470+
};
14681471
use rstest::rstest;
14691472

14701473
use crate::{
@@ -1484,8 +1487,16 @@ mod tests {
14841487
#[test_log::test(tokio::test)]
14851488
async fn test_list(
14861489
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1490+
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1491+
structural_encoding: &str,
14871492
) {
1488-
let field = Field::new("", make_list_type(DataType::Int32), true);
1493+
let mut field_metadata = HashMap::new();
1494+
field_metadata.insert(
1495+
STRUCTURAL_ENCODING_META_KEY.to_string(),
1496+
structural_encoding.into(),
1497+
);
1498+
let field =
1499+
Field::new("", make_list_type(DataType::Int32), true).with_metadata(field_metadata);
14891500
check_round_trip_encoding_random(field, version).await;
14901501
}
14911502

@@ -1544,6 +1555,8 @@ mod tests {
15441555
#[test_log::test(tokio::test)]
15451556
async fn test_simple_list(
15461557
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1558+
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1559+
structural_encoding: &str,
15471560
) {
15481561
let items_builder = Int32Builder::new();
15491562
let mut list_builder = ListBuilder::new(items_builder);
@@ -1553,18 +1566,23 @@ mod tests {
15531566
list_builder.append_value([Some(6), Some(7), Some(8)]);
15541567
let list_array = list_builder.finish();
15551568

1569+
let mut field_metadata = HashMap::new();
1570+
field_metadata.insert(
1571+
STRUCTURAL_ENCODING_META_KEY.to_string(),
1572+
structural_encoding.into(),
1573+
);
1574+
15561575
let test_cases = TestCases::default()
15571576
.with_range(0..2)
15581577
.with_range(0..3)
15591578
.with_range(1..3)
15601579
.with_indices(vec![1, 3])
15611580
.with_indices(vec![2])
15621581
.with_file_version(version);
1563-
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1582+
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
15641583
.await;
15651584
}
15661585

1567-
#[rstest]
15681586
#[test_log::test(tokio::test)]
15691587
async fn test_simple_sliced_list() {
15701588
let items_builder = Int32Builder::new();
@@ -1587,7 +1605,6 @@ mod tests {
15871605
.await;
15881606
}
15891607

1590-
#[rstest]
15911608
#[test_log::test(tokio::test)]
15921609
async fn test_list_with_garbage_nulls() {
15931610
// In Arrow, list nulls are allowed to be non-empty, with masked garbage values
@@ -1613,8 +1630,12 @@ mod tests {
16131630
.await;
16141631
}
16151632

1633+
#[rstest]
16161634
#[test_log::test(tokio::test)]
1617-
async fn test_simple_two_page_list() {
1635+
async fn test_simple_two_page_list(
1636+
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1637+
structural_encoding: &str,
1638+
) {
16181639
// This is a simple pre-defined list that spans two pages. This test is useful for
16191640
// debugging the repetition index
16201641
let items_builder = Int64Builder::new();
@@ -1632,14 +1653,20 @@ mod tests {
16321653
}
16331654
let list_array_2 = list_builder.finish();
16341655

1656+
let mut metadata = HashMap::new();
1657+
metadata.insert(
1658+
STRUCTURAL_ENCODING_META_KEY.to_string(),
1659+
structural_encoding.into(),
1660+
);
1661+
16351662
let test_cases = TestCases::default()
16361663
.with_file_version(LanceFileVersion::V2_1)
16371664
.with_page_sizes(vec![100])
16381665
.with_range(800..900);
16391666
check_round_trip_encoding_of_data(
16401667
vec![Arc::new(list_array_1), Arc::new(list_array_2)],
16411668
&test_cases,
1642-
HashMap::new(),
1669+
metadata,
16431670
)
16441671
.await;
16451672
}

0 commit comments

Comments
 (0)