diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ab3381dff257..4f0fe7163654 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -150,7 +150,7 @@ The parquet SQL benchmarks can be run with cargo bench --bench parquet_query_sql ``` -These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths. +These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/core/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths. If the environment variable `PARQUET_FILE` is set, the benchmark will run queries against this file instead of a randomly generated one. This can be useful for performing multiple runs, potentially with different code, against the same source data, or for testing against a custom dataset. diff --git a/Cargo.toml b/Cargo.toml index fefd5679a188..297b394b4521 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,8 @@ [workspace] members = [ - "datafusion/core", "datafusion/common", + "datafusion/core", "datafusion/expr", "datafusion/jit", "datafusion/physical-expr", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 551d3e59b4b9..6dc8acb3d4d4 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion" readme = "../README.md" authors = ["Apache Arrow "] license = "Apache-2.0" -keywords = [ "arrow", "query", "sql" ] +keywords = ["arrow", "query", "sql"] include = [ "benches/*.rs", "src/**/*.rs", @@ -50,6 +50,8 @@ pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"] regex_expressions = ["datafusion-physical-expr/regex_expressions"] # Used to enable row format experiment row = ["datafusion-row"] +# Used to enable scheduler +scheduler = ["rayon"] simd = ["arrow/simd"] unicode_expressions = ["datafusion-physical-expr/regex_expressions"] @@ -75,9 +77,10 @@ ordered-float = "3.0" parking_lot = "0.12" parquet = { version = "13", features = ["arrow"] } paste = "^1.0" -pin-project-lite= "^0.2.7" +pin-project-lite = "^0.2.7" pyo3 = { version = "0.16", optional = true } rand = "0.8" +rayon = { version = "1.5", optional = true } smallvec = { version = "1.6", features = ["union"] } sqlparser = "0.16" tempfile = "3" @@ -88,6 +91,7 @@ uuid = { version = "1.0", features = ["v4"] } [dev-dependencies] criterion = "0.3" doc-comment = "0.3" +env_logger = "0.9" fuzz-utils = { path = "fuzz-utils" } [[bench]] @@ -121,6 +125,7 @@ name = "physical_plan" [[bench]] harness = false name = "parquet_query_sql" +required-features = ["scheduler"] [[bench]] harness = false diff --git a/datafusion/core/benches/parquet_query_sql.rs b/datafusion/core/benches/parquet_query_sql.rs index 08156fad45e2..26a041fb47e3 100644 --- a/datafusion/core/benches/parquet_query_sql.rs +++ b/datafusion/core/benches/parquet_query_sql.rs @@ -24,7 +24,9 @@ use arrow::datatypes::{ }; use arrow::record_batch::RecordBatch; use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion::scheduler::Scheduler; +use futures::stream::StreamExt; use parquet::arrow::ArrowWriter; use parquet::file::properties::{WriterProperties, WriterVersion}; use rand::distributions::uniform::SampleUniform; @@ -37,7 +39,6 @@ use std::path::Path; use std::sync::Arc; use std::time::Instant; use tempfile::NamedTempFile; -use tokio_stream::StreamExt; /// The number of batches to write const NUM_BATCHES: usize = 2048; @@ -193,15 +194,24 @@ fn criterion_benchmark(c: &mut Criterion) { assert!(Path::new(&file_path).exists(), "path not found"); println!("Using parquet file {}", file_path); - let context = SessionContext::new(); + let partitions = 4; + let config = SessionConfig::new().with_target_partitions(partitions); + let context = SessionContext::with_config(config); - let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - rt.block_on(context.register_parquet( - "t", - file_path.as_str(), - ParquetReadOptions::default(), - )) - .unwrap(); + let scheduler = Scheduler::new(partitions); + + let local_rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + let query_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(partitions) + .build() + .unwrap(); + + local_rt + .block_on(context.register_parquet("t", file_path.as_str(), Default::default())) + .unwrap(); // We read the queries from a file so they can be changed without recompiling the benchmark let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap(); @@ -220,17 +230,42 @@ fn criterion_benchmark(c: &mut Criterion) { continue; } - let query = query.as_str(); - c.bench_function(query, |b| { + c.bench_function(&format!("tokio: {}", query), |b| { b.iter(|| { + let query = query.clone(); let context = context.clone(); - rt.block_on(async move { - let query = context.sql(query).await.unwrap(); + let (sender, mut receiver) = futures::channel::mpsc::unbounded(); + + // Spawn work to a separate tokio thread pool + query_rt.spawn(async move { + let query = context.sql(&query).await.unwrap(); let mut stream = query.execute_stream().await.unwrap(); - while criterion::black_box(stream.next().await).is_some() {} + + while let Some(next) = stream.next().await { + sender.unbounded_send(next).unwrap(); + } + }); + + local_rt.block_on(async { + while receiver.next().await.transpose().unwrap().is_some() {} }) }); }); + + c.bench_function(&format!("scheduled: {}", query), |b| { + b.iter(|| { + let query = query.clone(); + let context = context.clone(); + + local_rt.block_on(async { + let query = context.sql(&query).await.unwrap(); + let plan = query.create_physical_plan().await.unwrap(); + let mut stream = + scheduler.schedule(plan, context.task_ctx()).unwrap(); + while stream.next().await.transpose().unwrap().is_some() {} + }); + }); + }); } // Temporary file must outlive the benchmarks, it is deleted when dropped diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 055a17f4e907..c598d9a33cef 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -218,6 +218,8 @@ pub mod physical_optimizer; pub mod physical_plan; pub mod prelude; pub mod scalar; +#[cfg(feature = "scheduler")] +pub mod scheduler; pub mod sql; pub mod variable; diff --git a/datafusion/core/src/scheduler/mod.rs b/datafusion/core/src/scheduler/mod.rs new file mode 100644 index 000000000000..a765ddf83384 --- /dev/null +++ b/datafusion/core/src/scheduler/mod.rs @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A [`Scheduler`] maintains a pool of dedicated worker threads on which +//! query execution can be scheduled. This is based on the idea of [Morsel-Driven Parallelism] +//! and is designed to decouple the execution parallelism from the parallelism expressed in +//! the physical plan as partitions. +//! +//! # Implementation +//! +//! When provided with an [`ExecutionPlan`] the [`Scheduler`] first breaks it up into smaller +//! chunks called pipelines. Each pipeline may consist of one or more nodes from the +//! [`ExecutionPlan`] tree. +//! +//! The scheduler then maintains a list of pending [`Task`], that identify a partition within +//! a particular pipeline that may be able to make progress on some "morsel" of data. These +//! [`Task`] are then scheduled on the worker pool, with a preference for scheduling work +//! on a given "morsel" on the same thread that produced it. +//! +//! # Rayon +//! +//! Under-the-hood these [`Task`] are scheduled by [rayon], which is a lightweight, work-stealing +//! scheduler optimised for CPU-bound workloads. Pipelines may exploit this fact, and use [rayon]'s +//! structured concurrency primitives to express additional parallelism that may be exploited +//! if there are idle threads available at runtime +//! +//! # Shutdown +//! +//! Queries scheduled on a [`Scheduler`] will run to completion even if the +//! [`Scheduler`] is dropped +//! +//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf +//! [rayon]: https://docs.rs/rayon/latest/rayon/ +//! +//! # Example +//! +//! ```rust +//! # use futures::TryStreamExt; +//! # use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext}; +//! # use datafusion_scheduler::Scheduler; +//! +//! # #[tokio::main] +//! # async fn main() { +//! let scheduler = Scheduler::new(4); +//! let config = SessionConfig::new().with_target_partitions(4); +//! let context = SessionContext::with_config(config); +//! +//! context.register_csv("example", "../core/tests/example.csv", CsvReadOptions::new()).await.unwrap(); +//! let plan = context.sql("SELECT MIN(b) FROM example") +//! .await +//! .unwrap() +//! .create_physical_plan() +//! .await +//! .unwrap(); +//! +//! let task = context.task_ctx(); +//! let stream = scheduler.schedule(plan, task).unwrap(); +//! let scheduled: Vec<_> = stream.try_collect().await.unwrap(); +//! # } +//! ``` +//! + +use std::sync::Arc; + +use futures::stream::BoxStream; +use log::{debug, error}; + +use crate::error::Result; +use crate::execution::context::TaskContext; +use crate::physical_plan::ExecutionPlan; + +use plan::{PipelinePlan, PipelinePlanner, RoutablePipeline}; +use task::{spawn_plan, Task}; + +use rayon::{ThreadPool, ThreadPoolBuilder}; + +pub use task::ExecutionResults; + +mod pipeline; +mod plan; +mod task; + +/// Builder for a [`Scheduler`] +#[derive(Debug)] +pub struct SchedulerBuilder { + inner: ThreadPoolBuilder, +} + +impl SchedulerBuilder { + /// Create a new [`SchedulerConfig`] with the provided number of threads + pub fn new(num_threads: usize) -> Self { + let builder = ThreadPoolBuilder::new() + .num_threads(num_threads) + .panic_handler(|p| error!("{}", format_worker_panic(p))) + .thread_name(|idx| format!("df-worker-{}", idx)); + + Self { inner: builder } + } + + /// Registers a custom panic handler + #[cfg(test)] + fn panic_handler(self, panic_handler: H) -> Self + where + H: Fn(Box) + Send + Sync + 'static, + { + Self { + inner: self.inner.panic_handler(panic_handler), + } + } + + /// Build a new [`Scheduler`] + fn build(self) -> Scheduler { + Scheduler { + pool: Arc::new(self.inner.build().unwrap()), + } + } +} + +/// A [`Scheduler`] that can be used to schedule [`ExecutionPlan`] on a dedicated thread pool +pub struct Scheduler { + pool: Arc, +} + +impl Scheduler { + /// Create a new [`Scheduler`] with `num_threads` new threads in a dedicated thread pool + pub fn new(num_threads: usize) -> Self { + SchedulerBuilder::new(num_threads).build() + } + + /// Schedule the provided [`ExecutionPlan`] on this [`Scheduler`]. + /// + /// Returns a [`ExecutionResults`] that can be used to receive results as they are produced, + /// as a [`futures::Stream`] of [`RecordBatch`] + pub fn schedule( + &self, + plan: Arc, + context: Arc, + ) -> Result { + let plan = PipelinePlanner::new(plan, context).build()?; + Ok(self.schedule_plan(plan)) + } + + /// Schedule the provided [`PipelinePlan`] on this [`Scheduler`]. + pub(crate) fn schedule_plan(&self, plan: PipelinePlan) -> ExecutionResults { + spawn_plan(plan, self.spawner()) + } + + fn spawner(&self) -> Spawner { + Spawner { + pool: self.pool.clone(), + } + } +} + +/// Formats a panic message for a worker +fn format_worker_panic(panic: Box) -> String { + let maybe_idx = rayon::current_thread_index(); + let worker: &dyn std::fmt::Display = match &maybe_idx { + Some(idx) => idx, + None => &"UNKNOWN", + }; + + let message = if let Some(msg) = panic.downcast_ref::<&str>() { + *msg + } else if let Some(msg) = panic.downcast_ref::() { + msg.as_str() + } else { + "UNKNOWN" + }; + + format!("worker {} panicked with: {}", worker, message) +} + +/// Returns `true` if the current thread is a rayon worker thread +/// +/// Note: if there are multiple rayon pools, this will return `true` if the current thread +/// belongs to ANY rayon pool, even if this isn't a worker thread of a [`Scheduler`] instance +fn is_worker() -> bool { + rayon::current_thread_index().is_some() +} + +/// Spawn a [`Task`] onto the local workers thread pool +/// +/// There is no guaranteed order of execution, as workers may steal at any time. However, +/// `spawn_local` will append to the front of the current worker's queue, workers pop tasks from +/// the front of their queue, and steal tasks from the back of other workers queues +/// +/// The effect is that tasks spawned using `spawn_local` will typically be prioritised in +/// a LIFO order, however, this should not be relied upon +fn spawn_local(task: Task) { + // Verify is a worker thread to avoid creating a global pool + assert!(is_worker(), "must be called from a worker"); + rayon::spawn(|| task.do_work()) +} + +/// Spawn a [`Task`] onto the local workers thread pool with fifo ordering +/// +/// There is no guaranteed order of execution, as workers may steal at any time. However, +/// `spawn_local_fifo` will append to the back of the current worker's queue, workers pop tasks +/// from the front of their queue, and steal tasks from the back of other workers queues +/// +/// The effect is that tasks spawned using `spawn_local_fifo` will typically be prioritised +/// in a FIFO order, however, this should not be relied upon +fn spawn_local_fifo(task: Task) { + // Verify is a worker thread to avoid creating a global pool + assert!(is_worker(), "must be called from a worker"); + rayon::spawn_fifo(|| task.do_work()) +} + +#[derive(Debug, Clone)] +pub struct Spawner { + pool: Arc, +} + +impl Spawner { + pub fn spawn(&self, task: Task) { + debug!("Spawning {:?} to any worker", task); + self.pool.spawn(move || task.do_work()); + } +} + +#[cfg(test)] +mod tests { + use arrow::util::pretty::pretty_format_batches; + use std::ops::Range; + use std::panic::panic_any; + + use futures::{StreamExt, TryStreamExt}; + use log::info; + use rand::distributions::uniform::SampleUniform; + use rand::{thread_rng, Rng}; + + use crate::arrow::array::{ArrayRef, PrimitiveArray}; + use crate::arrow::datatypes::{ArrowPrimitiveType, Float64Type, Int32Type}; + use crate::arrow::record_batch::RecordBatch; + use crate::datasource::{MemTable, TableProvider}; + use crate::physical_plan::displayable; + use crate::prelude::{SessionConfig, SessionContext}; + + use super::*; + + fn generate_primitive( + rng: &mut R, + len: usize, + valid_percent: f64, + range: Range, + ) -> ArrayRef + where + T: ArrowPrimitiveType, + T::Native: SampleUniform, + R: Rng, + { + Arc::new(PrimitiveArray::::from_iter((0..len).map(|_| { + rng.gen_bool(valid_percent) + .then(|| rng.gen_range(range.clone())) + }))) + } + + fn generate_batch( + rng: &mut R, + row_count: usize, + id_offset: i32, + ) -> RecordBatch { + let id_range = id_offset..(row_count as i32 + id_offset); + let a = generate_primitive::(rng, row_count, 0.5, 0..1000); + let b = generate_primitive::(rng, row_count, 0.5, 0. ..1000.); + let id = PrimitiveArray::::from_iter_values(id_range); + + RecordBatch::try_from_iter_with_nullable([ + ("a", a, true), + ("b", b, true), + ("id", Arc::new(id), false), + ]) + .unwrap() + } + + const BATCHES_PER_PARTITION: usize = 20; + const ROWS_PER_BATCH: usize = 100; + const NUM_PARTITIONS: usize = 2; + + fn make_batches() -> Vec> { + let mut rng = thread_rng(); + + let mut id_offset = 0; + + (0..NUM_PARTITIONS) + .map(|_| { + (0..BATCHES_PER_PARTITION) + .map(|_| { + let batch = generate_batch(&mut rng, ROWS_PER_BATCH, id_offset); + id_offset += ROWS_PER_BATCH as i32; + batch + }) + .collect() + }) + .collect() + } + + fn make_provider() -> Arc { + let batches = make_batches(); + let schema = batches.first().unwrap().first().unwrap().schema(); + Arc::new(MemTable::try_new(schema, make_batches()).unwrap()) + } + + fn init_logging() { + let _ = env_logger::builder().is_test(true).try_init(); + } + + #[tokio::test] + async fn test_simple() { + init_logging(); + + let scheduler = Scheduler::new(4); + + let config = SessionConfig::new().with_target_partitions(4); + let context = SessionContext::with_config(config); + + context.register_table("table1", make_provider()).unwrap(); + context.register_table("table2", make_provider()).unwrap(); + + let queries = [ + "select * from table1 order by id", + "select * from table1 where table1.a > 100 order by id", + "select distinct a from table1 where table1.b > 100 order by a", + "select * from table1 join table2 on table1.id = table2.id order by table1.id", + "select id from table1 union all select id from table2 order by id", + "select id from table1 union all select id from table2 where a > 100 order by id", + "select id, b from (select id, b from table1 union all select id, b from table2 where a > 100 order by id) as t where b > 10 order by id, b", + "select id, MIN(b), MAX(b), AVG(b) from table1 group by id order by id", + "select count(*) from table1 where table1.a > 4", + ]; + + for sql in queries { + let task = context.task_ctx(); + + let query = context.sql(sql).await.unwrap(); + + let plan = query.create_physical_plan().await.unwrap(); + + info!("Plan: {}", displayable(plan.as_ref()).indent()); + + let stream = scheduler.schedule(plan, task).unwrap().stream(); + let scheduled: Vec<_> = stream.try_collect().await.unwrap(); + let expected = query.collect().await.unwrap(); + + let total_expected = expected.iter().map(|x| x.num_rows()).sum::(); + let total_scheduled = scheduled.iter().map(|x| x.num_rows()).sum::(); + assert_eq!(total_expected, total_scheduled); + + info!("Query \"{}\" produced {} rows", sql, total_expected); + + let expected = pretty_format_batches(&expected).unwrap().to_string(); + let scheduled = pretty_format_batches(&scheduled).unwrap().to_string(); + + assert_eq!( + expected, scheduled, + "\n\nexpected:\n\n{}\nactual:\n\n{}\n\n", + expected, scheduled + ); + } + } + + #[tokio::test] + async fn test_partitioned() { + init_logging(); + + let scheduler = Scheduler::new(4); + + let config = SessionConfig::new().with_target_partitions(4); + let context = SessionContext::with_config(config); + let plan = context + .read_table(make_provider()) + .unwrap() + .create_physical_plan() + .await + .unwrap(); + + assert_eq!(plan.output_partitioning().partition_count(), NUM_PARTITIONS); + + let results = scheduler + .schedule(plan.clone(), context.task_ctx()) + .unwrap(); + + let batches = results.stream().try_collect::>().await.unwrap(); + assert_eq!(batches.len(), NUM_PARTITIONS * BATCHES_PER_PARTITION); + + for batch in batches { + assert_eq!(batch.num_rows(), ROWS_PER_BATCH) + } + + let results = scheduler.schedule(plan, context.task_ctx()).unwrap(); + let streams = results.stream_partitioned(); + + let partitions: Vec> = + futures::future::try_join_all(streams.into_iter().map(|s| s.try_collect())) + .await + .unwrap(); + + assert_eq!(partitions.len(), NUM_PARTITIONS); + for batches in partitions { + assert_eq!(batches.len(), BATCHES_PER_PARTITION); + for batch in batches { + assert_eq!(batch.num_rows(), ROWS_PER_BATCH); + } + } + } + + #[tokio::test] + async fn test_panic() { + init_logging(); + + let do_test = |scheduler: Scheduler| { + scheduler.pool.spawn(|| panic!("test")); + scheduler.pool.spawn(|| panic!("{}", 1)); + scheduler.pool.spawn(|| panic_any(21)); + }; + + // The default panic handler should log panics and not abort the process + do_test(Scheduler::new(1)); + + // Override panic handler and capture panics to test formatting + let (sender, receiver) = futures::channel::mpsc::unbounded(); + let scheduler = SchedulerBuilder::new(1) + .panic_handler(move |panic| { + let _ = sender.unbounded_send(format_worker_panic(panic)); + }) + .build(); + + do_test(scheduler); + + // Sort as order not guaranteed + let mut buffer: Vec<_> = receiver.collect().await; + buffer.sort_unstable(); + + assert_eq!(buffer.len(), 3); + assert_eq!(buffer[0], "worker 0 panicked with: 1"); + assert_eq!(buffer[1], "worker 0 panicked with: UNKNOWN"); + assert_eq!(buffer[2], "worker 0 panicked with: test"); + } +} diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs new file mode 100644 index 000000000000..baf487d98c80 --- /dev/null +++ b/datafusion/core/src/scheduler/pipeline/execution.rs @@ -0,0 +1,330 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::fmt::Formatter; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +use arrow::error::ArrowError; +use async_trait::async_trait; +use futures::{Stream, StreamExt, TryStreamExt}; +use parking_lot::Mutex; + +use crate::arrow::datatypes::SchemaRef; +use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; +use crate::error::Result; +use crate::execution::context::TaskContext; +use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::metrics::MetricsSet; +use crate::physical_plan::{ + displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, +}; + +use crate::scheduler::pipeline::Pipeline; +use crate::scheduler::BoxStream; + +/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and +/// converts it to the push-based [`Pipeline`] interface +/// +/// Internally [`ExecutionPipeline`] is still pull-based which limits its parallelism +/// to that of its output partitioning, however, it provides full compatibility with +/// [`ExecutionPlan`] allowing full interoperability with the existing ecosystem +/// +/// Longer term we will likely want to introduce new traits that differentiate between +/// pipeline-able operators like filters, and pipeline-breakers like aggregations, and +/// are better aligned with a push-based execution model. +/// +/// This in turn will allow for [`Pipeline`] implementations that are able to introduce +/// parallelism beyond that expressed in their partitioning +pub struct ExecutionPipeline { + proxied: Arc, + inputs: Vec>>>, + outputs: Vec>>>, +} + +impl std::fmt::Debug for ExecutionPipeline { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let tree = debug_tree(self.proxied.as_ref()); + f.debug_tuple("ExecutionNode").field(&tree).finish() + } +} + +impl ExecutionPipeline { + pub fn new( + plan: Arc, + task_context: Arc, + depth: usize, + ) -> Result { + // The point in the plan at which to splice the plan graph + let mut splice_point = plan; + let mut parent_plans = Vec::with_capacity(depth.saturating_sub(1)); + for _ in 0..depth { + let children = splice_point.children(); + assert_eq!( + children.len(), + 1, + "can only group through nodes with a single child" + ); + parent_plans.push(splice_point); + splice_point = children.into_iter().next().unwrap(); + } + + // The children to replace with [`ProxyExecutionPlan`] + let children = splice_point.children(); + let mut inputs = Vec::with_capacity(children.len()); + + // The spliced plan with its children replaced with [`ProxyExecutionPlan`] + let spliced = if !children.is_empty() { + let mut proxies: Vec> = + Vec::with_capacity(children.len()); + + for child in children { + let count = child.output_partitioning().partition_count(); + + let mut child_inputs = Vec::with_capacity(count); + for _ in 0..count { + child_inputs.push(Default::default()) + } + + inputs.push(child_inputs.clone()); + proxies.push(Arc::new(ProxyExecutionPlan { + inner: child, + inputs: child_inputs, + })); + } + + splice_point.with_new_children(proxies)? + } else { + splice_point.clone() + }; + + // Reconstruct the parent graph + let mut proxied = spliced; + for parent in parent_plans.into_iter().rev() { + proxied = parent.with_new_children(vec![proxied])? + } + + // Construct the output streams + let output_count = proxied.output_partitioning().partition_count(); + let outputs = (0..output_count) + .map(|x| { + let proxy_captured = proxied.clone(); + let task_captured = task_context.clone(); + let fut = async move { + proxy_captured + .execute(x, task_captured) + .await + .map_err(|e| ArrowError::ExternalError(Box::new(e))) + }; + + // Use futures::stream::once to handle operators that perform computation + // within `ExecutionPlan::execute`. If we evaluated these futures here + // we could potentially block indefinitely waiting for inputs that will + // never arrive as the query isn't scheduled yet + Mutex::new(futures::stream::once(fut).try_flatten().boxed()) + }) + .collect(); + + Ok(Self { + proxied, + inputs, + outputs, + }) + } +} + +impl Pipeline for ExecutionPipeline { + /// Push a [`RecordBatch`] to the given input partition + fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> { + let mut partition = self.inputs[child][partition].lock(); + assert!(!partition.is_closed); + + partition.buffer.push_back(input); + for waker in partition.wait_list.drain(..) { + waker.wake() + } + Ok(()) + } + + fn close(&self, child: usize, partition: usize) { + let mut partition = self.inputs[child][partition].lock(); + assert!(!partition.is_closed); + + partition.is_closed = true; + for waker in partition.wait_list.drain(..) { + waker.wake() + } + } + + fn output_partitions(&self) -> usize { + self.outputs.len() + } + + /// Poll an output partition, attempting to get its output + fn poll_partition( + &self, + cx: &mut Context<'_>, + partition: usize, + ) -> Poll>> { + self.outputs[partition] + .lock() + .poll_next_unpin(cx) + .map(|opt| opt.map(|r| r.map_err(Into::into))) + } +} + +#[derive(Debug, Default)] +struct InputPartition { + buffer: VecDeque, + wait_list: Vec, + is_closed: bool, +} + +struct InputPartitionStream { + schema: SchemaRef, + partition: Arc>, +} + +impl Stream for InputPartitionStream { + type Item = ArrowResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut partition = self.partition.lock(); + match partition.buffer.pop_front() { + Some(batch) => Poll::Ready(Some(Ok(batch))), + None if partition.is_closed => Poll::Ready(None), + _ => { + partition.wait_list.push(cx.waker().clone()); + Poll::Pending + } + } + } +} + +impl RecordBatchStream for InputPartitionStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// This is a hack that allows injecting [`InputPartitionStream`] in place of the +/// streams yielded by the child of the wrapped [`ExecutionPlan`] +/// +/// This is hopefully temporary pending reworking [`ExecutionPlan`] +#[derive(Debug)] +struct ProxyExecutionPlan { + inner: Arc, + + inputs: Vec>>, +} + +#[async_trait] +impl ExecutionPlan for ProxyExecutionPlan { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn output_partitioning(&self) -> Partitioning { + self.inner.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.inner.output_ordering() + } + + fn required_child_distribution(&self) -> Distribution { + self.inner.required_child_distribution() + } + + fn relies_on_input_order(&self) -> bool { + self.inner.relies_on_input_order() + } + + fn maintains_input_order(&self) -> bool { + self.inner.maintains_input_order() + } + + fn benefits_from_input_partitioning(&self) -> bool { + self.inner.benefits_from_input_partitioning() + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + + async fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + Ok(Box::pin(InputPartitionStream { + schema: self.schema(), + partition: self.inputs[partition].clone(), + })) + } + + fn metrics(&self) -> Option { + self.inner.metrics() + } + + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "ProxyExecutionPlan") + } + + fn statistics(&self) -> Statistics { + self.inner.statistics() + } +} + +struct NodeDescriptor { + operator: String, + children: Vec, +} + +impl std::fmt::Debug for NodeDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct(&self.operator) + .field("children", &self.children) + .finish() + } +} + +fn debug_tree(plan: &dyn ExecutionPlan) -> NodeDescriptor { + let operator = format!("{}", displayable(plan).one_line()); + let children = plan + .children() + .into_iter() + .map(|x| debug_tree(x.as_ref())) + .collect(); + + NodeDescriptor { operator, children } +} diff --git a/datafusion/core/src/scheduler/pipeline/mod.rs b/datafusion/core/src/scheduler/pipeline/mod.rs new file mode 100644 index 000000000000..824a6950e025 --- /dev/null +++ b/datafusion/core/src/scheduler/pipeline/mod.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::{Context, Poll}; + +use arrow::record_batch::RecordBatch; + +use crate::error::Result; + +pub mod execution; +pub mod repartition; + +/// A push-based interface used by the scheduler to drive query execution +/// +/// A pipeline processes data from one or more input partitions, producing output +/// to one or more output partitions. As a [`Pipeline`] may drawn on input from +/// more than one upstream [`Pipeline`], input partitions are identified by both +/// a child index, and a partition index, whereas output partitions are only +/// identified by a partition index. +/// +/// This is not intended as an eventual replacement for the physical plan representation +/// within DataFusion, [`ExecutionPlan`], but rather a generic interface that +/// parts of the physical plan are "compiled" into by the scheduler. +/// +/// # Eager vs Lazy Execution +/// +/// Whether computation is eagerly done on push, or lazily done on pull, is +/// intentionally left as an implementation detail of the [`Pipeline`] +/// +/// This allows flexibility to support the following different patterns, and potentially more: +/// +/// An eager, push-based pipeline, that processes a batch synchronously in [`Pipeline::push`] +/// and immediately wakes the corresponding output partition. +/// +/// A parallel, push-based pipeline, that enqueues the processing of a batch to the rayon +/// thread pool in [`Pipeline::push`], and wakes the corresponding output partition when +/// the job completes. Order and non-order preserving variants are possible +/// +/// A merge pipeline which combines data from one or more input partitions into one or +/// more output partitions. [`Pipeline::push`] adds data to an input buffer, and wakes +/// any output partitions that may now be able to make progress. This may be none if +/// the operator is waiting on data from a different input partition +/// +/// An aggregation pipeline which combines data from one or more input partitions into +/// a single output partition. [`Pipeline::push`] would eagerly update the computed +/// aggregates, and the final [`Pipeline::close`] trigger flushing these to the output. +/// It would also be possible to flush once the partial aggregates reach a certain size +/// +/// A partition-aware aggregation pipeline, which functions similarly to the above, but +/// computes aggregations per input partition, before combining these prior to flush. +/// +/// An async input pipeline, which has no inputs, and wakes the output partition +/// whenever new data is available +/// +/// A JIT compiled sequence of synchronous operators, that perform multiple operations +/// from the physical plan as a single [`Pipeline`]. Parallelized implementations +/// are also possible +/// +pub trait Pipeline: Send + Sync + std::fmt::Debug { + /// Push a [`RecordBatch`] to the given input partition + fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()>; + + /// Mark an input partition as exhausted + fn close(&self, child: usize, partition: usize); + + /// Returns the number of output partitions + fn output_partitions(&self) -> usize; + + /// Attempt to pull out the next value of the given output partition, registering the + /// current task for wakeup if the value is not yet available, and returning `None` + /// if the output partition is exhausted and will never yield any further values + /// + /// # Return value + /// + /// There are several possible return values: + /// + /// - `Poll::Pending` indicates that this partition's next value is not ready yet. + /// Implementations should use the waker provided by `cx` to notify the scheduler when + /// progress may be able to be made + /// + /// - `Poll::Ready(Some(Ok(val)))` returns the next value from this output partition, + /// the output partition should be polled again as it may have further values. The returned + /// value will be routed to the next pipeline in the query + /// + /// - `Poll::Ready(Some(Err(e)))` returns an error that will be routed to the query's output + /// and the query execution aborted. + /// + /// - `Poll::Ready(None)` indicates that this partition is exhausted and will not produce any + /// further values. + /// + fn poll_partition( + &self, + cx: &mut Context<'_>, + partition: usize, + ) -> Poll>>; +} diff --git a/datafusion/core/src/scheduler/pipeline/repartition.rs b/datafusion/core/src/scheduler/pipeline/repartition.rs new file mode 100644 index 000000000000..c35ab909fde8 --- /dev/null +++ b/datafusion/core/src/scheduler/pipeline/repartition.rs @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::task::{Context, Poll, Waker}; + +use parking_lot::Mutex; + +use crate::arrow::record_batch::RecordBatch; +use crate::error::Result; +use crate::physical_plan::repartition::BatchPartitioner; +use crate::physical_plan::Partitioning; + +use crate::scheduler::pipeline::Pipeline; + +/// A [`Pipeline`] that can repartition its input +#[derive(Debug)] +pub struct RepartitionPipeline { + output_count: usize, + state: Mutex, +} + +impl RepartitionPipeline { + /// Create a new [`RepartitionPipeline`] with the given `input` and `output` partitioning + pub fn try_new(input: Partitioning, output: Partitioning) -> Result { + let input_count = input.partition_count(); + let output_count = output.partition_count(); + assert_ne!(input_count, 0); + assert_ne!(output_count, 0); + + // TODO: metrics support + let partitioner = BatchPartitioner::try_new(output, Default::default())?; + + let state = Mutex::new(RepartitionState { + partitioner, + partition_closed: vec![false; input_count], + input_closed: false, + output_buffers: (0..output_count).map(|_| Default::default()).collect(), + }); + + Ok(Self { + state, + output_count, + }) + } +} + +struct RepartitionState { + partitioner: BatchPartitioner, + partition_closed: Vec, + input_closed: bool, + output_buffers: Vec, +} + +impl std::fmt::Debug for RepartitionState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RepartitionState") + .field("partition_closed", &self.partition_closed) + .field("input_closed", &self.input_closed) + .finish() + } +} + +impl Pipeline for RepartitionPipeline { + fn push(&self, input: RecordBatch, child: usize, partition: usize) -> Result<()> { + assert_eq!(child, 0); + + let mut state = self.state.lock(); + assert!( + !state.partition_closed[partition], + "attempt to push to closed partition {} of RepartitionPipeline({:?})", + partition, state + ); + + let state = &mut *state; + state.partitioner.partition(input, |partition, batch| { + state.output_buffers[partition].push_batch(batch); + Ok(()) + }) + } + + fn close(&self, child: usize, partition: usize) { + assert_eq!(child, 0); + + let mut state = self.state.lock(); + assert!( + !state.partition_closed[partition], + "attempt to close already closed partition {} of RepartitionPipeline({:?})", + partition, state + ); + + state.partition_closed[partition] = true; + + // If all input streams exhausted, wake outputs + if state.partition_closed.iter().all(|x| *x) { + state.input_closed = true; + for buffer in &mut state.output_buffers { + for waker in buffer.wait_list.drain(..) { + waker.wake() + } + } + } + } + + fn output_partitions(&self) -> usize { + self.output_count + } + + fn poll_partition( + &self, + cx: &mut Context<'_>, + partition: usize, + ) -> Poll>> { + let mut state = self.state.lock(); + let input_closed = state.input_closed; + let buffer = &mut state.output_buffers[partition]; + + match buffer.batches.pop_front() { + Some(batch) => Poll::Ready(Some(Ok(batch))), + None if input_closed => Poll::Ready(None), + _ => { + buffer.wait_list.push(cx.waker().clone()); + Poll::Pending + } + } + } +} + +#[derive(Debug, Default)] +struct OutputBuffer { + batches: VecDeque, + wait_list: Vec, +} + +impl OutputBuffer { + fn push_batch(&mut self, batch: RecordBatch) { + self.batches.push_back(batch); + + for waker in self.wait_list.drain(..) { + waker.wake() + } + } +} diff --git a/datafusion/core/src/scheduler/plan.rs b/datafusion/core/src/scheduler/plan.rs new file mode 100644 index 000000000000..e7d5e1d33176 --- /dev/null +++ b/datafusion/core/src/scheduler/plan.rs @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::SchemaRef; +use std::sync::Arc; + +use crate::error::Result; +use crate::execution::context::TaskContext; +use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use crate::physical_plan::repartition::RepartitionExec; +use crate::physical_plan::{ExecutionPlan, Partitioning}; + +use crate::scheduler::pipeline::{ + execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline, +}; + +/// Identifies the [`Pipeline`] within the [`PipelinePlan`] to route output to +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct OutputLink { + /// The index of the [`Pipeline`] in [`PipelinePlan`] to route output to + pub pipeline: usize, + + /// The child of the [`Pipeline`] to route output to + pub child: usize, +} + +/// Combines a [`Pipeline`] with an [`OutputLink`] identifying where to send its output +#[derive(Debug)] +pub struct RoutablePipeline { + /// The pipeline that produces data + pub pipeline: Box, + + /// Where to send output the output of `pipeline` + /// + /// If `None`, the output should be sent to the query output + pub output: Option, +} + +/// [`PipelinePlan`] is the scheduler's representation of the [`ExecutionPlan`] passed to +/// [`super::Scheduler::schedule`]. It combines the list of [Pipeline`] with the information +/// necessary to route output from one stage to the next +#[derive(Debug)] +pub struct PipelinePlan { + /// Schema of this plans output + pub schema: SchemaRef, + + /// Number of output partitions + pub output_partitions: usize, + + /// Pipelines that comprise this plan + pub pipelines: Vec, +} + +/// When converting [`ExecutionPlan`] to [`Pipeline`] we may wish to group +/// together multiple operators, [`OperatorGroup`] stores this state +struct OperatorGroup { + /// Where to route the output of the eventual [`Pipeline`] + output: Option, + + /// The [`ExecutionPlan`] from which to start recursing + root: Arc, + + /// The number of times to recurse into the [`ExecutionPlan`]'s children + depth: usize, +} + +/// A utility struct to assist converting from [`ExecutionPlan`] to [`PipelinePlan`] +/// +/// The [`ExecutionPlan`] is visited in a depth-first fashion, gradually building +/// up the [`RoutablePipeline`] for the [`PipelinePlan`]. As nodes are visited depth-first, +/// a node is visited only after its parent has been. +pub struct PipelinePlanner { + task_context: Arc, + + /// The schema of this plan + schema: SchemaRef, + + /// The number of output partitions of this plan + output_partitions: usize, + + /// The current list of completed pipelines + completed: Vec, + + /// A list of [`ExecutionPlan`] still to visit, along with + /// where they should route their output + to_visit: Vec<(Arc, Option)>, + + /// Stores one or more operators to combine + /// together into a single [`ExecutionPipeline`] + execution_operators: Option, +} + +impl PipelinePlanner { + pub fn new(plan: Arc, task_context: Arc) -> Self { + let schema = plan.schema(); + let output_partitions = plan.output_partitioning().partition_count(); + Self { + completed: vec![], + to_visit: vec![(plan, None)], + task_context, + execution_operators: None, + schema, + output_partitions, + } + } + + /// Flush the current group of operators stored in `execution_operators` + /// into a single [`ExecutionPipeline] + fn flush_exec(&mut self) -> Result { + let group = self.execution_operators.take().unwrap(); + let node_idx = self.completed.len(); + self.completed.push(RoutablePipeline { + pipeline: Box::new(ExecutionPipeline::new( + group.root, + self.task_context.clone(), + group.depth, + )?), + output: group.output, + }); + Ok(node_idx) + } + + /// Visit a non-special cased [`ExecutionPlan`] + fn visit_exec( + &mut self, + plan: Arc, + parent: Option, + ) -> Result<()> { + let children = plan.children(); + + // Add the operator to the current group of operators to be combined + // into a single [`ExecutionPipeline`]. + // + // TODO: More sophisticated policy, just because we can combine them doesn't mean we should + match self.execution_operators.as_mut() { + Some(buffer) => { + assert_eq!(parent, buffer.output, "QueryBuilder out of sync"); + buffer.depth += 1; + } + None => { + self.execution_operators = Some(OperatorGroup { + output: parent, + root: plan, + depth: 0, + }) + } + } + + match children.len() { + 1 => { + // Enqueue the children with the parent of the `OperatorGroup` + self.to_visit + .push((children.into_iter().next().unwrap(), parent)) + } + _ => { + // We can only recursively group through nodes with a single child, therefore + // if this node has multiple children, we now need to flush the buffer and + // enqueue its children with this new pipeline as its parent + let node = self.flush_exec()?; + self.enqueue_children(children, node); + } + } + + Ok(()) + } + + /// Add the given list of children to the stack of [`ExecutionPlan`] to visit + fn enqueue_children( + &mut self, + children: Vec>, + parent_node_idx: usize, + ) { + for (child_idx, child) in children.into_iter().enumerate() { + self.to_visit.push(( + child, + Some(OutputLink { + pipeline: parent_node_idx, + child: child_idx, + }), + )) + } + } + + /// Push a new [`RoutablePipeline`] and enqueue its children to be visited + fn push_pipeline( + &mut self, + node: RoutablePipeline, + children: Vec>, + ) { + let node_idx = self.completed.len(); + self.completed.push(node); + self.enqueue_children(children, node_idx) + } + + /// Push a new [`RepartitionPipeline`] first flushing any buffered [`OperatorGroup`] + fn push_repartition( + &mut self, + input: Partitioning, + output: Partitioning, + parent: Option, + children: Vec>, + ) -> Result<()> { + let parent = match &self.execution_operators { + Some(buffer) => { + assert_eq!(buffer.output, parent, "QueryBuilder out of sync"); + Some(OutputLink { + pipeline: self.flush_exec()?, + child: 0, // Must be the only child + }) + } + None => parent, + }; + + let node = Box::new(RepartitionPipeline::try_new(input, output)?); + self.push_pipeline( + RoutablePipeline { + pipeline: node, + output: parent, + }, + children, + ); + Ok(()) + } + + /// Visit an [`ExecutionPlan`] operator and add it to the [`PipelinePlan`] being built + fn visit_operator( + &mut self, + plan: Arc, + parent: Option, + ) -> Result<()> { + if let Some(repartition) = plan.as_any().downcast_ref::() { + self.push_repartition( + repartition.input().output_partitioning(), + repartition.output_partitioning(), + parent, + repartition.children(), + ) + } else if let Some(coalesce) = + plan.as_any().downcast_ref::() + { + self.push_repartition( + coalesce.input().output_partitioning(), + Partitioning::RoundRobinBatch(1), + parent, + coalesce.children(), + ) + } else { + self.visit_exec(plan, parent) + } + } + + /// Build a [`PipelinePlan`] from the [`ExecutionPlan`] provided to [`PipelinePlanner::new`] + /// + /// This will group all operators possible into a single [`ExecutionPipeline`], only + /// creating new pipelines when: + /// + /// - encountering an operator with multiple children + /// - encountering a repartitioning operator + /// + /// This latter case is because currently the repartitioning operators in DataFusion are + /// coupled with the non-scheduler-based parallelism story + /// + /// The above logic is liable to change, is considered an implementation detail of the + /// scheduler, and should not be relied upon by operators + /// + pub fn build(mut self) -> Result { + // We do a depth-first scan of the operator tree, extracting a list of [`QueryNode`] + while let Some((plan, parent)) = self.to_visit.pop() { + self.visit_operator(plan, parent)?; + } + + if self.execution_operators.is_some() { + self.flush_exec()?; + } + + Ok(PipelinePlan { + schema: self.schema, + output_partitions: self.output_partitions, + pipelines: self.completed, + }) + } +} diff --git a/datafusion/core/src/scheduler/task.rs b/datafusion/core/src/scheduler/task.rs new file mode 100644 index 000000000000..e91985437d95 --- /dev/null +++ b/datafusion/core/src/scheduler/task.rs @@ -0,0 +1,497 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::stream::RecordBatchStreamAdapter; +use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; +use crate::scheduler::{ + is_worker, plan::PipelinePlan, spawn_local, spawn_local_fifo, RoutablePipeline, + Spawner, +}; +use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; +use arrow::record_batch::RecordBatch; +use futures::channel::mpsc; +use futures::task::ArcWake; +use futures::{ready, Stream, StreamExt}; +use log::{debug, trace}; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Weak}; +use std::task::{Context, Poll}; + +/// Spawns a [`PipelinePlan`] using the provided [`Spawner`] +pub fn spawn_plan(plan: PipelinePlan, spawner: Spawner) -> ExecutionResults { + debug!("Spawning pipeline plan: {:#?}", plan); + + let (senders, receivers) = (0..plan.output_partitions) + .map(|_| mpsc::unbounded()) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + let context = Arc::new(ExecutionContext { + spawner, + pipelines: plan.pipelines, + schema: plan.schema, + output: senders, + }); + + for (pipeline_idx, query_pipeline) in context.pipelines.iter().enumerate() { + for partition in 0..query_pipeline.pipeline.output_partitions() { + context.spawner.spawn(Task { + context: context.clone(), + waker: Arc::new(TaskWaker { + context: Arc::downgrade(&context), + wake_count: AtomicUsize::new(1), + pipeline: pipeline_idx, + partition, + }), + }); + } + } + + let partitions = receivers + .into_iter() + .map(|receiver| ExecutionResultStream { + receiver: receiver, + context: context.clone(), + }) + .collect(); + + ExecutionResults { + streams: partitions, + context, + } +} + +/// A [`Task`] identifies an output partition within a given pipeline that may be able to +/// make progress. The [`Scheduler`][super::Scheduler] maintains a list of outstanding +/// [`Task`] and distributes them amongst its worker threads. +pub struct Task { + /// Maintain a link to the [`ExecutionContext`] this is necessary to be able to + /// route the output of the partition to its destination + context: Arc, + + /// A [`ArcWake`] that can be used to re-schedule this [`Task`] for execution + waker: Arc, +} + +impl std::fmt::Debug for Task { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let output = &self.context.pipelines[self.waker.pipeline].output; + + f.debug_struct("Task") + .field("pipeline", &self.waker.pipeline) + .field("partition", &self.waker.partition) + .field("output", &output) + .finish() + } +} + +impl Task { + fn handle_error( + &self, + partition: usize, + routable: &RoutablePipeline, + error: DataFusionError, + ) { + self.context.send_query_output(partition, Err(error)); + if let Some(link) = routable.output { + trace!( + "Closing pipeline: {:?}, partition: {}, due to error", + link, + self.waker.partition, + ); + + self.context.pipelines[link.pipeline] + .pipeline + .close(link.child, self.waker.partition); + } + } + + /// Call [`Pipeline::poll_partition`], attempting to make progress on query execution + pub fn do_work(self) { + assert!(is_worker(), "Task::do_work called outside of worker pool"); + if self.context.is_cancelled() { + return; + } + + // Capture the wake count prior to calling [`Pipeline::poll_partition`] + // this allows us to detect concurrent wake ups and handle them correctly + let wake_count = self.waker.wake_count.load(Ordering::SeqCst); + + let node = self.waker.pipeline; + let partition = self.waker.partition; + + let waker = futures::task::waker_ref(&self.waker); + let mut cx = Context::from_waker(&*waker); + + let pipelines = &self.context.pipelines; + let routable = &pipelines[node]; + match routable.pipeline.poll_partition(&mut cx, partition) { + Poll::Ready(Some(Ok(batch))) => { + trace!("Poll {:?}: Ok: {}", self, batch.num_rows()); + match routable.output { + Some(link) => { + trace!( + "Publishing batch to pipeline {:?} partition {}", + link, + partition + ); + + let r = pipelines[link.pipeline] + .pipeline + .push(batch, link.child, partition); + + if let Err(e) = r { + self.handle_error(partition, routable, e); + + // Return without rescheduling this output again + return; + } + } + None => { + trace!("Publishing batch to output"); + self.context.send_query_output(partition, Ok(batch)) + } + } + + // Reschedule this pipeline again + // + // We want to prioritise running tasks triggered by the most recent + // batch, so reschedule with FIFO ordering + // + // Note: We must schedule after we have routed the batch, otherwise + // we introduce a potential ordering race where the newly scheduled + // task runs before this task finishes routing the output + spawn_local_fifo(self); + } + Poll::Ready(Some(Err(e))) => { + trace!("Poll {:?}: Error: {:?}", self, e); + self.handle_error(partition, routable, e) + } + Poll::Ready(None) => { + trace!("Poll {:?}: None", self); + match routable.output { + Some(link) => { + trace!("Closing pipeline: {:?}, partition: {}", link, partition); + pipelines[link.pipeline] + .pipeline + .close(link.child, partition) + } + None => self.context.finish(partition), + } + } + Poll::Pending => { + trace!("Poll {:?}: Pending", self); + // Attempt to reset the wake count with the value obtained prior + // to calling [`Pipeline::poll_partition`]. + // + // If this fails it indicates a wakeup was received whilst executing + // [`Pipeline::poll_partition`] and we should reschedule the task + let reset = self.waker.wake_count.compare_exchange( + wake_count, + 0, + Ordering::SeqCst, + Ordering::SeqCst, + ); + + if reset.is_err() { + trace!("Wakeup triggered whilst polling: {:?}", self); + spawn_local(self); + } + } + } + } +} + +/// The results of the execution of a query +pub struct ExecutionResults { + /// [`ExecutionResultStream`] for each partition of this query + streams: Vec, + + /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early + context: Arc, +} + +impl ExecutionResults { + /// Returns a [`SendableRecordBatchStream`] of this execution + /// + /// In the event of multiple output partitions, the output will be interleaved + pub fn stream(self) -> SendableRecordBatchStream { + let schema = self.context.schema.clone(); + Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::select_all(self.streams), + )) + } + + /// Returns a [`SendableRecordBatchStream`] for each partition of this execution + pub fn stream_partitioned(self) -> Vec { + self.streams.into_iter().map(|s| Box::pin(s) as _).collect() + } +} + +/// A result stream for the execution of a query +struct ExecutionResultStream { + receiver: mpsc::UnboundedReceiver>>, + + /// Keep a reference to the [`ExecutionContext`] so it isn't dropped early + context: Arc, +} + +impl Stream for ExecutionResultStream { + type Item = arrow::error::Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let opt = ready!(self.receiver.poll_next_unpin(cx)).flatten(); + Poll::Ready(opt.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e))))) + } +} + +impl RecordBatchStream for ExecutionResultStream { + fn schema(&self) -> SchemaRef { + self.context.schema.clone() + } +} + +/// The shared state of all [`Task`] created from the same [`PipelinePlan`] +#[derive(Debug)] +struct ExecutionContext { + /// Spawner for this query + spawner: Spawner, + + /// List of pipelines that belong to this query, pipelines are addressed + /// based on their index within this list + pipelines: Vec, + + /// Schema of this plans output + pub schema: SchemaRef, + + /// The output streams, per partition, for this query's execution + output: Vec>>>, +} + +impl Drop for ExecutionContext { + fn drop(&mut self) { + debug!("ExecutionContext dropped"); + } +} + +impl ExecutionContext { + /// Returns `true` if this query has been dropped, specifically if the + /// stream returned by [`super::Scheduler::schedule`] has been dropped + fn is_cancelled(&self) -> bool { + self.output.iter().all(|x| x.is_closed()) + } + + /// Sends `output` to this query's output stream + fn send_query_output(&self, partition: usize, output: Result) { + let _ = self.output[partition].unbounded_send(Some(output)); + } + + /// Mark this partition as finished + fn finish(&self, partition: usize) { + let _ = self.output[partition].unbounded_send(None); + } +} + +struct TaskWaker { + /// Store a weak reference to the [`ExecutionContext`] to avoid reference cycles if this + /// [`Waker`] is stored within a [`Pipeline`] owned by the [`ExecutionContext`] + context: Weak, + + /// A counter that stores the number of times this has been awoken + /// + /// A value > 0, implies the task is either in the ready queue or + /// currently being executed + /// + /// `TaskWaker::wake` always increments the `wake_count`, however, it only + /// re-enqueues the [`Task`] if the value prior to increment was 0 + /// + /// This ensures that a given [`Task`] is not enqueued multiple times + /// + /// We store an integer, as opposed to a boolean, so that wake ups that + /// occur during [`Pipeline::poll_partition`] can be detected and handled + /// after it has finished executing + /// + wake_count: AtomicUsize, + + /// The index of the pipeline within `query` to poll + pipeline: usize, + + /// The partition of the pipeline within `query` to poll + partition: usize, +} + +impl ArcWake for TaskWaker { + fn wake(self: Arc) { + if self.wake_count.fetch_add(1, Ordering::SeqCst) != 0 { + trace!("Ignoring duplicate wakeup"); + return; + } + + if let Some(context) = self.context.upgrade() { + let task = Task { + context, + waker: self.clone(), + }; + + trace!("Wakeup {:?}", task); + + // If called from a worker, spawn to the current worker's + // local queue, otherwise reschedule on any worker + match is_worker() { + true => spawn_local(task), + false => task.context.spawner.clone().spawn(task), + } + } else { + trace!("Dropped wakeup"); + } + } + + fn wake_by_ref(s: &Arc) { + ArcWake::wake(s.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Result; + use crate::scheduler::{pipeline::Pipeline, plan::RoutablePipeline, Scheduler}; + use arrow::array::{ArrayRef, Int32Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use futures::{channel::oneshot, ready, FutureExt, StreamExt}; + use parking_lot::Mutex; + use std::fmt::Debug; + use std::time::Duration; + + /// Tests that waker can be sent to tokio pool + #[derive(Debug)] + struct TokioPipeline { + handle: tokio::runtime::Handle, + state: Mutex, + } + + #[derive(Debug)] + enum State { + Init, + Wait(oneshot::Receiver>), + Finished, + } + + impl Default for State { + fn default() -> Self { + Self::Init + } + } + + impl Pipeline for TokioPipeline { + fn push( + &self, + _input: RecordBatch, + _child: usize, + _partition: usize, + ) -> Result<()> { + unreachable!() + } + + fn close(&self, _child: usize, _partition: usize) {} + + fn output_partitions(&self) -> usize { + 1 + } + + fn poll_partition( + &self, + cx: &mut Context<'_>, + _partition: usize, + ) -> Poll>> { + let mut state = self.state.lock(); + loop { + match &mut *state { + State::Init => { + let (sender, receiver) = oneshot::channel(); + self.handle.spawn(async move { + tokio::time::sleep(Duration::from_millis(10)).await; + let array = Int32Array::from_iter_values([1, 2, 3]); + sender.send( + RecordBatch::try_from_iter([( + "int", + Arc::new(array) as ArrayRef, + )]) + .map_err(DataFusionError::ArrowError), + ) + }); + *state = State::Wait(receiver) + } + State::Wait(r) => { + let v = ready!(r.poll_unpin(cx)).ok(); + *state = State::Finished; + return Poll::Ready(v); + } + State::Finished => return Poll::Ready(None), + } + } + } + } + + #[test] + fn test_tokio_waker() { + let scheduler = Scheduler::new(2); + + // A tokio runtime + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + // A pipeline that dispatches to a tokio worker + let pipeline = TokioPipeline { + handle: runtime.handle().clone(), + state: Default::default(), + }; + + let plan = PipelinePlan { + schema: Arc::new(Schema::new(vec![Field::new( + "int", + DataType::Int32, + false, + )])), + output_partitions: 1, + pipelines: vec![RoutablePipeline { + pipeline: Box::new(pipeline), + output: None, + }], + }; + + let mut receiver = scheduler.schedule_plan(plan).stream(); + + runtime.block_on(async move { + // Should wait for output + let batch = receiver.next().await.unwrap().unwrap(); + assert_eq!(batch.num_rows(), 3); + + // Next batch should be none + assert!(receiver.next().await.is_none()); + }) + } +}