Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve documentation (and ASCII art) about streaming execution, and thread pools #13423

Merged
merged 9 commits into from
Nov 18, 2024
189 changes: 175 additions & 14 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,14 @@
//!
//! Calling [`execute`] produces 1 or more partitions of data,
//! as a [`SendableRecordBatchStream`], which implements a pull based execution
//! API. Calling `.next().await` will incrementally compute and return the next
//! API. Calling [`next()`]`.await` will incrementally compute and return the next
//! [`RecordBatch`]. Balanced parallelism is achieved using [Volcano style]
//! "Exchange" operations implemented by [`RepartitionExec`].
//!
//! While some recent research such as [Morsel-Driven Parallelism] describes challenges
//! with the pull style Volcano execution model on NUMA architectures, in practice DataFusion achieves
//! similar scalability as systems that use morsel driven approach such as DuckDB.
//! See the [DataFusion paper submitted to SIGMOD] for more details.
//! similar scalability as systems that use push driven schedulers [such as DuckDB].
//! See the [DataFusion paper in SIGMOD 2024] for more details.
//!
//! [`execute`]: physical_plan::ExecutionPlan::execute
//! [`SendableRecordBatchStream`]: crate::physical_plan::SendableRecordBatchStream
Expand All @@ -403,22 +403,183 @@
//! [`RepartitionExec`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
//! [Volcano style]: https://w6113.github.io/files/papers/volcanoparallelism-89.pdf
//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
//! [DataFusion paper submitted SIGMOD]: https://github.com/apache/datafusion/files/13874720/DataFusion_Query_Engine___SIGMOD_2024.pdf
//! [DataFusion paper in SIGMOD 2024]: https://github.com/apache/datafusion/files/15149988/DataFusion_Query_Engine___SIGMOD_2024-FINAL-mk4.pdf
//! [such as DuckDB]: https://github.com/duckdb/duckdb/issues/1583
//! [implementors of `ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors
//!
//! ## Thread Scheduling
//! ## Streaming Execution
//!
//! DataFusion incrementally computes output from a [`SendableRecordBatchStream`]
//! with `target_partitions` threads. Parallelism is implementing using multiple
//! [Tokio] [`task`]s, which are executed by threads managed by a tokio Runtime.
//! While tokio is most commonly used
//! for asynchronous network I/O, its combination of an efficient, work-stealing
//! scheduler, first class compiler support for automatic continuation generation,
//! and exceptional performance makes it a compelling choice for CPU intensive
//! applications as well. This is explained in more detail in [Using Rustlang’s Async Tokio
//! Runtime for CPU-Bound Tasks].
//! DataFusion is a "streaming" query engine which means `ExecutionPlan`s incrementally
//! read from their input(s) and compute output one [`RecordBatch`] at a time
//! by continually polling [`SendableRecordBatchStream`]s. Output and
//! intermediate `RecordBatch`s each have approximately `batch_size` rows,
//! which amortizes per-batch overhead of execution.
//!
//! Note that certain operations, sometimes called "pipeline breakers",
//! (for example full sorts or hash aggregations) are fundamentally non streaming and
//! must read their input fully before producing **any** output. As much as possible,
//! other operators read a single [`RecordBatch`] from their input to produce a
//! single `RecordBatch` as output.
//!
//! For example, given this SQL query:
//!
//! ```sql
//! SELECT date_trunc('month', time) FROM data WHERE id IN (10,20,30);
//! ```
//!
//! The diagram below shows the call sequence when a consumer calls [`next()`] to
//! get the next `RecordBatch` of output. While it is possible that some
//! steps run on different threads, typically tokio will use the same thread
//! that called `next()` to read from the input, apply the filter, and
//! return the results without interleaving any other operations. This results
//! in excellent cache locality as the same CPU core that produces the data often
//! consumes it immediately as well.
//!
//! ```text
//!
//! Step 3: FilterExec calls next() Step 2: ProjectionExec calls
//! on input Stream next() on input Stream
//! ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
//! │ Step 1: Consumer
//! ▼ ▼ │ calls next()
//! ┏━━━━━━━━━━━━━━┓ ┏━━━━━┻━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━┓
//! ┃ ┃ ┃ ┃ ┃ ◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! ┃ DataSource ┃ ┃ ┃ ┃ ┃
//! ┃ (e.g. ┃ ┃ FilterExec ┃ ┃ ProjectionExec ┃
//! ┃ ParquetExec) ┃ ┃id IN (10, 20, 30) ┃ ┃date_bin('month', time) ┃
//! ┃ ┃ ┃ ┃ ┃ ┣ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶
//! ┃ ┃ ┃ ┃ ┃ ┃
//! ┗━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━┳━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━━━━━━┛
//! │ ▲ ▲ Step 6: ProjectionExec
//! │ │ │ computes date_trunc into a
//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ new RecordBatch returned
//! ┌─────────────────────┐ ┌─────────────┐ from client
//! │ RecordBatch │ │ RecordBatch │
//! └─────────────────────┘ └─────────────┘
//!
//! Step 4: DataSource returns a Step 5: FilterExec returns a new
//! single RecordBatch RecordBatch with only matching rows
//! ```
//!
//! [`next()`]: futures::StreamExt::next
//!
//! ## Thread Scheduling, CPU / IO Thread Pools, and [Tokio] [`Runtime`]s
//!
//! DataFusion automatically runs each plan with multiple CPU cores using
//! a [Tokio] [`Runtime`] as a thread pool. While tokio is most commonly used
//! for asynchronous network I/O, the combination of an efficient, work-stealing
//! scheduler and first class compiler support for automatic continuation
//! generation (`async`), also makes it a compelling choice for CPU intensive
//! applications as explained in the [Using Rustlang’s Async Tokio
//! Runtime for CPU-Bound Tasks] blog.
Comment on lines +473 to +474
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes -- I think it does (line 584)

