Skip to content

Commit 33ae43b

Browse files
authored
feat: add support for empty structs to the 2.0 format (#3499)
1 parent 9211330 commit 33ae43b

File tree

7 files changed

+164
-14
lines changed

7 files changed

+164
-14
lines changed

python/python/tests/test_file.py

+11
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,17 @@ def round_trip(arr):
359359
assert round_tripped.type == dict_arr.type
360360

361361

362+
def test_empty_structs(tmp_path):
363+
schema = pa.schema([pa.field("empties", pa.struct([]))])
364+
table = pa.table({"empties": [{}] * 3}, schema=schema)
365+
path = tmp_path / "foo.lance"
366+
with LanceFileWriter(str(path)) as writer:
367+
writer.write_batch(table)
368+
reader = LanceFileReader(str(path))
369+
round_tripped = reader.read_all().to_table()
370+
assert round_tripped == table
371+
372+
362373
def test_write_read_global_buffer(tmp_path):
363374
table = pa.table({"a": [1, 2, 3]})
364375
path = tmp_path / "foo.lance"

rust/lance-datagen/src/generator.rs

+6
Original file line numberDiff line numberDiff line change
@@ -1183,6 +1183,12 @@ impl ArrayGenerator for RandomStructGenerator {
11831183
length: RowCount,
11841184
rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
11851185
) -> Result<Arc<dyn arrow_array::Array>, ArrowError> {
1186+
if self.child_gens.is_empty() {
1187+
// Have to create empty struct arrays specially to ensure they have the correct
1188+
// row count
1189+
let struct_arr = StructArray::new_empty_fields(length.0 as usize, None);
1190+
return Ok(Arc::new(struct_arr));
1191+
}
11861192
let child_arrays = self
11871193
.child_gens
11881194
.iter_mut()

rust/lance-encoding/src/decoder.rs

+6
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,11 @@ impl CoreFieldDecoderStrategy {
959959
} else {
960960
// use default struct encoding
961961
Self::check_simple_struct(column_info, &field.name).unwrap();
962+
let num_rows = column_info
963+
.page_infos
964+
.iter()
965+
.map(|page| page.num_rows)
966+
.sum();
962967
let mut child_schedulers = Vec::with_capacity(field.children.len());
963968
for field in &field.children {
964969
column_infos.next_top_level();
@@ -971,6 +976,7 @@ impl CoreFieldDecoderStrategy {
971976
Ok(Box::new(SimpleStructScheduler::new(
972977
child_schedulers,
973978
fields,
979+
num_rows,
974980
)))
975981
}
976982
}

rust/lance-encoding/src/encodings/logical/list.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ async fn indirect_schedule_task(
365365
// Create a new root scheduler, which has one column, which is our items data
366366
let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
367367
let indirect_root_scheduler =
368-
SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone());
368+
SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
369369
let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
370370
Arc::new(indirect_root_scheduler),
371371
root_fields.clone(),

rust/lance-encoding/src/encodings/logical/struct.rs

+107-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
};
99

1010
use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
11-
use arrow_schema::{DataType, Fields};
11+
use arrow_schema::{DataType, Field, Fields};
1212
use futures::{
1313
future::BoxFuture,
1414
stream::{FuturesOrdered, FuturesUnordered},
@@ -64,6 +64,89 @@ impl Ord for SchedulingJobWithStatus<'_> {
6464
}
6565
}
6666

67+
#[derive(Debug)]
68+
struct EmptyStructDecodeTask {
69+
num_rows: u64,
70+
}
71+
72+
impl DecodeArrayTask for EmptyStructDecodeTask {
73+
fn decode(self: Box<Self>) -> Result<ArrayRef> {
74+
Ok(Arc::new(StructArray::new_empty_fields(
75+
self.num_rows as usize,
76+
None,
77+
)))
78+
}
79+
}
80+
81+
#[derive(Debug)]
82+
struct EmptyStructDecoder {
83+
num_rows: u64,
84+
rows_drained: u64,
85+
data_type: DataType,
86+
}
87+
88+
impl EmptyStructDecoder {
89+
fn new(num_rows: u64) -> Self {
90+
Self {
91+
num_rows,
92+
rows_drained: 0,
93+
data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
94+
}
95+
}
96+
}
97+
98+
impl LogicalPageDecoder for EmptyStructDecoder {
99+
fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<Result<()>> {
100+
Box::pin(std::future::ready(Ok(())))
101+
}
102+
fn rows_loaded(&self) -> u64 {
103+
self.num_rows
104+
}
105+
fn rows_unloaded(&self) -> u64 {
106+
0
107+
}
108+
fn num_rows(&self) -> u64 {
109+
self.num_rows
110+
}
111+
fn rows_drained(&self) -> u64 {
112+
self.rows_drained
113+
}
114+
fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
115+
self.rows_drained += num_rows;
116+
Ok(NextDecodeTask {
117+
num_rows,
118+
task: Box::new(EmptyStructDecodeTask { num_rows }),
119+
})
120+
}
121+
fn data_type(&self) -> &DataType {
122+
&self.data_type
123+
}
124+
}
125+
126+
#[derive(Debug)]
127+
struct EmptyStructSchedulerJob {
128+
num_rows: u64,
129+
}
130+
131+
impl SchedulingJob for EmptyStructSchedulerJob {
132+
fn schedule_next(
133+
&mut self,
134+
context: &mut SchedulerContext,
135+
_priority: &dyn PriorityRange,
136+
) -> Result<ScheduledScanLine> {
137+
let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
138+
let struct_decoder = context.locate_decoder(empty_decoder);
139+
Ok(ScheduledScanLine {
140+
decoders: vec![MessageType::DecoderReady(struct_decoder)],
141+
rows_scheduled: self.num_rows,
142+
})
143+
}
144+
145+
fn num_rows(&self) -> u64 {
146+
self.num_rows
147+
}
148+
}
149+
67150
/// Scheduling job for struct data
68151
///
69152
/// The order in which we schedule the children is important. We want to schedule the child
@@ -175,9 +258,15 @@ pub struct SimpleStructScheduler {
175258
}
176259

