Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Less use of TransportChunk #8707

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,6 @@ impl TransportChunk {
})
}

#[inline]
pub fn fields_and_columns(&self) -> impl Iterator<Item = (&ArrowField, &ArrowArrayRef)> + '_ {
self.fields().iter().enumerate().filter_map(|(i, field)| {
self.batch
.columns()
.get(i)
.map(|column| (field.as_ref(), column))
})
}

/// Iterates all control columns present in this chunk.
#[inline]
pub fn controls(&self) -> impl Iterator<Item = (&ArrowField, &ArrowArrayRef)> {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {

/// Calls [`Self::next_row`] and wraps the result in a [`ArrowRecordBatch`].
///
/// Only use this if you absolutely need a [`RecordBatch`] as this adds a
/// Only use this if you absolutely need a [`ArrowRecordBatch`] as this adds a
/// some overhead for schema validation.
///
/// See [`Self::next_row`] for more information.
Expand Down
9 changes: 5 additions & 4 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ async fn stream_recording_async(
}));
}

let store_info = store_info_from_catalog_chunk(&resp[0], &recording_id)?;
let store_info =
store_info_from_catalog_chunk(&TransportChunk::from(resp[0].clone()), &recording_id)?;
let store_id = store_info.store_id.clone();

re_log::debug!("Fetching {recording_id}…");
Expand Down Expand Up @@ -255,8 +256,8 @@ async fn stream_recording_async(

re_log::info!("Starting to read...");
while let Some(result) = resp.next().await {
let tc = result.map_err(TonicStatusError)?;
let chunk = Chunk::from_transport(&tc)?;
let batch = result.map_err(TonicStatusError)?;
let chunk = Chunk::from_record_batch(batch)?;

if tx
.send(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?))
Expand Down Expand Up @@ -391,7 +392,7 @@ async fn stream_catalog_async(

re_log::info!("Starting to read...");
while let Some(result) = resp.next().await {
let input = result.map_err(TonicStatusError)?;
let input = TransportChunk::from(result.map_err(TonicStatusError)?);

// Catalog received from the ReDap server isn't suitable for direct conversion to a Rerun Chunk:
// - conversion expects "data" columns to be ListArrays, hence we need to convert any individual row column data to ListArray
Expand Down
18 changes: 10 additions & 8 deletions crates/store/re_log_encoding/src/codec/wire/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
use crate::codec::arrow::read_arrow_from_bytes;
use crate::codec::CodecError;
use re_chunk::TransportChunk;
use arrow::array::RecordBatch as ArrowRecordBatch;

use re_protos::common::v0::RerunChunk;
use re_protos::remote_store::v0::DataframePart;

use crate::codec::arrow::read_arrow_from_bytes;
use crate::codec::CodecError;

/// Decode transport data from a byte stream.
fn decode(
version: re_protos::common::v0::EncoderVersion,
data: &[u8],
) -> Result<TransportChunk, CodecError> {
) -> Result<ArrowRecordBatch, CodecError> {
match version {
re_protos::common::v0::EncoderVersion::V0 => {
let mut reader = std::io::Cursor::new(data);
let batch = read_arrow_from_bytes(&mut reader)?;
Ok(TransportChunk::from(batch))
Ok(batch)
}
}
}

/// Decode an object from a its wire (protobuf) representation.
pub trait Decode {
fn decode(&self) -> Result<TransportChunk, CodecError>;
fn decode(&self) -> Result<ArrowRecordBatch, CodecError>;
}

impl Decode for DataframePart {
fn decode(&self) -> Result<TransportChunk, CodecError> {
fn decode(&self) -> Result<ArrowRecordBatch, CodecError> {
decode(self.encoder_version(), &self.payload)
}
}

impl Decode for RerunChunk {
fn decode(&self) -> Result<TransportChunk, CodecError> {
fn decode(&self) -> Result<ArrowRecordBatch, CodecError> {
decode(self.encoder_version(), &self.payload)
}
}
2 changes: 1 addition & 1 deletion crates/store/re_log_encoding/src/codec/wire/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod tests {
.unwrap();

let decoded = encoded.decode().unwrap();
let decoded_chunk = Chunk::from_transport(&decoded).unwrap();
let decoded_chunk = Chunk::from_record_batch(decoded).unwrap();

assert_eq!(expected_chunk, decoded_chunk);
}
Expand Down
6 changes: 5 additions & 1 deletion crates/store/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ re_types_core.workspace = true
ahash.workspace = true
anyhow.workspace = true
arrow = { workspace = true, features = ["ipc"] }
arrow2 = { workspace = true, features = ["io_print", "compute_concatenate"] }
arrow2 = { workspace = true, features = [
"arrow",
"io_print",
"compute_concatenate",
] }
backtrace.workspace = true
bytemuck.workspace = true
clean-path.workspace = true
Expand Down
20 changes: 9 additions & 11 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ impl PyStorageNodeClient {
));
}

re_grpc_client::store_info_from_catalog_chunk(&resp[0], id)
re_grpc_client::store_info_from_catalog_chunk(
&re_chunk::TransportChunk::from(resp[0].clone()),
id,
)
})
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

Expand Down Expand Up @@ -173,7 +176,7 @@ impl PyStorageNodeClient {
.unwrap_or_else(|| ArrowSchema::empty().into());

Ok(RecordBatchIterator::new(
batches.into_iter().map(|tc| Ok(tc.into())),
batches.into_iter().map(Ok),
schema,
))
});
Expand Down Expand Up @@ -234,10 +237,7 @@ impl PyStorageNodeClient {
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

let record_batches: Vec<Result<RecordBatch, arrow::error::ArrowError>> =
transport_chunks
.into_iter()
.map(|tc| Ok(tc.into()))
.collect();
transport_chunks.into_iter().map(Ok).collect();

// TODO(jleibs): surfacing this schema is awkward. This should be more explicit in
// the gRPC APIs somehow.
Expand Down Expand Up @@ -346,9 +346,7 @@ impl PyStorageNodeClient {
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

let recording_id = metadata
.fields_and_columns()
.find(|(field, _data)| field.name() == "rerun_recording_id")
.map(|(_field, data)| data)
.column_by_name("rerun_recording_id")
.ok_or(PyRuntimeError::new_err("No rerun_recording_id"))?
.downcast_array_ref::<arrow::array::StringArray>()
.ok_or(PyRuntimeError::new_err("Recording Id is not a string"))?
Expand Down Expand Up @@ -480,13 +478,13 @@ impl PyStorageNodeClient {

while let Some(result) = resp.next().await {
let response = result.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
let tc = match response.decode() {
let batch = match response.decode() {
Ok(tc) => tc,
Err(err) => {
return Err(PyRuntimeError::new_err(err.to_string()));
}
};
let chunk = Chunk::from_transport(&tc)
let chunk = Chunk::from_record_batch(batch)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

store
Expand Down
Loading