diff --git a/finality-aleph/src/network/manager/discovery.rs b/finality-aleph/src/network/manager/discovery.rs index 02b1a22f86..77edad14cc 100644 --- a/finality-aleph/src/network/manager/discovery.rs +++ b/finality-aleph/src/network/manager/discovery.rs @@ -64,20 +64,20 @@ impl Discovery { } } - /// Returns messages that should be sent as part of authority discovery at this moment. + /// Returns a message that should be sent as part of authority discovery at this moment. pub fn discover_authorities( &mut self, handler: &SessionHandler, - ) -> Vec> { + ) -> Option> { let authentication = match handler.authentication() { Some(authentication) => authentication, - None => return Vec::new(), + None => return None, }; let missing_authorities = handler.missing_nodes(); let node_count = handler.node_count(); info!(target: "aleph-network", "{}/{} authorities known for session {}.", node_count.0-missing_authorities.len(), node_count.0, handler.session_id().0); - vec![authentication_broadcast(authentication)] + Some(authentication_broadcast(authentication)) } /// Checks the authentication using the handler and returns the addresses we should be @@ -104,39 +104,37 @@ impl Discovery { &mut self, authentication: Authentication, handler: &mut SessionHandler, - ) -> (Vec, Vec>) { + ) -> (Vec, Option>) { debug!(target: "aleph-network", "Handling broadcast with authentication {:?}.", authentication); let addresses = self.handle_authentication(authentication.clone(), handler); if addresses.is_empty() { - return (Vec::new(), Vec::new()); + return (Vec::new(), None); } let node_id = authentication.0.creator(); - let mut messages = Vec::new(); - if self.should_rebroadcast(&node_id) { - trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication); - self.last_broadcast.insert(node_id, Instant::now()); - messages.push(authentication_broadcast(authentication)); + if !self.should_rebroadcast(&node_id) { + return (addresses, None); } - (addresses, messages) + trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication); + self.last_broadcast.insert(node_id, Instant::now()); + (addresses, Some(authentication_broadcast(authentication))) } /// Analyzes the provided message and returns all the new multiaddresses we should - /// be connected to if we want to stay connected to the committee and any messages - /// that we should send as a result of it. + /// be connected to if we want to stay connected to the committee and an optional + /// message that we should send as a result of it. pub fn handle_message( &mut self, message: DiscoveryMessage, handler: &mut SessionHandler, - ) -> (Vec, Vec>) { + ) -> (Vec, Option>) { use DiscoveryMessage::*; match message { AuthenticationBroadcast(authentication) => { self.handle_broadcast(authentication, handler) } - Authentication(authentication) => ( - self.handle_authentication(authentication, handler), - Vec::new(), - ), + Authentication(authentication) => { + (self.handle_authentication(authentication, handler), None) + } } } } @@ -215,11 +213,9 @@ mod tests { for num_nodes in 2..NUM_NODES { let (mut discovery, mut handlers, _) = build_number(num_nodes).await; let handler = &mut handlers[0]; - let mut messages = discovery.discover_authorities(handler); - assert_eq!(messages.len(), 1); - let message = messages.pop().unwrap(); + let message = discovery.discover_authorities(handler); assert_eq!( - message, + message.expect("there is a discovery message"), ( DiscoveryMessage::AuthenticationBroadcast(handler.authentication().unwrap()), DataCommand::Broadcast @@ -231,8 +227,8 @@ mod tests { #[tokio::test] async fn non_validator_discover_authorities_returns_empty_vector() { let (mut discovery, _, non_validator) = build().await; - let messages = discovery.discover_authorities(&non_validator); - assert!(messages.is_empty()); + let message = discovery.discover_authorities(&non_validator); + assert!(message.is_none()); } #[tokio::test] @@ -240,32 +236,30 @@ mod tests { let (mut discovery, mut handlers, _) = build().await; let authentication = handlers[1].authentication().unwrap(); let handler = &mut handlers[0]; - let (addresses, commands) = discovery.handle_message( + let (addresses, command) = discovery.handle_message( DiscoveryMessage::AuthenticationBroadcast(authentication.clone()), handler, ); assert_eq!(addresses, authentication.0.addresses()); - assert_eq!(commands.len(), 1); - assert!(commands.iter().any(|command| matches!(command, ( + assert!(matches!(command, Some(( DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), DataCommand::Broadcast, - ) if rebroadcast_authentication == &authentication))); + )) if rebroadcast_authentication == authentication)); } #[tokio::test] async fn non_validators_rebroadcasts() { let (mut discovery, handlers, mut non_validator) = build().await; let authentication = handlers[1].authentication().unwrap(); - let (addresses, commands) = discovery.handle_message( + let (addresses, command) = discovery.handle_message( DiscoveryMessage::AuthenticationBroadcast(authentication.clone()), &mut non_validator, ); assert_eq!(addresses, authentication.0.addresses()); - assert_eq!(commands.len(), 1); - assert!(commands.iter().any(|command| matches!(command, ( + assert!(matches!(command, Some(( DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), DataCommand::Broadcast, - ) if rebroadcast_authentication == &authentication))); + )) if rebroadcast_authentication == authentication)); } #[tokio::test] @@ -275,12 +269,12 @@ mod tests { let (_, signature) = handlers[2].authentication().unwrap(); let authentication = (auth_data, signature); let handler = &mut handlers[0]; - let (addresses, commands) = discovery.handle_message( + let (addresses, command) = discovery.handle_message( DiscoveryMessage::AuthenticationBroadcast(authentication), handler, ); assert!(addresses.is_empty()); - assert!(commands.is_empty()); + assert!(command.is_none()); } #[tokio::test] @@ -293,15 +287,15 @@ mod tests { handler, ); sleep(Duration::from_millis(MS_COOLDOWN + 5)); - let (addresses, commands) = discovery.handle_message( + let (addresses, command) = discovery.handle_message( DiscoveryMessage::AuthenticationBroadcast(authentication.clone()), handler, ); assert_eq!(addresses, authentication.0.addresses()); - assert!(commands.iter().any(|command| matches!(command, ( + assert!(matches!(command, Some(( DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), DataCommand::Broadcast, - ) if rebroadcast_authentication == &authentication))); + )) if rebroadcast_authentication == authentication)); } #[tokio::test] @@ -310,12 +304,12 @@ mod tests { let expected_address = handlers[1].authentication().unwrap().0.addresses()[0].encode(); let authentication = handlers[1].authentication().unwrap(); let handler = &mut handlers[0]; - let (addresses, commands) = + let (addresses, command) = discovery.handle_message(DiscoveryMessage::Authentication(authentication), handler); assert_eq!(addresses.len(), 1); let address = addresses[0].encode(); assert_eq!(address, expected_address); - assert!(commands.is_empty()); + assert!(command.is_none()); } #[tokio::test] @@ -325,11 +319,11 @@ mod tests { let (_, signature) = handlers[2].authentication().unwrap(); let incorrect_authentication = (auth_data, signature); let handler = &mut handlers[0]; - let (addresses, commands) = discovery.handle_message( + let (addresses, command) = discovery.handle_message( DiscoveryMessage::Authentication(incorrect_authentication), handler, ); assert!(addresses.is_empty()); - assert!(commands.is_empty()); + assert!(command.is_none()); } } diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index ea8bdadd8f..fba4ebd456 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -114,14 +114,14 @@ type MessageForNetwork = (NetworkData, DataCommand< { maybe_command: Option>, - data: Vec>, + maybe_data: Option>, } impl ServiceActions { fn noop() -> Self { ServiceActions { maybe_command: None, - data: Vec::new(), + maybe_data: None, } } } @@ -195,29 +195,25 @@ impl Service { fn discover_authorities( &mut self, session_id: &SessionId, - ) -> Vec> { - if let Some(Session { - handler, discovery, .. - }) = self.sessions.get_mut(session_id) - { - discovery - .discover_authorities(handler) - .into_iter() - .map(Self::network_message) - .collect() - } else { - Vec::new() - } + ) -> Option> { + self.sessions.get_mut(session_id).and_then( + |Session { + handler, discovery, .. + }| { + discovery + .discover_authorities(handler) + .map(Self::network_message) + }, + ) } /// Returns all the network messages that should be sent as part of discovery at this moment. pub fn discovery(&mut self) -> Vec> { - let mut result = Vec::new(); let sessions: Vec<_> = self.sessions.keys().cloned().collect(); - for session_id in sessions { - result.append(&mut self.discover_authorities(&session_id)); - } - result + sessions + .iter() + .flat_map(|session_id| self.discover_authorities(session_id)) + .collect() } fn addresses(&self) -> Vec { @@ -235,7 +231,7 @@ impl Service { addresses: Vec, ) -> Result< ( - Vec>, + Option>, mpsc::UnboundedReceiver, ), SessionHandlerError, @@ -276,12 +272,12 @@ impl Service { let session = match self.sessions.get_mut(&pre_session.session_id) { Some(session) => session, None => { - let (data, data_from_network) = + let (maybe_data, data_from_network) = self.start_validator_session(pre_session, addresses).await?; return Ok(( ServiceActions { maybe_command: None, - data, + maybe_data, }, data_from_network, )); @@ -313,7 +309,7 @@ impl Service { Ok(( ServiceActions { maybe_command, - data: self.discover_authorities(&session_id), + maybe_data: self.discover_authorities(&session_id), }, data_from_network, )) @@ -425,7 +421,7 @@ impl Service { } Stop(session_id) => Ok(ServiceActions { maybe_command: self.finish_session(session_id), - data: Vec::new(), + maybe_data: None, }), } } @@ -473,7 +469,7 @@ impl Service { Some(Session { handler, discovery, .. }) => { - let (addresses, responses) = discovery.handle_message(message, handler); + let (addresses, response) = discovery.handle_message(message, handler); let maybe_command = match !addresses.is_empty() && handler.is_validator() { true => { debug!(target: "aleph-network", "Adding addresses for session {:?} to reserved: {:?}", session_id, addresses); @@ -489,7 +485,7 @@ impl Service { }; ServiceActions { maybe_command, - data: responses.into_iter().map(Self::network_message).collect(), + maybe_data: response.map(Self::network_message), } } None => { @@ -671,14 +667,14 @@ impl IO { &self, ServiceActions { maybe_command, - data, + maybe_data, }: ServiceActions, ) -> Result<(), Error> { if let Some(command) = maybe_command { self.send_command(command)?; } - for data_to_send in data { - self.send_data(data_to_send)?; + if let Some(data) = maybe_data { + self.send_data(data)?; } Ok(()) } @@ -796,13 +792,13 @@ mod tests { let session_id = SessionId(43); let ServiceActions { maybe_command, - data, + maybe_data, } = service .on_command(SessionCommand::StartNonvalidator(session_id, verifier)) .await .unwrap(); assert!(maybe_command.is_none()); - assert!(data.is_empty()); + assert!(maybe_data.is_none()); assert_eq!( service.send_session_data(&session_id, -43), Err(Error::NoSession) @@ -818,7 +814,7 @@ mod tests { let (result_for_user, result_from_service) = oneshot::channel(); let ServiceActions { maybe_command, - data, + maybe_data, } = service .on_command(SessionCommand::StartValidator( session_id, @@ -830,10 +826,10 @@ mod tests { .await .unwrap(); assert!(maybe_command.is_none()); - assert_eq!(data.len(), 1); - assert!(data - .iter() - .all(|(_, command)| command == &DataCommand::Broadcast)); + assert_eq!( + maybe_data.expect("there is a message").1, + DataCommand::Broadcast + ); let _data_from_network = result_from_service.await.unwrap(); assert_eq!(service.send_session_data(&session_id, -43), Ok(())); } @@ -847,7 +843,7 @@ mod tests { let (result_for_user, result_from_service) = oneshot::channel(); let ServiceActions { maybe_command, - data, + maybe_data, } = service .on_command(SessionCommand::StartValidator( session_id, @@ -859,22 +855,22 @@ mod tests { .await .unwrap(); assert!(maybe_command.is_none()); - assert_eq!(data.len(), 1); - assert!(data - .iter() - .all(|(_, command)| command == &DataCommand::Broadcast)); + assert_eq!( + maybe_data.expect("there is a message").1, + DataCommand::Broadcast + ); assert_eq!(service.send_session_data(&session_id, -43), Ok(())); let mut data_from_network = result_from_service.await.unwrap(); assert_eq!(data_from_network.next().await, Some(-43)); let ServiceActions { maybe_command, - data, + maybe_data, } = service .on_command(SessionCommand::Stop(session_id)) .await .unwrap(); assert!(maybe_command.is_none()); - assert!(data.is_empty()); + assert!(maybe_data.is_none()); assert_eq!( service.send_session_data(&session_id, -43), Err(Error::NoSession) @@ -900,15 +896,18 @@ mod tests { .unwrap(); let mut other_service = build(); let (node_id, pen) = validator_data[1].clone(); - let ServiceActions { data, .. } = other_service + let ServiceActions { maybe_data, .. } = other_service .on_command(SessionCommand::StartValidator( session_id, verifier, node_id, pen, None, )) .await .unwrap(); - let broadcast = match data[0].clone() { - (NetworkData::Meta(broadcast), DataCommand::Broadcast) => broadcast, - _ => panic!("Expected discovery massage broadcast, got: {:?}", data[0]), + let broadcast = match maybe_data { + Some((NetworkData::Meta(broadcast), DataCommand::Broadcast)) => broadcast, + maybe_data => panic!( + "Expected discovery massage broadcast, got: {:?}", + maybe_data + ), }; let addresses = match &broadcast { DiscoveryMessage::AuthenticationBroadcast((auth_data, _)) => auth_data.addresses(), @@ -916,7 +915,7 @@ mod tests { }; let ServiceActions { maybe_command, - data, + maybe_data, } = service.on_discovery_message(broadcast); assert_eq!( maybe_command, @@ -924,10 +923,10 @@ mod tests { addresses.into_iter().collect() )) ); - assert_eq!(data.len(), 1); - assert!(data - .iter() - .any(|(_, command)| command == &DataCommand::Broadcast)); + assert_eq!( + maybe_data.expect("there is a message").1, + DataCommand::Broadcast + ); } #[tokio::test] @@ -948,15 +947,18 @@ mod tests { .unwrap(); let mut other_service = build(); let (node_id, pen) = validator_data[1].clone(); - let ServiceActions { data, .. } = other_service + let ServiceActions { maybe_data, .. } = other_service .on_command(SessionCommand::StartValidator( session_id, verifier, node_id, pen, None, )) .await .unwrap(); - let broadcast = match data[0].clone() { - (NetworkData::Meta(broadcast), DataCommand::Broadcast) => broadcast, - _ => panic!("Expected discovery massage broadcast, got: {:?}", data[0]), + let broadcast = match maybe_data { + Some((NetworkData::Meta(broadcast), DataCommand::Broadcast)) => broadcast, + maybe_data => panic!( + "Expected discovery massage broadcast, got: {:?}", + maybe_data + ), }; service.on_discovery_message(broadcast); let messages = service.on_user_message(2137, session_id, Recipient::Everyone);