diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 1f83200d65f8..237e5b304672 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -1010,8 +1010,8 @@ impl FileReaderBuilder { } /// Arrow File reader -pub struct FileReader { - /// Buffered file reader that supports reading and seeking +pub struct FileReader { + /// File reader that supports reading and seeking reader: R, /// The decoder @@ -1032,7 +1032,7 @@ pub struct FileReader { custom_metadata: HashMap, } -impl fmt::Debug for FileReader { +impl fmt::Debug for FileReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("FileReader") .field("decoder", &self.decoder) @@ -1043,6 +1043,15 @@ impl fmt::Debug for FileReader { } } +impl FileReader> { + /// Try to create a new file reader with the reader wrapped in a BufReader. + /// + /// See [`FileReader::try_new`] for an unbuffered version. + pub fn try_new_buffered(reader: R, projection: Option>) -> Result { + Self::try_new(BufReader::new(reader), projection) + } +} + impl FileReader { /// Try to create a new file reader /// @@ -1129,7 +1138,7 @@ impl RecordBatchReader for FileReader { } /// Arrow Stream reader -pub struct StreamReader { +pub struct StreamReader { /// Stream reader reader: R, @@ -1150,10 +1159,10 @@ pub struct StreamReader { projection: Option<(Vec, Schema)>, } -impl fmt::Debug for StreamReader { +impl fmt::Debug for StreamReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { f.debug_struct("StreamReader") - .field("reader", &"BufReader<..>") + .field("reader", &"R") .field("schema", &self.schema) .field("dictionaries_by_id", &self.dictionaries_by_id) .field("finished", &self.finished) @@ -1163,21 +1172,24 @@ impl fmt::Debug for StreamReader { } impl StreamReader> { - /// Try to create a new stream reader with the reader wrapped in a BufReader + /// Try to create a new stream reader with the reader wrapped in a BufReader. /// - /// The first message in the stream is the schema, the reader will fail if it does not - /// encounter a schema. - /// To check if the reader is done, use `is_finished(self)` - pub fn try_new(reader: R, projection: Option>) -> Result { - Self::try_new_unbuffered(BufReader::new(reader), projection) + /// See [`StreamReader::try_new`] for an unbuffered version. + pub fn try_new_buffered(reader: R, projection: Option>) -> Result { + Self::try_new(BufReader::new(reader), projection) } } impl StreamReader { - /// Try to create a new stream reader but do not wrap the reader in a BufReader. + /// Try to create a new stream reader. /// - /// Unless you need the StreamReader to be unbuffered you likely want to use `StreamReader::try_new` instead. - pub fn try_new_unbuffered( + /// The first message in the stream is the schema, the reader will fail if it does not + /// encounter a schema. + /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished). + /// + /// There is no internal buffering. If buffered reads are needed you likely want to use + /// [`StreamReader::try_new_buffered`] instead. + pub fn try_new( mut reader: R, projection: Option>, ) -> Result, ArrowError> { @@ -1224,6 +1236,15 @@ impl StreamReader { }) } + /// Deprecated, use [`StreamReader::try_new`] instead. + #[deprecated(since = "53.0.0", note = "use `try_new` instead")] + pub fn try_new_unbuffered( + reader: R, + projection: Option>, + ) -> Result { + Self::try_new(reader, projection) + } + /// Return the schema of the stream pub fn schema(&self) -> SchemaRef { self.schema.clone() diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 5a8adb31b038..de40008a41f4 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -821,9 +821,9 @@ impl DictionaryTracker { } /// Writer for an IPC file -pub struct FileWriter { +pub struct FileWriter { /// The object to write to - writer: BufWriter, + writer: W, /// IPC write options write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches @@ -844,6 +844,15 @@ pub struct FileWriter { data_gen: IpcDataGenerator, } +impl FileWriter> { + /// Try to create a new file writer with the writer wrapped in a BufWriter. + /// + /// See [`FileWriter::try_new`] for an unbuffered version. + pub fn try_new_buffered(writer: W, schema: &Schema) -> Result { + Self::try_new(BufWriter::new(writer), schema) + } +} + impl FileWriter { /// Try to create a new writer, with the schema written as part of the header pub fn try_new(writer: W, schema: &Schema) -> Result { @@ -853,12 +862,11 @@ impl FileWriter { /// Try to create a new writer with IpcWriteOptions pub fn try_new_with_options( - writer: W, + mut writer: W, schema: &Schema, write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); - let mut writer = BufWriter::new(writer); // write magic to header aligned on alignment boundary let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len()); let header_size = super::ARROW_MAGIC.len() + pad_len; @@ -972,14 +980,14 @@ impl FileWriter { /// Gets a reference to the underlying writer. pub fn get_ref(&self) -> &W { - self.writer.get_ref() + &self.writer } /// Gets a mutable reference to the underlying writer. /// /// It is inadvisable to directly write to the underlying writer. pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() + &mut self.writer } /// Flush the underlying writer. @@ -990,16 +998,20 @@ impl FileWriter { Ok(()) } - /// Unwraps the BufWriter housed in FileWriter.writer, returning the underlying - /// writer + /// Unwraps the the underlying writer. + /// + /// The writer is flushed and the FileWriter is finished before returning. /// - /// The buffer is flushed and the FileWriter is finished before returning the - /// writer. + /// # Errors + /// + /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter + /// or while flushing the writer. pub fn into_inner(mut self) -> Result { if !self.finished { + // `finish` flushes the writer. self.finish()?; } - self.writer.into_inner().map_err(ArrowError::from) + Ok(self.writer) } } @@ -1014,9 +1026,9 @@ impl RecordBatchWriter for FileWriter { } /// Writer for an IPC stream -pub struct StreamWriter { +pub struct StreamWriter { /// The object to write to - writer: BufWriter, + writer: W, /// IPC write options write_options: IpcWriteOptions, /// Whether the writer footer has been written, and the writer is finished @@ -1027,6 +1039,15 @@ pub struct StreamWriter { data_gen: IpcDataGenerator, } +impl StreamWriter> { + /// Try to create a new stream writer with the writer wrapped in a BufWriter. + /// + /// See [`StreamWriter::try_new`] for an unbuffered version. + pub fn try_new_buffered(writer: W, schema: &Schema) -> Result { + Self::try_new(BufWriter::new(writer), schema) + } +} + impl StreamWriter { /// Try to create a new writer, with the schema written as part of the header pub fn try_new(writer: W, schema: &Schema) -> Result { @@ -1035,12 +1056,11 @@ impl StreamWriter { } pub fn try_new_with_options( - writer: W, + mut writer: W, schema: &Schema, write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); - let mut writer = BufWriter::new(writer); // write the schema, set the written bytes to the schema let encoded_message = data_gen.schema_to_bytes(schema, &write_options); write_message(&mut writer, encoded_message, &write_options)?; @@ -1095,14 +1115,14 @@ impl StreamWriter { /// Gets a reference to the underlying writer. pub fn get_ref(&self) -> &W { - self.writer.get_ref() + &self.writer } /// Gets a mutable reference to the underlying writer. /// /// It is inadvisable to directly write to the underlying writer. pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() + &mut self.writer } /// Flush the underlying writer. @@ -1113,16 +1133,14 @@ impl StreamWriter { Ok(()) } - /// Unwraps the BufWriter housed in StreamWriter.writer, returning the underlying - /// writer + /// Unwraps the the underlying writer. /// - /// The buffer is flushed and the StreamWriter is finished before returning the - /// writer. + /// The writer is flushed and the StreamWriter is finished before returning. /// /// # Errors /// - /// An ['Err'] may be returned if an error occurs while finishing the StreamWriter - /// or while flushing the buffer. + /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter + /// or while flushing the writer. /// /// # Example /// @@ -1154,9 +1172,10 @@ impl StreamWriter { /// ``` pub fn into_inner(mut self) -> Result { if !self.finished { + // `finish` flushes. self.finish()?; } - self.writer.into_inner().map_err(ArrowError::from) + Ok(self.writer) } }