diff --git a/Cargo.lock b/Cargo.lock index 88544f30d3e..822af0fa039 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -634,9 +634,9 @@ dependencies = [ [[package]] name = "darling" -version = "0.13.4" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" +checksum = "4529658bdda7fd6769b8614be250cdcfc3aeb0ee72fe66f9e41e5e5eb73eac02" dependencies = [ "darling_core", "darling_macro", @@ -644,9 +644,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.13.4" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" +checksum = "649c91bc01e8b1eac09fb91e8dbc7d517684ca6be8ebc75bb9cafc894f9fdb6f" dependencies = [ "fnv", "ident_case", @@ -657,9 +657,9 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.13.4" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" +checksum = "ddfc69c5bfcbd2fc09a0f38451d2daf0e372e367986a83906d1b0dbc88134fb5" dependencies = [ "darling_core", "quote", @@ -865,18 +865,18 @@ dependencies = [ [[package]] name = "enumset" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4799cdb24d48f1f8a7a98d06b7fde65a85a2d1e42b25a889f5406aa1fbefe074" +checksum = "19be8061a06ab6f3a6cf21106c873578bf01bd42ad15e0311a9c76161cb1c753" dependencies = [ "enumset_derive", ] [[package]] name = "enumset_derive" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea83a3fbdc1d999ccfbcbee717eab36f8edf2d71693a23ce0d7cca19e085304c" +checksum = "03e7b551eba279bf0fa88b83a46330168c1560a52a94f5126f892f0b364ab3e0" dependencies = [ "darling", "proc-macro2", @@ -1283,9 +1283,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" +checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" [[package]] name = "jobserver" @@ -2050,12 +2050,10 @@ dependencies = [ name = "massa_pool_exports" version = "0.1.0" dependencies = [ - "displaydoc", "massa_models", "massa_storage", "massa_time", "serde 1.0.145", - "thiserror", ] [[package]] @@ -2070,6 +2068,7 @@ dependencies = [ "massa_storage", "num", "parking_lot", + "tracing", ] [[package]] @@ -2357,6 +2356,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.0" @@ -2573,6 +2582,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -3312,9 +3327,9 @@ dependencies = [ [[package]] name = "signature" -version = "1.6.3" +version = "1.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb766570a2825fa972bceff0d195727876a9cdf2460ab2e52d455dc2de47fd9" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" [[package]] name = "slab" @@ -3327,9 +3342,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" [[package]] name = "socket2" @@ -3411,9 +3426,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.101" +version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e90cde112c4b9690b8cbe810cba9ddd8bc1d7472e2cae317b69e9438c1cba7d2" +checksum = "3fcd952facd492f9be3ef0d0b7032a6e442ee9b361d4acc2b1d0c4aaa5f613a1" dependencies = [ "proc-macro2", "quote", @@ -3528,9 +3543,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c3f9a28b618c3a6b9251b6908e9c99e04b9e5c02e6581ccbb67d59c34ef7f9b" +checksum = "d634a985c4d4238ec39cacaed2e7ae552fbd3c476b552c1deac3021b7d7eaf0c" dependencies = [ "itoa", "libc", @@ -3632,9 +3647,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.36" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if 1.0.0", "log", @@ -3645,9 +3660,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", @@ -3656,9 +3671,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.29" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", "valuable", @@ -3677,11 +3692,11 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60db860322da191b40952ad9affe65ea23e7dd6a5c442c2c42865810c6ab8e6b" +checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" dependencies = [ - "ansi_term", + "nu-ansi-term", "sharded-slab", "smallvec", "thread_local", @@ -3854,9 +3869,9 @@ checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" [[package]] name = "wasm-encoder" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e7ca71c70a6de5b10968ae4d298e548366d9cd9588176e6ff8866f3c49c96ee" +checksum = "c64ac98d5d61192cc45c701b7e4bd0b9aff91e2edfc7a088406cfe2288581e2c" dependencies = [ "leb128", ] @@ -4131,9 +4146,9 @@ checksum = "718ed7c55c2add6548cca3ddd6383d738cd73b892df400e96b9aa876f0141d7a" [[package]] name = "wast" -version = "47.0.0" +version = "47.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "117ccfc4262e62a28a13f0548a147f19ffe71e8a08be802af23ae4ea0bedad73" +checksum = "02b98502f3978adea49551e801a6687678e6015317d7d9470a67fe813393f2a8" dependencies = [ "leb128", "memchr", diff --git a/massa-api/src/error.rs b/massa-api/src/error.rs index 6ad62228b2d..ef55848cc9d 100644 --- a/massa-api/src/error.rs +++ b/massa-api/src/error.rs @@ -6,7 +6,6 @@ use massa_execution_exports::ExecutionError; use massa_hash::MassaHashError; use massa_models::error::ModelsError; use massa_network_exports::NetworkError; -use massa_pool_exports::PoolError; use massa_protocol_exports::ProtocolError; use massa_time::TimeError; use massa_wallet::WalletError; @@ -15,8 +14,6 @@ use thiserror::Error; #[non_exhaustive] #[derive(Display, Error, Debug)] pub enum ApiError { - /// pool error: {0} - PoolError(#[from] PoolError), /// too many arguments error: {0} TooManyArguments(String), /// send channel error: {0} diff --git a/massa-consensus-exports/src/error.rs b/massa-consensus-exports/src/error.rs index 2e6907cdd12..f74321ae503 100644 --- a/massa-consensus-exports/src/error.rs +++ b/massa-consensus-exports/src/error.rs @@ -43,8 +43,6 @@ pub enum ConsensusError { SendChannelError(String), /// Receive channel error : {0} ReceiveChannelError(String), - /// pool error : {0} - PoolError(#[from] massa_pool_exports::PoolError), /// io error {0} IOError(#[from] std::io::Error), /// missing block {0} diff --git a/massa-models/src/config/constants.rs b/massa-models/src/config/constants.rs index 1a91126dafc..3e75c0b02a3 100644 --- a/massa-models/src/config/constants.rs +++ b/massa-models/src/config/constants.rs @@ -179,12 +179,13 @@ pub const BOOTSTRAP_RANDOMNESS_SIZE_BYTES: usize = 32; /// Max size of the printed error pub const MAX_BOOTSTRAP_ERROR_LENGTH: u32 = 10000; -// Protocol constants - -/// Controller channel size +/// Protocol controller channel size pub const PROTOCOL_CONTROLLER_CHANNEL_SIZE: usize = 1024; -/// Event channel size +/// Protocol event channel size pub const PROTOCOL_EVENT_CHANNEL_SIZE: usize = 1024; +/// Pool controller channel size +pub const POOL_CONTROLLER_CHANNEL_SIZE: usize = 1024; + // *********************** // Constants used for execution module (injected from ConsensusConfig) // diff --git a/massa-node/src/main.rs b/massa-node/src/main.rs index cd0a0d6bfbf..31688a5d5ed 100644 --- a/massa-node/src/main.rs +++ b/massa-node/src/main.rs @@ -42,10 +42,11 @@ use massa_models::config::constants::{ PROTOCOL_CONTROLLER_CHANNEL_SIZE, PROTOCOL_EVENT_CHANNEL_SIZE, ROLL_PRICE, T0, THREAD_COUNT, VERSION, }; +use massa_models::config::POOL_CONTROLLER_CHANNEL_SIZE; use massa_network_exports::{Establisher, NetworkConfig, NetworkManager}; use massa_network_worker::start_network_controller; -use massa_pool_exports::{PoolConfig, PoolController}; -use massa_pool_worker::start_pool; +use massa_pool_exports::{PoolConfig, PoolManager}; +use massa_pool_worker::start_pool_controller; use massa_pos_exports::{SelectorConfig, SelectorManager}; use massa_pos_worker::start_selector_worker; use massa_protocol_exports::{ProtocolConfig, ProtocolManager}; @@ -54,7 +55,7 @@ use massa_storage::Storage; use massa_time::MassaTime; use massa_wallet::Wallet; use parking_lot::RwLock; -use std::{mem, path::PathBuf}; +use std::path::PathBuf; use std::{path::Path, process, sync::Arc}; use structopt::StructOpt; use tokio::signal; @@ -72,7 +73,7 @@ async fn launch( ConsensusManager, Box, Box, - Box, + Box, ProtocolManager, NetworkManager, Box, @@ -317,9 +318,10 @@ async fn launch( operation_validity_periods: OPERATION_VALIDITY_PERIODS, max_operation_pool_size_per_thread: SETTINGS.pool.max_pool_size_per_thread, max_endorsements_pool_size_per_thread: SETTINGS.pool.max_pool_size_per_thread, + channels_size: POOL_CONTROLLER_CHANNEL_SIZE, }; - let pool_controller = start_pool(pool_config, &shared_storage, execution_controller.clone()); - let pool_manager: Box = Box::new(pool_controller.clone()); + let (pool_manager, pool_controller) = + start_pool_controller(pool_config, &shared_storage, execution_controller.clone()); // launch protocol controller let protocol_config = ProtocolConfig { @@ -357,7 +359,7 @@ async fn launch( protocol_config, network_command_sender.clone(), network_event_receiver, - pool_manager.clone(), + pool_controller.clone(), shared_storage.clone(), ) .await @@ -394,7 +396,7 @@ async fn launch( execution_controller: execution_controller.clone(), protocol_command_sender: protocol_command_sender.clone(), protocol_event_receiver, - pool_command_sender: pool_manager.clone(), + pool_command_sender: pool_controller.clone(), selector_controller: selector_controller.clone(), }, bootstrap_state.graph, @@ -417,7 +419,7 @@ async fn launch( let factory_channels = FactoryChannels { selector: selector_controller.clone(), consensus: consensus_command_sender.clone(), - pool: pool_manager.clone(), + pool: pool_controller.clone(), protocol: protocol_command_sender.clone(), storage: shared_storage.clone(), }; @@ -467,7 +469,7 @@ async fn launch( api_config, selector_controller.clone(), consensus_config, - pool_manager.clone(), + pool_controller.clone(), protocol_command_sender.clone(), network_config, *VERSION, @@ -524,7 +526,7 @@ struct Managers { consensus_manager: ConsensusManager, execution_manager: Box, selector_manager: Box, - pool_manager: Box, + pool_manager: Box, protocol_manager: ProtocolManager, network_manager: NetworkManager, factory_manager: Box, @@ -537,7 +539,7 @@ async fn stop( mut execution_manager, consensus_manager, mut selector_manager, - pool_manager, + mut pool_manager, protocol_manager, network_manager, mut factory_manager, @@ -568,8 +570,7 @@ async fn stop( .expect("consensus shutdown failed"); // stop pool - //TODO make a proper manager - mem::drop(pool_manager); + pool_manager.stop(); // stop execution controller execution_manager.stop(); diff --git a/massa-pool-exports/Cargo.toml b/massa-pool-exports/Cargo.toml index d22e838b2c3..6eaab6f8577 100644 --- a/massa-pool-exports/Cargo.toml +++ b/massa-pool-exports/Cargo.toml @@ -5,9 +5,7 @@ authors = ["Massa Labs "] edition = "2021" [dependencies] -displaydoc = "0.2" serde = { version = "1.0", features = ["derive"] } -thiserror = "1.0" # custom modules massa_models = { path = "../massa-models" } massa_storage = { path = "../massa-storage" } diff --git a/massa-pool-exports/src/config.rs b/massa-pool-exports/src/config.rs index 1175b7dbf8f..af41e3bddbe 100644 --- a/massa-pool-exports/src/config.rs +++ b/massa-pool-exports/src/config.rs @@ -22,4 +22,6 @@ pub struct PoolConfig { pub max_endorsements_pool_size_per_thread: usize, /// max number of endorsements per block pub max_block_endorsement_count: u32, + /// operations and endorsements communication channels size + pub channels_size: usize, } diff --git a/massa-pool-exports/src/controller_traits.rs b/massa-pool-exports/src/controller_traits.rs index bbd558e495c..bef47bf3022 100644 --- a/massa-pool-exports/src/controller_traits.rs +++ b/massa-pool-exports/src/controller_traits.rs @@ -7,19 +7,19 @@ use massa_storage::Storage; /// Trait defining a pool controller pub trait PoolController: Send + Sync { - /// add operations to pool + /// Asynchronously add operations to pool. Simply print a warning on failure. fn add_operations(&mut self, ops: Storage); - /// add endorsements to pool + /// Asynchronously add endorsements to pool. Simply print a warning on failure. fn add_endorsements(&mut self, endorsements: Storage); - /// notify of new consensus final periods + /// Asynchronously notify of new consensus final periods. Simply print a warning on failure. fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]); - /// get operations for block creation + /// Get operations for block creation. fn get_block_operations(&self, slot: &Slot) -> (Vec, Storage); - /// get endorsements for a block + /// Get endorsements for a block. fn get_block_endorsements( &self, target_block: &BlockId, @@ -50,3 +50,9 @@ impl Clone for Box { self.clone_box() } } + +/// Pool manager trait +pub trait PoolManager: Send + Sync { + /// Stops the worker + fn stop(&mut self); +} diff --git a/massa-pool-exports/src/error.rs b/massa-pool-exports/src/error.rs deleted file mode 100644 index f3d699179a1..00000000000 --- a/massa-pool-exports/src/error.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Copyright (c) 2022 MASSA LABS - -use displaydoc::Display; -use massa_models::error::ModelsError; -use thiserror::Error; - -/// pool error -#[non_exhaustive] -#[derive(Display, Error, Debug)] -pub enum PoolError { - /// there was an inconsistency between containers - ContainerInconsistency(String), - /// channel error : {0} - ChannelError(String), - /// models error: {0} - ModelsError(#[from] ModelsError), - /// missing operation error: {0} - MissingOperation(String), -} diff --git a/massa-pool-exports/src/lib.rs b/massa-pool-exports/src/lib.rs index cf7ad661a27..ca03fe37179 100644 --- a/massa-pool-exports/src/lib.rs +++ b/massa-pool-exports/src/lib.rs @@ -8,11 +8,9 @@ mod config; mod controller_traits; -mod error; pub use config::PoolConfig; -pub use controller_traits::PoolController; -pub use error::PoolError; +pub use controller_traits::{PoolController, PoolManager}; /// Test utils #[cfg(feature = "testing")] diff --git a/massa-pool-exports/src/test_exports/config.rs b/massa-pool-exports/src/test_exports/config.rs index 3784e8634e4..5f40d6972b5 100644 --- a/massa-pool-exports/src/test_exports/config.rs +++ b/massa-pool-exports/src/test_exports/config.rs @@ -18,6 +18,7 @@ impl Default for PoolConfig { max_operation_pool_size_per_thread: 1000, max_endorsements_pool_size_per_thread: 1000, max_block_endorsement_count: ENDORSEMENT_COUNT, + channels_size: 1024, } } } diff --git a/massa-pool-worker/Cargo.toml b/massa-pool-worker/Cargo.toml index 158d0060fe6..dcbd3334ad2 100644 --- a/massa-pool-worker/Cargo.toml +++ b/massa-pool-worker/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] num = "0.4" +tracing = "0.1" # custom modules parking_lot = { version = "0.12", features = ["deadlock_detection"] } massa_models = { path = "../massa-models" } diff --git a/massa-pool-worker/src/controller_impl.rs b/massa-pool-worker/src/controller_impl.rs index 078772e9fd3..67254720bc7 100644 --- a/massa-pool-worker/src/controller_impl.rs +++ b/massa-pool-worker/src/controller_impl.rs @@ -1,38 +1,110 @@ +// Copyright (c) 2022 MASSA LABS + +//! Pool controller implementation + use massa_models::{ block::BlockId, endorsement::EndorsementId, operation::OperationId, slot::Slot, }; -use massa_pool_exports::{PoolConfig, PoolController}; +use massa_pool_exports::{PoolConfig, PoolController, PoolManager}; use massa_storage::Storage; use parking_lot::RwLock; -use std::sync::Arc; +use std::sync::mpsc::TrySendError; +use std::sync::{mpsc::SyncSender, Arc}; +use tracing::{info, warn}; use crate::{endorsement_pool::EndorsementPool, operation_pool::OperationPool}; + +/// A generic command to send commands to a pool +pub enum Command { + /// Add items to the pool + AddItems(Storage), + /// Notify of new final consensus periods + NotifyFinalCsPeriods(Vec), + /// Stop the worker + Stop, +} + +/// Pool controller #[derive(Clone)] pub struct PoolControllerImpl { + /// Config pub(crate) _config: PoolConfig, + /// Shared reference to the operation pool pub(crate) operation_pool: Arc>, + /// Shared reference to the endorsement pool pub(crate) endorsement_pool: Arc>, + /// Operation write worker command sender + pub(crate) operations_input_sender: SyncSender, + /// Endorsement write worker command sender + pub(crate) endorsements_input_sender: SyncSender, } impl PoolController for PoolControllerImpl { - /// add operations to pool + /// Asynchronously add operations to pool. Simply print a warning on failure. fn add_operations(&mut self, ops: Storage) { - self.operation_pool.write().add_operations(ops); + match self + .operations_input_sender + .try_send(Command::AddItems(ops)) + { + Err(TrySendError::Disconnected(_)) => { + warn!("Could not add operations to pool: worker is unreachable."); + } + Err(TrySendError::Full(_)) => { + warn!("Could not add operations to pool: worker channel is full."); + } + Ok(_) => {} + } } - /// add endorsements to pool + /// Asynchronously add endorsements to pool. Simply print a warning on failure. fn add_endorsements(&mut self, endorsements: Storage) { - self.endorsement_pool.write().add_endorsements(endorsements); + match self + .endorsements_input_sender + .try_send(Command::AddItems(endorsements)) + { + Err(TrySendError::Disconnected(_)) => { + warn!("Could not add endorsements to pool: worker is unreachable."); + } + Err(TrySendError::Full(_)) => { + warn!("Could not add endorsements to pool: worker channel is full."); + } + Ok(_) => {} + } } - /// notify of new final consensus periods (1 per thread) + /// Asynchronously notify of new final consensus periods. Simply print a warning on failure. fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) { - self.operation_pool - .write() - .notify_final_cs_periods(final_cs_periods); - self.endorsement_pool - .write() - .notify_final_cs_periods(final_cs_periods); + match self + .operations_input_sender + .try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec())) + { + Err(TrySendError::Disconnected(_)) => { + warn!("Could not notify operation pool of new final slots: worker is unreachable."); + } + Err(TrySendError::Full(_)) => { + warn!( + "Could not notify operation pool of new final slots: worker channel is full." + ); + } + Ok(_) => {} + } + + match self + .endorsements_input_sender + .try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec())) + { + Err(TrySendError::Disconnected(_)) => { + warn!( + "Could not notify endorsement pool of new final slots: worker is unreachable." + ); + } + Err(TrySendError::Full(_)) => { + warn!( + "Could not notify endorsement pool of new final slots: worker channel is full." + ); + } + Ok(_) => {} + } } /// get operations for block creation @@ -79,3 +151,37 @@ impl PoolController for PoolControllerImpl { operations.iter().map(|id| lck.contains(id)).collect() } } + +/// Implementation of the pool manager. +/// +/// Contains the operations and endorsements thread handles. +pub struct PoolManagerImpl { + /// Handle used to join the operation thread + pub(crate) operations_thread_handle: Option>, + /// Handle used to join the endorsement thread + pub(crate) endorsements_thread_handle: Option>, + /// Operations input data mpsc (used to stop the pool thread) + pub(crate) operations_input_sender: SyncSender, + /// Endorsements input data mpsc (used to stop the pool thread) + pub(crate) endorsements_input_sender: SyncSender, +} + +impl PoolManager for PoolManagerImpl { + /// Stops the worker + fn stop(&mut self) { + info!("stopping pool workers..."); + let _ = self.operations_input_sender.send(Command::Stop); + let _ = self.endorsements_input_sender.send(Command::Stop); + if let Some(join_handle) = self.operations_thread_handle.take() { + join_handle + .join() + .expect("operations pool thread panicked on try to join"); + } + if let Some(join_handle) = self.endorsements_thread_handle.take() { + join_handle + .join() + .expect("endorsements pool thread panicked on try to join"); + } + info!("pool workers stopped"); + } +} diff --git a/massa-pool-worker/src/endorsement_pool.rs b/massa-pool-worker/src/endorsement_pool.rs index a23d427eda4..80f1e7a57ef 100644 --- a/massa-pool-worker/src/endorsement_pool.rs +++ b/massa-pool-worker/src/endorsement_pool.rs @@ -50,7 +50,7 @@ impl EndorsementPool { } /// notify of new final CS periods - pub fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) { + pub(crate) fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) { // update internal final CS period counter self.last_cs_final_periods = final_cs_periods.to_vec(); @@ -75,7 +75,7 @@ impl EndorsementPool { } /// Add a list of endorsements to the pool - pub fn add_endorsements(&mut self, mut endorsement_storage: Storage) { + pub(crate) fn add_endorsements(&mut self, mut endorsement_storage: Storage) { let items = endorsement_storage .get_endorsement_refs() .iter() diff --git a/massa-pool-worker/src/lib.rs b/massa-pool-worker/src/lib.rs index 2e15461ae4f..7864ee318b8 100644 --- a/massa-pool-worker/src/lib.rs +++ b/massa-pool-worker/src/lib.rs @@ -12,10 +12,10 @@ mod controller_impl; mod endorsement_pool; mod operation_pool; -mod run; mod types; +mod worker; -pub use run::start_pool; +pub use worker::start_pool_controller; #[cfg(test)] mod tests; diff --git a/massa-pool-worker/src/operation_pool.rs b/massa-pool-worker/src/operation_pool.rs index 0ee14081983..661873c7a1a 100644 --- a/massa-pool-worker/src/operation_pool.rs +++ b/massa-pool-worker/src/operation_pool.rs @@ -65,7 +65,7 @@ impl OperationPool { } /// notify of new final slot - pub fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) { + pub(crate) fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) { // update internal final slot counter self.last_cs_final_periods = final_cs_periods.to_vec(); @@ -98,7 +98,7 @@ impl OperationPool { } /// Add a list of operations to the pool - pub fn add_operations(&mut self, mut ops_storage: Storage) { + pub(crate) fn add_operations(&mut self, mut ops_storage: Storage) { let items = ops_storage .get_op_refs() .iter() diff --git a/massa-pool-worker/src/run.rs b/massa-pool-worker/src/run.rs deleted file mode 100644 index a405ff0870b..00000000000 --- a/massa-pool-worker/src/run.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::operation_pool::OperationPool; -use crate::{controller_impl::PoolControllerImpl, endorsement_pool::EndorsementPool}; -use massa_execution_exports::ExecutionController; -use massa_pool_exports::PoolConfig; -use massa_storage::Storage; -use parking_lot::RwLock; -use std::sync::Arc; - -/// Starts the pool system and returns a controller -pub fn start_pool( - config: PoolConfig, - storage: &Storage, - execution_controller: Box, -) -> PoolControllerImpl { - // start operation pool - let operation_pool = Arc::new(RwLock::new(OperationPool::init( - config, - storage, - execution_controller, - ))); - - // start endorsement pool - let endorsement_pool = Arc::new(RwLock::new(EndorsementPool::init(config, storage))); - - PoolControllerImpl { - _config: config, - operation_pool, - endorsement_pool, - } -} diff --git a/massa-pool-worker/src/tests/add_to_pool.rs b/massa-pool-worker/src/tests/add_to_pool.rs deleted file mode 100644 index 72f4c6170b2..00000000000 --- a/massa-pool-worker/src/tests/add_to_pool.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[test] -fn test_pool() {} diff --git a/massa-pool-worker/src/tests/scenario.rs b/massa-pool-worker/src/tests/scenario.rs index c0846b3e881..1eb7ac2b91d 100644 --- a/massa-pool-worker/src/tests/scenario.rs +++ b/massa-pool-worker/src/tests/scenario.rs @@ -49,7 +49,7 @@ fn test_simple_get_operations() { let config = PoolConfig::default(); pool_test( config, - |mut pool_controller, execution_receiver, mut storage| { + |mut pool_manager, mut pool_controller, execution_receiver, mut storage| { let keypair = KeyPair::generate(); storage.store_operations(create_some_operations(10, &keypair, 1)); @@ -96,6 +96,8 @@ fn test_simple_get_operations() { .get_block_operations(&Slot::new(1, creator_thread)) .1; + pool_manager.stop(); + assert_eq!(block_operations_storage.get_op_refs().len(), 10); }, ); @@ -160,7 +162,7 @@ fn test_get_operations_overflow() { let creator_thread = creator_address.get_thread(config.thread_count); pool_test( config, - |mut pool_controller, execution_receiver, mut storage| { + |mut pool_manager, mut pool_controller, execution_receiver, mut storage| { storage.store_operations(operations); let unexecuted_ops = storage.get_op_refs().clone(); @@ -177,6 +179,8 @@ fn test_get_operations_overflow() { .get_block_operations(&Slot::new(1, creator_thread)) .1; + pool_manager.stop(); + assert_eq!(block_operations_storage.get_op_refs().len(), MAX_OP_LEN); }, ); diff --git a/massa-pool-worker/src/tests/tools.rs b/massa-pool-worker/src/tests/tools.rs index c2487fe77d2..5aa101d79fc 100644 --- a/massa-pool-worker/src/tests/tools.rs +++ b/massa-pool-worker/src/tests/tools.rs @@ -1,6 +1,6 @@ // Copyright (c) 2022 MASSA LABS -use crate::{operation_pool::OperationPool, start_pool}; +use crate::{operation_pool::OperationPool, start_pool_controller}; use massa_execution_exports::test_exports::{ MockExecutionController, MockExecutionControllerMessage, }; @@ -14,7 +14,7 @@ use massa_models::{ slot::Slot, wrapped::WrappedContent, }; -use massa_pool_exports::{PoolConfig, PoolController}; +use massa_pool_exports::{PoolConfig, PoolController, PoolManager}; use massa_signature::{KeyPair, PublicKey}; use massa_storage::Storage; use std::collections::BTreeMap; @@ -54,14 +54,20 @@ pub fn create_some_operations( pub fn pool_test(cfg: PoolConfig, test: F) where - F: FnOnce(Box, Receiver, Storage), + F: FnOnce( + Box, + Box, + Receiver, + Storage, + ), { let storage: Storage = Storage::create_root(); let (execution_controller, execution_receiver) = MockExecutionController::new_with_receiver(); - let pool_controller = start_pool(cfg, &storage, execution_controller); + let (pool_manager, pool_controller) = + start_pool_controller(cfg, &storage, execution_controller); - test(Box::new(pool_controller), execution_receiver, storage) + test(pool_manager, pool_controller, execution_receiver, storage) } pub fn operation_pool_test(cfg: PoolConfig, test: F) diff --git a/massa-pool-worker/src/worker.rs b/massa-pool-worker/src/worker.rs new file mode 100644 index 00000000000..442e24c8ab8 --- /dev/null +++ b/massa-pool-worker/src/worker.rs @@ -0,0 +1,138 @@ +//! Copyright (c) 2022 MASSA LABS + +//! Write worker for the pools, allowing asynchronous writes. + +use crate::controller_impl::{Command, PoolManagerImpl}; +use crate::operation_pool::OperationPool; +use crate::{controller_impl::PoolControllerImpl, endorsement_pool::EndorsementPool}; +use massa_execution_exports::ExecutionController; +use massa_pool_exports::PoolConfig; +use massa_pool_exports::{PoolController, PoolManager}; +use massa_storage::Storage; +use parking_lot::RwLock; +use std::sync::mpsc::RecvError; +use std::{ + sync::mpsc::{sync_channel, Receiver}, + sync::Arc, + thread::JoinHandle, +}; + +/// Endorsement pool write thread instance +pub(crate) struct EndorsementPoolThread { + /// Command reception channel + receiver: Receiver, + /// Shared reference to the pool + endorsement_pool: Arc>, +} + +impl EndorsementPoolThread { + /// Spawns a pool writer thread, returning a join handle. + pub(crate) fn spawn( + receiver: Receiver, + endorsement_pool: Arc>, + ) -> JoinHandle<()> { + std::thread::spawn(|| { + let this = Self { + receiver, + endorsement_pool, + }; + this.run() + }) + } + + /// Runs the thread + fn run(self) { + loop { + match self.receiver.recv() { + Err(RecvError) => break, + Ok(Command::Stop) => break, + Ok(Command::AddItems(endorsements)) => { + self.endorsement_pool.write().add_endorsements(endorsements) + } + Ok(Command::NotifyFinalCsPeriods(final_cs_periods)) => self + .endorsement_pool + .write() + .notify_final_cs_periods(&final_cs_periods), + } + } + } +} + +/// Operation pool writer thread. +pub(crate) struct OperationPoolThread { + /// Command reception channel + receiver: Receiver, + /// Shared reference to the operation pool + operation_pool: Arc>, +} + +impl OperationPoolThread { + /// Spawns a pool writer thread, returning a join handle. + pub(crate) fn spawn( + receiver: Receiver, + operation_pool: Arc>, + ) -> JoinHandle<()> { + std::thread::spawn(|| { + let this = Self { + receiver, + operation_pool, + }; + this.run() + }) + } + + /// Run the thread. + fn run(self) { + loop { + match self.receiver.recv() { + Err(RecvError) => break, + Ok(Command::Stop) => break, + Ok(Command::AddItems(operations)) => { + self.operation_pool.write().add_operations(operations) + } + Ok(Command::NotifyFinalCsPeriods(final_cs_periods)) => self + .operation_pool + .write() + .notify_final_cs_periods(&final_cs_periods), + }; + } + } +} + +/// Start pool manager and controller +#[allow(clippy::type_complexity)] +pub fn start_pool_controller( + config: PoolConfig, + storage: &Storage, + execution_controller: Box, +) -> (Box, Box) { + let (operations_input_sender, operations_input_receiver) = sync_channel(config.channels_size); + let (endorsements_input_sender, endorsements_input_receiver) = + sync_channel(config.channels_size); + let operation_pool = Arc::new(RwLock::new(OperationPool::init( + config, + storage, + execution_controller, + ))); + let endorsement_pool = Arc::new(RwLock::new(EndorsementPool::init(config, storage))); + let controller = PoolControllerImpl { + _config: config, + operation_pool: operation_pool.clone(), + endorsement_pool: endorsement_pool.clone(), + operations_input_sender: operations_input_sender.clone(), + endorsements_input_sender: endorsements_input_sender.clone(), + }; + + let operations_thread_handle = + OperationPoolThread::spawn(operations_input_receiver, operation_pool); + let endorsements_thread_handle = + EndorsementPoolThread::spawn(endorsements_input_receiver, endorsement_pool); + + let manager = PoolManagerImpl { + operations_thread_handle: Some(operations_thread_handle), + endorsements_thread_handle: Some(endorsements_thread_handle), + operations_input_sender, + endorsements_input_sender, + }; + (Box::new(manager), Box::new(controller)) +} diff --git a/massa-protocol-exports/Cargo.toml b/massa-protocol-exports/Cargo.toml index 8dd370a7f79..2f9d48039a5 100644 --- a/massa-protocol-exports/Cargo.toml +++ b/massa-protocol-exports/Cargo.toml @@ -25,7 +25,6 @@ massa_signature = { path = "../massa-signature" } massa_time = { path = "../massa-time" } massa_storage = { path = "../massa-storage" } - # for more information on what are the following features used for, see the cargo.toml at workspace level [features] sandbox = ["massa_models/sandbox"]