From 73fba63c0b405d1613c9aceccefd48876aa3b80a Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 4 Jun 2024 09:05:07 +0100 Subject: [PATCH] [experiment] Try threadpool instead of rayon for io pool --- Cargo.lock | 11 ++++++++++- crates/rocksdb/Cargo.toml | 2 +- crates/rocksdb/src/db_manager.rs | 26 ++++++++++++-------------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a562097301..7896cd439c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5950,7 +5950,6 @@ dependencies = [ "metrics", "once_cell", "parking_lot", - "rayon", "restate-core", "restate-errors", "restate-serde-util", @@ -5962,6 +5961,7 @@ dependencies = [ "strum 0.26.2", "strum_macros 0.26.2", "thiserror", + "threadpool", "tokio", "tracing", ] @@ -7295,6 +7295,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "thrift" version = "0.17.0" diff --git a/crates/rocksdb/Cargo.toml b/crates/rocksdb/Cargo.toml index 0b929d88c4..c9343abe4e 100644 --- a/crates/rocksdb/Cargo.toml +++ b/crates/rocksdb/Cargo.toml @@ -29,13 +29,13 @@ futures-util = { workspace = true } metrics = {workspace = true } once_cell = { workspace = true } parking_lot = { workspace = true } -rayon = { workspace = true } rocksdb = { workspace = true } smartstring = { workspace = true } static_assertions = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } +threadpool = { version = "1.8" } tokio = { workspace = true } tracing = { workspace = true } diff --git a/crates/rocksdb/src/db_manager.rs b/crates/rocksdb/src/db_manager.rs index b6a667b6a2..7e86115599 100644 --- a/crates/rocksdb/src/db_manager.rs +++ b/crates/rocksdb/src/db_manager.rs @@ -49,8 +49,8 @@ pub struct RocksDbManager { dbs: RwLock>>, watchdog_tx: mpsc::UnboundedSender, shutting_down: AtomicBool, - high_pri_pool: rayon::ThreadPool, - low_pri_pool: rayon::ThreadPool, + high_pri_pool: threadpool::ThreadPool, + low_pri_pool: threadpool::ThreadPool, } impl Debug for RocksDbManager { @@ -95,17 +95,15 @@ impl RocksDbManager { env.set_background_threads(opts.rocksdb_bg_threads().get() as i32); // Create our own storage thread pools - let high_pri_pool = rayon::ThreadPoolBuilder::new() - .thread_name(|i| format!("rs:io-hi-{}", i)) + let high_pri_pool = threadpool::Builder::new() + .thread_name("rs:io-hi".to_owned()) .num_threads(opts.storage_high_priority_bg_threads().into()) - .build() - .expect("storage high priority thread pool to be created"); + .build(); - let low_pri_pool = rayon::ThreadPoolBuilder::new() - .thread_name(|i| format!("rs:io-lo-{}", i)) + let low_pri_pool = threadpool::Builder::new() + .thread_name("rs:io-lo".to_owned()) .num_threads(opts.storage_low_priority_bg_threads().into()) - .build() - .expect("storage low priority thread pool to be created"); + .build(); let dbs = RwLock::default(); @@ -377,8 +375,8 @@ impl RocksDbManager { let (tx, rx) = tokio::sync::oneshot::channel(); let priority = task.priority; match priority { - Priority::High => self.high_pri_pool.spawn(task.into_async_runner(tx)), - Priority::Low => self.low_pri_pool.spawn(task.into_async_runner(tx)), + Priority::High => self.high_pri_pool.execute(task.into_async_runner(tx)), + Priority::Low => self.low_pri_pool.execute(task.into_async_runner(tx)), } rx.await.map_err(|_| ShutdownError) } @@ -403,8 +401,8 @@ impl RocksDbManager { OP: FnOnce() + Send + 'static, { match task.priority { - Priority::High => self.high_pri_pool.spawn(task.into_runner()), - Priority::Low => self.low_pri_pool.spawn(task.into_runner()), + Priority::High => self.high_pri_pool.execute(task.into_runner()), + Priority::Low => self.low_pri_pool.execute(task.into_runner()), } } }