Skip to content

Commit

Permalink
Morsel-driven Parallelism using rayon (apache#2199)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Apr 13, 2022
1 parent 8058fbb commit f43b114
Show file tree
Hide file tree
Showing 12 changed files with 1,355 additions and 22 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ The parquet SQL benchmarks can be run with
cargo bench --bench parquet_query_sql
```

These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.
These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/scheduler/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.

If the environment variable `PARQUET_FILE` is set, the benchmark will run queries against this file instead of a randomly generated one. This can be useful for performing multiple runs, potentially with different code, against the same source data, or for testing against a custom dataset.

Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

[workspace]
members = [
"datafusion/core",
"datafusion/common",
"datafusion/core",
"datafusion/expr",
"datafusion/jit",
"datafusion/physical-expr",
"datafusion/proto",
"datafusion/scheduler",
"datafusion-examples",
"benchmarks",
"ballista/rust/client",
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ name = "scalar"
harness = false
name = "physical_plan"

[[bench]]
harness = false
name = "parquet_query_sql"

[[bench]]
harness = false
name = "jit"
Expand Down
56 changes: 56 additions & 0 deletions datafusion/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-scheduler"
description = "Scheduling for DataFusion query engine"
version = "7.0.0"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
readme = "../README.md"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
license = "Apache-2.0"
keywords = ["arrow", "query", "sql"]
edition = "2021"
rust-version = "1.58"

[lib]
name = "datafusion_scheduler"
path = "src/lib.rs"

[features]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "11" }
async-trait = "0.1"
datafusion = { path = "../core", version = "7.0.0" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
rayon = "1.5"

[dev-dependencies]
criterion = "0.3"
rand = "0.8"
tokio = { version = "1.0", features = ["macros", "rt"] }
parquet = "11.0"
tempfile = "3"

[[bench]]
harness = false
name = "parquet_query_sql"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use arrow::datatypes::{
};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_scheduler::Scheduler;
use futures::stream::StreamExt;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::distributions::uniform::SampleUniform;
Expand All @@ -37,7 +39,6 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tempfile::NamedTempFile;
use tokio_stream::StreamExt;

/// The number of batches to write
const NUM_BATCHES: usize = 2048;
Expand Down Expand Up @@ -193,15 +194,24 @@ fn criterion_benchmark(c: &mut Criterion) {
assert!(Path::new(&file_path).exists(), "path not found");
println!("Using parquet file {}", file_path);

let context = SessionContext::new();
let partitions = 4;
let config = SessionConfig::new().with_target_partitions(partitions);
let mut context = SessionContext::with_config(config);

let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
rt.block_on(context.register_parquet(
"t",
file_path.as_str(),
ParquetReadOptions::default(),
))
.unwrap();
let scheduler = Scheduler::new(partitions);

let local_rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let query_rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(partitions)
.build()
.unwrap();

local_rt
.block_on(context.register_parquet("t", file_path.as_str()))
.unwrap();

// We read the queries from a file so they can be changed without recompiling the benchmark
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
Expand All @@ -220,17 +230,42 @@ fn criterion_benchmark(c: &mut Criterion) {
continue;
}

let query = query.as_str();
c.bench_function(query, |b| {
c.bench_function(&format!("tokio: {}", query), |b| {
b.iter(|| {
let context = context.clone();
rt.block_on(async move {
let query = context.sql(query).await.unwrap();
let query = query.clone();
let mut context = context.clone();
let (sender, mut receiver) = futures::channel::mpsc::unbounded();

// Spawn work to a separate tokio thread pool
query_rt.spawn(async move {
let query = context.sql(&query).await.unwrap();
let mut stream = query.execute_stream().await.unwrap();
while criterion::black_box(stream.next().await).is_some() {}

while let Some(next) = stream.next().await {
sender.unbounded_send(next).unwrap();
}
});

local_rt.block_on(async {
while receiver.next().await.transpose().unwrap().is_some() {}
})
});
});

c.bench_function(&format!("scheduled: {}", query), |b| {
b.iter(|| {
let query = query.clone();
let mut context = context.clone();

local_rt.block_on(async {
let query = context.sql(&query).await.unwrap();
let plan = query.create_physical_plan().await.unwrap();
let mut stream =
scheduler.schedule(plan, context.task_ctx()).unwrap();
while stream.next().await.transpose().unwrap().is_some() {}
});
});
});
}

// Temporary file must outlive the benchmarks, it is deleted when dropped
Expand Down
Loading

0 comments on commit f43b114

Please sign in to comment.