Skip to content

Commit

Permalink
Merge branch 'main' into jan/grpc-log-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
jprochazk authored Jan 17, 2025
2 parents e5ca91e + 828e317 commit e4ef1af
Show file tree
Hide file tree
Showing 31 changed files with 326 additions and 396 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/contrib_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ jobs:
no-codegen-changes:
name: Check if running codegen would produce any changes
runs-on: ubuntu-latest-16-cores
# TODO(andreas): setup-vulkan doesn't work on 24.4 right now due to missing .so
runs-on: ubuntu-22.04-large
steps:
# Note: We explicitly don't override `ref` here. We need to see if changes would be made
# in a context where we have merged with main. Otherwise we might miss changes such as one
Expand All @@ -92,7 +93,8 @@ jobs:

rs-lints:
name: Rust lints (fmt, check, clippy, tests, doc)
runs-on: ubuntu-latest-16-cores
# TODO(andreas): setup-vulkan doesn't work on 24.4 right now due to missing .so
runs-on: ubuntu-22.04-16core
steps:
- uses: actions/checkout@v4
with:
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/reusable_checks_rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ jobs:

rs-lints:
name: Rust lints (fmt, check, clippy, tests, doc)
runs-on: ubuntu-latest-16-cores
# TODO(andreas): setup-vulkan doesn't work on 24.4 right now due to missing .so
runs-on: ubuntu-22.04-large
steps:
- uses: actions/checkout@v4
with:
Expand Down
5 changes: 1 addition & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5605,8 +5605,6 @@ dependencies = [
"hashbrown 0.14.5",
"num-traits",
"rustc_version",
"serde",
"serde_derive",
"simdutf8",
]

