diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index eb110563983..eea59d5b161 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3202,6 +3202,9 @@ where NetworkBehaviourAction::ReportObservedAddr { address, score } => { NetworkBehaviourAction::ReportObservedAddr { address, score } } + NetworkBehaviourAction::CloseConnection { peer_id, connection } => { + NetworkBehaviourAction::CloseConnection { peer_id, connection } + } }); } diff --git a/protocols/request-response/src/throttled.rs b/protocols/request-response/src/throttled.rs index 95edd106bee..611331f4f52 100644 --- a/protocols/request-response/src/throttled.rs +++ b/protocols/request-response/src/throttled.rs @@ -657,7 +657,9 @@ where | NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }, | NetworkBehaviourAction::ReportObservedAddr { address, score } => - NetworkBehaviourAction::ReportObservedAddr { address, score } + NetworkBehaviourAction::ReportObservedAddr { address, score }, + | NetworkBehaviourAction::CloseConnection { peer_id, connection } => + NetworkBehaviourAction::CloseConnection { peer_id, connection } }; return Poll::Ready(event) diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 351dd902ef3..a5cdf4900ca 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -479,6 +479,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address, score }) => { return std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address, score }); } + std::task::Poll::Ready(#network_behaviour_action::CloseConnection { peer_id, connection }) => { + return std::task::Poll::Ready(#network_behaviour_action::CloseConnection { peer_id, connection }); + } std::task::Poll::Pending => break, } } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 103f88d2891..0f01b314987 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -15,7 +15,12 @@ See [PR 2100] for details. +- Add `ExpandedSwarm::disconnect_peer_id` and + `NetworkBehaviourAction::CloseConnection` to close connections to a specific + peer via an `ExpandedSwarm` or `NetworkBehaviour`. See [PR 2110] for details. + [PR 2100]: https://github.com/libp2p/rust-libp2p/pull/2100 +[PR 2110]: https://github.com/libp2p/rust-libp2p/pull/2110/ # 0.29.0 [2021-04-13] diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 58c066486a4..432cef9514b 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -293,6 +293,23 @@ pub enum NetworkBehaviourAction { /// relative to other observed addresses. score: AddressScore, }, + + /// Instructs the `Swarm` to initiate a graceful close of one or all connections + /// with the given peer. + /// + /// Note: Closing a connection via + /// [`NetworkBehaviourAction::CloseConnection`] does not inform the + /// corresponding [`ProtocolsHandler`]. + /// Closing a connection via a [`ProtocolsHandler`] can be done + /// either in a collaborative manner across [`ProtocolsHandler`]s + /// with [`ProtocolsHandler::connection_keep_alive`] or directly with + /// [`ProtocolsHandlerEvent::Close`](crate::ProtocolsHandlerEvent::Close). + CloseConnection { + /// The peer to disconnect. + peer_id: PeerId, + /// Whether to close a specific or all connections to the given peer. + connection: CloseConnection, + } } impl NetworkBehaviourAction { @@ -312,7 +329,9 @@ impl NetworkBehaviourAction { event: f(event) }, NetworkBehaviourAction::ReportObservedAddr { address, score } => - NetworkBehaviourAction::ReportObservedAddr { address, score } + NetworkBehaviourAction::ReportObservedAddr { address, score }, + NetworkBehaviourAction::CloseConnection { peer_id, connection } => + NetworkBehaviourAction::CloseConnection { peer_id, connection } } } @@ -328,7 +347,9 @@ impl NetworkBehaviourAction { NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }, NetworkBehaviourAction::ReportObservedAddr { address, score } => - NetworkBehaviourAction::ReportObservedAddr { address, score } + NetworkBehaviourAction::ReportObservedAddr { address, score }, + NetworkBehaviourAction::CloseConnection { peer_id, connection } => + NetworkBehaviourAction::CloseConnection { peer_id, connection } } } } @@ -373,3 +394,18 @@ impl Default for DialPeerCondition { DialPeerCondition::Disconnected } } + +/// The options which connections to close. +#[derive(Debug, Clone)] +pub enum CloseConnection { + /// Disconnect a particular connection. + One(ConnectionId), + /// Disconnect all connections. + All, +} + +impl Default for CloseConnection { + fn default() -> Self { + CloseConnection::All + } +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index edefe05e758..37895a5c8ca 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -68,7 +68,8 @@ pub use behaviour::{ NetworkBehaviourEventProcess, PollParameters, NotifyHandler, - DialPeerCondition + DialPeerCondition, + CloseConnection }; pub use protocols_handler::{ IntoProtocolsHandler, @@ -463,6 +464,25 @@ where TBehaviour: NetworkBehaviour, self.banned_peers.remove(&peer_id); } + /// Disconnects a peer by its peer ID, closing all connections to said peer. + /// + /// Returns `Ok(())` if there was one or more established connections to the peer. + /// + /// Note: Closing a connection via [`ExpandedSwarm::disconnect_peer_id`] does + /// not inform the corresponding [`ProtocolsHandler`]. + /// Closing a connection via a [`ProtocolsHandler`] can be done either in a + /// collaborative manner across [`ProtocolsHandler`]s + /// with [`ProtocolsHandler::connection_keep_alive`] or directly with + /// [`ProtocolsHandlerEvent::Close`]. + pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> { + if let Some(peer) = self.network.peer(peer_id).into_connected() { + peer.disconnect(); + return Ok(()); + } + + Err(()) + } + /// Checks whether the [`Network`] has an established connection to a peer. pub fn is_connected(&self, peer_id: &PeerId) -> bool { self.network.is_connected(peer_id) @@ -737,6 +757,20 @@ where TBehaviour: NetworkBehaviour, this.add_external_address(addr, score); } }, + Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }) => { + if let Some(mut peer) = this.network.peer(peer_id).into_connected() { + match connection { + CloseConnection::One(connection_id) => { + if let Some(conn) = peer.connection(connection_id) { + conn.start_close(); + } + } + CloseConnection::All => { + peer.disconnect(); + } + } + } + }, } } } @@ -838,7 +872,7 @@ where TBehaviour: NetworkBehaviour, THandler: IntoProtocolsHandler + Send + 'static, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler::Handler: + THandler::Handler: ProtocolsHandler, THandleErr: error::Error + Send + 'static, { @@ -1135,6 +1169,13 @@ mod tests { use libp2p_noise as noise; use super::*; + // Test execution state. + // Connection => Disconnecting => Connecting. + enum State { + Connecting, + Disconnecting, + } + fn new_test_swarm(handler_proto: T) -> Swarm>> where T: ProtocolsHandler + Clone, @@ -1153,7 +1194,53 @@ mod tests { SwarmBuilder::new(transport, behaviour, pubkey.into()).build() } - /// Establishes a number of connections between two peers, + fn swarms_connected( + swarm1: &Swarm>, + swarm2: &Swarm>, + num_connections: usize, + ) -> bool + where + TBehaviour: NetworkBehaviour, + <::Handler as ProtocolsHandler>::OutEvent: Clone, + { + for s in &[swarm1, swarm2] { + if s.behaviour.inject_connection_established.len() > 0 { + assert_eq!(s.behaviour.inject_connected.len(), 1); + } else { + assert_eq!(s.behaviour.inject_connected.len(), 0); + } + assert!(s.behaviour.inject_connection_closed.is_empty()); + assert!(s.behaviour.inject_disconnected.is_empty()); + } + [swarm1, swarm2] + .iter() + .all(|s| s.behaviour.inject_connection_established.len() == num_connections) + } + + fn swarms_disconnected( + swarm1: &Swarm>, + swarm2: &Swarm>, + num_connections: usize, + ) -> bool + where + TBehaviour: NetworkBehaviour, + <::Handler as ProtocolsHandler>::OutEvent: Clone + { + for s in &[swarm1, swarm2] { + if s.behaviour.inject_connection_closed.len() < num_connections { + assert_eq!(s.behaviour.inject_disconnected.len(), 0); + } else { + assert_eq!(s.behaviour.inject_disconnected.len(), 1); + } + assert_eq!(s.behaviour.inject_connection_established.len(), 0); + assert_eq!(s.behaviour.inject_connected.len(), 0); + } + [swarm1, swarm2] + .iter() + .all(|s| s.behaviour.inject_connection_closed.len() == num_connections) + } + + /// Establishes multiple connections between two peers, /// after which one peer bans the other. /// /// The test expects both behaviours to be notified via pairs of @@ -1163,8 +1250,7 @@ mod tests { fn test_connect_disconnect_ban() { // Since the test does not try to open any substreams, we can // use the dummy protocols handler. - let mut handler_proto = DummyProtocolsHandler::default(); - handler_proto.keep_alive = KeepAlive::Yes; + let handler_proto = DummyProtocolsHandler { keep_alive: KeepAlive::Yes }; let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()); let mut swarm2 = new_test_swarm::<_, ()>(handler_proto); @@ -1175,12 +1261,6 @@ mod tests { swarm1.listen_on(addr1.clone().into()).unwrap(); swarm2.listen_on(addr2.clone().into()).unwrap(); - // Test execution state. Connection => Disconnecting => Connecting. - enum State { - Connecting, - Disconnecting, - } - let swarm1_id = *swarm1.local_peer_id(); let mut banned = false; @@ -1188,7 +1268,7 @@ mod tests { let num_connections = 10; - for _ in 0 .. num_connections { + for _ in 0..num_connections { swarm1.dial_addr(addr2.clone()).unwrap(); } let mut state = State::Connecting; @@ -1199,18 +1279,7 @@ mod tests { let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx); match state { State::Connecting => { - for s in &[&swarm1, &swarm2] { - if s.behaviour.inject_connection_established.len() > 0 { - assert_eq!(s.behaviour.inject_connected.len(), 1); - } else { - assert_eq!(s.behaviour.inject_connected.len(), 0); - } - assert!(s.behaviour.inject_connection_closed.len() == 0); - assert!(s.behaviour.inject_disconnected.len() == 0); - } - if [&swarm1, &swarm2].iter().all(|s| { - s.behaviour.inject_connection_established.len() == num_connections - }) { + if swarms_connected(&swarm1, &swarm2, num_connections) { if banned { return Poll::Ready(()) } @@ -1222,18 +1291,7 @@ mod tests { } } State::Disconnecting => { - for s in &[&swarm1, &swarm2] { - if s.behaviour.inject_connection_closed.len() < num_connections { - assert_eq!(s.behaviour.inject_disconnected.len(), 0); - } else { - assert_eq!(s.behaviour.inject_disconnected.len(), 1); - } - assert_eq!(s.behaviour.inject_connection_established.len(), 0); - assert_eq!(s.behaviour.inject_connected.len(), 0); - } - if [&swarm1, &swarm2].iter().all(|s| { - s.behaviour.inject_connection_closed.len() == num_connections - }) { + if swarms_disconnected(&swarm1, &swarm2, num_connections) { if unbanned { return Poll::Ready(()) } @@ -1242,7 +1300,77 @@ mod tests { swarm1.behaviour.reset(); swarm2.behaviour.reset(); unbanned = true; - for _ in 0 .. num_connections { + for _ in 0..num_connections { + swarm2.dial_addr(addr1.clone()).unwrap(); + } + state = State::Connecting; + } + } + } + + if poll1.is_pending() && poll2.is_pending() { + return Poll::Pending + } + } + })) + } + + /// Establishes multiple connections between two peers, + /// after which one peer disconnects the other using [`ExpandedSwarm::disconnect_peer_id`]. + /// + /// The test expects both behaviours to be notified via pairs of + /// inject_connected / inject_disconnected as well as + /// inject_connection_established / inject_connection_closed calls. + #[test] + fn test_swarm_disconnect() { + // Since the test does not try to open any substreams, we can + // use the dummy protocols handler. + let handler_proto = DummyProtocolsHandler { keep_alive: KeepAlive::Yes }; + + let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()); + let mut swarm2 = new_test_swarm::<_, ()>(handler_proto); + + let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); + let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); + + swarm1.listen_on(addr1.clone().into()).unwrap(); + swarm2.listen_on(addr2.clone().into()).unwrap(); + + let swarm1_id = *swarm1.local_peer_id(); + + let mut reconnected = false; + let num_connections = 10; + + for _ in 0..num_connections { + swarm1.dial_addr(addr2.clone()).unwrap(); + } + let mut state = State::Connecting; + + executor::block_on(future::poll_fn(move |cx| { + loop { + let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx); + let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx); + match state { + State::Connecting => { + if swarms_connected(&swarm1, &swarm2, num_connections) { + if reconnected { + return Poll::Ready(()) + } + swarm2.disconnect_peer_id(swarm1_id.clone()).expect("Error disconnecting"); + swarm1.behaviour.reset(); + swarm2.behaviour.reset(); + state = State::Disconnecting; + } + } + State::Disconnecting => { + if swarms_disconnected(&swarm1, &swarm2, num_connections) { + if reconnected { + return Poll::Ready(()) + } + reconnected = true; + swarm1.behaviour.reset(); + swarm2.behaviour.reset(); + for _ in 0..num_connections { swarm2.dial_addr(addr1.clone()).unwrap(); } state = State::Connecting; @@ -1256,4 +1384,161 @@ mod tests { } })) } + + /// Establishes multiple connections between two peers, + /// after which one peer disconnects the other + /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. + /// + /// The test expects both behaviours to be notified via pairs of + /// inject_connected / inject_disconnected as well as + /// inject_connection_established / inject_connection_closed calls. + #[test] + fn test_behaviour_disconnect_all() { + // Since the test does not try to open any substreams, we can + // use the dummy protocols handler. + let handler_proto = DummyProtocolsHandler { keep_alive: KeepAlive::Yes }; + + let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()); + let mut swarm2 = new_test_swarm::<_, ()>(handler_proto); + + let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); + let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); + + swarm1.listen_on(addr1.clone().into()).unwrap(); + swarm2.listen_on(addr2.clone().into()).unwrap(); + + let swarm1_id = *swarm1.local_peer_id(); + + let mut reconnected = false; + let num_connections = 10; + + for _ in 0..num_connections { + swarm1.dial_addr(addr2.clone()).unwrap(); + } + let mut state = State::Connecting; + + executor::block_on(future::poll_fn(move |cx| { + loop { + let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx); + let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx); + match state { + State::Connecting => { + if swarms_connected(&swarm1, &swarm2, num_connections) { + if reconnected { + return Poll::Ready(()) + } + swarm2 + .behaviour + .inner() + .next_action + .replace(NetworkBehaviourAction::CloseConnection { + peer_id: swarm1_id.clone(), + connection: CloseConnection::All, + }); + swarm1.behaviour.reset(); + swarm2.behaviour.reset(); + state = State::Disconnecting; + } + } + State::Disconnecting => { + if swarms_disconnected(&swarm1, &swarm2, num_connections) { + if reconnected { + return Poll::Ready(()) + } + reconnected = true; + swarm1.behaviour.reset(); + swarm2.behaviour.reset(); + for _ in 0..num_connections { + swarm2.dial_addr(addr1.clone()).unwrap(); + } + state = State::Connecting; + } + } + } + + if poll1.is_pending() && poll2.is_pending() { + return Poll::Pending + } + } + })) + } + + /// Establishes multiple connections between two peers, + /// after which one peer closes a single connection + /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. + /// + /// The test expects both behaviours to be notified via pairs of + /// inject_connected / inject_disconnected as well as + /// inject_connection_established / inject_connection_closed calls. + #[test] + fn test_behaviour_disconnect_one() { + // Since the test does not try to open any substreams, we can + // use the dummy protocols handler. + let handler_proto = DummyProtocolsHandler { keep_alive: KeepAlive::Yes }; + + let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()); + let mut swarm2 = new_test_swarm::<_, ()>(handler_proto); + + let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); + let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); + + swarm1.listen_on(addr1.clone().into()).unwrap(); + swarm2.listen_on(addr2.clone().into()).unwrap(); + + let swarm1_id = *swarm1.local_peer_id(); + + let num_connections = 10; + + for _ in 0..num_connections { + swarm1.dial_addr(addr2.clone()).unwrap(); + } + let mut state = State::Connecting; + let mut disconnected_conn_id = None; + + executor::block_on(future::poll_fn(move |cx| { + loop { + let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx); + let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx); + match state { + State::Connecting => { + if swarms_connected(&swarm1, &swarm2, num_connections) { + disconnected_conn_id = { + let conn_id = swarm2.behaviour.inject_connection_established[num_connections / 2].1; + swarm2 + .behaviour + .inner() + .next_action + .replace(NetworkBehaviourAction::CloseConnection { + peer_id: swarm1_id.clone(), + connection: CloseConnection::One(conn_id), + }); + Some(conn_id) + }; + swarm1.behaviour.reset(); + swarm2.behaviour.reset(); + state = State::Disconnecting; + } + } + State::Disconnecting => { + for s in &[&swarm1, &swarm2] { + assert_eq!(s.behaviour.inject_disconnected.len(), 0); + assert_eq!(s.behaviour.inject_connection_established.len(), 0); + assert_eq!(s.behaviour.inject_connected.len(), 0); + } + if [&swarm1, &swarm2].iter().all(|s| { + s.behaviour.inject_connection_closed.len() == 1 + }) { + let conn_id = swarm2.behaviour.inject_connection_closed[0].1; + assert_eq!(Some(conn_id), disconnected_conn_id); + return Poll::Ready(()); + } + } + } + + if poll1.is_pending() && poll2.is_pending() { + return Poll::Pending + } + } + })) + } } diff --git a/swarm/src/test.rs b/swarm/src/test.rs index c34eb417627..6bb11a34cc0 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -167,6 +167,8 @@ where self.inject_listener_closed = Vec::new(); self.poll = 0; } + + pub fn inner(&mut self) -> &mut TInner { &mut self.inner } } impl NetworkBehaviour for CallTraceBehaviour