Skip to content

Commit

Permalink
Chunk store: support for overlapped range queries (#7586)
Browse files Browse the repository at this point in the history
Turns out I never got around to implementing support for overlapped
chunks in range queries in the `ChunkStore`.

I only realized after hunting down a crazy bug in the dataframe APIs for
way too long...

The fix is the exact same as the one used for latest-at queries. In fact
it is a copy paste.
  • Loading branch information
teh-cmc authored Oct 4, 2024
1 parent 6ffcc4f commit 82eb7b0
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 0 deletions.
21 changes: 21 additions & 0 deletions crates/store/re_chunk_store/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,27 @@ impl ChunkStore {
query.range.max()
};

// Overlapped chunks
// =================
//
// To deal with potentially overlapping chunks, we keep track of the longest
// interval in the entire map, which gives us an upper bound on how much we
// would need to walk backwards in order to find all potential overlaps.
//
// This is a fairly simple solution that scales much better than interval-tree
// based alternatives, both in terms of complexity and performance, in the normal
// case where most chunks in a collection have similar lengths.
//
// The most degenerate case -- a single chunk overlaps everything else -- results
// in `O(n)` performance, which gets amortized by the query cache.
// If that turns out to be a problem in practice, we can experiment with more
// complex solutions then.
let query_min = TimeInt::new_temporal(
query_min
.as_i64()
.saturating_sub(temporal_chunk_ids_per_time.max_interval_length as _),
);

let start_time = temporal_chunk_ids_per_time
.per_start_time
.range(..=query_min)
Expand Down
170 changes: 170 additions & 0 deletions crates/store/re_chunk_store/tests/reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,3 +893,173 @@ fn range() -> anyhow::Result<()> {

Ok(())
}

#[test]
fn range_overlapped_chunks() -> anyhow::Result<()> {
re_log::setup_logging();

let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
ChunkStoreConfig::COMPACTION_DISABLED,
);

let entity_path = EntityPath::from("this/that");

let frame0 = TimeInt::new_temporal(0);
let frame1 = TimeInt::new_temporal(1);
let frame2 = TimeInt::new_temporal(2);
let frame3 = TimeInt::new_temporal(3);
let frame4 = TimeInt::new_temporal(4);
let frame5 = TimeInt::new_temporal(5);
let frame6 = TimeInt::new_temporal(6);
let frame7 = TimeInt::new_temporal(7);
let frame8 = TimeInt::new_temporal(8);

let points1 = MyPoint::from_iter(0..1);
let points2 = MyPoint::from_iter(1..2);
let points3 = MyPoint::from_iter(2..3);
let points4 = MyPoint::from_iter(3..4);
let points5 = MyPoint::from_iter(4..5);
let points7_1 = MyPoint::from_iter(6..7);
let points7_2 = MyPoint::from_iter(7..8);
let points7_3 = MyPoint::from_iter(8..9);

let row_id1_1 = RowId::new();
let row_id1_3 = RowId::new();
let row_id1_5 = RowId::new();
let row_id1_7_1 = RowId::new();
let row_id1_7_2 = RowId::new();
let row_id1_7_3 = RowId::new();
let chunk1_1 = Chunk::builder(entity_path.clone())
.with_sparse_component_batches(
row_id1_1,
[build_frame_nr(frame1)],
[(MyPoint::name(), Some(&points1 as _))],
)
.with_sparse_component_batches(
row_id1_3,
[build_frame_nr(frame3)],
[(MyPoint::name(), Some(&points3 as _))],
)
.with_sparse_component_batches(
row_id1_5,
[build_frame_nr(frame5)],
[(MyPoint::name(), Some(&points5 as _))],
)
.with_sparse_component_batches(
row_id1_7_1,
[build_frame_nr(frame7)],
[(MyPoint::name(), Some(&points7_1 as _))],
)
.with_sparse_component_batches(
row_id1_7_2,
[build_frame_nr(frame7)],
[(MyPoint::name(), Some(&points7_2 as _))],
)
.with_sparse_component_batches(
row_id1_7_3,
[build_frame_nr(frame7)],
[(MyPoint::name(), Some(&points7_3 as _))],
)
.build()?;

let chunk1_1 = Arc::new(chunk1_1);
store.insert_chunk(&chunk1_1)?;
let chunk1_2 = Arc::new(chunk1_1.clone_as(ChunkId::new(), RowId::new()));
store.insert_chunk(&chunk1_2)?; // x2 !
let chunk1_3 = Arc::new(chunk1_1.clone_as(ChunkId::new(), RowId::new()));
store.insert_chunk(&chunk1_3)?; // x3 !!

let row_id2_2 = RowId::new();
let row_id2_3 = RowId::new();
let row_id2_4 = RowId::new();
let chunk2 = Chunk::builder(entity_path.clone())
.with_sparse_component_batches(
row_id2_2,
[build_frame_nr(frame2)],
[(MyPoint::name(), Some(&points2 as _))],
)
.with_sparse_component_batches(
row_id2_3,
[build_frame_nr(frame3)],
[(MyPoint::name(), Some(&points3 as _))],
)
.with_sparse_component_batches(
row_id2_4,
[build_frame_nr(frame4)],
[(MyPoint::name(), Some(&points4 as _))],
)
.build()?;

let chunk2 = Arc::new(chunk2);
store.insert_chunk(&chunk2)?;

let assert_range_chunk = |time_range: ResolvedTimeRange,
mut expected_chunk_ids: Vec<ChunkId>| {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);

eprintln!("--- {time_range:?} ---");
let mut chunk_ids = store
.range_relevant_chunks_for_all_components(
&RangeQuery::new(timeline_frame_nr, time_range),
&entity_path,
)
.into_iter()
.map(|chunk| {
eprintln!("{chunk}");
chunk.id()
})
.collect_vec();
chunk_ids.sort();

expected_chunk_ids.sort();

similar_asserts::assert_eq!(expected_chunk_ids, chunk_ids);
};

