Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operations and Endorsements pool threads #3132

Merged
merged 16 commits into from
Oct 8, 2022
85 changes: 50 additions & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions massa-api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use massa_execution_exports::ExecutionError;
use massa_hash::MassaHashError;
use massa_models::error::ModelsError;
use massa_network_exports::NetworkError;
use massa_pool_exports::PoolError;
use massa_protocol_exports::ProtocolError;
use massa_time::TimeError;
use massa_wallet::WalletError;
Expand All @@ -15,8 +14,6 @@ use thiserror::Error;
#[non_exhaustive]
#[derive(Display, Error, Debug)]
pub enum ApiError {
/// pool error: {0}
PoolError(#[from] PoolError),
/// too many arguments error: {0}
TooManyArguments(String),
/// send channel error: {0}
Expand Down
2 changes: 0 additions & 2 deletions massa-consensus-exports/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ pub enum ConsensusError {
SendChannelError(String),
/// Receive channel error : {0}
ReceiveChannelError(String),
/// pool error : {0}
PoolError(#[from] massa_pool_exports::PoolError),
/// io error {0}
IOError(#[from] std::io::Error),
/// missing block {0}
Expand Down
9 changes: 5 additions & 4 deletions massa-models/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,13 @@ pub const BOOTSTRAP_RANDOMNESS_SIZE_BYTES: usize = 32;
/// Max size of the printed error
pub const MAX_BOOTSTRAP_ERROR_LENGTH: u32 = 10000;

// Protocol constants

/// Controller channel size
/// Protocol controller channel size
pub const PROTOCOL_CONTROLLER_CHANNEL_SIZE: usize = 1024;
/// Event channel size
/// Protocol event channel size
pub const PROTOCOL_EVENT_CHANNEL_SIZE: usize = 1024;
/// Pool controller channel size
pub const POOL_CONTROLLER_CHANNEL_SIZE: usize = 1024;

// ***********************
// Constants used for execution module (injected from ConsensusConfig)
//
Expand Down
29 changes: 15 additions & 14 deletions massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ use massa_models::config::constants::{
PROTOCOL_CONTROLLER_CHANNEL_SIZE, PROTOCOL_EVENT_CHANNEL_SIZE, ROLL_PRICE, T0, THREAD_COUNT,
VERSION,
};
use massa_models::config::POOL_CONTROLLER_CHANNEL_SIZE;
use massa_network_exports::{Establisher, NetworkConfig, NetworkManager};
use massa_network_worker::start_network_controller;
use massa_pool_exports::{PoolConfig, PoolController};
use massa_pool_worker::start_pool;
use massa_pool_exports::{PoolConfig, PoolManager};
use massa_pool_worker::start_pool_controller;
use massa_pos_exports::{SelectorConfig, SelectorManager};
use massa_pos_worker::start_selector_worker;
use massa_protocol_exports::{ProtocolConfig, ProtocolManager};
Expand All @@ -54,7 +55,7 @@ use massa_storage::Storage;
use massa_time::MassaTime;
use massa_wallet::Wallet;
use parking_lot::RwLock;
use std::{mem, path::PathBuf};
use std::path::PathBuf;
use std::{path::Path, process, sync::Arc};
use structopt::StructOpt;
use tokio::signal;
Expand All @@ -72,7 +73,7 @@ async fn launch(
ConsensusManager,
Box<dyn ExecutionManager>,
Box<dyn SelectorManager>,
Box<dyn PoolController>,
Box<dyn PoolManager>,
ProtocolManager,
NetworkManager,
Box<dyn FactoryManager>,
Expand Down Expand Up @@ -317,9 +318,10 @@ async fn launch(
operation_validity_periods: OPERATION_VALIDITY_PERIODS,
max_operation_pool_size_per_thread: SETTINGS.pool.max_pool_size_per_thread,
max_endorsements_pool_size_per_thread: SETTINGS.pool.max_pool_size_per_thread,
channels_size: POOL_CONTROLLER_CHANNEL_SIZE,
};
let pool_controller = start_pool(pool_config, &shared_storage, execution_controller.clone());
let pool_manager: Box<dyn PoolController> = Box::new(pool_controller.clone());
let (pool_manager, pool_controller) =
start_pool_controller(pool_config, &shared_storage, execution_controller.clone());

// launch protocol controller
let protocol_config = ProtocolConfig {
Expand Down Expand Up @@ -357,7 +359,7 @@ async fn launch(
protocol_config,
network_command_sender.clone(),
network_event_receiver,
pool_manager.clone(),
pool_controller.clone(),
shared_storage.clone(),
)
.await
Expand Down Expand Up @@ -394,7 +396,7 @@ async fn launch(
execution_controller: execution_controller.clone(),
protocol_command_sender: protocol_command_sender.clone(),
protocol_event_receiver,
pool_command_sender: pool_manager.clone(),
pool_command_sender: pool_controller.clone(),
selector_controller: selector_controller.clone(),
},
bootstrap_state.graph,
Expand All @@ -417,7 +419,7 @@ async fn launch(
let factory_channels = FactoryChannels {
selector: selector_controller.clone(),
consensus: consensus_command_sender.clone(),
pool: pool_manager.clone(),
pool: pool_controller.clone(),
protocol: protocol_command_sender.clone(),
storage: shared_storage.clone(),
};
Expand Down Expand Up @@ -467,7 +469,7 @@ async fn launch(
api_config,
selector_controller.clone(),
consensus_config,
pool_manager.clone(),
pool_controller.clone(),
protocol_command_sender.clone(),
network_config,
*VERSION,
Expand Down Expand Up @@ -524,7 +526,7 @@ struct Managers {
consensus_manager: ConsensusManager,
execution_manager: Box<dyn ExecutionManager>,
selector_manager: Box<dyn SelectorManager>,
pool_manager: Box<dyn PoolController>,
pool_manager: Box<dyn PoolManager>,
protocol_manager: ProtocolManager,
network_manager: NetworkManager,
factory_manager: Box<dyn FactoryManager>,
Expand All @@ -537,7 +539,7 @@ async fn stop(
mut execution_manager,
consensus_manager,
mut selector_manager,
pool_manager,
mut pool_manager,
protocol_manager,
network_manager,
mut factory_manager,
Expand Down Expand Up @@ -568,8 +570,7 @@ async fn stop(
.expect("consensus shutdown failed");

// stop pool
//TODO make a proper manager
mem::drop(pool_manager);
pool_manager.stop();

// stop execution controller
execution_manager.stop();
Expand Down
Loading