Skip to content

Commit

Permalink
change ipc::reader and writer APIs for consistent buffering
Browse files Browse the repository at this point in the history
Current writer API automatically wraps the supplied std::io::Writer
impl into a BufWriter.
It is cleaner and more idiomatic to have the default be using the
supplied impl directly, as the user might already have a BufWriter
or an impl that doesn't actually benefit from buffering at all.

StreamReader does a similar thing, but it also exposes a `try_new_unbuffered`
that bypasses the internal wrap.

Here we propose a consistent and non-buffered by default API:
- `try_new` does not wrap the passed reader/writer,
- `try_new_buffered` is a convenience function that does wrap
  the reader/writer into a BufReader/BufWriter,
- all four publicly exposed IPC reader/writers follow the above consistently,
  i.e. `StreamReader`, `FileReader`, `StreamWriter`, `FileWriter`.

Those are breaking changes.

An additional tweak: removed the generic type bounds from struct definitions
on the four types, as that is the idiomatic Rust approach (see e.g. stdlib's
HashMap that has no bounds on the struct definition, only the impl requires
Hash + Eq).

See #6099 for the discussion.
  • Loading branch information
V0ldek committed Jul 27, 2024
1 parent 613e93e commit 646d193
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 39 deletions.
51 changes: 36 additions & 15 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,8 @@ impl FileReaderBuilder {
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
pub struct FileReader<R> {
/// File reader that supports reading and seeking
reader: R,

/// The decoder
Expand All @@ -1032,7 +1032,7 @@ pub struct FileReader<R: Read + Seek> {
custom_metadata: HashMap<String, String>,
}

impl<R: Read + Seek> fmt::Debug for FileReader<R> {
impl<R> fmt::Debug for FileReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("FileReader<R>")
.field("decoder", &self.decoder)
Expand All @@ -1043,6 +1043,15 @@ impl<R: Read + Seek> fmt::Debug for FileReader<R> {
}
}

impl<R: Read + Seek> FileReader<BufReader<R>> {
/// Try to create a new file reader with the reader wrapped in a BufReader.
///
/// See [`FileReader::try_new`] for an unbuffered version.
pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
Self::try_new(BufReader::new(reader), projection)
}
}

impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
///
Expand Down Expand Up @@ -1129,7 +1138,7 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
}