// Unit ranges
assert_range_chunk(ResolvedTimeRange::new(frame0, frame0), vec![]);
assert_range_chunk(
ResolvedTimeRange::new(frame1, frame1),
vec![chunk1_1.id(), chunk1_2.id(), chunk1_3.id()],
);
assert_range_chunk(
ResolvedTimeRange::new(frame2, frame2),
vec![chunk1_1.id(), chunk1_2.id(), chunk1_3.id(), chunk2.id()],
);
assert_range_chunk(
ResolvedTimeRange::new(frame3, frame3),
vec![chunk1_1.id(), chunk1_2.id(), chunk1_3.id(), chunk2.id()],
);
assert_range_chunk(
ResolvedTimeRange::new(frame4, frame4),
vec![chunk1_1.id(), chunk1_2.id(), chunk1_3.id(), chunk2.id()],
);
assert_range_chunk(
ResolvedTimeRange::new(frame5, frame5),
vec![chunk1_1.id(), chunk1_2.id(), chunk1_3.id()],
);
assert_range_chunk(
ResolvedTimeRange::new(frame6, frame6),
vec![chunk1_1.id(), chunk1_2.id(), chunk1_3.id()],
);
assert_range_chunk(
ResolvedTimeRange::new(frame7, frame7),
vec![chunk1_1.id(), chunk1_2.id(), chunk1_3.id()],
);
assert_range_chunk(ResolvedTimeRange::new(frame8, frame8), vec![]);

// Full range
assert_range_chunk(
ResolvedTimeRange::new(frame1, frame5),
vec![chunk1_1.id(), chunk1_2.id(), chunk1_3.id(), chunk2.id()],
);

// Infinite range
assert_range_chunk(
ResolvedTimeRange::EVERYTHING,
vec![chunk1_1.id(), chunk1_2.id(), chunk1_3.id(), chunk2.id()],
);

Ok(())
}

0 comments on commit 82eb7b0

Please sign in to comment.