diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 1f83200d65f8..2b1d09dc9588 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -1010,8 +1010,8 @@ impl FileReaderBuilder { } /// Arrow File reader -pub struct FileReader { - /// Buffered file reader that supports reading and seeking +pub struct FileReader { + /// File reader that supports reading and seeking reader: R, /// The decoder @@ -1032,7 +1032,7 @@ pub struct FileReader { custom_metadata: HashMap, } -impl fmt::Debug for FileReader { +impl fmt::Debug for FileReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("FileReader") .field("decoder", &self.decoder) @@ -1043,10 +1043,26 @@ impl fmt::Debug for FileReader { } } +impl FileReader> { + /// Try to create a new file reader with the reader wrapped in a BufReader. + /// + /// See [`FileReader::try_new`] for an unbuffered version. + pub fn try_new_buffered(reader: R, projection: Option>) -> Result { + Self::try_new(BufReader::new(reader), projection) + } +} + impl FileReader { - /// Try to create a new file reader + /// Try to create a new file reader. /// - /// Returns errors if the file does not meet the Arrow Format footer requirements + /// There is no internal buffering. If buffered reads are needed you likely want to use + /// [`FileReader::try_new_buffered`] instead. + /// + /// # Errors + /// + /// An ['Err'](Result::Err) may be returned if: + /// - the file does not meet the Arrow Format footer requirements, or + /// - file endianness does not match the target endianness. pub fn try_new(reader: R, projection: Option>) -> Result { let builder = FileReaderBuilder { projection, @@ -1129,7 +1145,7 @@ impl RecordBatchReader for FileReader { } /// Arrow Stream reader -pub struct StreamReader { +pub struct StreamReader { /// Stream reader reader: R, @@ -1150,10 +1166,10 @@ pub struct StreamReader { projection: Option<(Vec, Schema)>, } -impl fmt::Debug for StreamReader { +impl fmt::Debug for StreamReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { f.debug_struct("StreamReader") - .field("reader", &"BufReader<..>") + .field("reader", &"R") .field("schema", &self.schema) .field("dictionaries_by_id", &self.dictionaries_by_id) .field("finished", &self.finished) @@ -1163,21 +1179,27 @@ impl fmt::Debug for StreamReader { } impl StreamReader> { - /// Try to create a new stream reader with the reader wrapped in a BufReader + /// Try to create a new stream reader with the reader wrapped in a BufReader. /// - /// The first message in the stream is the schema, the reader will fail if it does not - /// encounter a schema. - /// To check if the reader is done, use `is_finished(self)` - pub fn try_new(reader: R, projection: Option>) -> Result { - Self::try_new_unbuffered(BufReader::new(reader), projection) + /// See [`StreamReader::try_new`] for an unbuffered version. + pub fn try_new_buffered(reader: R, projection: Option>) -> Result { + Self::try_new(BufReader::new(reader), projection) } } impl StreamReader { - /// Try to create a new stream reader but do not wrap the reader in a BufReader. + /// Try to create a new stream reader. /// - /// Unless you need the StreamReader to be unbuffered you likely want to use `StreamReader::try_new` instead. - pub fn try_new_unbuffered( + /// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished). + /// + /// There is no internal buffering. If buffered reads are needed you likely want to use + /// [`StreamReader::try_new_buffered`] instead. + /// + /// # Errors + /// + /// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema + /// as the first message in the stream. + pub fn try_new( mut reader: R, projection: Option>, ) -> Result, ArrowError> { @@ -1224,6 +1246,15 @@ impl StreamReader { }) } + /// Deprecated, use [`StreamReader::try_new`] instead. + #[deprecated(since = "53.0.0", note = "use `try_new` instead")] + pub fn try_new_unbuffered( + reader: R, + projection: Option>, + ) -> Result { + Self::try_new(reader, projection) + } + /// Return the schema of the stream pub fn schema(&self) -> SchemaRef { self.schema.clone() diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 5a8adb31b038..ade902f7cafd 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -821,9 +821,9 @@ impl DictionaryTracker { } /// Writer for an IPC file -pub struct FileWriter { +pub struct FileWriter { /// The object to write to - writer: BufWriter, + writer: W, /// IPC write options write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches @@ -844,21 +844,41 @@ pub struct FileWriter { data_gen: IpcDataGenerator, } +impl FileWriter> { + /// Try to create a new file writer with the writer wrapped in a BufWriter. + /// + /// See [`FileWriter::try_new`] for an unbuffered version. + pub fn try_new_buffered(writer: W, schema: &Schema) -> Result { + Self::try_new(BufWriter::new(writer), schema) + } +} + impl FileWriter { /// Try to create a new writer, with the schema written as part of the header + /// + /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details. + /// + /// # Errors + /// + /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails. pub fn try_new(writer: W, schema: &Schema) -> Result { let write_options = IpcWriteOptions::default(); Self::try_new_with_options(writer, schema, write_options) } /// Try to create a new writer with IpcWriteOptions + /// + /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details. + /// + /// # Errors + /// + /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails. pub fn try_new_with_options( - writer: W, + mut writer: W, schema: &Schema, write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); - let mut writer = BufWriter::new(writer); // write magic to header aligned on alignment boundary let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len()); let header_size = super::ARROW_MAGIC.len() + pad_len; @@ -972,14 +992,14 @@ impl FileWriter { /// Gets a reference to the underlying writer. pub fn get_ref(&self) -> &W { - self.writer.get_ref() + &self.writer } /// Gets a mutable reference to the underlying writer. /// /// It is inadvisable to directly write to the underlying writer. pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() + &mut self.writer } /// Flush the underlying writer. @@ -990,16 +1010,20 @@ impl FileWriter { Ok(()) } - /// Unwraps the BufWriter housed in FileWriter.writer, returning the underlying - /// writer + /// Unwraps the the underlying writer. + /// + /// The writer is flushed and the FileWriter is finished before returning. /// - /// The buffer is flushed and the FileWriter is finished before returning the - /// writer. + /// # Errors + /// + /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter + /// or while flushing the writer. pub fn into_inner(mut self) -> Result { if !self.finished { + // `finish` flushes the writer. self.finish()?; } - self.writer.into_inner().map_err(ArrowError::from) + Ok(self.writer) } } @@ -1014,9 +1038,9 @@ impl RecordBatchWriter for FileWriter { } /// Writer for an IPC stream -pub struct StreamWriter { +pub struct StreamWriter { /// The object to write to - writer: BufWriter, + writer: W, /// IPC write options write_options: IpcWriteOptions, /// Whether the writer footer has been written, and the writer is finished @@ -1027,20 +1051,39 @@ pub struct StreamWriter { data_gen: IpcDataGenerator, } +impl StreamWriter> { + /// Try to create a new stream writer with the writer wrapped in a BufWriter. + /// + /// See [`StreamWriter::try_new`] for an unbuffered version. + pub fn try_new_buffered(writer: W, schema: &Schema) -> Result { + Self::try_new(BufWriter::new(writer), schema) + } +} + impl StreamWriter { - /// Try to create a new writer, with the schema written as part of the header + /// Try to create a new writer, with the schema written as part of the header. + /// + /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`]. + /// + /// # Errors + /// + /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails. pub fn try_new(writer: W, schema: &Schema) -> Result { let write_options = IpcWriteOptions::default(); Self::try_new_with_options(writer, schema, write_options) } + /// Try to create a new writer with [`IpcWriteOptions`]. + /// + /// # Errors + /// + /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails. pub fn try_new_with_options( - writer: W, + mut writer: W, schema: &Schema, write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); - let mut writer = BufWriter::new(writer); // write the schema, set the written bytes to the schema let encoded_message = data_gen.schema_to_bytes(schema, &write_options); write_message(&mut writer, encoded_message, &write_options)?; @@ -1095,14 +1138,14 @@ impl StreamWriter { /// Gets a reference to the underlying writer. pub fn get_ref(&self) -> &W { - self.writer.get_ref() + &self.writer } /// Gets a mutable reference to the underlying writer. /// /// It is inadvisable to directly write to the underlying writer. pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() + &mut self.writer } /// Flush the underlying writer. @@ -1113,16 +1156,14 @@ impl StreamWriter { Ok(()) } - /// Unwraps the BufWriter housed in StreamWriter.writer, returning the underlying - /// writer + /// Unwraps the the underlying writer. /// - /// The buffer is flushed and the StreamWriter is finished before returning the - /// writer. + /// The writer is flushed and the StreamWriter is finished before returning. /// /// # Errors /// - /// An ['Err'] may be returned if an error occurs while finishing the StreamWriter - /// or while flushing the buffer. + /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter + /// or while flushing the writer. /// /// # Example /// @@ -1154,9 +1195,10 @@ impl StreamWriter { /// ``` pub fn into_inner(mut self) -> Result { if !self.finished { + // `finish` flushes. self.finish()?; } - self.writer.into_inner().map_err(ArrowError::from) + Ok(self.writer) } }