Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Morsel-Driven Parallelism Using Rayon #2199

Closed
tustvold opened this issue Apr 11, 2022 · 4 comments · Fixed by #2226
Closed

Morsel-Driven Parallelism Using Rayon #2199

tustvold opened this issue Apr 11, 2022 · 4 comments · Fixed by #2226
Labels
enhancement New feature or request

Comments

@tustvold
Copy link
Contributor

tustvold commented Apr 11, 2022

UPDATE June 2024: DataFusion does not use Morsel Driven Parallelism, instead it uses volcano pull + exchange style execution

You can read more about details and analysis in https://dl.acm.org/doi/10.1145/3626246.3653368

A proposal for reformulating the parallelism story within DataFusion to use a morsel-driven approach based on rayon. More details, background, and discussion can be found in the proposal document here, please feel free to comment there.

The keys highlights are:

  • Decouples parallelism from the partitioning expressed in the physical plan, allowing for:
    • Better handling of imbalanced partitions
    • Adaptive parallelism based on compute availability at execution time
    • Parallelism within a partition, such as decoding parquet columns in parallel, parallel sort, etc...
  • The first step to reducing the complexity associated with the current futures-based concurrency model
  • Improvements to thread-locality, observability and performance
@tustvold tustvold added the enhancement New feature or request label Apr 11, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Apr 13, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Apr 13, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Apr 13, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Apr 13, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Apr 13, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Apr 13, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Apr 13, 2022
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Apr 13, 2022
alamb pushed a commit that referenced this issue May 4, 2022
…ayon (#2199) (#2226)

* Morsel-driven Parallelism using rayon (#2199)

* Fix LIFO spawn ordering

* Further docs for ExecutionPipeline

* Deduplicate concurrent wakes

* Add license headers

* Sort Cargo.toml

* Revert accidental change to ParquetExec

* Handle wakeups triggered by other threads

* Use SeqCst memory ordering

* Review feedback

* Add panic handler

* Cleanup structs

Add test of tokio interoperation

* Review feedback

* Use BatchPartitioner

Cleanup error handling

* Clarify shutdown characteristics

* Fix racy test_panic

* Don't overload Query nomenclature

* Rename QueryResults to ExecutionResults

* Further review feedback

* Merge scheduler into datafusion/core

* Review feedback

* Fix partitioned execution

* Format

* Format Cargo.toml

* Fix doc link
@JasonLi-cn
Copy link
Contributor

  1. binary code
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion::scheduler::Scheduler;
use futures::{StreamExt, TryStreamExt};
use std::env;

#[tokio::main]
async fn main() -> Result<()> {
    let name = "test_table";
    let mut args = env::args();
    args.next();
    let table_path = args.next().expect("parquet file");
    let sql = &args.next().expect("sql");
    let using_scheduler = args.next().is_some();

    // create local session context
    let config = SessionConfig::new()
        .with_information_schema(true)
        .with_target_partitions(4);
    let context = SessionContext::with_config(config);

    // register parquet file with the execution context
    context
        .register_parquet(name, &table_path, ParquetReadOptions::default())
        .await?;

    let task = context.task_ctx();
    let query = context.sql(sql).await.unwrap();
    let plan = query.create_physical_plan().await.unwrap();

    println!("Start query, using scheduler {}", using_scheduler);
    let now = std::time::Instant::now();
    let results = if using_scheduler {
        let scheduler = Scheduler::new(4);
        let stream = scheduler.schedule(plan, task).unwrap().stream();
        let results: Vec<RecordBatch> = stream.try_collect().await.unwrap();
        results
    } else {
        context.sql(sql).await?.collect().await?
    };
    let elapsed = now.elapsed().as_millis();
    println!("End query, elapsed {} ms", elapsed);
    print_batches(&results)?;
    Ok(())
}

/// Execute sql
async fn plan_and_collect(
    context: &SessionContext,
    sql: &str,
) -> Result<Vec<RecordBatch>> {
    context.sql(sql).await?.collect().await
}
  1. test data
  • format: parquet
  • number of files: 4
  • rows: 16405852 * 4 = 65623408
  • number of columns: 6
  • schema: uint32, uint32, uint32, uint32, string, uint32
  1. test result

SQLs:

select count(distinct column0) from test_table;
select * from test_table order by column5 limit 10;

The performance is similar with and without the Scheduler! Is there a problem with where I use it?

@tustvold

@tustvold
Copy link
Contributor Author

Yes that is expected, I've had to park working on this for a bit in favour of some other things. See #2504 for the follow on work

@JasonLi-cn
Copy link
Contributor

Yes that is expected, I've had to park working on this for a bit in favour of some other things. See #2504 for the follow on work

Ok, thanks!
By the way, do you know the difference between ClickHouse's Query Execution Pipeline and Datafusion's Execution model(likes vectorized volcano model)? And what are the advantages of ClickHouse?

@alamb
Copy link
Contributor

alamb commented Jun 19, 2024

Updated description of this ticket to note DF doesn't use morsel driven parallelism, and added link to the paper https://dl.acm.org/doi/10.1145/3626246.3653368

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
4 participants