Skip to content

Commit fe2d874

Browse files
authored
feat: add priority to I/O scheduler (lancedb#2315)
This also renames store scheduler to scan scheduler. I'm thinking I don't want to get into the thorny issue of how to prioritize I/O requests across different scans. So, with this approach, if multiple scans are running at the same time, then they will overschedule (and let the OS deal with it). This can be revisited in the future. Closes lancedb#1958
1 parent c6f82fe commit fe2d874

File tree

23 files changed

+339
-63
lines changed

23 files changed

+339
-63
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ http = "0.2.9"
103103
itertools = "0.12"
104104
lazy_static = "1"
105105
log = "0.4"
106+
mockall = { version = "0.12.1" }
106107
mock_instant = { version = "0.3.1", features = ["sync"] }
107108
moka = "0.11"
108109
num-traits = "0.2"

python/src/file.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use lance_file::v2::{
2323
reader::{BufferDescriptor, CachedFileMetadata, FileReader},
2424
writer::{FileWriter, FileWriterOptions},
2525
};
26-
use lance_io::{scheduler::StoreScheduler, ReadBatchParams};
26+
use lance_io::{scheduler::ScanScheduler, ReadBatchParams};
2727
use object_store::path::Path;
2828
use pyo3::{
2929
exceptions::{PyIOError, PyRuntimeError, PyValueError},
@@ -267,7 +267,7 @@ impl LanceFileReader {
267267
let io_parallelism = std::env::var("IO_THREADS")
268268
.map(|val| val.parse::<u32>().unwrap_or(8))
269269
.unwrap_or(8);
270-
let scheduler = StoreScheduler::new(Arc::new(object_store), io_parallelism);
270+
let scheduler = ScanScheduler::new(Arc::new(object_store), io_parallelism);
271271
let file = scheduler.open_file(&path).await.infer_error()?;
272272
let inner = FileReader::try_open(file, None).await.infer_error()?;
273273
Ok(Self {

rust/lance-encoding/src/decoder.rs

+15-3
Original file line numberDiff line numberDiff line change
@@ -537,8 +537,12 @@ impl DecodeBatchScheduler {
537537

538538
let range = range.start as u32..range.end as u32;
539539

540-
self.root_scheduler
541-
.schedule_ranges(&[range.clone()], scheduler, &sink)?;
540+
self.root_scheduler.schedule_ranges(
541+
&[range.clone()],
542+
scheduler,
543+
&sink,
544+
range.start as u64,
545+
)?;
542546

543547
trace!("Finished scheduling of range {:?}", range);
544548
Ok(())
@@ -567,8 +571,11 @@ impl DecodeBatchScheduler {
567571
format!("{}, ..., {}", indices[0], indices[indices.len() - 1])
568572
}
569573
);
574+
if indices.is_empty() {
575+
return Ok(());
576+
}
570577
self.root_scheduler
571-
.schedule_take(indices, scheduler, &sink)?;
578+
.schedule_take(indices, scheduler, &sink, indices[0] as u64)?;
572579
trace!("Finished scheduling take of {} rows", indices.len());
573580
Ok(())
574581
}
@@ -740,10 +747,13 @@ pub trait PhysicalPageScheduler: Send + Sync + std::fmt::Debug {
740747
/// * `range` - the range of row offsets (relative to start of page) requested
741748
/// these must be ordered and must not overlap
742749
/// * `scheduler` - a scheduler to submit the I/O request to
750+
/// * `top_level_row` - the row offset of the top level field currently being
751+
/// scheduled. This can be used to assign priority to I/O requests
743752
fn schedule_ranges(
744753
&self,
745754
ranges: &[Range<u32>],
746755
scheduler: &dyn EncodingsIo,
756+
top_level_row: u64,
747757
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>>;
748758
}
749759

@@ -780,6 +790,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug {
780790
ranges: &[Range<u32>],
781791
scheduler: &Arc<dyn EncodingsIo>,
782792
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
793+
top_level_row: u64,
783794
) -> Result<()>;
784795
/// Schedules I/O for the requested rows (identified by row offsets from start of page)
785796
/// TODO: implement this using schedule_ranges
@@ -788,6 +799,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug {
788799
indices: &[u32],
789800
scheduler: &Arc<dyn EncodingsIo>,
790801
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
802+
top_level_row: u64,
791803
) -> Result<()>;
792804
/// The number of rows covered by this page
793805
fn num_rows(&self) -> u32;

rust/lance-encoding/src/encodings/logical/binary.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,12 @@ impl LogicalPageScheduler for BinaryPageScheduler {
4747
ranges: &[std::ops::Range<u32>],
4848
scheduler: &Arc<dyn crate::EncodingsIo>,
4949
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
50+
top_level_row: u64,
5051
) -> Result<()> {
5152
trace!("Scheduling binary for {} ranges", ranges.len());
5253
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
5354
self.varbin_scheduler
54-
.schedule_ranges(ranges, scheduler, &tx)?;
55+
.schedule_ranges(ranges, scheduler, &tx, top_level_row)?;
5556

5657
while let Some(decoder) = rx.recv().now_or_never() {
5758
let wrapped = BinaryPageDecoder {
@@ -69,6 +70,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
6970
indices: &[u32],
7071
scheduler: &Arc<dyn crate::EncodingsIo>,
7172
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
73+
top_level_row: u64,
7274
) -> Result<()> {
7375
trace!("Scheduling binary for {} indices", indices.len());
7476
self.schedule_ranges(
@@ -78,6 +80,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
7880
.collect::<Vec<_>>(),
7981
scheduler,
8082
sink,
83+
top_level_row,
8184
)
8285
}
8386

rust/lance-encoding/src/encodings/logical/fixed_size_list.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ impl LogicalPageScheduler for FslPageScheduler {
5353
ranges: &[Range<u32>],
5454
scheduler: &Arc<dyn EncodingsIo>,
5555
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
56+
top_level_row: u64,
5657
) -> Result<()> {
5758
let expanded_ranges = ranges
5859
.iter()
@@ -64,7 +65,7 @@ impl LogicalPageScheduler for FslPageScheduler {
6465
);
6566
let (tx, mut rx) = mpsc::unbounded_channel();
6667
self.items_scheduler
67-
.schedule_ranges(&expanded_ranges, scheduler, &tx)?;
68+
.schedule_ranges(&expanded_ranges, scheduler, &tx, top_level_row)?;
6869
let inner_page_decoder = rx.blocking_recv().unwrap();
6970
sink.send(Box::new(FslPageDecoder {
7071
inner: inner_page_decoder,
@@ -79,6 +80,7 @@ impl LogicalPageScheduler for FslPageScheduler {
7980
indices: &[u32],
8081
scheduler: &Arc<dyn EncodingsIo>,
8182
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
83+
top_level_row: u64,
8284
) -> Result<()> {
8385
self.schedule_ranges(
8486
&indices
@@ -87,6 +89,7 @@ impl LogicalPageScheduler for FslPageScheduler {
8789
.collect::<Vec<_>>(),
8890
scheduler,
8991
sink,
92+
top_level_row,
9093
)
9194
}
9295

rust/lance-encoding/src/encodings/logical/list.rs

+27-4
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ impl LogicalPageScheduler for ListPageScheduler {
227227
ranges: &[std::ops::Range<u32>],
228228
scheduler: &Arc<dyn EncodingsIo>,
229229
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
230+
top_level_row: u64,
230231
) -> Result<()> {
231232
// TODO: Shortcut here if the request covers the entire range (can be determined by
232233
// the first_invalid_offset). If this is the case we don't need any indirect I/O. We
@@ -258,7 +259,7 @@ impl LogicalPageScheduler for ListPageScheduler {
258259
// to this page.
259260
let (tx, mut rx) = mpsc::unbounded_channel();
260261
self.offsets_scheduler
261-
.schedule_ranges(&offsets_ranges, scheduler, &tx)?;
262+
.schedule_ranges(&offsets_ranges, scheduler, &tx, top_level_row)?;
262263
let mut scheduled_offsets = rx.try_recv().unwrap();
263264
let items_schedulers = self.items_schedulers.clone();
264265
let ranges = ranges.to_vec();
@@ -319,7 +320,17 @@ impl LogicalPageScheduler for ListPageScheduler {
319320
// All requested items are past this page, continue
320321
row_offset += next_scheduler.num_rows() as u64;
321322
if !next_item_ranges.is_empty() {
322-
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
323+
// Note: we are providing the same top_level_row to ALL items pages referenced by
324+
// this offsets page. This gives them higher priority.
325+
// TODO: Ideally we would ALSO have a guarantee from the scheduler that items with
326+
// the same top_level_row are scheduled in FCFS order but I don't think it works
327+
// that way. Still, this is probably good enough for a while
328+
next_scheduler.schedule_ranges(
329+
&next_item_ranges,
330+
&scheduler,
331+
&tx,
332+
top_level_row,
333+
)?;
323334
next_item_ranges.clear();
324335
}
325336
next_scheduler = item_schedulers.pop_front().unwrap();
@@ -342,14 +353,24 @@ impl LogicalPageScheduler for ListPageScheduler {
342353
next_item_ranges.push(page_range);
343354
row_offset += next_scheduler.num_rows() as u64;
344355
if !next_item_ranges.is_empty() {
345-
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
356+
next_scheduler.schedule_ranges(
357+
&next_item_ranges,
358+
&scheduler,
359+
&tx,
360+
top_level_row,
361+
)?;
346362
next_item_ranges.clear();
347363
}
348364
next_scheduler = item_schedulers.pop_front().unwrap();
349365
}
350366
}
351367
if !next_item_ranges.is_empty() {
352-
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
368+
next_scheduler.schedule_ranges(
369+
&next_item_ranges,
370+
&scheduler,
371+
&tx,
372+
top_level_row,
373+
)?;
353374
}
354375
let mut item_decoders = Vec::new();
355376
drop(tx);
@@ -388,6 +409,7 @@ impl LogicalPageScheduler for ListPageScheduler {
388409
indices: &[u32],
389410
scheduler: &Arc<dyn EncodingsIo>,
390411
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
412+
top_level_row: u64,
391413
) -> Result<()> {
392414
trace!("Scheduling list offsets for {} indices", indices.len());
393415
self.schedule_ranges(
@@ -397,6 +419,7 @@ impl LogicalPageScheduler for ListPageScheduler {
397419
.collect::<Vec<_>>(),
398420
scheduler,
399421
sink,
422+
top_level_row,
400423
)
401424
}
402425
}

rust/lance-encoding/src/encodings/logical/primitive.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,13 @@ impl LogicalPageScheduler for PrimitivePageScheduler {
8080
ranges: &[std::ops::Range<u32>],
8181
scheduler: &Arc<dyn EncodingsIo>,
8282
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
83+
top_level_row: u64,
8384
) -> Result<()> {
8485
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
8586
trace!("Scheduling ranges {:?} from physical page", ranges);
86-
let physical_decoder = self
87-
.physical_decoder
88-
.schedule_ranges(ranges, scheduler.as_ref());
87+
let physical_decoder =
88+
self.physical_decoder
89+
.schedule_ranges(ranges, scheduler.as_ref(), top_level_row);
8990

9091
let logical_decoder = PrimitiveFieldDecoder {
9192
data_type: self.data_type.clone(),
@@ -104,6 +105,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler {
104105
indices: &[u32],
105106
scheduler: &Arc<dyn EncodingsIo>,
106107
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
108+
top_level_row: u64,
107109
) -> Result<()> {
108110
trace!(
109111
"Scheduling take of {} indices from physical page",
@@ -116,6 +118,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler {
116118
.collect::<Vec<_>>(),
117119
scheduler,
118120
sink,
121+
top_level_row,
119122
)
120123
}
121124
}

rust/lance-encoding/src/encodings/logical/struct.rs

+19-3
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
110110
ranges: &[Range<u32>],
111111
scheduler: &Arc<dyn EncodingsIo>,
112112
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
113+
top_level_row: u64,
113114
) -> Result<()> {
114115
for range in ranges.iter().cloned() {
115116
let mut rows_to_read = range.end - range.start;
@@ -156,6 +157,8 @@ impl LogicalPageScheduler for SimpleStructScheduler {
156157
// The downside of the current algorithm is that many tiny I/O batches means less opportunity for in-batch coalescing.
157158
// Then again, if our outer batch coalescing is super good then maybe we don't bother
158159

160+
let mut current_top_level_row = top_level_row;
161+
159162
while rows_to_read > 0 {
160163
let mut min_rows_added = u32::MAX;
161164
for (col_idx, field_scheduler) in self.children.iter().enumerate() {
@@ -183,7 +186,12 @@ impl LogicalPageScheduler for SimpleStructScheduler {
183186
page_range_start,
184187
next_page
185188
);
186-
next_page.schedule_ranges(&[page_range], scheduler, sink)?;
189+
next_page.schedule_ranges(
190+
&[page_range],
191+
scheduler,
192+
sink,
193+
current_top_level_row,
194+
)?;
187195

188196
status.rows_queued += rows_to_take;
189197
status.rows_to_take -= rows_to_take;
@@ -199,6 +207,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
199207
panic!("Error in scheduling logic, panic to avoid infinite loop");
200208
}
201209
rows_to_read -= min_rows_added;
210+
current_top_level_row += min_rows_added as u64;
202211
for field_status in &mut field_status {
203212
field_status.rows_queued -= min_rows_added;
204213
}
@@ -216,6 +225,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
216225
indices: &[u32],
217226
scheduler: &Arc<dyn EncodingsIo>,
218227
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
228+
top_level_row: u64,
219229
) -> Result<()> {
220230
trace!("Scheduling struct decode of {} indices", indices.len());
221231

@@ -236,7 +246,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
236246
let mut rows_to_read = indices.len() as u32;
237247

238248
// NOTE: See schedule_range for a description of the scheduling algorithm
239-
249+
let mut current_top_level_row = top_level_row;
240250
while rows_to_read > 0 {
241251
let mut min_rows_added = u32::MAX;
242252
for (col_idx, field_scheduler) in self.children.iter().enumerate() {
@@ -269,7 +279,12 @@ impl LogicalPageScheduler for SimpleStructScheduler {
269279
// We should be guaranteed to get at least one page
270280
let next_page = next_page.unwrap();
271281

272-
next_page.schedule_take(&indices_in_page, scheduler, sink)?;
282+
next_page.schedule_take(
283+
&indices_in_page,
284+
scheduler,
285+
sink,
286+
current_top_level_row,
287+
)?;
273288

274289
let rows_scheduled = indices_in_page.len() as u32;
275290
status.rows_queued += rows_scheduled;
@@ -281,6 +296,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
281296
panic!("Error in scheduling logic, panic to avoid infinite loop");
282297
}
283298
rows_to_read -= min_rows_added;
299+
current_top_level_row += min_rows_added as u64;
284300
for field_status in &mut field_status {
285301
field_status.rows_queued -= min_rows_added;
286302
}

rust/lance-encoding/src/encodings/physical/basic.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -123,18 +123,27 @@ impl PhysicalPageScheduler for BasicPageScheduler {
123123
&self,
124124
ranges: &[std::ops::Range<u32>],
125125
scheduler: &dyn EncodingsIo,
126+
top_level_row: u64,
126127
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>> {
127128
let validity_future = match &self.mode {
128129
SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None,
129130
SchedulerNullStatus::Some(schedulers) => {
130131
trace!("Scheduling ranges {:?} from validity", ranges);
131-
Some(schedulers.validity.schedule_ranges(ranges, scheduler))
132+
Some(
133+
schedulers
134+
.validity
135+
.schedule_ranges(ranges, scheduler, top_level_row),
136+
)
132137
}
133138
};
134139

135140
let values_future = if let Some(values_scheduler) = self.mode.values_scheduler() {
136141
trace!("Scheduling range {:?} from values", ranges);
137-
Some(values_scheduler.schedule_ranges(ranges, scheduler).boxed())
142+
Some(
143+
values_scheduler
144+
.schedule_ranges(ranges, scheduler, top_level_row)
145+
.boxed(),
146+
)
138147
} else {
139148
trace!("No values fetch needed since values all null");
140149
None

rust/lance-encoding/src/encodings/physical/bitmap.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ impl PhysicalPageScheduler for DenseBitmapScheduler {
3535
&self,
3636
ranges: &[Range<u32>],
3737
scheduler: &dyn EncodingsIo,
38+
top_level_row: u64,
3839
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>> {
3940
let mut min = u64::MAX;
4041
let mut max = 0;
@@ -62,7 +63,7 @@ impl PhysicalPageScheduler for DenseBitmapScheduler {
6263
min,
6364
max
6465
);
65-
let bytes = scheduler.submit_request(byte_ranges);
66+
let bytes = scheduler.submit_request(byte_ranges, top_level_row);
6667

6768
async move {
6869
let bytes = bytes.await?;

0 commit comments

Comments
 (0)