Skip to content

Commit

Permalink
added a flush method to IPC writers
Browse files Browse the repository at this point in the history
While the writers expose `get_ref` and `get_mut` to access the underlying
`io::Write` writer, there is an internal layer of a `BufWriter` that is not accessible.
Because of that, there is no way to ensure that all messages written thus far to the
`StreamWriter` or `FileWriter` have actually been passed to the underlying writer.

Here we expose a `flush` method that flushes the internal buffer and the underlying writer.

See apache#6099 for the discussion.
  • Loading branch information
V0ldek committed Jul 24, 2024
1 parent af40ea3 commit 24a461c
Showing 1 changed file with 63 additions and 0 deletions.
63 changes: 63 additions & 0 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,14 @@ impl<W: Write> FileWriter<W> {
self.writer.get_mut()
}

/// Flush the underlying writer.
///
/// Both the BufWriter and the underlying writer are flushed.
pub fn flush(&mut self) -> Result<(), ArrowError> {
self.writer.flush()?;
Ok(())
}

/// Unwraps the BufWriter housed in FileWriter.writer, returning the underlying
/// writer
///
Expand Down Expand Up @@ -1097,6 +1105,14 @@ impl<W: Write> StreamWriter<W> {
self.writer.get_mut()
}

/// Flush the underlying writer.
///
/// Both the BufWriter and the underlying writer are flushed.
pub fn flush(&mut self) -> Result<(), ArrowError> {
self.writer.flush()?;
Ok(())
}

/// Unwraps the BufWriter housed in StreamWriter.writer, returning the underlying
/// writer
///
Expand Down Expand Up @@ -2615,4 +2631,51 @@ mod tests {
offset from expected alignment of 16 by 8"
);
}

#[test]
fn test_flush() {
// We write a schema which is small enough to fit into a buffer and not get flushed,
// and then force the write with .flush().
let num_cols = 2;
let mut fields = Vec::new();
let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
for i in 0..num_cols {
let field = Field::new(&format!("col_{}", i), DataType::Decimal128(38, 10), true);
fields.push(field);
}
let schema = Schema::new(fields);
let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
let mut stream_writer =
StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
.unwrap();
let mut file_writer =
FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();

let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
stream_writer.flush().unwrap();
file_writer.flush().unwrap();
let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
// Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
// and then a length of 0 (4 bytes) for a total of 8 bytes.
// Everything before that should have been flushed in the .flush() call.
let expected_stream_flushed_bytes = stream_out.len() - 8;
// A file write is the same as the stream write except for the leading magic string
// ARROW1 plus padding, which is 8 bytes.
let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;

assert!(
stream_bytes_written_on_new < stream_bytes_written_on_flush,
"this test makes no sense if flush is not actually required"
);
assert!(
file_bytes_written_on_new < file_bytes_written_on_flush,
"this test makes no sense if flush is not actually required"
);
assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
}
}

0 comments on commit 24a461c

Please sign in to comment.