Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robustify operations propagation #3086

Merged
merged 23 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 39 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion massa-network-exports/src/network_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl NetworkCommandSender {
/// # Returns
/// Can return a `[NetworkError::ChannelError]` that must be managed by the direct caller of the
/// function.
pub async fn send_operations_batch(
pub async fn announce_operations(
&self,
to_node: NodeId,
batch: OperationPrefixIds,
Expand Down
4 changes: 4 additions & 0 deletions massa-node/base_config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,14 @@
# Maximum number of batches in the memory buffer.
# Dismiss the new batches if overflow
operation_batch_buffer_capacity = 10024
# Immediately announce ops if overflow
operation_announcement_buffer_capacity = 10024
# Start processing batches in the buffer each `operation_batch_proc_period` in millisecond
operation_batch_proc_period = 500
# All operations asked are prune each `operation_asked_pruning_period` millisecond
asked_operations_pruning_period = 100000
# Interval at which operations are announced in batches.
operation_announcement_interval = 150
# Max number of operation per message, same as network param but can be smaller
max_operations_per_message = 1024
# Time threshold after which operation are not propagated
Expand Down
4 changes: 4 additions & 0 deletions massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,12 @@ async fn launch(
.max_simultaneous_ask_blocks_per_node,
max_send_wait: SETTINGS.protocol.max_send_wait,
operation_batch_buffer_capacity: SETTINGS.protocol.operation_batch_buffer_capacity,
operation_announcement_buffer_capacity: SETTINGS
.protocol
.operation_announcement_buffer_capacity,
operation_batch_proc_period: SETTINGS.protocol.operation_batch_proc_period,
asked_operations_pruning_period: SETTINGS.protocol.asked_operations_pruning_period,
operation_announcement_interval: SETTINGS.protocol.operation_announcement_interval,
max_operations_per_message: SETTINGS.protocol.max_operations_per_message,
max_serialized_operations_size_per_block: MAX_BLOCK_SIZE as usize,
controller_channel_size: PROTOCOL_CONTROLLER_CHANNEL_SIZE,
Expand Down
5 changes: 5 additions & 0 deletions massa-node/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,15 @@ pub struct ProtocolSettings {
/// Maximum number of batches in the memory buffer.
/// Dismiss the new batches if overflow
pub operation_batch_buffer_capacity: usize,
/// Maximum number of operations in the announcement buffer.
/// Immedidately announce if overflow.
pub operation_announcement_buffer_capacity: usize,
/// Start processing batches in the buffer each `operation_batch_proc_period` in millisecond
pub operation_batch_proc_period: MassaTime,
/// All operations asked are prune each `operation_asked_pruning_period` millisecond
pub asked_operations_pruning_period: MassaTime,
/// Interval at which operations are announced in batches.
pub operation_announcement_interval: MassaTime,
/// Maximum of operations sent in one message.
pub max_operations_per_message: u64,
/// Time threshold after which operation are not propagated
Expand Down
5 changes: 5 additions & 0 deletions massa-protocol-exports/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ pub struct ProtocolConfig {
/// Maximum number of batches in the memory buffer.
/// Dismiss the new batches if overflow
pub operation_batch_buffer_capacity: usize,
/// Maximum number of operations in the announcement buffer.
/// Immedidately announce if overflow.
pub operation_announcement_buffer_capacity: usize,
/// Start processing batches in the buffer each `operation_batch_proc_period` in millisecond
pub operation_batch_proc_period: MassaTime,
/// All operations asked are prune each `operation_asked_pruning_period` millisecond
pub asked_operations_pruning_period: MassaTime,
/// Interval at which operations are announced in batches.
pub operation_announcement_interval: MassaTime,
/// Maximum of operations sent in one message.
pub max_operations_per_message: u64,
/// Maximum size in bytes of all serialized operations size in a block
Expand Down
2 changes: 2 additions & 0 deletions massa-protocol-exports/src/tests/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,10 @@ pub fn create_protocol_config() -> ProtocolConfig {
max_known_endorsements_size: 1000,
max_node_known_endorsements_size: 1000,
operation_batch_buffer_capacity: 1000,
operation_announcement_buffer_capacity: 1000,
operation_batch_proc_period: 200.into(),
asked_operations_pruning_period: 500.into(),
operation_announcement_interval: 150.into(),
max_operations_per_message: 1024,
thread_count: 32,
max_serialized_operations_size_per_block: 1024,
Expand Down
2 changes: 2 additions & 0 deletions massa-protocol-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition = "2021"
serde_json = "1.0"
tokio = { version = "1.21", features = ["full"] }
tracing = "0.1"
rayon = "1.5"
# custom modules
massa_hash = { path = "../massa-hash" }
massa_logging = { path = "../massa-logging" }
Expand All @@ -20,6 +21,7 @@ massa_protocol_exports = { path = "../massa-protocol-exports" }
massa_serialization = { path = "../massa-serialization" }
massa_storage = { path = "../massa-storage" }
massa_time = { path = "../massa-time" }
massa_signature = { path = "../massa-signature" }

[dev-dependencies]
lazy_static = "1.4"
Expand Down
1 change: 1 addition & 0 deletions massa-protocol-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use protocol_worker::start_protocol_controller;
mod checked_operations;
mod node_info;
mod protocol_network;
mod sig_verifier;

#[cfg(test)]
pub mod tests;
8 changes: 4 additions & 4 deletions massa-protocol-worker/src/node_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ impl NodeInfo {
self.known_endorsements.contains(endorsement_id)
}

pub fn insert_known_ops(&mut self, ops: PreHashSet<OperationId>, max_ops_nb: usize) {
for operation_id in ops.into_iter() {
if self.known_operations.insert(operation_id) {
self.known_operations_queue.push_back(operation_id);
pub fn insert_known_ops(&mut self, ops: &[OperationId], max_ops_nb: usize) {
for operation_id in ops.iter() {
if self.known_operations.insert(*operation_id) {
self.known_operations_queue.push_back(*operation_id);
while self.known_operations_queue.len() > max_ops_nb {
if let Some(op_id) = self.known_operations_queue.pop_front() {
self.known_operations.remove(&op_id);
Expand Down
5 changes: 1 addition & 4 deletions massa-protocol-worker/src/protocol_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,7 @@ impl ProtocolWorker {

// add to known ops
if let Some(node_info) = self.active_nodes.get_mut(&from_node_id) {
node_info.insert_known_ops(
operation_ids_set.clone(),
self.config.max_node_known_ops_size,
);
node_info.insert_known_ops(&operation_ids, self.config.max_node_known_ops_size);
}

let info = if let Some(info) = self.block_wishlist.get_mut(&block_id) {
Expand Down
Loading