Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation for SyncIoBridge with examples and alternatives #6815

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions tokio-util/src/io/sync_bridge.rs
Nathy-bajo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,169 @@ use tokio::io::{

/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
///
/// # Alternatives
///
/// In many cases, there are better alternatives to using `SyncIoBridge`, especially if you
/// want to avoid blocking the async runtime. Consider the following scenarios:
///
/// ## Example 1: Hashing Data
///
/// When hashing data, using `SyncIoBridge` can lead to suboptimal performance and might not fully leverage the async capabilities of the system.
Nathy-bajo marked this conversation as resolved.
Show resolved Hide resolved
///
/// ### Why It Matters:
/// `SyncIoBridge` allows you to use synchronous I/O operations in an asynchronous context by blocking the current thread. However, this can be inefficient because:
/// Blocking: The use of `SyncIoBridge` may block a valuable async runtime thread, which could otherwise be used to handle more tasks concurrently.
/// Thread Pool Saturation: If many threads are blocked using `SyncIoBridge`, it can exhaust the async runtime's thread pool, leading to increased latency and reduced throughput.
/// Lack of Parallelism: By blocking on synchronous operations, you may miss out on the benefits of running tasks concurrently, especially in I/O-bound operations where async tasks could be interleaved.
///
/// Instead, consider reading the data into memory and then hashing it, or processing the data in chunks.
Nathy-bajo marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```rust
/// use tokio::io::AsyncReadExt;
/// # mod blake3 { pub fn hash(_: &[u8]) {} }
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut reader = tokio::io::empty();
///
/// // Read all data from the reader into a Vec<u8>.
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
///
/// // Hash the data using the blake3 hashing function.
/// let hash = blake3::hash(&data);
///
/// Ok(())
/// }
/// ```
///
/// Or, for more complex cases:
///
/// ```rust
/// use tokio::io::AsyncReadExt;
/// # struct Hasher;
/// # impl Hasher { pub fn update(&mut self, _: &[u8]) {} pub fn finalize(&self) {} }
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut reader = tokio::io::empty();
Nathy-bajo marked this conversation as resolved.
Show resolved Hide resolved
/// let mut hasher = Hasher;
///
/// // Create a buffer to read data into, sized for performance.
/// let mut data = vec![0; 64 * 1024];
/// loop {
/// //Read data from the reader into the buffer.
/// let len = reader.read(&mut data).await?;
/// if len == 0 { break; } // Exit loop if no more data.
///
/// // Update the hash with the data read.
/// hasher.update(&data[..len]);
/// }
///
/// // Finalize the hash after all data has been processed.
/// let hash = hasher.finalize();
///
/// Ok(())
/// }
/// ```
///
/// ## Example 2: Compressing Data
///
/// When compressing data, avoid using `SyncIoBridge` with non-async compression libraries, as it may lead to inefficient and blocking code.
/// Instead, use an async compression library such as the [`async-compression`](https://docs.rs/async-compression/latest/async_compression/) crate,
/// which is designed to work with asynchronous data streams.
///
/// ```ignore
/// use async_compression::tokio::write::GzipEncoder;
/// use tokio::io::AsyncReadExt;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut reader = tokio::io::empty();
/// let mut writer = tokio::io::sink();
///
/// // Create a Gzip encoder that wraps the writer.
/// let mut encoder = GzipEncoder::new(writer);
///
/// // Copy data from the reader to the encoder, compressing it.
/// tokio::io::copy(&mut reader, &mut encoder).await?;
///
/// Ok(())
/// }
/// ```
///
/// ## Example 3: Parsing `JSON`
Nathy-bajo marked this conversation as resolved.
Show resolved Hide resolved
///
/// When parsing serialization formats such as `JSON`, avoid using `SyncIoBridge` with functions that
/// `deserialize` data from a type implementing `std::io::Read`, such as `serde_json::from_reader`.
///
/// ```rust,no_run
/// use tokio::io::AsyncReadExt;
/// # mod serde {
/// # pub trait DeserializeOwned: 'static {}
/// # impl<T: 'static> DeserializeOwned for T {}
/// # }
/// # mod serde_json {
/// # use super::serde::DeserializeOwned;
/// # pub fn from_slice<T: DeserializeOwned>(_: &[u8]) -> Result<T, std::io::Error> {
/// # unimplemented!()
/// # }
/// # }
/// # #[derive(Debug)] struct MyStruct;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut reader = tokio::io::empty();
///
/// // Read all data from the reader into a Vec<u8>.
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
///
/// // Deserialize the data from the Vec<u8> into a MyStruct instance.
/// let value: MyStruct = serde_json::from_slice(&data)?;
///
/// Ok(())
/// }
/// ```
///
/// ## Correct Usage of `SyncIoBridge` inside `spawn_blocking`
///
/// `SyncIoBridge` is mainly useful when you need to interface with synchronous libraries from an asynchronous context.
/// Here is how you can do it correctly:
///
/// ```rust
/// use tokio::task::spawn_blocking;
/// use tokio_util::io::SyncIoBridge;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
///
/// let reader = tokio::io::empty();
///
/// // Wrap the async reader with SyncIoBridge to allow synchronous reading.
/// let mut sync_reader = SyncIoBridge::new(reader);
///
/// // Spawn a blocking task to perform synchronous I/O operations.
/// let result = spawn_blocking(move || {
/// // Create an in-memory buffer to hold the copied data.
/// let mut buffer = Vec::new();
///
/// // Copy data from the sync_reader to the buffer.
/// std::io::copy(&mut sync_reader, &mut buffer)?;
///
/// // Return the buffer containing the copied data.
/// Ok::<_, std::io::Error>(buffer)
/// })
/// .await??;
///
/// // You can use `result` here as needed.
/// // `result` contains the data read into the buffer.
///
/// Ok(())
/// }
/// ```
///
#[derive(Debug)]
pub struct SyncIoBridge<T> {
src: T,
Expand Down
Loading