diff --git a/finality-aleph/src/validator_network/manager/legacy.rs b/finality-aleph/src/validator_network/manager/legacy.rs new file mode 100644 index 0000000000..f305c716f8 --- /dev/null +++ b/finality-aleph/src/validator_network/manager/legacy.rs @@ -0,0 +1,318 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt::{Display, Error as FmtError, Formatter}, +}; + +use aleph_primitives::AuthorityId; +use futures::channel::mpsc; + +use crate::{ + network::PeerId, + validator_network::{ + manager::{AddResult, SendError}, + Data, + }, +}; + +/// Network component responsible for holding the list of peers that we +/// want to connect to, and managing the established connections. +pub struct Manager { + addresses: HashMap>, + outgoing: HashMap>, + incoming: HashMap>, +} + +struct ManagerStatus { + wanted_peers: usize, + both_ways_peers: HashSet, + outgoing_peers: HashSet, + incoming_peers: HashSet, + missing_peers: HashSet, +} + +impl ManagerStatus { + fn new(manager: &Manager) -> Self { + let incoming: HashSet<_> = manager + .incoming + .iter() + .filter(|(_, exit)| !exit.is_closed()) + .map(|(k, _)| k.clone()) + .collect(); + let outgoing: HashSet<_> = manager + .outgoing + .iter() + .filter(|(_, exit)| !exit.is_closed()) + .map(|(k, _)| k.clone()) + .collect(); + + let both_ways = incoming.intersection(&outgoing).cloned().collect(); + let incoming: HashSet<_> = incoming.difference(&both_ways).cloned().collect(); + let outgoing: HashSet<_> = outgoing.difference(&both_ways).cloned().collect(); + let missing = manager + .addresses + .keys() + .filter(|a| !both_ways.contains(a) && !incoming.contains(a) && !outgoing.contains(a)) + .cloned() + .collect(); + + ManagerStatus { + wanted_peers: manager.addresses.len(), + both_ways_peers: both_ways, + incoming_peers: incoming, + outgoing_peers: outgoing, + missing_peers: missing, + } + } +} + +impl Display for ManagerStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + if self.wanted_peers == 0 { + return write!(f, "not maintaining any connections; "); + } + + write!(f, "target - {:?} connections; ", self.wanted_peers)?; + + if self.both_ways_peers.is_empty() && self.incoming_peers.is_empty() { + write!(f, "WARNING! No incoming peers even though we expected tham, maybe connecting to us is impossible; ")?; + } + + if !self.both_ways_peers.is_empty() { + let peers = self + .both_ways_peers + .iter() + .map(|authority_id| authority_id.to_short_string()) + .collect::>() + .join(", "); + write!( + f, + "both ways - {:?} [{}]; ", + self.both_ways_peers.len(), + peers, + )?; + } + + if !self.incoming_peers.is_empty() { + let peers = self + .incoming_peers + .iter() + .map(|authority_id| authority_id.to_short_string()) + .collect::>() + .join(", "); + write!( + f, + "incoming only - {:?} [{}]; ", + self.incoming_peers.len(), + peers + )?; + } + + if !self.outgoing_peers.is_empty() { + let peers = self + .outgoing_peers + .iter() + .map(|authority_id| authority_id.to_short_string()) + .collect::>() + .join(", "); + write!( + f, + "outgoing only - {:?} [{}];", + self.outgoing_peers.len(), + peers + )?; + } + + if !self.missing_peers.is_empty() { + let peers = self + .missing_peers + .iter() + .map(|authority_id| authority_id.to_short_string()) + .collect::>() + .join(", "); + write!(f, "missing - {:?} [{}];", self.missing_peers.len(), peers)?; + } + + Ok(()) + } +} + +impl Manager { + /// Create a new Manager with empty list of peers. + pub fn new() -> Self { + Manager { + addresses: HashMap::new(), + outgoing: HashMap::new(), + incoming: HashMap::new(), + } + } + + /// Add a peer to the list of peers we want to stay connected to, or + /// update the list of addresses if the peer was already added. + /// Returns whether this peer is a new peer. + pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + self.addresses.insert(peer_id, addresses).is_none() + } + + /// Return Option containing addresses of the given peer, or None if + /// the peer is unknown. + pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + self.addresses.get(peer_id).cloned() + } + + /// Add an established outgoing connection with a known peer, + /// but only if the peer is on the list of peers that we want to stay connected with. + pub fn add_outgoing( + &mut self, + peer_id: AuthorityId, + data_for_network: mpsc::UnboundedSender, + ) -> AddResult { + use AddResult::*; + if !self.addresses.contains_key(&peer_id) { + return Uninterested; + } + match self.outgoing.insert(peer_id, data_for_network) { + Some(_) => Replaced, + None => Added, + } + } + + /// Add an established incoming connection with a known peer, + /// but only if the peer is on the list of peers that we want to stay connected with. + pub fn add_incoming( + &mut self, + peer_id: AuthorityId, + exit: mpsc::UnboundedSender, + ) -> AddResult { + use AddResult::*; + if !self.addresses.contains_key(&peer_id) { + return Uninterested; + }; + match self.incoming.insert(peer_id, exit) { + Some(_) => Replaced, + None => Added, + } + } + + /// Remove a peer from the list of peers that we want to stay connected with. + /// Close any incoming and outgoing connections that were established. + pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + self.addresses.remove(peer_id); + self.incoming.remove(peer_id); + self.outgoing.remove(peer_id); + } + + /// Send data to a peer. + /// Returns error if there is no outgoing connection to the peer, + /// or if the connection is dead. + pub fn send_to(&mut self, peer_id: &AuthorityId, data: D) -> Result<(), SendError> { + self.outgoing + .get(peer_id) + .ok_or(SendError::PeerNotFound)? + .unbounded_send(data) + .map_err(|_| SendError::ConnectionClosed) + } + + /// A status of the manager, to be displayed somewhere. + pub fn status_report(&self) -> impl Display { + ManagerStatus::new(self) + } +} + +#[cfg(test)] +mod tests { + use futures::{channel::mpsc, StreamExt}; + + use super::{AddResult::*, Manager, SendError}; + use crate::validator_network::mock::key; + + type Data = String; + type Address = String; + + #[tokio::test] + async fn add_remove() { + let mut manager = Manager::::new(); + let (peer_id, _) = key().await; + let (peer_id_b, _) = key().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + // add new peer - returns true + assert!(manager.add_peer(peer_id.clone(), addresses.clone())); + // add known peer - returns false + assert!(!manager.add_peer(peer_id.clone(), addresses.clone())); + // get address + assert_eq!(manager.peer_addresses(&peer_id), Some(addresses)); + // try to get address of an unknown peer + assert_eq!(manager.peer_addresses(&peer_id_b), None); + // remove peer + manager.remove_peer(&peer_id); + // try to get address of removed peer + assert_eq!(manager.peer_addresses(&peer_id), None); + } + + #[tokio::test] + async fn outgoing() { + let mut manager = Manager::::new(); + let data = String::from("DATA"); + let (peer_id, _) = key().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + let (tx, _rx) = mpsc::unbounded(); + // try add unknown peer + manager.add_outgoing(peer_id.clone(), tx); + // sending should fail + assert_eq!( + manager.send_to(&peer_id, data.clone()), + Err(SendError::PeerNotFound) + ); + // add peer, this time for real + assert!(manager.add_peer(peer_id.clone(), addresses.clone())); + let (tx, mut rx) = mpsc::unbounded(); + assert_eq!(manager.add_outgoing(peer_id.clone(), tx), Added); + // send and receive + assert!(manager.send_to(&peer_id, data.clone()).is_ok()); + assert_eq!(data, rx.next().await.expect("should receive")); + // remove peer + manager.remove_peer(&peer_id); + // receiving should fail + assert!(rx.next().await.is_none()); + } + + #[tokio::test] + async fn incoming() { + let mut manager = Manager::::new(); + let (peer_id, _) = key().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + let (tx, mut rx) = mpsc::unbounded(); + // try add unknown peer + assert_eq!(manager.add_incoming(peer_id.clone(), tx), Uninterested); + // rx should fail + assert!(rx.try_next().expect("channel should be closed").is_none()); + // add peer, this time for real + assert!(manager.add_peer(peer_id.clone(), addresses.clone())); + let (tx, mut rx) = mpsc::unbounded(); + // should just add + assert_eq!(manager.add_incoming(peer_id.clone(), tx), Added); + // the exit channel should be open + assert!(rx.try_next().is_err()); + let (tx, mut rx2) = mpsc::unbounded(); + // should replace now + assert_eq!(manager.add_incoming(peer_id.clone(), tx), Replaced); + // receiving should fail on old, but work on new channel + assert!(rx.try_next().expect("channel should be closed").is_none()); + assert!(rx2.try_next().is_err()); + // remove peer + manager.remove_peer(&peer_id); + // receiving should fail + assert!(rx2.try_next().expect("channel should be closed").is_none()); + } +} diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/validator_network/manager/mod.rs index e773104e78..bbfe0641b5 100644 --- a/finality-aleph/src/validator_network/manager/mod.rs +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -8,8 +8,11 @@ use futures::channel::mpsc; use crate::{network::PeerId, validator_network::Data}; -#[allow(dead_code)] mod direction; +mod legacy; + +use direction::DirectedPeers; +pub use legacy::Manager as LegacyManager; /// Error during sending data through the Manager #[derive(Debug, PartialEq, Eq)] @@ -41,180 +44,168 @@ pub enum AddResult { Replaced, } -/// Network component responsible for holding the list of peers that we -/// want to connect to, and managing the established connections. -pub struct Manager { - addresses: HashMap>, - outgoing: HashMap>, - incoming: HashMap>, -} - struct ManagerStatus { - wanted_peers: usize, - both_ways_peers: HashSet, outgoing_peers: HashSet, + missing_outgoing: HashSet, incoming_peers: HashSet, - missing_peers: HashSet, + missing_incoming: HashSet, } impl ManagerStatus { fn new(manager: &Manager) -> Self { - let incoming: HashSet<_> = manager - .incoming - .iter() - .filter(|(_, exit)| !exit.is_closed()) - .map(|(k, _)| k.clone()) - .collect(); - let outgoing: HashSet<_> = manager - .outgoing - .iter() - .filter(|(_, exit)| !exit.is_closed()) - .map(|(k, _)| k.clone()) - .collect(); - - let both_ways = incoming.intersection(&outgoing).cloned().collect(); - let incoming: HashSet<_> = incoming.difference(&both_ways).cloned().collect(); - let outgoing: HashSet<_> = outgoing.difference(&both_ways).cloned().collect(); - let missing = manager - .addresses - .keys() - .filter(|a| !both_ways.contains(a) && !incoming.contains(a) && !outgoing.contains(a)) - .cloned() - .collect(); - + let mut incoming_peers = HashSet::new(); + let mut missing_incoming = HashSet::new(); + let mut outgoing_peers = HashSet::new(); + let mut missing_outgoing = HashSet::new(); + + for peer in manager.wanted.incoming_peers() { + match manager.active_connection(peer) { + true => incoming_peers.insert(peer.clone()), + false => missing_incoming.insert(peer.clone()), + }; + } + for peer in manager.wanted.outgoing_peers() { + match manager.active_connection(peer) { + true => outgoing_peers.insert(peer.clone()), + false => missing_outgoing.insert(peer.clone()), + }; + } ManagerStatus { - wanted_peers: manager.addresses.len(), - both_ways_peers: both_ways, - incoming_peers: incoming, - outgoing_peers: outgoing, - missing_peers: missing, + incoming_peers, + missing_incoming, + outgoing_peers, + missing_outgoing, } } + + fn wanted_incoming(&self) -> usize { + self.incoming_peers.len() + self.missing_incoming.len() + } + + fn wanted_outgoing(&self) -> usize { + self.outgoing_peers.len() + self.missing_outgoing.len() + } +} + +fn pretty_authority_id_set(set: &HashSet) -> String { + set.iter() + .map(|authority_id| authority_id.to_short_string()) + .collect::>() + .join(", ") } impl Display for ManagerStatus { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { - if self.wanted_peers == 0 { + let wanted_incoming = self.wanted_incoming(); + let wanted_outgoing = self.wanted_outgoing(); + if wanted_incoming + wanted_outgoing == 0 { return write!(f, "not maintaining any connections; "); } - write!(f, "target - {:?} connections; ", self.wanted_peers)?; - - if self.both_ways_peers.is_empty() && self.incoming_peers.is_empty() { - write!(f, "WARNING! No incoming peers even though we expected tham, maybe connecting to us is impossible; ")?; - } - - if !self.both_ways_peers.is_empty() { - let peers = self - .both_ways_peers - .iter() - .map(|authority_id| authority_id.to_short_string()) - .collect::>() - .join(", "); - write!( - f, - "both ways - {:?} [{}]; ", - self.both_ways_peers.len(), - peers, - )?; - } - - if !self.incoming_peers.is_empty() { - let peers = self - .incoming_peers - .iter() - .map(|authority_id| authority_id.to_short_string()) - .collect::>() - .join(", "); - write!( - f, - "incoming only - {:?} [{}]; ", - self.incoming_peers.len(), - peers - )?; - } - - if !self.outgoing_peers.is_empty() { - let peers = self - .outgoing_peers - .iter() - .map(|authority_id| authority_id.to_short_string()) - .collect::>() - .join(", "); - write!( - f, - "outgoing only - {:?} [{}];", - self.outgoing_peers.len(), - peers - )?; + match wanted_incoming { + 0 => write!(f, "not expecting any incoming connections; ")?, + _ => { + write!(f, "expecting {:?} incoming connections; ", wanted_incoming)?; + match self.incoming_peers.is_empty() { + // We warn about the lack of incoming connections, because this is relatively + // likely to be a common misconfiguration; much less the case for outgoing. + true => write!(f, "WARNING! No incoming peers even though we expected them, maybe connecting to us is impossible; ")?, + false => write!( + f, + "have - {:?} [{}]; ", + self.incoming_peers.len(), + pretty_authority_id_set(&self.incoming_peers), + )?, + } + if !self.missing_incoming.is_empty() { + write!( + f, + "missing - {:?} [{}]; ", + self.missing_incoming.len(), + pretty_authority_id_set(&self.missing_incoming), + )?; + } + } } - if !self.missing_peers.is_empty() { - let peers = self - .missing_peers - .iter() - .map(|authority_id| authority_id.to_short_string()) - .collect::>() - .join(", "); - write!(f, "missing - {:?} [{}];", self.missing_peers.len(), peers)?; + match wanted_outgoing { + 0 => write!(f, "not attempting any outgoing connections; ")?, + _ => { + write!(f, "attempting {:?} outgoing connections; ", wanted_outgoing)?; + if !self.outgoing_peers.is_empty() { + write!( + f, + "have - {:?} [{}]; ", + self.incoming_peers.len(), + pretty_authority_id_set(&self.outgoing_peers), + )?; + } + if !self.missing_outgoing.is_empty() { + write!( + f, + "missing - {:?} [{}]; ", + self.missing_incoming.len(), + pretty_authority_id_set(&self.missing_outgoing), + )?; + } + } } Ok(()) } } +/// Network component responsible for holding the list of peers that we +/// want to connect to or let them connect to us, and managing the established +/// connections. +pub struct Manager { + // Which peers we want to be connected with, and which way. + wanted: DirectedPeers, + // This peers we are connected with. We ensure that this is always a subset of what we want. + have: HashMap>, +} + impl Manager { /// Create a new Manager with empty list of peers. - pub fn new() -> Self { + pub fn new(own_id: AuthorityId) -> Self { Manager { - addresses: HashMap::new(), - outgoing: HashMap::new(), - incoming: HashMap::new(), + wanted: DirectedPeers::new(own_id), + have: HashMap::new(), } } + fn active_connection(&self, peer_id: &AuthorityId) -> bool { + self.have + .get(peer_id) + .map(|sender| !sender.is_closed()) + .unwrap_or(false) + } + /// Add a peer to the list of peers we want to stay connected to, or /// update the list of addresses if the peer was already added. - /// Returns whether this peer is a new peer. + /// Returns whether we should start attempts at connecting with the peer, which depends on the + /// coorddinated pseudorandom decision on the direction of the connection and whether this was + /// added for the first time. pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { - self.addresses.insert(peer_id, addresses).is_none() + self.wanted.add_peer(peer_id, addresses) } - /// Return Option containing addresses of the given peer, or None if - /// the peer is unknown. + /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { - self.addresses.get(peer_id).cloned() + self.wanted.peer_addresses(peer_id) } - /// Add an established outgoing connection with a known peer, - /// but only if the peer is on the list of peers that we want to stay connected with. - pub fn add_outgoing( + /// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to. + pub fn add_connection( &mut self, peer_id: AuthorityId, data_for_network: mpsc::UnboundedSender, ) -> AddResult { use AddResult::*; - if !self.addresses.contains_key(&peer_id) { + if !self.wanted.interested(&peer_id) { return Uninterested; } - match self.outgoing.insert(peer_id, data_for_network) { - Some(_) => Replaced, - None => Added, - } - } - - /// Add an established incoming connection with a known peer, - /// but only if the peer is on the list of peers that we want to stay connected with. - pub fn add_incoming( - &mut self, - peer_id: AuthorityId, - exit: mpsc::UnboundedSender, - ) -> AddResult { - use AddResult::*; - if !self.addresses.contains_key(&peer_id) { - return Uninterested; - }; - match self.incoming.insert(peer_id, exit) { + match self.have.insert(peer_id, data_for_network) { Some(_) => Replaced, None => Added, } @@ -223,16 +214,15 @@ impl Manager { /// Remove a peer from the list of peers that we want to stay connected with. /// Close any incoming and outgoing connections that were established. pub fn remove_peer(&mut self, peer_id: &AuthorityId) { - self.addresses.remove(peer_id); - self.incoming.remove(peer_id); - self.outgoing.remove(peer_id); + self.wanted.remove_peer(peer_id); + self.have.remove(peer_id); } /// Send data to a peer. /// Returns error if there is no outgoing connection to the peer, /// or if the connection is dead. pub fn send_to(&mut self, peer_id: &AuthorityId, data: D) -> Result<(), SendError> { - self.outgoing + self.have .get(peer_id) .ok_or(SendError::PeerNotFound)? .unbounded_send(data) @@ -257,7 +247,8 @@ mod tests { #[tokio::test] async fn add_remove() { - let mut manager = Manager::::new(); + let (own_id, _) = key().await; + let mut manager = Manager::::new(own_id); let (peer_id, _) = key().await; let (peer_id_b, _) = key().await; let addresses = vec![ @@ -265,12 +256,15 @@ mod tests { String::from("a/b/c"), String::from("43.43.43.43:43000"), ]; - // add new peer - returns true - assert!(manager.add_peer(peer_id.clone(), addresses.clone())); - // add known peer - returns false + // add new peer - might return either true or false, depending on the ids + let attempting_connections = manager.add_peer(peer_id.clone(), addresses.clone()); + // add known peer - always returns false assert!(!manager.add_peer(peer_id.clone(), addresses.clone())); // get address - assert_eq!(manager.peer_addresses(&peer_id), Some(addresses)); + match attempting_connections { + true => assert_eq!(manager.peer_addresses(&peer_id), Some(addresses)), + false => assert_eq!(manager.peer_addresses(&peer_id), None), + } // try to get address of an unknown peer assert_eq!(manager.peer_addresses(&peer_id_b), None); // remove peer @@ -284,10 +278,12 @@ mod tests { } #[tokio::test] - async fn outgoing() { - let mut manager = Manager::::new(); + async fn send_receive() { + let (mut connecting_id, _) = key().await; + let mut connecting_manager = Manager::::new(connecting_id.clone()); + let (mut listening_id, _) = key().await; + let mut listening_manager = Manager::::new(listening_id.clone()); let data = String::from("DATA"); - let (peer_id, _) = key().await; let addresses = vec![ String::from(""), String::from("a/b/c"), @@ -295,55 +291,54 @@ mod tests { ]; let (tx, _rx) = mpsc::unbounded(); // try add unknown peer - manager.add_outgoing(peer_id.clone(), tx); + assert_eq!( + connecting_manager.add_connection(listening_id.clone(), tx), + Uninterested + ); // sending should fail assert_eq!( - manager.send_to(&peer_id, data.clone()), + connecting_manager.send_to(&listening_id, data.clone()), Err(SendError::PeerNotFound) ); // add peer, this time for real - assert!(manager.add_peer(peer_id.clone(), addresses.clone())); + if connecting_manager.add_peer(listening_id.clone(), addresses.clone()) { + assert!(!listening_manager.add_peer(connecting_id.clone(), addresses.clone())) + } else { + // We need to switch the names around, because the connection was randomly the + // other way around. + let temp_id = connecting_id; + connecting_id = listening_id; + listening_id = temp_id; + let temp_manager = connecting_manager; + connecting_manager = listening_manager; + listening_manager = temp_manager; + assert!(connecting_manager.add_peer(listening_id.clone(), addresses.clone())); + } + // add outgoing to connecting let (tx, mut rx) = mpsc::unbounded(); - assert_eq!(manager.add_outgoing(peer_id.clone(), tx), Added); - // send and receive - assert!(manager.send_to(&peer_id, data.clone()).is_ok()); + assert_eq!( + connecting_manager.add_connection(listening_id.clone(), tx), + Added + ); + // send and receive connecting + assert!(connecting_manager + .send_to(&listening_id, data.clone()) + .is_ok()); assert_eq!(data, rx.next().await.expect("should receive")); - // remove peer - manager.remove_peer(&peer_id); - // receiving should fail - assert!(rx.next().await.is_none()); - } - - #[tokio::test] - async fn incoming() { - let mut manager = Manager::::new(); - let (peer_id, _) = key().await; - let addresses = vec![ - String::from(""), - String::from("a/b/c"), - String::from("43.43.43.43:43000"), - ]; + // add incoming to listening let (tx, mut rx) = mpsc::unbounded(); - // try add unknown peer - assert_eq!(manager.add_incoming(peer_id.clone(), tx), Uninterested); - // rx should fail - assert!(rx.try_next().expect("channel should be closed").is_none()); - // add peer, this time for real - assert!(manager.add_peer(peer_id.clone(), addresses.clone())); - let (tx, mut rx) = mpsc::unbounded(); - // should just add - assert_eq!(manager.add_incoming(peer_id.clone(), tx), Added); - // the exit channel should be open - assert!(rx.try_next().is_err()); - let (tx, mut rx2) = mpsc::unbounded(); - // should replace now - assert_eq!(manager.add_incoming(peer_id.clone(), tx), Replaced); - // receiving should fail on old, but work on new channel - assert!(rx.try_next().expect("channel should be closed").is_none()); - assert!(rx2.try_next().is_err()); + assert_eq!( + listening_manager.add_connection(connecting_id.clone(), tx), + Added + ); + // send and receive listening + assert!(listening_manager + .send_to(&connecting_id, data.clone()) + .is_ok()); + assert_eq!(data, rx.next().await.expect("should receive")); // remove peer - manager.remove_peer(&peer_id); + listening_manager.remove_peer(&connecting_id); // receiving should fail - assert!(rx2.try_next().expect("channel should be closed").is_none()); + assert!(rx.next().await.is_none()); } } diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index 37ea94b668..d22858497b 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -96,8 +96,10 @@ pub async fn outgoing>( { info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", peer_id, addresses, e, RETRY_DELAY.as_secs()); sleep(RETRY_DELAY).await; + // we send the "new" connection type, because we always assume it's new until proven + // otherwise, and here we did not even get the chance to attempt negotiating a protocol if result_for_parent - .unbounded_send((peer_id, None, ConnectionType::LegacyOutgoing)) + .unbounded_send((peer_id, None, ConnectionType::New)) .is_err() { debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service."); diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs index c1a6c998a9..d16430d205 100644 --- a/finality-aleph/src/validator_network/protocols/mod.rs +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -14,7 +14,6 @@ use crate::{ mod handshake; mod negotiation; mod v0; -#[allow(dead_code)] mod v1; use handshake::HandshakeError; @@ -46,6 +45,9 @@ pub type ResultForService = ( pub enum Protocol { /// The first version of the protocol, with unidirectional connections. V0, + /// The current version of the protocol, with pseudorandom connection direction and + /// multiplexing. + V1, } /// Protocol error. @@ -102,7 +104,7 @@ impl Protocol { const MIN_VERSION: Version = 0; /// Maximal supported protocol version. - const MAX_VERSION: Version = 0; + const MAX_VERSION: Version = 1; /// Launches the proper variant of the protocol (receiver half). pub async fn manage_incoming( @@ -115,6 +117,7 @@ impl Protocol { use Protocol::*; match self { V0 => v0::incoming(stream, authority_pen, result_for_service, data_for_user).await, + V1 => v1::incoming(stream, authority_pen, result_for_service, data_for_user).await, } } @@ -125,11 +128,21 @@ impl Protocol { authority_pen: AuthorityPen, peer_id: AuthorityId, result_for_service: mpsc::UnboundedSender>, - _data_for_user: mpsc::UnboundedSender, + data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { use Protocol::*; match self { V0 => v0::outgoing(stream, authority_pen, peer_id, result_for_service).await, + V1 => { + v1::outgoing( + stream, + authority_pen, + peer_id, + result_for_service, + data_for_user, + ) + .await + } } } } @@ -140,6 +153,7 @@ impl TryFrom for Protocol { fn try_from(version: Version) -> Result { match version { 0 => Ok(Protocol::V0), + 1 => Ok(Protocol::V1), unknown_version => Err(unknown_version), } } diff --git a/finality-aleph/src/validator_network/protocols/negotiation.rs b/finality-aleph/src/validator_network/protocols/negotiation.rs index aa6ff97f89..52f0f1203a 100644 --- a/finality-aleph/src/validator_network/protocols/negotiation.rs +++ b/finality-aleph/src/validator_network/protocols/negotiation.rs @@ -145,7 +145,7 @@ mod tests { fn correct_negotiation(result: Result<(S, Protocol), ProtocolNegotiationError>) { match result { - Ok((_stream, protocol)) => assert_eq!(Protocol::V0, protocol), + Ok((_stream, protocol)) => assert_eq!(Protocol::V1, protocol), Err(e) => panic!("Unexpected error: {:?}", e), } } diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index 75b9198e98..86a2aca44c 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -1,20 +1,20 @@ -use std::fmt::Debug; +use std::{collections::HashSet, fmt::Debug}; use aleph_primitives::AuthorityId; use futures::{ channel::{mpsc, oneshot}, StreamExt, }; -use log::{info, trace, warn}; +use log::{debug, info, trace, warn}; use tokio::time; use crate::{ crypto::AuthorityPen, validator_network::{ incoming::incoming, - manager::{AddResult, Manager}, + manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, - protocols::ResultForService, + protocols::{ConnectionType, ResultForService}, Data, Dialer, Listener, Network, }, SpawnTaskHandle, STATUS_REPORT_INTERVAL, @@ -74,7 +74,7 @@ impl Network for ServiceInterface { } /// A service that has to be run for the validator network to work. -pub struct Service, NL: Listener> { +pub struct Service, NL: Listener> { commands_from_interface: mpsc::UnboundedReceiver>, next_to_interface: mpsc::UnboundedSender, manager: Manager, @@ -82,6 +82,9 @@ pub struct Service, NL: Listener> { listener: NL, spawn_handle: SpawnTaskHandle, authority_pen: AuthorityPen, + // Backwards compatibility with the one-sided connections, remove when no longer needed. + legacy_connected: HashSet, + legacy_manager: LegacyManager, } impl, NL: Listener> Service { @@ -100,11 +103,13 @@ impl, NL: Listener> Service, NL: Listener> Service, NL: Listener> Service Option> { + match self.legacy_connected.contains(peer_id) { + true => self.legacy_manager.peer_addresses(peer_id), + false => self.manager.peer_addresses(peer_id), + } + } + + fn add_connection( + &mut self, + peer_id: AuthorityId, + data_for_network: mpsc::UnboundedSender, + connection_type: ConnectionType, + ) -> AddResult { + use ConnectionType::*; + match connection_type { + New => { + // If we are adding a non-legacy connection we want to ensure it's not marked as + // such. This should only matter if a peer initially used the legacy protocol but + // now upgraded, otherwise this is unnecessary busywork, but what can you do. + self.unmark_legacy(&peer_id); + self.manager.add_connection(peer_id, data_for_network) + } + LegacyIncoming => self.legacy_manager.add_incoming(peer_id, data_for_network), + LegacyOutgoing => self.legacy_manager.add_outgoing(peer_id, data_for_network), + } + } + + // Mark a peer as legacy and return whether it is the first time we do so. + fn mark_legacy(&mut self, peer_id: &AuthorityId) -> bool { + self.manager.remove_peer(peer_id); + self.legacy_connected.insert(peer_id.clone()) + } + + // Unmark a peer as legacy, putting it back in the normal set. + fn unmark_legacy(&mut self, peer_id: &AuthorityId) { + self.legacy_connected.remove(peer_id); + // Put it back if we still want to be connected. + if let Some(addresses) = self.legacy_manager.peer_addresses(peer_id) { + self.manager.add_peer(peer_id.clone(), addresses); + } + } + + // Checks whether this peer should now be marked as one using the legacy protocol and handled + // accordingly. Returns whether we should spawn a new connection worker because of that. + fn check_for_legacy(&mut self, peer_id: &AuthorityId, connection_type: ConnectionType) -> bool { + use ConnectionType::*; + match connection_type { + LegacyIncoming => self.mark_legacy(peer_id), + LegacyOutgoing => { + self.mark_legacy(peer_id); + false + } + // We don't unmark here, because we always return New when a connection + // fails early, and in such cases we want to keep the previous guess as to + // how we want to connect. We unmark once we successfully negotiate and add + // a connection. + New => false, + } + } + /// Run the service until a signal from exit. pub async fn run(mut self, mut exit: oneshot::Receiver<()>) { let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); - // channel used to receive tuple (peer_id, exit_handle) from a spawned worker - // that has just established an incoming connection - // exit_handle may be used to kill the worker later - let (incoming_result_for_parent, mut incoming_workers) = mpsc::unbounded(); - // channel used to receive information about failure from a spawned worker - // that managed an outgoing connection - // the received peer_id can be used to spawn another worker - let (outgoing_result_for_parent, mut outgoing_workers) = mpsc::unbounded(); + let (result_for_parent, mut worker_results) = mpsc::unbounded(); use ServiceCommand::*; loop { tokio::select! { // got new incoming connection from the listener - spawn an incoming worker maybe_stream = self.listener.accept() => match maybe_stream { - Ok(stream) => self.spawn_new_incoming(stream, incoming_result_for_parent.clone()), + Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone()), Err(e) => warn!(target: "validator-network", "Listener failed to accept connection: {}", e), }, // got a new command from the interface @@ -174,52 +231,65 @@ impl, NL: Listener> Service { + // we add all the peers to the legacy manager so we don't lose the + // addresses, but only care about its opinion when it turns out we have to + // in particular the first time we add a peer we never know whether it + // requires legacy connecting, so we only attempt to connect to it if the + // new criterion is satisfied, otherwise we wait for it to connect to us + self.legacy_manager.add_peer(peer_id.clone(), addresses.clone()); if self.manager.add_peer(peer_id.clone(), addresses.clone()) { - self.spawn_new_outgoing(peer_id, addresses, outgoing_result_for_parent.clone()); + self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); }; }, // remove the peer from the manager all workers will be killed automatically, due to closed channels DelConnection(peer_id) => { self.manager.remove_peer(&peer_id); + self.legacy_manager.remove_peer(&peer_id); + self.legacy_connected.remove(&peer_id); }, // pass the data to the manager SendData(data, peer_id) => { - match self.manager.send_to(&peer_id, data) { - Ok(_) => trace!(target: "validator-network", "Sending data to {}.", peer_id), - Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", peer_id, e), + match self.legacy_connected.contains(&peer_id) { + true => match self.legacy_manager.send_to(&peer_id, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {} through legacy.", peer_id), + Err(e) => trace!(target: "validator-network", "Failed sending to {} through legacy: {}", peer_id, e), + }, + false => match self.manager.send_to(&peer_id, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {}.", peer_id), + Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", peer_id, e), + }, } }, }, - // received tuple (peer_id, exit_handle) from a spawned worker - // that has just established an incoming connection - // pass the tuple to the manager to register the connection - // the manager will be responsible for killing the worker if necessary - Some((peer_id, Some(exit), _)) = incoming_workers.next() => { - use AddResult::*; - match self.manager.add_incoming(peer_id.clone(), exit) { - Uninterested => info!(target: "validator-network", "Peer {} connected to us despite out lack of interest.", peer_id), - Added => info!(target: "validator-network", "New incoming connection for peer {}.", peer_id), - Replaced => info!(target: "validator-network", "Replaced incoming connection for peer {}.", peer_id), - } - }, - // received information from a spawned worker managing an outgoing connection + // received information from a spawned worker managing a connection // check if we still want to be connected to the peer, and if so, spawn a new worker or actually add proper connection - Some((peer_id, maybe_data_for_network, _)) = outgoing_workers.next() => { - use AddResult::*; - if let Some(addresses) = self.manager.peer_addresses(&peer_id) { - match maybe_data_for_network { - Some(data_for_network) => match self.manager.add_outgoing(peer_id.clone(), data_for_network) { - Uninterested => warn!(target: "validator-network", "We connected to peer {} for unknown reasons.", peer_id), - Added => info!(target: "validator-network", "New outgoing connection to peer {}.", peer_id), - Replaced => info!(target: "validator-network", "Replaced outgoing connection to peer {}.", peer_id), + Some((peer_id, maybe_data_for_network, connection_type)) = worker_results.next() => { + if self.check_for_legacy(&peer_id, connection_type) { + match self.legacy_manager.peer_addresses(&peer_id) { + Some(addresses) => self.spawn_new_outgoing(peer_id.clone(), addresses, result_for_parent.clone()), + None => { + // We received a result from a worker we are no longer interested + // in. + self.legacy_connected.remove(&peer_id); }, - None => self.spawn_new_outgoing(peer_id, addresses, outgoing_result_for_parent.clone()), } - }; + } + use AddResult::*; + match maybe_data_for_network { + Some(data_for_network) => match self.add_connection(peer_id.clone(), data_for_network, connection_type) { + Uninterested => warn!(target: "validator-network", "Established connection with peer {} for unknown reasons.", peer_id), + Added => info!(target: "validator-network", "New connection with peer {}.", peer_id), + Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", peer_id), + }, + None => if let Some(addresses) = self.peer_addresses(&peer_id) { + self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); + } + } }, // periodically reporting what we are trying to do _ = status_ticker.tick() => { - info!(target: "validator-network", "Validator Network status: {}", self.manager.status_report()) + info!(target: "validator-network", "Validator Network status: {}", self.manager.status_report()); + debug!(target: "validator-network", "Validator Network legacy status: {}", self.legacy_manager.status_report()); } // received exit signal, stop the network // all workers will be killed automatically after the manager gets dropped