Skip to content

Commit

Permalink
Less use of TransportChunk (#8707)
Browse files Browse the repository at this point in the history
* Part of #3741
  • Loading branch information
emilk authored Jan 16, 2025
1 parent e99a507 commit 7e6b79d
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 36 deletions.
10 changes: 0 additions & 10 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,6 @@ impl TransportChunk {
})
}

#[inline]
pub fn fields_and_columns(&self) -> impl Iterator<Item = (&ArrowField, &ArrowArrayRef)> + '_ {
self.fields().iter().enumerate().filter_map(|(i, field)| {
self.batch
.columns()
.get(i)
.map(|column| (field.as_ref(), column))
})
}

/// Iterates all control columns present in this chunk.
#[inline]
pub fn controls(&self) -> impl Iterator<Item = (&ArrowField, &ArrowArrayRef)> {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {

/// Calls [`Self::next_row`] and wraps the result in a [`ArrowRecordBatch`].
///
/// Only use this if you absolutely need a [`RecordBatch`] as this adds a
/// Only use this if you absolutely need a [`ArrowRecordBatch`] as this adds a
/// some overhead for schema validation.
///
/// See [`Self::next_row`] for more information.
Expand Down
9 changes: 5 additions & 4 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ async fn stream_recording_async(
}));
}

let store_info = store_info_from_catalog_chunk(&resp[0], &recording_id)?;
let store_info =
store_info_from_catalog_chunk(&TransportChunk::from(resp[0].clone()), &recording_id)?;
let store_id = store_info.store_id.clone();

re_log::debug!("Fetching {recording_id}…");
Expand Down Expand Up @@ -255,8 +256,8 @@ async fn stream_recording_async(

re_log::info!("Starting to read...");
while let Some(result) = resp.next().await {
let tc = result.map_err(TonicStatusError)?;
let chunk = Chunk::from_transport(&tc)?;
let batch = result.map_err(TonicStatusError)?;
let chunk = Chunk::from_record_batch(batch)?;

if tx
.send(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?))
Expand Down Expand Up @@ -391,7 +392,7 @@ async fn stream_catalog_async(

re_log::info!("Starting to read...");
while let Some(result) = resp.next().await {
let input = result.map_err(TonicStatusError)?;
let input = TransportChunk::from(result.map_err(TonicStatusError)?);

// Catalog received from the ReDap server isn't suitable for direct conversion to a Rerun Chunk:
// - conversion expects "data" columns to be ListArrays, hence we need to convert any individual row column data to ListArray
Expand Down
18 changes: 10 additions & 8 deletions crates/store/re_log_encoding/src/codec/wire/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
use crate::codec::arrow::read_arrow_from_bytes;
use crate::codec::CodecError;
use re_chunk::TransportChunk;
use arrow::array::RecordBatch as ArrowRecordBatch;

use re_protos::common::v0::RerunChunk;
use re_protos::remote_store::v0::DataframePart;

use crate::codec::arrow::read_arrow_from_bytes;
use crate::codec::CodecError;

/// Decode transport data from a byte stream.
fn decode(
version: re_protos::common::v0::EncoderVersion,
data: &[u8],
) -> Result<TransportChunk, CodecError> {
) -> Result<ArrowRecordBatch, CodecError> {
match version {
re_protos::common::v0::EncoderVersion::V0 => {
let mut reader = std::io::Cursor::new(data);
let batch = read_arrow_from_bytes(&mut reader)?;
Ok(TransportChunk::from(batch))
Ok(batch)
}
}
}

/// Decode an object from a its wire (protobuf) representation.
pub trait Decode {
fn decode(&self) -> Result<TransportChunk, CodecError>;
fn decode(&self) -> Result<ArrowRecordBatch, CodecError>;
}

impl Decode for DataframePart {
fn decode(&self) -> Result<TransportChunk, CodecError> {
fn decode(&self) -> Result<ArrowRecordBatch, CodecError> {
decode(self.encoder_version(), &self.payload)
}
}

impl Decode for RerunChunk {
fn decode(&self) -> Result<TransportChunk, CodecError> {
fn decode(&self) -> Result<ArrowRecordBatch, CodecError> {
decode(self.encoder_version(), &self.payload)
}
}
2 changes: 1 addition & 1 deletion crates/store/re_log_encoding/src/codec/wire/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod tests {
.unwrap();

let decoded = encoded.decode().unwrap();
let decoded_chunk = Chunk::from_transport(&decoded).unwrap();
let decoded_chunk = Chunk::from_record_batch(decoded).unwrap();

assert_eq!(expected_chunk, decoded_chunk);
}
Expand Down
6 changes: 5 additions & 1 deletion crates/store/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ re_types_core.workspace = true
ahash.workspace = true
anyhow.workspace = true
arrow = { workspace = true, features = ["ipc"] }
arrow2 = { workspace = true, features = ["io_print", "compute_concatenate"] }
arrow2 = { workspace = true, features = [
"arrow",
"io_print",
"compute_concatenate",
] }
backtrace.workspace = true
bytemuck.workspace = true
clean-path.workspace = true
Expand Down
20 changes: 9 additions & 11 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ impl PyStorageNodeClient {
));
}

re_grpc_client::store_info_from_catalog_chunk(&resp[0], id)
re_grpc_client::store_info_from_catalog_chunk(
&re_chunk::TransportChunk::from(resp[0].clone()),
id,
)
})
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

Expand Down Expand Up @@ -173,7 +176,7 @@ impl PyStorageNodeClient {
.unwrap_or_else(|| ArrowSchema::empty().into());

Ok(RecordBatchIterator::new(
batches.into_iter().map(|tc| Ok(tc.into())),
batches.into_iter().map(Ok),
schema,
))
});
Expand Down Expand Up @@ -234,10 +237,7 @@ impl PyStorageNodeClient {
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

let record_batches: Vec<Result<RecordBatch, arrow::error::ArrowError>> =
transport_chunks
.into_iter()
.map(|tc| Ok(tc.into()))
.collect();
transport_chunks.into_iter().map(Ok).collect();

// TODO(jleibs): surfacing this schema is awkward. This should be more explicit in
// the gRPC APIs somehow.
Expand Down Expand Up @@ -346,9 +346,7 @@ impl PyStorageNodeClient {
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

let recording_id = metadata
.fields_and_columns()
.find(|(field, _data)| field.name() == "rerun_recording_id")
.map(|(_field, data)| data)
.column_by_name("rerun_recording_id")
.ok_or(PyRuntimeError::new_err("No rerun_recording_id"))?
.downcast_array_ref::<arrow::array::StringArray>()
.ok_or(PyRuntimeError::new_err("Recording Id is not a string"))?
Expand Down Expand Up @@ -480,13 +478,13 @@ impl PyStorageNodeClient {

while let Some(result) = resp.next().await {
let response = result.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
let tc = match response.decode() {
let batch = match response.decode() {
Ok(tc) => tc,
Err(err) => {
return Err(PyRuntimeError::new_err(err.to_string()));
}
};
let chunk = Chunk::from_transport(&tc)
let chunk = Chunk::from_record_batch(batch)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

store
Expand Down

0 comments on commit 7e6b79d

Please sign in to comment.