Skip to content

Commit

Permalink
perf(swordfish): Parallel expression evaluation (#3593)
Browse files Browse the repository at this point in the history
Addresses: #3389. More
generally, this PR optimizes for projections with many expressions,
particularly memory intensive expressions like UDFs.

**Problem:** 

Currently, swordfish parallelizes projections across morsels, with 1 CPU
per morsel. However, if each projection has many memory intensive
expressions, we could experience a massive inflation in memory because
we will have many materialized morsels living in memory at once.

**Proposed solution:** 

Instead, we can parallelize the expressions within the projection (but
only for expressions that require compute). This way, we still have good
CPU utilization, but we keep a lower number of materialized morsels in
memory.

In the linked issue above, we see that a 128cpu machine will parallelize
morsels across the cores, each doing multiple udfs, resulting in "317GB
allocations and duration 351 secs".

This PR reduces that to 7.8GB peak memory and runtime of 66 seconds.
<img width="1187" alt="Screenshot 2024-12-17 at 3 54 06 PM"
src="https://github.com/user-attachments/assets/88f4ad49-a1d3-4659-b49f-8364214ee146"
/>


**Notes:**
- Found a bug with the loole channels where an `async` send to a `sync`
receive was not respecting capacity constraints, and was allowing sends
even though the receive did not happen. Moved over to
https://github.com/fereidani/kanal, which worked much better.

Todos for next time:
- We should also be able to parallelize expression evaluation within a
single expression, since it is a tree. We can calculate max width of the
tree and set that as max parallel tasks.

---------

Co-authored-by: EC2 Default User <ec2-user@ip-172-31-50-162.us-west-2.compute.internal>
Co-authored-by: Colin Ho <colinho@Colins-MBP.localdomain>
Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local>
  • Loading branch information
4 people authored Feb 13, 2025
1 parent 246e3e9 commit 1ae9605
Show file tree
Hide file tree
Showing 16 changed files with 281 additions and 135 deletions.
82 changes: 42 additions & 40 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions src/daft-dsl/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,35 @@ impl Expr {
}
}

pub fn has_compute(&self) -> bool {
match self {
Self::Column(..) => false,
Self::Literal(..) => false,
Self::Subquery(..) => false,
Self::Exists(..) => false,
Self::OuterReferenceColumn(..) => false,
Self::Function { .. } => true,
Self::ScalarFunction(..) => true,
Self::Agg(_) => true,
Self::IsIn(..) => true,
Self::Between(..) => true,
Self::BinaryOp { .. } => true,
Self::Alias(expr, ..) => expr.has_compute(),
Self::Cast(expr, ..) => expr.has_compute(),
Self::Not(expr) => expr.has_compute(),
Self::IsNull(expr) => expr.has_compute(),
Self::NotNull(expr) => expr.has_compute(),
Self::FillNull(expr, fill_value) => expr.has_compute() || fill_value.has_compute(),
Self::IfElse {
if_true,
if_false,
predicate,
} => if_true.has_compute() || if_false.has_compute() || predicate.has_compute(),
Self::InSubquery(expr, _) => expr.has_compute(),
Self::List(..) => true,
}
}

pub fn eq_null_safe(self: ExprRef, other: ExprRef) -> ExprRef {
binary_op(Operator::EqNullSafe, self, other)
}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ futures = {workspace = true}
indexmap = {workspace = true}
indicatif = "0.17.9"
itertools = {workspace = true}
kanal = "0.1.0-pre8"
lazy_static = {workspace = true}
log = {workspace = true}
loole = "0.4.0"
num-format = {workspace = true}
pin-project = "1"
pyo3 = {workspace = true, optional = true}
Expand Down
20 changes: 8 additions & 12 deletions src/daft-local-execution/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
#[derive(Clone)]
pub(crate) struct Sender<T>(loole::Sender<T>);
pub(crate) struct Sender<T>(kanal::AsyncSender<T>);
impl<T> Sender<T> {
pub(crate) async fn send(&self, val: T) -> Result<(), loole::SendError<T>> {
self.0.send_async(val).await
pub(crate) async fn send(&self, val: T) -> Result<(), kanal::SendError> {
self.0.send(val).await
}
}

#[derive(Clone)]
pub(crate) struct Receiver<T>(loole::Receiver<T>);
pub(crate) struct Receiver<T>(kanal::AsyncReceiver<T>);
impl<T> Receiver<T> {
pub(crate) async fn recv(&self) -> Option<T> {
self.0.recv_async().await.ok()
self.0.recv().await.ok()
}

pub(crate) fn blocking_recv(&self) -> Option<T> {
self.0.recv().ok()
}

pub(crate) fn into_inner(self) -> loole::Receiver<T> {
pub(crate) fn into_inner(self) -> kanal::AsyncReceiver<T> {
self.0
}
}

pub(crate) fn create_channel<T: Clone>(buffer_size: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = loole::bounded(buffer_size);
let (tx, rx) = kanal::bounded_async::<T>(buffer_size);
(Sender(tx), Receiver(rx))
}

Expand All @@ -36,7 +32,7 @@ pub(crate) fn create_ordering_aware_receiver_channel<T: Clone>(
) -> (Vec<Sender<T>>, OrderingAwareReceiver<T>) {
match ordered {
true => {
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(1)).unzip();
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(0)).unzip();
(
senders,
OrderingAwareReceiver::InOrder(RoundRobinReceiver::new(receiver)),
Expand Down
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl DispatchSpawner for RoundRobinDispatcher {
runtime_handle: &mut RuntimeHandle,
) -> SpawnedDispatchResult {
let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) =
(0..num_workers).map(|_| create_channel(1)).unzip();
(0..num_workers).map(|_| create_channel(0)).unzip();
let morsel_size = self.morsel_size;
let task = runtime_handle.spawn(async move {
Self::dispatch_inner(worker_senders, input_receivers, morsel_size).await
Expand Down Expand Up @@ -213,7 +213,7 @@ impl DispatchSpawner for PartitionedDispatcher {
runtime_handle: &mut RuntimeHandle,
) -> SpawnedDispatchResult {
let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) =
(0..num_workers).map(|_| create_channel(1)).unzip();
(0..num_workers).map(|_| create_channel(0)).unzip();
let partition_by = self.partition_by.clone();
let dispatch_task = runtime_handle.spawn(async move {
Self::dispatch_inner(worker_senders, input_receivers, partition_by).await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl PipelineNode for IntermediateNode {
let num_workers = op.max_concurrency().context(PipelineExecutionSnafu {
node_name: self.name(),
})?;
let (destination_sender, destination_receiver) = create_channel(1);
let (destination_sender, destination_receiver) = create_channel(0);
let counting_sender =
CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar);

Expand Down
Loading

0 comments on commit 1ae9605

Please sign in to comment.