Expand Down Expand Up @@ -5748,7 +5746,6 @@ dependencies = [
"re_tracing",
"re_types",
"re_types_core",
"serde_json",
"similar-asserts",
"thiserror 1.0.65",
"tinyvec",
Expand Down Expand Up @@ -6235,6 +6232,7 @@ version = "0.22.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
"arrow",
"backtrace",
"bytemuck",
"criterion",
Expand Down Expand Up @@ -7420,7 +7418,6 @@ dependencies = [
"pyo3",
"pyo3-build-config",
"rand",
"re_arrow2",
"re_arrow_util",
"re_build_info",
"re_build_tools",
Expand Down
54 changes: 16 additions & 38 deletions crates/store/re_chunk/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow2::array::Array as Arrow2Array;
use arrow::array::ArrayRef as ArrowArrayRef;
use arrow2::array::Array as _;

use re_log_types::{TimeInt, Timeline};
use re_types_core::{Component, ComponentName};
Expand All @@ -21,26 +21,13 @@ impl Chunk {
&self,
component_name: &ComponentName,
row_index: usize,
) -> Option<ChunkResult<ArrayRef>> {
self.component_batch_raw_arrow2(component_name, row_index)
.map(|res| res.map(|array| array.into()))
}

/// Returns the raw data for the specified component.
///
/// Returns an error if the row index is out of bounds.
#[inline]
fn component_batch_raw_arrow2(
&self,
component_name: &ComponentName,
row_index: usize,
) -> Option<ChunkResult<Box<dyn Arrow2Array>>> {
) -> Option<ChunkResult<ArrowArrayRef>> {
self.get_first_component(component_name)
.and_then(|list_array| {
if list_array.len() > row_index {
list_array
.is_valid(row_index)
.then(|| Ok(list_array.value(row_index)))
.then(|| Ok(list_array.value(row_index).into()))
} else {
Some(Err(crate::ChunkError::IndexOutOfBounds {
kind: "row".to_owned(),
Expand Down Expand Up @@ -78,7 +65,7 @@ impl Chunk {
component_name: &ComponentName,
row_index: usize,
instance_index: usize,
) -> Option<ChunkResult<ArrayRef>> {
) -> Option<ChunkResult<ArrowArrayRef>> {
let res = self.component_batch_raw(component_name, row_index)?;

let array = match res {
Expand Down Expand Up @@ -131,7 +118,7 @@ impl Chunk {
&self,
component_name: &ComponentName,
row_index: usize,
) -> Option<ChunkResult<ArrayRef>> {
) -> Option<ChunkResult<ArrowArrayRef>> {
let res = self.component_batch_raw(component_name, row_index)?;

let array = match res {
Expand Down Expand Up @@ -276,20 +263,11 @@ impl UnitChunkShared {

/// Returns the raw data for the specified component.
#[inline]
pub fn component_batch_raw(&self, component_name: &ComponentName) -> Option<ArrayRef> {
self.component_batch_raw_arrow2(component_name)
.map(|array| array.into())
}

/// Returns the raw data for the specified component.
#[inline]
pub fn component_batch_raw_arrow2(
&self,
component_name: &ComponentName,
) -> Option<Box<dyn Arrow2Array>> {
pub fn component_batch_raw(&self, component_name: &ComponentName) -> Option<ArrowArrayRef> {
debug_assert!(self.num_rows() == 1);
self.get_first_component(component_name)
.and_then(|list_array| list_array.is_valid(0).then(|| list_array.value(0)))
.map(|array| array.into())
}

/// Returns the deserialized data for the specified component.
Expand All @@ -311,10 +289,10 @@ impl UnitChunkShared {
&self,
component_name: &ComponentName,
instance_index: usize,
) -> Option<ChunkResult<Box<dyn Arrow2Array>>> {
let array = self.component_batch_raw_arrow2(component_name)?;
) -> Option<ChunkResult<ArrowArrayRef>> {
let array = self.component_batch_raw(component_name)?;
if array.len() > instance_index {
Some(Ok(array.sliced(instance_index, 1)))
Some(Ok(array.slice(instance_index, 1)))
} else {
Some(Err(crate::ChunkError::IndexOutOfBounds {
kind: "instance".to_owned(),
Expand All @@ -335,7 +313,7 @@ impl UnitChunkShared {
let res = self.component_instance_raw(&C::name(), instance_index)?;

let array = match res {
Ok(array) => ArrayRef::from(array),
Ok(array) => array,
Err(err) => return Some(Err(err)),
};

Expand All @@ -354,10 +332,10 @@ impl UnitChunkShared {
pub fn component_mono_raw(
&self,
component_name: &ComponentName,
) -> Option<ChunkResult<Box<dyn Arrow2Array>>> {
let array = self.component_batch_raw_arrow2(component_name)?;
) -> Option<ChunkResult<ArrowArrayRef>> {
let array = self.component_batch_raw(component_name)?;
if array.len() == 1 {
Some(Ok(array.sliced(0, 1)))
Some(Ok(array.slice(0, 1)))
} else {
Some(Err(crate::ChunkError::IndexOutOfBounds {
kind: "mono".to_owned(),
Expand All @@ -375,7 +353,7 @@ impl UnitChunkShared {
let res = self.component_mono_raw(&C::name())?;

let array = match res {
Ok(array) => ArrayRef::from(array),
Ok(array) => array,
Err(err) => return Some(Err(err)),
};

Expand Down
36 changes: 19 additions & 17 deletions crates/store/re_chunk/src/slice.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use arrow::array::ArrayRef as ArrowArrayRef;
use arrow2::array::{
Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, ListArray as Arrow2ListArray,
};

use itertools::Itertools;
use nohash_hasher::IntSet;

use re_arrow_util::arrow2_util;
use re_arrow_util::arrow_util;
use re_arrow_util::Arrow2ArrayDowncastRef as _;
use re_arrow_util::{arrow2_util, arrow_util, Arrow2ArrayDowncastRef as _};
use re_log_types::Timeline;
use re_types_core::{ComponentDescriptor, ComponentName};

Expand All @@ -28,7 +26,7 @@ impl Chunk {
&self,
row_id: RowId,
component_desc: &ComponentDescriptor,
) -> Option<Box<dyn Arrow2Array>> {
) -> Option<ArrowArrayRef> {
let list_array = self
.components
.get(&component_desc.component_name)
Expand All @@ -51,11 +49,15 @@ impl Chunk {
let found_it =
times.get(index) == Some(&row_id_time_ns) && incs.get(index) == Some(&row_id_inc);

(found_it && list_array.is_valid(index)).then(|| list_array.value(index))
(found_it && list_array.is_valid(index)).then(|| list_array.value(index).into())
} else {
self.row_ids()
.find_position(|id| *id == row_id)
.and_then(|(index, _)| list_array.is_valid(index).then(|| list_array.value(index)))
.and_then(|(index, _)| {
list_array
.is_valid(index)
.then(|| list_array.value(index).into())
})
}
}

Expand Down Expand Up @@ -1002,8 +1004,8 @@ mod tests {

assert!(!chunk.is_sorted());
for (row_id, component_desc, expected) in expectations {
let expected = expected
.and_then(|expected| re_types_core::LoggableBatch::to_arrow2(expected).ok());
let expected =
expected.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_desc} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_desc));
}
Expand All @@ -1012,8 +1014,8 @@ mod tests {
assert!(chunk.is_sorted());

for (row_id, component_desc, expected) in expectations {
let expected = expected
.and_then(|expected| re_types_core::LoggableBatch::to_arrow2(expected).ok());
let expected =
expected.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_desc} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_desc));
}
Expand Down Expand Up @@ -1131,7 +1133,7 @@ mod tests {

for (row_id, component_desc, expected) in expectations {
let expected = expected
.and_then(|expected| re_types_core::LoggableBatch::to_arrow2(expected).ok());
.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_desc} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_desc));
}
Expand Down Expand Up @@ -1162,7 +1164,7 @@ mod tests {

for (row_id, component_desc, expected) in expectations {
let expected = expected
.and_then(|expected| re_types_core::LoggableBatch::to_arrow2(expected).ok());
.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_desc} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_desc));
}
Expand Down Expand Up @@ -1258,7 +1260,7 @@ mod tests {

for (row_id, component_name, expected) in expectations {
let expected = expected
.and_then(|expected| re_types_core::LoggableBatch::to_arrow2(expected).ok());
.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_name} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name));
}
Expand All @@ -1277,7 +1279,7 @@ mod tests {

for (row_id, component_name, expected) in expectations {
let expected = expected
.and_then(|expected| re_types_core::LoggableBatch::to_arrow2(expected).ok());
.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_name} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name));
}
Expand Down Expand Up @@ -1404,7 +1406,7 @@ mod tests {

for (row_id, component_name, expected) in expectations {
let expected = expected
.and_then(|expected| re_types_core::LoggableBatch::to_arrow2(expected).ok());
.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_name} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name));
}
Expand Down Expand Up @@ -1551,7 +1553,7 @@ mod tests {

for (row_id, component_name, expected) in expectations {
let expected = expected
.and_then(|expected| re_types_core::LoggableBatch::to_arrow2(expected).ok());
.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok());
eprintln!("{component_name} @ {row_id}");
similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name));
}
Expand Down
3 changes: 1 addition & 2 deletions crates/store/re_chunk_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ re_types_core.workspace = true
ahash.workspace = true
anyhow.workspace = true
arrow.workspace = true
arrow2 = { workspace = true, features = ["compute_concatenate", "serde_types"] }
arrow2 = { workspace = true, features = ["compute_concatenate"] }
document-features.workspace = true
indent.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
once_cell.workspace = true
parking_lot = { workspace = true, features = ["arc_lock"] }
serde_json.workspace = true
thiserror.workspace = true
web-time.workspace = true

Expand Down
Loading

0 comments on commit e4ef1af

Please sign in to comment.