diff --git a/massa-protocol-worker/src/protocol_worker.rs b/massa-protocol-worker/src/protocol_worker.rs index 4455011463..003cdd0e34 100644 --- a/massa-protocol-worker/src/protocol_worker.rs +++ b/massa-protocol-worker/src/protocol_worker.rs @@ -25,6 +25,7 @@ use massa_protocol_exports::{ use massa_storage::Storage; use massa_time::{MassaTime, TimeError}; use std::collections::{HashMap, HashSet}; +use std::mem; use tokio::{ sync::mpsc, sync::mpsc::error::SendTimeoutError, @@ -158,6 +159,8 @@ pub struct ProtocolWorker { pub(crate) op_batch_buffer: OperationBatchBuffer, /// Shared storage. pub(crate) storage: Storage, + /// Operations to announce at the next interval. + operations_to_announce: Vec, } /// channels used by the protocol worker @@ -213,6 +216,7 @@ impl ProtocolWorker { config.operation_batch_buffer_capacity, ), storage, + operations_to_announce: Default::default(), } } @@ -292,9 +296,14 @@ impl ProtocolWorker { self.update_ask_block(&mut block_ask_timer).await?; } - // operation ask timer + // operation ask, and announce, timer _ = &mut operation_batch_proc_period_timer => { - massa_trace!("protocol.protocol_worker.run_loop.operation_ask_timer", { }); + massa_trace!("protocol.protocol_worker.run_loop.operation_ask_and_announce_timer", { }); + + // Announce operations. + self.announce_ops().await; + + // Update operations to ask. self.update_ask_operation(&mut operation_batch_proc_period_timer).await?; } // operation prune timer @@ -311,9 +320,10 @@ impl ProtocolWorker { /// Announce a set of operations to active nodes who do not know about it yet. /// Side effect: notes nodes as knowing about those operations from now on. - async fn announce_ops(&mut self, operations: &[OperationId]) { + async fn announce_ops(&mut self) { + let operations = mem::take(&mut self.operations_to_announce); massa_trace!("protocol.protocol_worker.propagate_operations.begin", { - "operation": operations + "operations": operations }); for (node, node_info) in self.active_nodes.iter_mut() { let new_ops: Vec = operations @@ -334,6 +344,18 @@ impl ProtocolWorker { } } + /// Add an list of operations to a queue pending for announcement. + async fn note_operations_to_announce(&mut self, operations: &[OperationId]) { + // If we have too many operations to announce, + // announce them immediately, clearing the data at the same time. + if self.operations_to_announce.len() > self.config.max_known_ops_size { + self.announce_ops().await; + } + + // Add the operations to a list for announcement at the next interval. + self.operations_to_announce.extend_from_slice(operations); + } + async fn propagate_endorsements(&mut self, storage: &Storage) { massa_trace!( "protocol.protocol_worker.process_command.propagate_endorsements.begin", @@ -482,7 +504,7 @@ impl ProtocolWorker { // Announce operations to active nodes not knowing about it. let to_announce: Vec = operation_ids.iter().copied().collect(); - self.announce_ops(&to_announce).await; + self.note_operations_to_announce(&to_announce).await; } ProtocolCommand::PropagateEndorsements(endorsements) => { self.propagate_endorsements(&endorsements).await; @@ -987,7 +1009,7 @@ impl ProtocolWorker { ops_to_propagate.drop_operation_refs(&operations_to_not_propagate); let to_announce: Vec = ops_to_propagate.get_op_refs().iter().copied().collect(); - self.announce_ops(&to_announce).await; + self.note_operations_to_announce(&to_announce).await; // Add to pool self.pool_controller.add_operations(ops);