/// Arrow Stream reader
pub struct StreamReader<R: Read> {
pub struct StreamReader<R> {
/// Stream reader
reader: R,

Expand All @@ -1150,10 +1159,10 @@ pub struct StreamReader<R: Read> {
projection: Option<(Vec<usize>, Schema)>,
}

impl<R: Read> fmt::Debug for StreamReader<R> {
impl<R> fmt::Debug for StreamReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
f.debug_struct("StreamReader<R>")
.field("reader", &"BufReader<..>")
.field("reader", &"R")
.field("schema", &self.schema)
.field("dictionaries_by_id", &self.dictionaries_by_id)
.field("finished", &self.finished)
Expand All @@ -1163,21 +1172,24 @@ impl<R: Read> fmt::Debug for StreamReader<R> {
}

impl<R: Read> StreamReader<BufReader<R>> {
/// Try to create a new stream reader with the reader wrapped in a BufReader
/// Try to create a new stream reader with the reader wrapped in a BufReader.
///
/// The first message in the stream is the schema, the reader will fail if it does not
/// encounter a schema.
/// To check if the reader is done, use `is_finished(self)`
pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
Self::try_new_unbuffered(BufReader::new(reader), projection)
/// See [`StreamReader::try_new`] for an unbuffered version.
pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
Self::try_new(BufReader::new(reader), projection)
}
}

impl<R: Read> StreamReader<R> {
/// Try to create a new stream reader but do not wrap the reader in a BufReader.
/// Try to create a new stream reader.
///
/// Unless you need the StreamReader to be unbuffered you likely want to use `StreamReader::try_new` instead.
pub fn try_new_unbuffered(
/// The first message in the stream is the schema, the reader will fail if it does not
/// encounter a schema.
/// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
///
/// There is no internal buffering. If buffered reads are needed you likely want to use
/// [`StreamReader::try_new_buffered`] instead.
pub fn try_new(
mut reader: R,
projection: Option<Vec<usize>>,
) -> Result<StreamReader<R>, ArrowError> {
Expand Down Expand Up @@ -1224,6 +1236,15 @@ impl<R: Read> StreamReader<R> {
})
}

/// Deprecated, use [`StreamReader::try_new`] instead.
#[deprecated(since = "53.0.0", note = "use `try_new` instead")]
pub fn try_new_unbuffered(
reader: R,
projection: Option<Vec<usize>>,
) -> Result<Self, ArrowError> {
Self::try_new(reader, projection)
}

/// Return the schema of the stream
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
Expand Down
67 changes: 43 additions & 24 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,9 +821,9 @@ impl DictionaryTracker {
}

/// Writer for an IPC file
pub struct FileWriter<W: Write> {
pub struct FileWriter<W> {
/// The object to write to
writer: BufWriter<W>,
writer: W,
/// IPC write options
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
Expand All @@ -844,6 +844,15 @@ pub struct FileWriter<W: Write> {
data_gen: IpcDataGenerator,
}

impl<W: Write> FileWriter<BufWriter<W>> {
/// Try to create a new file writer with the writer wrapped in a BufWriter.
///
/// See [`FileWriter::try_new`] for an unbuffered version.
pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
Self::try_new(BufWriter::new(writer), schema)
}
}

impl<W: Write> FileWriter<W> {
/// Try to create a new writer, with the schema written as part of the header
pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
Expand All @@ -853,12 +862,11 @@ impl<W: Write> FileWriter<W> {

/// Try to create a new writer with IpcWriteOptions
pub fn try_new_with_options(
writer: W,
mut writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
let mut writer = BufWriter::new(writer);
// write magic to header aligned on alignment boundary
let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
let header_size = super::ARROW_MAGIC.len() + pad_len;
Expand Down Expand Up @@ -972,14 +980,14 @@ impl<W: Write> FileWriter<W> {

/// Gets a reference to the underlying writer.
pub fn get_ref(&self) -> &W {
self.writer.get_ref()
&self.writer
}

/// Gets a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer.
pub fn get_mut(&mut self) -> &mut W {
self.writer.get_mut()
&mut self.writer
}

/// Flush the underlying writer.
Expand All @@ -990,16 +998,20 @@ impl<W: Write> FileWriter<W> {
Ok(())
}

/// Unwraps the BufWriter housed in FileWriter.writer, returning the underlying
/// writer
/// Unwraps the the underlying writer.
///
/// The writer is flushed and the FileWriter is finished before returning.
///
/// The buffer is flushed and the FileWriter is finished before returning the
/// writer.
/// # Errors
///
/// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
/// or while flushing the writer.
pub fn into_inner(mut self) -> Result<W, ArrowError> {
if !self.finished {
// `finish` flushes the writer.
self.finish()?;
}
self.writer.into_inner().map_err(ArrowError::from)
Ok(self.writer)
}
}

Expand All @@ -1014,9 +1026,9 @@ impl<W: Write> RecordBatchWriter for FileWriter<W> {
}

/// Writer for an IPC stream
pub struct StreamWriter<W: Write> {
pub struct StreamWriter<W> {
/// The object to write to
writer: BufWriter<W>,
writer: W,
/// IPC write options
write_options: IpcWriteOptions,
/// Whether the writer footer has been written, and the writer is finished
Expand All @@ -1027,6 +1039,15 @@ pub struct StreamWriter<W: Write> {
data_gen: IpcDataGenerator,
}

impl<W: Write> StreamWriter<BufWriter<W>> {
/// Try to create a new stream writer with the writer wrapped in a BufWriter.
///
/// See [`StreamWriter::try_new`] for an unbuffered version.
pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
Self::try_new(BufWriter::new(writer), schema)
}
}

impl<W: Write> StreamWriter<W> {
/// Try to create a new writer, with the schema written as part of the header
pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
Expand All @@ -1035,12 +1056,11 @@ impl<W: Write> StreamWriter<W> {
}

pub fn try_new_with_options(
writer: W,
mut writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
let mut writer = BufWriter::new(writer);
// write the schema, set the written bytes to the schema
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
write_message(&mut writer, encoded_message, &write_options)?;
Expand Down Expand Up @@ -1095,14 +1115,14 @@ impl<W: Write> StreamWriter<W> {

/// Gets a reference to the underlying writer.
pub fn get_ref(&self) -> &W {
self.writer.get_ref()
&self.writer
}

/// Gets a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer.
pub fn get_mut(&mut self) -> &mut W {
self.writer.get_mut()
&mut self.writer
}

/// Flush the underlying writer.
Expand All @@ -1113,16 +1133,14 @@ impl<W: Write> StreamWriter<W> {
Ok(())
}

/// Unwraps the BufWriter housed in StreamWriter.writer, returning the underlying
/// writer
/// Unwraps the the underlying writer.
///
/// The buffer is flushed and the StreamWriter is finished before returning the
/// writer.
/// The writer is flushed and the StreamWriter is finished before returning.
///
/// # Errors
///
/// An ['Err'] may be returned if an error occurs while finishing the StreamWriter
/// or while flushing the buffer.
/// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
/// or while flushing the writer.
///
/// # Example
///
Expand Down Expand Up @@ -1154,9 +1172,10 @@ impl<W: Write> StreamWriter<W> {
/// ```
pub fn into_inner(mut self) -> Result<W, ArrowError> {
if !self.finished {
// `finish` flushes.
self.finish()?;
}
self.writer.into_inner().map_err(ArrowError::from)
Ok(self.writer)
}
}

Expand Down

0 comments on commit 646d193

Please sign in to comment.