From 2b3684b39621c188b59cfbb801ce885ce38a53b8 Mon Sep 17 00:00:00 2001 From: Clide Stefani <109172241+Monkwire3@users.noreply.github.com> Date: Tue, 5 Mar 2024 13:51:37 -0500 Subject: [PATCH 1/2] Add RecordBatch::schema_ref --- arrow-array/src/record_batch.rs | 5 +++++ arrow-flight/examples/flight_sql_server.rs | 12 ++++++------ arrow-flight/src/encode.rs | 8 ++++---- arrow-flight/tests/encode_decode.rs | 2 +- arrow-flight/tests/flight_sql_client_cli.rs | 8 ++++---- arrow-ipc/src/reader.rs | 8 ++++---- arrow-ipc/src/writer.rs | 6 +++--- parquet/src/arrow/arrow_reader/mod.rs | 2 +- 8 files changed, 28 insertions(+), 23 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index d89020a65681..314445bba617 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -236,6 +236,11 @@ impl RecordBatch { self.schema.clone() } + /// Returns a reference to the [`Schema`] of the record batch. + pub fn schema_ref(&self) -> &SchemaRef { + &self.schema + } + /// Projects the schema onto the specified columns pub fn project(&self, indices: &[usize]) -> Result { let projected_schema = self.schema.project(indices)?; diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs index 85f5c7499346..c0ceb7743e11 100644 --- a/arrow-flight/examples/flight_sql_server.rs +++ b/arrow-flight/examples/flight_sql_server.rs @@ -193,9 +193,9 @@ impl FlightSqlService for FlightSqlServiceImpl { ) -> Result::DoGetStream>, Status> { self.check_token(&request)?; let batch = Self::fake_result().map_err(|e| status!("Could not fake a result", e))?; - let schema = batch.schema(); - let batches = vec![batch]; - let flight_data = batches_to_flight_data(schema.as_ref(), batches) + let schema = batch.schema_ref(); + let batches = vec![batch.clone()]; + let flight_data = batches_to_flight_data(schema, batches) .map_err(|e| status!("Could not convert batches", e))? .into_iter() .map(Ok); @@ -641,9 +641,9 @@ impl FlightSqlService for FlightSqlServiceImpl { request: Request, ) -> Result { self.check_token(&request)?; - let schema = Self::fake_result() - .map_err(|e| status!("Error getting result schema", e))? - .schema(); + let record_batch = + Self::fake_result().map_err(|e| status!("Error getting result schema", e))?; + let schema = record_batch.schema_ref(); let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default()) .try_into() .map_err(|e| status!("Unable to serialize schema", e))?; diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index e6ef9994d487..bb0436816209 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -320,7 +320,7 @@ impl FlightDataEncoder { let schema = match &self.schema { Some(schema) => schema.clone(), // encode the schema if this is the first time we have seen it - None => self.encode_schema(&batch.schema()), + None => self.encode_schema(batch.schema_ref()), }; // encode the batch @@ -565,12 +565,12 @@ mod tests { let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)]) .expect("cannot create record batch"); - let schema = batch.schema(); + let schema = batch.schema_ref(); let (_, baseline_flight_batch) = make_flight_data(&batch, &options); let big_batch = batch.slice(0, batch.num_rows() - 1); - let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(&schema), false) + let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(schema), false) .expect("failed to optimize"); let (_, optimized_big_flight_batch) = make_flight_data(&optimized_big_batch, &options); @@ -581,7 +581,7 @@ mod tests { let small_batch = batch.slice(0, 1); let optimized_small_batch = - prepare_batch_for_flight(&small_batch, Arc::clone(&schema), false) + prepare_batch_for_flight(&small_batch, Arc::clone(schema), false) .expect("failed to optimize"); let (_, optimized_small_flight_batch) = make_flight_data(&optimized_small_batch, &options); diff --git a/arrow-flight/tests/encode_decode.rs b/arrow-flight/tests/encode_decode.rs index 789233b918d0..224b12500a08 100644 --- a/arrow-flight/tests/encode_decode.rs +++ b/arrow-flight/tests/encode_decode.rs @@ -465,7 +465,7 @@ async fn roundtrip(input: Vec) { /// When is resolved, /// it should be possible to use `roundtrip` async fn roundtrip_dictionary(input: Vec) { - let schema = Arc::new(prepare_schema_for_flight(&input[0].schema())); + let schema = Arc::new(prepare_schema_for_flight(input[0].schema_ref())); let expected_output: Vec<_> = input .iter() .map(|batch| prepare_batch_for_flight(batch, schema.clone()).unwrap()) diff --git a/arrow-flight/tests/flight_sql_client_cli.rs b/arrow-flight/tests/flight_sql_client_cli.rs index a28080450bc2..cc270eeb6186 100644 --- a/arrow-flight/tests/flight_sql_client_cli.rs +++ b/arrow-flight/tests/flight_sql_client_cli.rs @@ -189,7 +189,7 @@ impl FlightSqlServiceImpl { let batch = Self::fake_result()?; Ok(FlightInfo::new() - .try_with_schema(&batch.schema()) + .try_with_schema(batch.schema_ref()) .expect("encoding schema") .with_endpoint( FlightEndpoint::new().with_ticket(Ticket::new( @@ -245,9 +245,9 @@ impl FlightSqlService for FlightSqlServiceImpl { "part_2" => batch.slice(2, 1), ticket => panic!("Invalid ticket: {ticket:?}"), }; - let schema = batch.schema(); - let batches = vec![batch]; - let flight_data = batches_to_flight_data(schema.as_ref(), batches) + let schema = batch.schema_ref(); + let batches = vec![batch.clone()]; + let flight_data = batches_to_flight_data(schema, batches) .unwrap() .into_iter() .map(Ok); diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 81b8b530734a..361c4f7f67dd 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -1429,7 +1429,7 @@ mod tests { fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch { let mut buf = Vec::new(); - let mut writer = crate::writer::FileWriter::try_new(&mut buf, &rb.schema()).unwrap(); + let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); writer.write(rb).unwrap(); writer.finish().unwrap(); drop(writer); @@ -1440,7 +1440,7 @@ mod tests { fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch { let mut buf = Vec::new(); - let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap(); + let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); writer.write(rb).unwrap(); writer.finish().unwrap(); drop(writer); @@ -1815,7 +1815,7 @@ mod tests { let batch = RecordBatch::new_empty(schema); let mut buf = Vec::new(); - let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap(); + let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap(); writer.write(&batch).unwrap(); writer.finish().unwrap(); drop(writer); @@ -1842,7 +1842,7 @@ mod tests { let batch = RecordBatch::new_empty(schema); let mut buf = Vec::new(); - let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap(); + let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap(); writer.write(&batch).unwrap(); writer.finish().unwrap(); drop(writer); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 1f6bf5f6fa85..99e52e2a7076 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -1436,7 +1436,7 @@ mod tests { use super::*; fn serialize_file(rb: &RecordBatch) -> Vec { - let mut writer = FileWriter::try_new(vec![], &rb.schema()).unwrap(); + let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap(); writer.write(rb).unwrap(); writer.finish().unwrap(); writer.into_inner().unwrap() @@ -1448,7 +1448,7 @@ mod tests { } fn serialize_stream(record: &RecordBatch) -> Vec { - let mut stream_writer = StreamWriter::try_new(vec![], &record.schema()).unwrap(); + let mut stream_writer = StreamWriter::try_new(vec![], record.schema_ref()).unwrap(); stream_writer.write(record).unwrap(); stream_writer.finish().unwrap(); stream_writer.into_inner().unwrap() @@ -1982,7 +1982,7 @@ mod tests { ) .expect("new batch"); - let mut writer = StreamWriter::try_new(vec![], &batch.schema()).expect("new writer"); + let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer"); writer.write(&batch).expect("write"); let outbuf = writer.into_inner().expect("inner"); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 6b6146042051..7aeb3d127ac3 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -3082,7 +3082,7 @@ mod tests { .unwrap(); let batches = reader.collect::, _>>().unwrap(); - let actual = concat_batches(&batch.schema(), &batches).unwrap(); + let actual = concat_batches(batch.schema_ref(), &batches).unwrap(); assert_eq!(actual.num_rows(), selection.row_count()); let mut batch_offset = 0; From 3d8acaa60e4f20b29832e746ed948bad5b2eeace Mon Sep 17 00:00:00 2001 From: Clide Stefani <109172241+Monkwire3@users.noreply.github.com> Date: Tue, 5 Mar 2024 19:29:34 -0500 Subject: [PATCH 2/2] Fix Clippy errors --- arrow-flight/examples/flight_sql_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs index c0ceb7743e11..efd8b6dec90f 100644 --- a/arrow-flight/examples/flight_sql_server.rs +++ b/arrow-flight/examples/flight_sql_server.rs @@ -644,7 +644,7 @@ impl FlightSqlService for FlightSqlServiceImpl { let record_batch = Self::fake_result().map_err(|e| status!("Error getting result schema", e))?; let schema = record_batch.schema_ref(); - let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default()) + let message = SchemaAsIpc::new(schema, &IpcWriteOptions::default()) .try_into() .map_err(|e| status!("Unable to serialize schema", e))?; let IpcMessage(schema_bytes) = message;