177260
impl SimpleStructScheduler {
178-
pub fn new(children: Vec<Arc<dyn FieldScheduler>>, child_fields: Fields) -> Self {
179-
debug_assert!(!children.is_empty());
180-
let num_rows = children[0].num_rows();
261+
pub fn new(
262+
children: Vec<Arc<dyn FieldScheduler>>,
263+
child_fields: Fields,
264+
num_rows: u64,
265+
) -> Self {
266+
let num_rows = children
267+
.first()
268+
.map(|child| child.num_rows())
269+
.unwrap_or(num_rows);
181270
debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
182271
Self {
183272
children,
@@ -193,6 +282,11 @@ impl FieldScheduler for SimpleStructScheduler {
193282
ranges: &[Range<u64>],
194283
filter: &FilterExpression,
195284
) -> Result<Box<dyn SchedulingJob + 'a>> {
285+
if self.children.is_empty() {
286+
return Ok(Box::new(EmptyStructSchedulerJob {
287+
num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
288+
}));
289+
}
196290
let child_schedulers = self
197291
.children
198292
.iter()
@@ -1120,6 +1214,15 @@ mod tests {
11201214
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
11211215
}
11221216

1217+
#[test_log::test(tokio::test)]
1218+
async fn test_empty_struct() {
1219+
// It's technically legal for a struct to have 0 children, need to
1220+
// make sure we support that
1221+
let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
1222+
let field = Field::new("row", data_type, false);
1223+
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1224+
}
1225+
11231226
#[test_log::test(tokio::test)]
11241227
async fn test_complicated_struct() {
11251228
let data_type = DataType::Struct(Fields::from(vec![

rust/lance-encoding/src/testing.rs

+18-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use std::{cmp::Ordering, collections::HashMap, ops::Range, sync::Arc};
55

66
use arrow::array::make_comparator;
7-
use arrow_array::{Array, UInt64Array};
7+
use arrow_array::{Array, StructArray, UInt64Array};
88
use arrow_schema::{DataType, Field, FieldRef, Schema, SortOptions};
99
use arrow_select::concat::concat;
1010
use bytes::{Bytes, BytesMut};
@@ -651,9 +651,23 @@ async fn check_round_trip_encoding_inner(
651651
}
652652
let num_rows = indices.len() as u64;
653653
let indices_arr = UInt64Array::from(indices.clone());
654-
let expected = concat_data
655-
.as_ref()
656-
.map(|concat_data| arrow_select::take::take(&concat_data, &indices_arr, None).unwrap());
654+
655+
// There is a bug in arrow_select::take::take that causes it to return empty arrays
656+
// if the data type is an empty struct. This is a workaround for that.
657+
let is_empty_struct = if let DataType::Struct(fields) = field.data_type() {
658+
fields.is_empty()
659+
} else {
660+
false
661+
};
662+
663+
let expected = if is_empty_struct {
664+
Some(Arc::new(StructArray::new_empty_fields(indices_arr.len(), None)) as Arc<dyn Array>)
665+
} else {
666+
concat_data.as_ref().map(|concat_data| {
667+
arrow_select::take::take(&concat_data, &indices_arr, None).unwrap()
668+
})
669+
};
670+
657671
let scheduler = scheduler.clone();
658672
let indices = indices.clone();
659673
test_decode(

rust/lance-file/src/v2/reader.rs

+15-5
Original file line numberDiff line numberDiff line change
@@ -1525,6 +1525,11 @@ pub mod tests {
15251525
.copied()
15261526
.collect::<BTreeMap<_, _>>();
15271527

1528+
let empty_projection = ReaderProjection {
1529+
column_indices: Vec::default(),
1530+
schema: Arc::new(Schema::default()),
1531+
};
1532+
15281533
for columns in [
15291534
vec!["score"],
15301535
vec!["location"],
@@ -1606,12 +1611,17 @@ pub mod tests {
16061611
})),
16071612
)
16081613
.await;
1609-
}
16101614

1611-
let empty_projection = ReaderProjection {
1612-
column_indices: Vec::default(),
1613-
schema: Arc::new(Schema::default()),
1614-
};
1615+
assert!(file_reader
1616+
.read_stream_projected(
1617+
lance_io::ReadBatchParams::RangeFull,
1618+
1024,
1619+
16,
1620+
empty_projection.clone(),
1621+
FilterExpression::no_filter(),
1622+
)
1623+
.is_err());
1624+
}
16151625

16161626
assert!(FileReader::try_open(
16171627
file_scheduler.clone(),

0 commit comments

Comments
 (0)