Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RecordBatch::schema_ref #5474

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ impl RecordBatch {
self.schema.clone()
}

/// Returns a reference to the [`Schema`] of the record batch.
pub fn schema_ref(&self) -> &SchemaRef {
&self.schema
}

/// Projects the schema onto the specified columns
pub fn project(&self, indices: &[usize]) -> Result<RecordBatch, ArrowError> {
let projected_schema = self.schema.project(indices)?;
Expand Down
14 changes: 7 additions & 7 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
self.check_token(&request)?;
let batch = Self::fake_result().map_err(|e| status!("Could not fake a result", e))?;
let schema = batch.schema();
let batches = vec![batch];
let flight_data = batches_to_flight_data(schema.as_ref(), batches)
let schema = batch.schema_ref();
let batches = vec![batch.clone()];
let flight_data = batches_to_flight_data(schema, batches)
.map_err(|e| status!("Could not convert batches", e))?
.into_iter()
.map(Ok);
Expand Down Expand Up @@ -641,10 +641,10 @@ impl FlightSqlService for FlightSqlServiceImpl {
request: Request<Action>,
) -> Result<ActionCreatePreparedStatementResult, Status> {
self.check_token(&request)?;
let schema = Self::fake_result()
.map_err(|e| status!("Error getting result schema", e))?
.schema();
let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default())
let record_batch =
Self::fake_result().map_err(|e| status!("Error getting result schema", e))?;
let schema = record_batch.schema_ref();
let message = SchemaAsIpc::new(schema, &IpcWriteOptions::default())
.try_into()
.map_err(|e| status!("Unable to serialize schema", e))?;
let IpcMessage(schema_bytes) = message;
Expand Down
8 changes: 4 additions & 4 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl FlightDataEncoder {
let schema = match &self.schema {
Some(schema) => schema.clone(),
// encode the schema if this is the first time we have seen it
None => self.encode_schema(&batch.schema()),
None => self.encode_schema(batch.schema_ref()),
};

// encode the batch
Expand Down Expand Up @@ -565,12 +565,12 @@ mod tests {

let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)])
.expect("cannot create record batch");
let schema = batch.schema();
let schema = batch.schema_ref();

let (_, baseline_flight_batch) = make_flight_data(&batch, &options);

let big_batch = batch.slice(0, batch.num_rows() - 1);
let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(&schema), false)
let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(schema), false)
.expect("failed to optimize");
let (_, optimized_big_flight_batch) = make_flight_data(&optimized_big_batch, &options);

Expand All @@ -581,7 +581,7 @@ mod tests {

let small_batch = batch.slice(0, 1);
let optimized_small_batch =
prepare_batch_for_flight(&small_batch, Arc::clone(&schema), false)
prepare_batch_for_flight(&small_batch, Arc::clone(schema), false)
.expect("failed to optimize");
let (_, optimized_small_flight_batch) = make_flight_data(&optimized_small_batch, &options);

Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/tests/encode_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ async fn roundtrip(input: Vec<RecordBatch>) {
/// When <https://github.com/apache/arrow-rs/issues/3389> is resolved,
/// it should be possible to use `roundtrip`
async fn roundtrip_dictionary(input: Vec<RecordBatch>) {
let schema = Arc::new(prepare_schema_for_flight(&input[0].schema()));
let schema = Arc::new(prepare_schema_for_flight(input[0].schema_ref()));
let expected_output: Vec<_> = input
.iter()
.map(|batch| prepare_batch_for_flight(batch, schema.clone()).unwrap())
Expand Down
8 changes: 4 additions & 4 deletions arrow-flight/tests/flight_sql_client_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl FlightSqlServiceImpl {
let batch = Self::fake_result()?;

Ok(FlightInfo::new()
.try_with_schema(&batch.schema())
.try_with_schema(batch.schema_ref())
.expect("encoding schema")
.with_endpoint(
FlightEndpoint::new().with_ticket(Ticket::new(
Expand Down Expand Up @@ -245,9 +245,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
"part_2" => batch.slice(2, 1),
ticket => panic!("Invalid ticket: {ticket:?}"),
};
let schema = batch.schema();
let batches = vec![batch];
let flight_data = batches_to_flight_data(schema.as_ref(), batches)
let schema = batch.schema_ref();
let batches = vec![batch.clone()];
let flight_data = batches_to_flight_data(schema, batches)
.unwrap()
.into_iter()
.map(Ok);
Expand Down
8 changes: 4 additions & 4 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,7 @@ mod tests {

fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &rb.schema()).unwrap();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
writer.write(rb).unwrap();
writer.finish().unwrap();
drop(writer);
Expand All @@ -1440,7 +1440,7 @@ mod tests {

fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
let mut buf = Vec::new();
let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap();
let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
writer.write(rb).unwrap();
writer.finish().unwrap();
drop(writer);
Expand Down Expand Up @@ -1815,7 +1815,7 @@ mod tests {
let batch = RecordBatch::new_empty(schema);

let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);
Expand All @@ -1842,7 +1842,7 @@ mod tests {
let batch = RecordBatch::new_empty(schema);

let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);
Expand Down
6 changes: 3 additions & 3 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ mod tests {
use super::*;

fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
let mut writer = FileWriter::try_new(vec![], &rb.schema()).unwrap();
let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
writer.write(rb).unwrap();
writer.finish().unwrap();
writer.into_inner().unwrap()
Expand All @@ -1448,7 +1448,7 @@ mod tests {
}

fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
let mut stream_writer = StreamWriter::try_new(vec![], &record.schema()).unwrap();
let mut stream_writer = StreamWriter::try_new(vec![], record.schema_ref()).unwrap();
stream_writer.write(record).unwrap();
stream_writer.finish().unwrap();
stream_writer.into_inner().unwrap()
Expand Down Expand Up @@ -1982,7 +1982,7 @@ mod tests {
)
.expect("new batch");

let mut writer = StreamWriter::try_new(vec![], &batch.schema()).expect("new writer");
let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
writer.write(&batch).expect("write");
let outbuf = writer.into_inner().expect("inner");

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3082,7 +3082,7 @@ mod tests {
.unwrap();

let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
let actual = concat_batches(&batch.schema(), &batches).unwrap();
let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
assert_eq!(actual.num_rows(), selection.row_count());

let mut batch_offset = 0;
Expand Down
Loading