Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explicit threads names #3110

Merged
merged 6 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion massa-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ fn serve(api: impl Endpoints, url: &SocketAddr) -> StopHandle {
.expect("Unable to start RPC server");

let close_handle = server.close_handle();
let join_handle = thread::spawn(|| server.wait());
let thread_builder = thread::Builder::new().name("rpc-server".into());
let join_handle = thread_builder
.spawn(|| server.wait())
.expect("failed to spawn thread : rpc-server");

StopHandle {
close_handle,
Expand Down
18 changes: 16 additions & 2 deletions massa-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use massa_wallet::Wallet;
use serde::Serialize;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use structopt::StructOpt;

mod cmds;
Expand Down Expand Up @@ -80,8 +81,21 @@ fn ask_password(wallet_path: &Path) -> String {
}

#[paw::main]
#[tokio::main]
async fn main(args: Args) -> Result<()> {
fn main(args: Args) -> anyhow::Result<()> {
let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-client-{}", id)
})
.enable_all()
.build()
.unwrap();

tokio_rt.block_on(run(args))
}

async fn run(args: Args) -> Result<()> {
// TODO: move settings loading in another crate ... see #1277
let settings = SETTINGS.clone();
let address = match args.ip {
Expand Down
11 changes: 7 additions & 4 deletions massa-execution-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use massa_storage::Storage;
use massa_time::MassaTime;
use parking_lot::{Condvar, Mutex, RwLock};
use std::sync::Arc;
use std::thread;
use tracing::debug;

/// Structure gathering all elements needed by the execution thread
Expand Down Expand Up @@ -261,10 +262,12 @@ pub fn start_execution_worker(

// launch the execution thread
let input_data_clone = input_data.clone();
let thread_handle = std::thread::spawn(move || {
ExecutionThread::new(config, input_data_clone, execution_state, selector).main_loop();
});

let thread_builder = thread::Builder::new().name("execution".into());
let thread_handle = thread_builder
.spawn(move || {
ExecutionThread::new(config, input_data_clone, execution_state, selector).main_loop();
})
.expect("failed to spawn thread : execution");
// create a manager
let manager = ExecutionManagerImpl {
input_data,
Expand Down
4 changes: 2 additions & 2 deletions massa-factory-worker/src/block_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl BlockFactoryWorker {
factory_receiver: mpsc::Receiver<()>,
) -> thread::JoinHandle<()> {
thread::Builder::new()
.name("block factory worker".into())
.name("block-factory".into())
.spawn(|| {
let mut this = Self {
cfg,
Expand All @@ -48,7 +48,7 @@ impl BlockFactoryWorker {
};
this.run();
})
.expect("could not spawn block factory worker thread")
.expect("failed to spawn thread : block-factory")
}

/// Gets the next slot and the instant when it will happen.
Expand Down
4 changes: 2 additions & 2 deletions massa-factory-worker/src/endorsement_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl EndorsementFactoryWorker {
factory_receiver: mpsc::Receiver<()>,
) -> thread::JoinHandle<()> {
thread::Builder::new()
.name("endorsement factory worker".into())
.name("endorsement-factory".into())
.spawn(|| {
let mut this = Self {
half_t0: cfg
Expand All @@ -54,7 +54,7 @@ impl EndorsementFactoryWorker {
};
this.run();
})
.expect("could not spawn endorsement factory worker thread")
.expect("failed to spawn thread : endorsement-factory")
}

/// Gets the next slot and the instant when the corresponding endorsements should be made.
Expand Down
52 changes: 35 additions & 17 deletions massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use massa_time::MassaTime;
use massa_wallet::Wallet;
use parking_lot::RwLock;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{path::Path, process, sync::Arc};
use structopt::StructOpt;
use tokio::signal;
Expand Down Expand Up @@ -487,23 +488,27 @@ async fn launch(
use std::thread;
use std::time::Duration;
// Create a background thread which checks for deadlocks every 10s
let handler2 = thread::spawn(move || loop {
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
println!("deadlocks check");
if deadlocks.is_empty() {
continue;
}
let thread_builder = thread::Builder::new().name("deadlock-detection".into());
thread_builder
.spawn(move || loop {
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
println!("deadlocks check");

if deadlocks.is_empty() {
continue;
}

println!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
println!("Deadlock #{}", i);
for t in threads {
println!("Thread Id {:#?}", t.thread_id());
println!("{:#?}", t.backtrace());
println!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
println!("Deadlock #{}", i);
for t in threads {
println!("Thread Id {:#?}", t.thread_id());
println!("{:#?}", t.backtrace());
}
}
}
});
})
.expect("failed to spawn thread : deadlock-detection");
}
(
consensus_event_receiver,
Expand Down Expand Up @@ -629,8 +634,21 @@ fn load_wallet(password: Option<String>, path: &Path) -> anyhow::Result<Arc<RwLo
}

#[paw::main]
#[tokio::main]
async fn main(args: Args) -> anyhow::Result<()> {
fn main(args: Args) -> anyhow::Result<()> {
let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-node-{}", id)
})
.enable_all()
.build()
.unwrap();

tokio_rt.block_on(run(args))
}

async fn run(args: Args) -> anyhow::Result<()> {
use tracing_subscriber::prelude::*;
// spawn the console server in the background, returning a `Layer`:
let tracing_layer = tracing_subscriber::fmt::layer()
Expand Down
35 changes: 21 additions & 14 deletions massa-pool-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use massa_pool_exports::{PoolController, PoolManager};
use massa_storage::Storage;
use parking_lot::RwLock;
use std::sync::mpsc::RecvError;
use std::thread;
use std::{
sync::mpsc::{sync_channel, Receiver},
sync::Arc,
Expand All @@ -31,13 +32,16 @@ impl EndorsementPoolThread {
receiver: Receiver<Command>,
endorsement_pool: Arc<RwLock<EndorsementPool>>,
) -> JoinHandle<()> {
std::thread::spawn(|| {
let this = Self {
receiver,
endorsement_pool,
};
this.run()
})
let thread_builder = thread::Builder::new().name("endorsement-pool".into());
thread_builder
.spawn(|| {
let this = Self {
receiver,
endorsement_pool,
};
this.run()
})
.expect("failed to spawn thread : endorsement-pool")
}

/// Runs the thread
Expand Down Expand Up @@ -72,13 +76,16 @@ impl OperationPoolThread {
receiver: Receiver<Command>,
operation_pool: Arc<RwLock<OperationPool>>,
) -> JoinHandle<()> {
std::thread::spawn(|| {
let this = Self {
receiver,
operation_pool,
};
this.run()
})
let thread_builder = thread::Builder::new().name("operation-pool".into());
thread_builder
.spawn(|| {
let this = Self {
receiver,
operation_pool,
};
this.run()
})
.expect("failed to spawn thread : operation-pool")
}

/// Run the thread.
Expand Down
20 changes: 12 additions & 8 deletions massa-pos-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::VecDeque;
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

/// Structure gathering all elements needed by the selector thread
Expand All @@ -38,14 +39,17 @@ impl SelectorThread {
cache: DrawCachePtr,
cfg: SelectorConfig,
) -> JoinHandle<PosResult<()>> {
std::thread::spawn(|| {
let this = Self {
input_mpsc,
cache,
cfg,
};
this.run()
})
let thread_builder = thread::Builder::new().name("selector".into());
thread_builder
.spawn(|| {
let this = Self {
input_mpsc,
cache,
cfg,
};
this.run()
})
.expect("failed to spawn thread : selector")
}

/// process the result of a draw
Expand Down
2 changes: 1 addition & 1 deletion massa-protocol-worker/src/sig_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use massa_hash::Hash;
use massa_protocol_exports::ProtocolError;
use massa_signature::{verify_signature_batch, PublicKey, Signature};
use rayon::prelude::*;
use rayon::{prelude::ParallelIterator, slice::ParallelSlice};

/// Limit for small batch optimization
const SMALL_BATCH_LIMIT: usize = 2;
Expand Down