diff --git a/Cargo.lock b/Cargo.lock index c67ba07d17..7ce8714f46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -367,7 +367,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener 5.3.1", + "event-listener 5.4.0", "event-listener-strategy", "pin-project-lite", ] @@ -1351,9 +1351,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.23" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" +checksum = "9560b07a799281c7e0958b9296854d6fafd4c5f31444a7e5bb1ad6dde5ccf1bd" dependencies = [ "clap_builder", "clap_derive", @@ -1361,9 +1361,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.23" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" +checksum = "874e0dd3eb68bf99058751ac9712f622e61e6f393a94f7128fa26e3f02f5c7cd" dependencies = [ "anstream", "anstyle", @@ -1373,9 +1373,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.18" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" +checksum = "54b755194d6389280185988721fffba69495eed5ee9feeee9a599b53db80318c" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2340,9 +2340,9 @@ dependencies = [ "indexmap 2.7.0", "indicatif", "itertools 0.11.0", + "kanal", "lazy_static", "log", - "loole", "num-format", "pin-project", "pyo3", @@ -2518,10 +2518,12 @@ dependencies = [ "common-arrow-ffi", "common-display", "common-error", + "common-runtime", "daft-core", "daft-dsl", "daft-image", "daft-logical-plan", + "futures", "html-escape", "indexmap 2.7.0", "num-traits", @@ -2973,9 +2975,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "event-listener" -version = "5.3.1" +version = "5.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" dependencies = [ "concurrent-queue", "parking", @@ -2988,7 +2990,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" dependencies = [ - "event-listener 5.3.1", + "event-listener 5.4.0", "pin-project-lite", ] @@ -3985,9 +3987,9 @@ dependencies = [ [[package]] name = "inventory" -version = "0.3.16" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d80fade88dd420ce0d9ab6f7c58ef2272dde38db874657950f827d4982c817" +checksum = "3b31349d02fe60f80bbbab1a9402364cad7460626d6030494b08ac4a2075bf81" dependencies = [ "rustversion", ] @@ -4152,6 +4154,16 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kanal" +version = "0.1.0-pre8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -4330,9 +4342,9 @@ checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" [[package]] name = "libz-ng-sys" -version = "1.1.20" +version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f0f7295a34685977acb2e8cc8b08ee4a8dffd6cf278eeccddbe1ed55ba815d5" +checksum = "7cee1488e961a80d172564fd6fcda11d8a4ac6672c06fe008e9213fa60520c2b" dependencies = [ "cmake", "libc", @@ -4340,9 +4352,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.14" +version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "litemap" @@ -4366,21 +4378,11 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" -[[package]] -name = "loole" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2998397c725c822c6b2ba605fd9eb4c6a7a0810f1629ba3cc232ef4f0308d96" -dependencies = [ - "futures-core", - "futures-sink", -] - [[package]] name = "lz4" -version = "1.28.0" +version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" dependencies = [ "lz4-sys", ] @@ -5838,9 +5840,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.42" +version = "0.38.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" +checksum = "a78891ee6bf2340288408954ac787aa063d8e8817e9f53abb37c695c6d834ef6" dependencies = [ "bitflags 2.6.0", "errno", @@ -6024,9 +6026,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.13.0" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1863fd3768cd83c56a7f60faa4dc0d403f1b6df0a38c3c25f44b7894e45370d5" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" dependencies = [ "core-foundation-sys", "libc", @@ -6079,9 +6081,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.134" +version = "1.0.135" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" +checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" dependencies = [ "indexmap 2.7.0", "itoa", @@ -6564,7 +6566,7 @@ checksum = "257822358c6f206fed78bfe6369cf959063b0644d70f88df6b19f2dadc93423e" dependencies = [ "alloca", "anyhow", - "clap 4.5.23", + "clap 4.5.24", "colorz", "glob-match", "goblin", @@ -6802,9 +6804,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.42.0" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", @@ -6820,9 +6822,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", diff --git a/src/daft-dsl/src/expr/mod.rs b/src/daft-dsl/src/expr/mod.rs index c1966cfd5a..878cd50082 100644 --- a/src/daft-dsl/src/expr/mod.rs +++ b/src/daft-dsl/src/expr/mod.rs @@ -1286,6 +1286,35 @@ impl Expr { } } + pub fn has_compute(&self) -> bool { + match self { + Self::Column(..) => false, + Self::Literal(..) => false, + Self::Subquery(..) => false, + Self::Exists(..) => false, + Self::OuterReferenceColumn(..) => false, + Self::Function { .. } => true, + Self::ScalarFunction(..) => true, + Self::Agg(_) => true, + Self::IsIn(..) => true, + Self::Between(..) => true, + Self::BinaryOp { .. } => true, + Self::Alias(expr, ..) => expr.has_compute(), + Self::Cast(expr, ..) => expr.has_compute(), + Self::Not(expr) => expr.has_compute(), + Self::IsNull(expr) => expr.has_compute(), + Self::NotNull(expr) => expr.has_compute(), + Self::FillNull(expr, fill_value) => expr.has_compute() || fill_value.has_compute(), + Self::IfElse { + if_true, + if_false, + predicate, + } => if_true.has_compute() || if_false.has_compute() || predicate.has_compute(), + Self::InSubquery(expr, _) => expr.has_compute(), + Self::List(..) => true, + } + } + pub fn eq_null_safe(self: ExprRef, other: ExprRef) -> ExprRef { binary_op(Operator::EqNullSafe, self, other) } diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index e235b2d667..cacaf11c9d 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -26,9 +26,9 @@ futures = {workspace = true} indexmap = {workspace = true} indicatif = "0.17.9" itertools = {workspace = true} +kanal = "0.1.0-pre8" lazy_static = {workspace = true} log = {workspace = true} -loole = "0.4.0" num-format = {workspace = true} pin-project = "1" pyo3 = {workspace = true, optional = true} diff --git a/src/daft-local-execution/src/channel.rs b/src/daft-local-execution/src/channel.rs index 8adaae0616..9db656fe53 100644 --- a/src/daft-local-execution/src/channel.rs +++ b/src/daft-local-execution/src/channel.rs @@ -1,29 +1,25 @@ #[derive(Clone)] -pub(crate) struct Sender(loole::Sender); +pub(crate) struct Sender(kanal::AsyncSender); impl Sender { - pub(crate) async fn send(&self, val: T) -> Result<(), loole::SendError> { - self.0.send_async(val).await + pub(crate) async fn send(&self, val: T) -> Result<(), kanal::SendError> { + self.0.send(val).await } } #[derive(Clone)] -pub(crate) struct Receiver(loole::Receiver); +pub(crate) struct Receiver(kanal::AsyncReceiver); impl Receiver { pub(crate) async fn recv(&self) -> Option { - self.0.recv_async().await.ok() + self.0.recv().await.ok() } - pub(crate) fn blocking_recv(&self) -> Option { - self.0.recv().ok() - } - - pub(crate) fn into_inner(self) -> loole::Receiver { + pub(crate) fn into_inner(self) -> kanal::AsyncReceiver { self.0 } } pub(crate) fn create_channel(buffer_size: usize) -> (Sender, Receiver) { - let (tx, rx) = loole::bounded(buffer_size); + let (tx, rx) = kanal::bounded_async::(buffer_size); (Sender(tx), Receiver(rx)) } @@ -36,7 +32,7 @@ pub(crate) fn create_ordering_aware_receiver_channel( ) -> (Vec>, OrderingAwareReceiver) { match ordered { true => { - let (senders, receiver) = (0..buffer_size).map(|_| create_channel::(1)).unzip(); + let (senders, receiver) = (0..buffer_size).map(|_| create_channel::(0)).unzip(); ( senders, OrderingAwareReceiver::InOrder(RoundRobinReceiver::new(receiver)), diff --git a/src/daft-local-execution/src/dispatcher.rs b/src/daft-local-execution/src/dispatcher.rs index 9e487028f1..05fdaf0b00 100644 --- a/src/daft-local-execution/src/dispatcher.rs +++ b/src/daft-local-execution/src/dispatcher.rs @@ -95,7 +95,7 @@ impl DispatchSpawner for RoundRobinDispatcher { runtime_handle: &mut RuntimeHandle, ) -> SpawnedDispatchResult { let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) = - (0..num_workers).map(|_| create_channel(1)).unzip(); + (0..num_workers).map(|_| create_channel(0)).unzip(); let morsel_size = self.morsel_size; let task = runtime_handle.spawn(async move { Self::dispatch_inner(worker_senders, input_receivers, morsel_size).await @@ -213,7 +213,7 @@ impl DispatchSpawner for PartitionedDispatcher { runtime_handle: &mut RuntimeHandle, ) -> SpawnedDispatchResult { let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) = - (0..num_workers).map(|_| create_channel(1)).unzip(); + (0..num_workers).map(|_| create_channel(0)).unzip(); let partition_by = self.partition_by.clone(); let dispatch_task = runtime_handle.spawn(async move { Self::dispatch_inner(worker_senders, input_receivers, partition_by).await diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index accc41eccb..317fcbc2ed 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -241,7 +241,7 @@ impl PipelineNode for IntermediateNode { let num_workers = op.max_concurrency().context(PipelineExecutionSnafu { node_name: self.name(), })?; - let (destination_sender, destination_receiver) = create_channel(1); + let (destination_sender, destination_receiver) = create_channel(0); let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar); diff --git a/src/daft-local-execution/src/intermediate_ops/project.rs b/src/daft-local-execution/src/intermediate_ops/project.rs index c41977e829..e5763b8493 100644 --- a/src/daft-local-execution/src/intermediate_ops/project.rs +++ b/src/daft-local-execution/src/intermediate_ops/project.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{cmp::max, sync::Arc}; use common_error::{DaftError, DaftResult}; use daft_dsl::{functions::python::get_resource_request, ExprRef}; @@ -12,21 +12,74 @@ use super::intermediate_op::{ }; use crate::{ExecutionTaskSpawner, NUM_CPUS}; +fn num_parallel_exprs(projection: &[ExprRef]) -> usize { + max( + projection.iter().filter(|expr| expr.has_compute()).count(), + 1, + ) +} + pub struct ProjectOperator { projection: Arc>, + max_concurrency: usize, + parallel_exprs: usize, memory_request: u64, } impl ProjectOperator { - pub fn new(projection: Vec) -> Self { + pub fn new(projection: Vec) -> DaftResult { let memory_request = get_resource_request(&projection) .and_then(|req| req.memory_bytes()) .map(|m| m as u64) .unwrap_or(0); - Self { + let (max_concurrency, parallel_exprs) = Self::get_optimal_allocation(&projection)?; + Ok(Self { projection: Arc::new(projection), memory_request, - } + max_concurrency, + parallel_exprs, + }) + } + + // This function is used to determine the optimal allocation of concurrency and expression parallelism + fn get_optimal_allocation(projection: &[ExprRef]) -> DaftResult<(usize, usize)> { + let resource_request = get_resource_request(projection); + // The number of CPUs available for the operator. + let available_cpus = match resource_request { + // If the resource request specifies a number of CPUs, the available cpus is the number of actual CPUs + // divided by the requested number of CPUs, clamped to (1, NUM_CPUS). + // E.g. if the resource request specifies 2 CPUs and NUM_CPUS is 4, the number of available cpus is 2. + Some(resource_request) if resource_request.num_cpus().is_some() => { + let requested_num_cpus = resource_request.num_cpus().unwrap(); + if requested_num_cpus > *NUM_CPUS as f64 { + Err(DaftError::ValueError(format!( + "Requested {} CPUs but found only {} available", + requested_num_cpus, *NUM_CPUS + ))) + } else { + Ok( + (*NUM_CPUS as f64 / requested_num_cpus).clamp(1.0, *NUM_CPUS as f64) + as usize, + ) + } + } + _ => Ok(*NUM_CPUS), + }?; + + let max_parallel_exprs = num_parallel_exprs(projection); + + // Calculate optimal concurrency using ceiling division + // Example: For 128 CPUs and 60 parallel expressions: + // max_concurrency = 128.div_ceil(60) = 3 concurrent tasks + let max_concurrency = available_cpus.div_ceil(max_parallel_exprs); + + // Calculate actual parallel expressions per task using floor division + // Example: For 128 CPUs and 3 concurrent tasks: + // num_parallel_exprs = 128 / 3 = 42 parallel expressions per task + // This ensures even distribution across concurrent tasks + let num_parallel_exprs = available_cpus / max_concurrency; + + Ok((max_concurrency, num_parallel_exprs)) } } @@ -39,12 +92,19 @@ impl IntermediateOperator for ProjectOperator { task_spawner: &ExecutionTaskSpawner, ) -> IntermediateOpExecuteResult { let projection = self.projection.clone(); + let num_parallel_exprs = self.parallel_exprs; let memory_request = self.memory_request; task_spawner .spawn_with_memory_request( memory_request, async move { - let out = input.eval_expression_list(&projection)?; + let out = if num_parallel_exprs > 1 { + input + .par_eval_expression_list(&projection, num_parallel_exprs) + .await? + } else { + input.eval_expression_list(&projection)? + }; Ok(( state, IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))), @@ -78,26 +138,6 @@ impl IntermediateOperator for ProjectOperator { } fn max_concurrency(&self) -> DaftResult { - let resource_request = get_resource_request(&self.projection); - match resource_request { - // If the resource request specifies a number of CPUs, the max concurrency is the number of CPUs - // divided by the requested number of CPUs, clamped to (1, NUM_CPUS). - // E.g. if the resource request specifies 2 CPUs and NUM_CPUS is 4, the max concurrency is 2. - Some(resource_request) if resource_request.num_cpus().is_some() => { - let requested_num_cpus = resource_request.num_cpus().unwrap(); - if requested_num_cpus > *NUM_CPUS as f64 { - Err(DaftError::ValueError(format!( - "Requested {} CPUs but found only {} available", - requested_num_cpus, *NUM_CPUS - ))) - } else { - Ok( - (*NUM_CPUS as f64 / requested_num_cpus).clamp(1.0, *NUM_CPUS as f64) - as usize, - ) - } - } - _ => Ok(*NUM_CPUS), - } + Ok(self.max_concurrency) } } diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 49844f2c61..54aa623d48 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -140,7 +140,11 @@ pub fn physical_plan_to_pipeline( stats_state, .. }) => { - let proj_op = ProjectOperator::new(projection.clone()); + let proj_op = ProjectOperator::new(projection.clone()).with_context(|_| { + PipelineCreationSnafu { + plan_name: physical_plan.name(), + } + })?; let child_node = physical_plan_to_pipeline(input, psets, cfg)?; IntermediateNode::new(Arc::new(proj_op), vec![child_node], stats_state.clone()).boxed() } diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index cddcf28248..a3b486125c 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -16,8 +16,7 @@ use daft_micropartition::{ partitioning::{InMemoryPartitionSetCache, MicroPartitionSet, PartitionSetCache}, MicroPartition, MicroPartitionRef, }; -use futures::{FutureExt, Stream}; -use loole::RecvFuture; +use futures::Stream; use tokio_util::sync::CancellationToken; #[cfg(feature = "python")] use { @@ -201,7 +200,7 @@ impl NativeExecutor { refresh_chrome_trace(); let cancel = self.cancel.clone(); let pipeline = physical_plan_to_pipeline(&physical_plan, psets, &cfg)?; - let (tx, rx) = create_channel(results_buffer_size.unwrap_or(1)); + let (tx, rx) = create_channel(results_buffer_size.unwrap_or(0)); let rt = self.runtime.clone(); let pb_manager = self.pb_manager.clone(); @@ -357,7 +356,7 @@ fn should_enable_progress_bar() -> bool { } pub struct ExecutionEngineReceiverIterator { - receiver: Receiver>, + receiver: kanal::Receiver>, handle: Option>>, } @@ -365,7 +364,7 @@ impl Iterator for ExecutionEngineReceiverIterator { type Item = DaftResult>; fn next(&mut self) -> Option { - match self.receiver.blocking_recv() { + match self.receiver.recv().ok() { Some(part) => Some(Ok(part)), None => { if self.handle.is_some() { @@ -387,41 +386,6 @@ impl Iterator for ExecutionEngineReceiverIterator { } } -pub struct ExecutionEngineReceiverStream { - receive_fut: RecvFuture>, - handle: Option>>, -} - -impl Stream for ExecutionEngineReceiverStream { - type Item = DaftResult>; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match self.receive_fut.poll_unpin(cx) { - std::task::Poll::Ready(Ok(part)) => std::task::Poll::Ready(Some(Ok(part))), - std::task::Poll::Ready(Err(_)) => { - if self.handle.is_some() { - let join_result = self - .handle - .take() - .unwrap() - .join() - .expect("Execution engine thread panicked"); - match join_result { - Ok(()) => std::task::Poll::Ready(None), - Err(e) => std::task::Poll::Ready(Some(Err(e))), - } - } else { - std::task::Poll::Ready(None) - } - } - std::task::Poll::Pending => std::task::Poll::Pending, - } - } -} - pub struct ExecutionEngineResult { handle: std::thread::JoinHandle>, receiver: Receiver>, @@ -429,10 +393,37 @@ pub struct ExecutionEngineResult { impl ExecutionEngineResult { pub fn into_stream(self) -> impl Stream>> { - ExecutionEngineReceiverStream { - receive_fut: self.receiver.into_inner().recv_async(), - handle: Some(self.handle), + struct StreamState { + receiver: Receiver>, + handle: Option>>, } + + let state = StreamState { + receiver: self.receiver, + handle: Some(self.handle), + }; + + futures::stream::unfold(state, |mut state| async { + match state.receiver.recv().await { + Some(part) => Some((Ok(part), state)), + None => { + if state.handle.is_some() { + let join_result = state + .handle + .take() + .unwrap() + .join() + .expect("Execution engine thread panicked"); + match join_result { + Ok(()) => None, + Err(e) => Some((Err(e), state)), + } + } else { + None + } + } + } + }) } } @@ -442,7 +433,7 @@ impl IntoIterator for ExecutionEngineResult { fn into_iter(self) -> Self::IntoIter { ExecutionEngineReceiverIterator { - receiver: self.receiver, + receiver: self.receiver.into_inner().to_sync(), handle: Some(self.handle), } } diff --git a/src/daft-local-execution/src/runtime_stats.rs b/src/daft-local-execution/src/runtime_stats.rs index 835997c5b7..7cc6e77e2a 100644 --- a/src/daft-local-execution/src/runtime_stats.rs +++ b/src/daft-local-execution/src/runtime_stats.rs @@ -9,7 +9,7 @@ use std::{ }; use daft_micropartition::MicroPartition; -use loole::SendError; +use kanal::SendError; use tracing::{instrument::Instrumented, Instrument}; use crate::{ @@ -174,10 +174,7 @@ impl CountingSender { } } #[inline] - pub(crate) async fn send( - &self, - v: Arc, - ) -> Result<(), SendError>> { + pub(crate) async fn send(&self, v: Arc) -> Result<(), SendError> { self.rt.mark_rows_emitted(v.len() as u64); if let Some(ref pb) = self.progress_bar { pb.render(); diff --git a/src/daft-local-execution/src/sinks/blocking_sink.rs b/src/daft-local-execution/src/sinks/blocking_sink.rs index efb09c2be5..ed68042be7 100644 --- a/src/daft-local-execution/src/sinks/blocking_sink.rs +++ b/src/daft-local-execution/src/sinks/blocking_sink.rs @@ -184,7 +184,7 @@ impl PipelineNode for BlockingSinkNode { progress_bar.clone(), ); - let (destination_sender, destination_receiver) = create_channel(1); + let (destination_sender, destination_receiver) = create_channel(0); let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar); diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs index d054be142b..b8dd330195 100644 --- a/src/daft-local-execution/src/sinks/streaming_sink.rs +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -234,7 +234,7 @@ impl PipelineNode for StreamingSinkNode { )); } - let (destination_sender, destination_receiver) = create_channel(1); + let (destination_sender, destination_receiver) = create_channel(0); let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar); diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index 6fad499f57..bad39d4070 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -117,7 +117,7 @@ impl PipelineNode for SourceNode { ); let source = self.source.clone(); let io_stats = self.io_stats.clone(); - let (destination_sender, destination_receiver) = create_channel(1); + let (destination_sender, destination_receiver) = create_channel(0); let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar); runtime_handle.spawn( diff --git a/src/daft-micropartition/src/ops/eval_expressions.rs b/src/daft-micropartition/src/ops/eval_expressions.rs index 14baff4c37..29a6be0531 100644 --- a/src/daft-micropartition/src/ops/eval_expressions.rs +++ b/src/daft-micropartition/src/ops/eval_expressions.rs @@ -54,6 +54,36 @@ impl MicroPartition { )) } + pub async fn par_eval_expression_list( + &self, + exprs: &[ExprRef], + num_parallel_tasks: usize, + ) -> DaftResult { + let io_stats = IOStatsContext::new("MicroPartition::eval_expression_list"); + + let expected_schema = infer_schema(exprs, &self.schema)?; + + let tables = self.tables_or_read(io_stats)?; + + let evaluated_table_futs = tables + .iter() + .map(|table| table.par_eval_expression_list(exprs, num_parallel_tasks)); + + let evaluated_tables = futures::future::try_join_all(evaluated_table_futs).await?; + + let eval_stats = self + .statistics + .as_ref() + .map(|table_statistics| table_statistics.eval_expression_list(exprs, &expected_schema)) + .transpose()?; + + Ok(Self::new_loaded( + expected_schema.into(), + Arc::new(evaluated_tables), + eval_stats, + )) + } + pub fn explode(&self, exprs: &[ExprRef]) -> DaftResult { let io_stats = IOStatsContext::new("MicroPartition::explode"); diff --git a/src/daft-recordbatch/Cargo.toml b/src/daft-recordbatch/Cargo.toml index c6dc83e282..5770e04d10 100644 --- a/src/daft-recordbatch/Cargo.toml +++ b/src/daft-recordbatch/Cargo.toml @@ -4,10 +4,12 @@ comfy-table = {workspace = true} common-arrow-ffi = {path = "../common/arrow-ffi", default-features = false} common-display = {path = "../common/display", default-features = false} common-error = {path = "../common/error", default-features = false} +common-runtime = {path = "../common/runtime", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} daft-image = {path = "../daft-image", default-features = false} daft-logical-plan = {path = "../daft-logical-plan", default-features = false} +futures = {workspace = true} html-escape = {workspace = true} indexmap = {workspace = true} num-traits = {workspace = true} diff --git a/src/daft-recordbatch/src/lib.rs b/src/daft-recordbatch/src/lib.rs index 8107c2df5b..44cb9dcc5d 100644 --- a/src/daft-recordbatch/src/lib.rs +++ b/src/daft-recordbatch/src/lib.rs @@ -13,6 +13,7 @@ use std::{ use arrow2::array::Array; use common_display::table_display::{make_comfy_table, StrValue}; use common_error::{DaftError, DaftResult}; +use common_runtime::get_compute_runtime; use daft_core::{ array::ops::{ full::FullNull, DaftApproxCountDistinctAggable, DaftHllSketchAggable, GroupIndices, @@ -24,6 +25,7 @@ use daft_dsl::{ LiteralValue, SketchType, }; use daft_logical_plan::FileInfos; +use futures::{StreamExt, TryStreamExt}; use num_traits::ToPrimitive; #[cfg(feature = "python")] pub mod ffi; @@ -675,6 +677,59 @@ impl RecordBatch { .map(|e| self.eval_expression(e)) .try_collect()?; + self.process_eval_results(exprs, result_series) + } + + pub async fn par_eval_expression_list( + &self, + exprs: &[ExprRef], + num_parallel_tasks: usize, + ) -> DaftResult { + // Partition the expressions into compute and non-compute + let (compute_exprs, non_compute_exprs): (Vec<_>, Vec<_>) = exprs + .iter() + .cloned() + .enumerate() + .partition(|(_, e)| e.has_compute()); + + // Evaluate non-compute expressions + let non_compute_results = non_compute_exprs + .into_iter() + .map(|(i, e)| (i, self.eval_expression(&e))) + .collect::>(); + + // Spawn tasks for the compute expressions + let compute_runtime = get_compute_runtime(); + let compute_futures = compute_exprs.into_iter().map(|(i, e)| { + let table = self.clone(); + compute_runtime.spawn(async move { (i, table.eval_expression(&e)) }) + }); + + // Collect the results of the compute expressions + let compute_results = futures::stream::iter(compute_futures) + .buffered(num_parallel_tasks) + .try_collect::>() + .await?; + + // Combine and sort by original index + let mut all_results = non_compute_results; + all_results.extend(compute_results); + all_results.sort_by_key(|(i, _)| *i); + + // Extract just the results in order + let result_series = all_results + .into_iter() + .map(|(_, result)| result) + .collect::>>()?; + + self.process_eval_results(exprs, result_series) + } + + fn process_eval_results( + &self, + exprs: &[ExprRef], + result_series: Vec, + ) -> DaftResult { let fields: Vec<_> = result_series.iter().map(|s| s.field().clone()).collect(); let mut seen = HashSet::new();