Skip to content

Commit

Permalink
Parallel merge sort (apache#6162)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 9, 2023
1 parent e4bf3fa commit 8733405
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 30 deletions.
26 changes: 26 additions & 0 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryReservation;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
Expand Down Expand Up @@ -180,6 +181,31 @@ pub(crate) fn spawn_execution(
})
}

/// If running in a tokio context spawns the execution of `stream` to a separate task
/// allowing it to execute in parallel with an intermediate buffer of size `buffer`
pub(crate) fn spawn_buffered(
mut input: SendableRecordBatchStream,
buffer: usize,
) -> SendableRecordBatchStream {
// Use tokio only if running from a tokio context (#2201)
let handle = match tokio::runtime::Handle::try_current() {
Ok(handle) => handle,
Err(_) => return input,
};

let schema = input.schema();
let (sender, receiver) = mpsc::channel(buffer);
let join = handle.spawn(async move {
while let Some(item) = input.next().await {
if sender.send(item).await.is_err() {
return;
}
}
});

RecordBatchReceiverStream::create(&schema, receiver, join)
}

/// Computes the statistics for an in-memory RecordBatch
///
/// Only computes statistics that are in arrows metadata (num rows, byte size and nulls)
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::execution::memory_pool::{
human_readable_size, MemoryConsumer, MemoryReservation,
};
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::common::{batch_byte_size, IPCWriter};
use crate::physical_plan::common::{batch_byte_size, spawn_buffered, IPCWriter};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet,
Expand Down Expand Up @@ -284,11 +284,13 @@ impl ExternalSorter {
self.partition_id,
&self.runtime.memory_pool,
);
sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)
Ok(spawn_buffered(
sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?,
1,
))
})
.collect::<Result<_>>()?;

// TODO: Run batch sorts concurrently (#6162)
// TODO: Pushdown fetch to streaming merge (#6000)

streaming_merge(
Expand Down
36 changes: 9 additions & 27 deletions datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@ use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use log::{debug, trace};
use tokio::sync::mpsc;

use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::common::spawn_buffered;
use crate::physical_plan::metrics::{
ExecutionPlanMetricsSet, MemTrackingMetrics, MetricsSet,
};
use crate::physical_plan::sorts::streaming_merge;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{
common::spawn_execution, expressions::PhysicalSortExpr, DisplayFormatType,
Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
expressions::PhysicalSortExpr, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};

Expand Down Expand Up @@ -181,29 +180,12 @@ impl ExecutionPlan for SortPreservingMergeExec {
result
}
_ => {
// Use tokio only if running from a tokio context (#2201)
let receivers = match tokio::runtime::Handle::try_current() {
Ok(_) => (0..input_partitions)
.map(|part_i| {
let (sender, receiver) = mpsc::channel(1);
let join_handle = spawn_execution(
self.input.clone(),
sender,
part_i,
context.clone(),
);

RecordBatchReceiverStream::create(
&schema,
receiver,
join_handle,
)
})
.collect(),
Err(_) => (0..input_partitions)
.map(|partition| self.input.execute(partition, context.clone()))
.collect::<Result<_>>()?,
};
let receivers = (0..input_partitions)
.map(|partition| {
let stream = self.input.execute(partition, context.clone())?;
Ok(spawn_buffered(stream, 1))
})
.collect::<Result<_>>()?;

debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute");

Expand Down

0 comments on commit 8733405

Please sign in to comment.