//!
//! The number of cores used is determined by the `target_partitions`
//! configuration setting, which defaults to the number of CPU cores.
//! During execution, DataFusion creates this many distinct `async` [`Stream`]s and
//! this many distinct [Tokio] [`task`]s, which drive the `Stream`s
Comment on lines +478 to +479
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was aware that target_partitions and number of Streams matched the number of CPU cores but I was not aware that we cap the number of Tokio tasks as well. Can you expand on that point / are we getting all the benefits of the tokio runtime (work stealing, etc) by keeping that number (relatively) low? for context the local task queue size per thread is 256 tasks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is somewhat nuanced -- what happens is that certain operations (like RepartionExec spawn tasks to read from the inputs. As written I now see this sentence is somewhat misleading. I will try and clarify in a follow on PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to clarify in #13474

//! using threads managed by the `Runtime`. Many DataFusion `Stream`s perform
//! CPU intensive processing.
//!
//! Using `async` for CPU intensive tasks makes it easy for [`TableProvider`]s
//! to perform network I/O using standard Rust `async` during execution.
//! However, this design also makes it very easy to mix CPU intensive and latency
//! sensitive I/O work on the same thread pool ([`Runtime`]).
//! Using the same (default) `Runtime` is convenient, and often works well for
//! initial development and processing local files, but it can lead to problems
//! under load and/or when reading from network sources such as AWS S3.
//!
//! If your system does not fully utilize either the CPU or network bandwidth
//! during execution, or you see significantly higher tail (e.g. p99) latencies
//! responding to network requests, **it is likely you need to use a different
//! `Runtime` for CPU intensive DataFusion plans**. This effect can be especially
//! pronounced when running several queries concurrently.
//!
//! As shown in the following figure, using the same `Runtime` for both CPU
//! intensive processing and network requests can introduce significant
//! delays in responding to those network requests. Delays in processing network
//! requests can and does lead network flow control to throttle the available
//! bandwidth in response.
//!
//! ```text
//! Legend
//!
//! ┏━━━━━━┓
//! Processing network request ┃ ┃ CPU bound work
//! is delayed due to processing ┗━━━━━━┛
//! CPU bound work ┌─┐
//! │ │ Network request
//! ││ └─┘ processing
//!
//! ││
//! ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! │ │
//!
//! ▼ ▼
//! ┌─────────────┐ ┌─┐┌─┐┏━━━━━━━━━━━━━━━━━━━┓┏━━━━━━━━━━━━━━━━━━━┓┌─┐
//! │ │thread 1 │ ││ │┃ Decoding ┃┃ Filtering ┃│ │
//! │ │ └─┘└─┘┗━━━━━━━━━━━━━━━━━━━┛┗━━━━━━━━━━━━━━━━━━━┛└─┘
//! │ │ ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
//! │Tokio Runtime│thread 2 ┃ Decoding ┃ Filtering ┃ Decoding ┃ ...
//! │(thread pool)│ ┗━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛
//! │ │ ... ...
//! │ │ ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓┌─┐ ┏━━━━━━━━━━━━━━┓
//! │ │thread N ┃ Decoding ┃ Filtering ┃│ │ ┃ Decoding ┃
//! └─────────────┘ ┗━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┛└─┘ ┗━━━━━━━━━━━━━━┛
//! ─────────────────────────────────────────────────────────────▶
//! time
//! ```
//!
//! The bottleneck resulting from network throttling can be avoided
//! by using separate [`Runtime`]s for the different types of work, as shown
//! in the diagram below.
//!
//! ```text
//! A separate thread pool processes network Legend
//! requests, reducing the latency for
//! processing each request ┏━━━━━━┓
//! ┃ ┃ CPU bound work
//! │ ┗━━━━━━┛
//! │ ┌─┐
//! ┌ ─ ─ ─ ─ ┘ │ │ Network request
//! ┌ ─ ─ ─ ┘ └─┘ processing
//! │
//! ▼ ▼
//! ┌─────────────┐ ┌─┐┌─┐┌─┐
//! │ │thread 1 │ ││ ││ │
//! │ │ └─┘└─┘└─┘
//! │Tokio Runtime│ ...
//! │(thread pool)│thread 2
//! │ │
//! │"IO Runtime" │ ...
//! │ │ ┌─┐
//! │ │thread N │ │
//! └─────────────┘ └─┘
//! ─────────────────────────────────────────────────────────────▶
//! time
//!
//! ┌─────────────┐ ┏━━━━━━━━━━━━━━━━━━━┓┏━━━━━━━━━━━━━━━━━━━┓
//! │ │thread 1 ┃ Decoding ┃┃ Filtering ┃
//! │ │ ┗━━━━━━━━━━━━━━━━━━━┛┗━━━━━━━━━━━━━━━━━━━┛
//! │Tokio Runtime│ ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
//! │(thread pool)│thread 2 ┃ Decoding ┃ Filtering ┃ Decoding ┃ ...
//! │ │ ┗━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛
//! │ CPU Runtime │ ... ...
//! │ │ ┏━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
//! │ │thread N ┃ Decoding ┃ Filtering ┃ Decoding ┃
//! └─────────────┘ ┗━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━┛
//! ─────────────────────────────────────────────────────────────▶
//! time
//!```
//!
//! Note that DataFusion does not use [`tokio::task::spawn_blocking`] for
//! CPU-bounded work, because `spawn_blocking` is designed for blocking **IO**,
//! not designed CPU bound tasks. Among other challenges, spawned blocking
//! tasks can't yield waiting for input (can't call `await`) so they
//! can't be used to limit the number of concurrent CPU bound tasks or
//! keep the processing pipeline to the same core.
//!
//! [Tokio]: https://tokio.rs
//! [`Runtime`]: tokio::runtime::Runtime
//! [`task`]: tokio::task
//! [Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks]: https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
//!
Expand Down