Skip to content

Commit

Permalink
docs: add an example for RecordBatchReceiverStreamBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveLauC committed Jan 17, 2024
1 parent 3f219bc commit 6466168
Showing 1 changed file with 59 additions and 9 deletions.
68 changes: 59 additions & 9 deletions datafusion/physical-plan/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,62 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
}
}

/// Builder for [`RecordBatchReceiverStream`] that propagates errors
/// Builder for `RecordBatchReceiverStream` that propagates errors
/// and panic's correctly.
///
/// [`RecordBatchReceiverStream`] is used to spawn one or more tasks
/// that produce `RecordBatch`es and send them to a single
/// [`RecordBatchReceiverStreamBuilder`] is used to spawn one or more tasks
/// that produce [`RecordBatch`]es and send them to a single
/// `Receiver` which can improve parallelism.
///
/// This also handles propagating panic`s and canceling the tasks.
///
/// # Example
///
/// The following example spawns 2 tasks that will write [`RecordBatch`]es to
/// the `tx` end of the builder, after building the stream, we can receive
/// those batches with calling `.next()`
///
/// ```
/// # use std::sync::Arc;
/// # use datafusion_common::arrow::datatypes::{Schema, Field, DataType};
/// # use datafusion_common::arrow::array::RecordBatch;
/// # use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
/// # use futures::stream::StreamExt;
/// # use tokio::runtime::Builder;
/// # let rt = Builder::new_current_thread().build().unwrap();
/// #
/// # rt.block_on(async {
/// let schema = Arc::new(Schema::new(vec![Field::new("foo", DataType::Int8, false)]));
/// let mut builder = RecordBatchReceiverStreamBuilder::new(Arc::clone(&schema), 10);
///
/// // task 1
/// let tx_1 = builder.tx();
/// let schema_1 = Arc::clone(&schema);
/// builder.spawn(async move {
/// // Your task needs to send batches to the tx
/// tx_1.send(Ok(RecordBatch::new_empty(Arc::clone(&schema_1)))).await.unwrap();
///
/// Ok(())
/// });
///
/// // task 2
/// let tx_2 = builder.tx();
/// let schema_2 = Arc::clone(&schema);
/// builder.spawn(async move {
/// // Your task needs to send batches to the tx
/// tx_2.send(Ok(RecordBatch::new_empty(Arc::clone(&schema_2)))).await.unwrap();
///
/// Ok(())
/// });
///
/// let mut stream = builder.build();
/// while let Some(res_batch) = stream.next().await {
/// // `res_batch` can either from task 1 or 2
///
/// // do something with `res_batch`
/// }
/// # });
/// ```
pub struct RecordBatchReceiverStreamBuilder {
schema: SchemaRef,
inner: ReceiverStreamBuilder<RecordBatch>,
Expand All @@ -186,8 +234,9 @@ impl RecordBatchReceiverStreamBuilder {
/// Spawn task that will be aborted if this builder (or the stream
/// built from it) are dropped
///
/// this is often used to spawn tasks that write to the sender
/// retrieved from `Self::tx`
/// This is often used to spawn tasks that write to the sender
/// retrieved from [`Self::tx`], for examples, see the document
/// of this type.
pub fn spawn<F>(&mut self, task: F)
where
F: Future<Output = Result<()>>,
Expand All @@ -199,8 +248,9 @@ impl RecordBatchReceiverStreamBuilder {
/// Spawn a blocking task that will be aborted if this builder (or the stream
/// built from it) are dropped
///
/// this is often used to spawn tasks that write to the sender
/// retrieved from `Self::tx`
/// This is often used to spawn tasks that write to the sender
/// retrieved from [`Self::tx`], for examples, see the document
/// of this type.
pub fn spawn_blocking<F>(&mut self, f: F)
where
F: FnOnce() -> Result<()>,
Expand All @@ -209,7 +259,7 @@ impl RecordBatchReceiverStreamBuilder {
self.inner.spawn_blocking(f)
}

/// runs the input_partition of the `input` ExecutionPlan on the
/// runs the `partition` of the `input` ExecutionPlan on the
/// tokio threadpool and writes its outputs to this stream
///
/// If the input partition produces an error, the error will be
Expand Down Expand Up @@ -339,7 +389,7 @@ where
}
}

/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// `EmptyRecordBatchStream` can be used to create a [`RecordBatchStream`]
/// that will produce no results
pub struct EmptyRecordBatchStream {
/// Schema wrapped by Arc
Expand Down

0 comments on commit 6466168

Please sign in to comment.