diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 639641e75cf..0b7573997d5 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,5 +1,9 @@ # 0.25.0 [unreleased] +- Permit a configuration override for the substream upgrade protocol + to use for all (outbound) substreams. + [PR 1858](https://github.com/libp2p/rust-libp2p/pull/1858). + - Changed parameters for connection limits from `usize` to `u32`. Connection limits are now configured via `SwarmBuilder::connection_limits()`. diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index b3e0d4deeaf..af3ccb71bcb 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -120,7 +120,7 @@ use libp2p_core::{ NetworkConfig, peer::ConnectedPeer, }, - upgrade::ProtocolName, + upgrade::{ProtocolName}, }; use registry::{Addresses, AddressIntoIter}; use smallvec::SmallVec; @@ -286,7 +286,10 @@ where /// Pending event to be delivered to connection handlers /// (or dropped if the peer disconnected) before the `behaviour` /// can be polled again. - pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)> + pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>, + + /// The configured override for substream protocol upgrades, if any. + substream_upgrade_protocol_override: Option, } impl Deref for @@ -357,8 +360,10 @@ where TBehaviour: NetworkBehaviour, /// Initiates a new dialing attempt to the given address. pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> { - let handler = me.behaviour.new_handler(); - me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ()) + let handler = me.behaviour.new_handler() + .into_node_handler_builder() + .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override); + me.network.dial(&addr, handler).map(|_id| ()) } /// Initiates a new dialing attempt to the given peer. @@ -375,7 +380,9 @@ where TBehaviour: NetworkBehaviour, let result = if let Some(first) = addrs.next() { - let handler = me.behaviour.new_handler().into_node_handler_builder(); + let handler = me.behaviour.new_handler() + .into_node_handler_builder() + .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override); me.network.peer(peer_id.clone()) .dial(first, addrs, handler) .map(|_| ()) @@ -546,10 +553,12 @@ where TBehaviour: NetworkBehaviour, }); }, Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => { - let handler = this.behaviour.new_handler(); + let handler = this.behaviour.new_handler() + .into_node_handler_builder() + .with_substream_upgrade_protocol_override(this.substream_upgrade_protocol_override); let local_addr = connection.local_addr.clone(); let send_back_addr = connection.send_back_addr.clone(); - if let Err(e) = this.network.accept(connection, handler.into_node_handler_builder()) { + if let Err(e) = this.network.accept(connection, handler) { log::warn!("Incoming connection rejected: {:?}", e); } return Poll::Ready(SwarmEvent::IncomingConnection { @@ -962,6 +971,7 @@ pub struct SwarmBuilder { transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, network_config: NetworkConfig, + substream_upgrade_protocol_override: Option, } impl SwarmBuilder @@ -980,6 +990,7 @@ where TBehaviour: NetworkBehaviour, transport: transport, behaviour, network_config: Default::default(), + substream_upgrade_protocol_override: None, } } @@ -1040,6 +1051,21 @@ where TBehaviour: NetworkBehaviour, self } + /// Configures an override for the substream upgrade protocol to use. + /// + /// The subtream upgrade protocol is the multistream-select protocol + /// used for protocol negotiation on substreams. Since a listener + /// supports all existing versions, the choice of upgrade protocol + /// only effects the "dialer", i.e. the peer opening a substream. + /// + /// > **Note**: If configured, specific upgrade protocols for + /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour` + /// > are ignored. + pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self { + self.substream_upgrade_protocol_override = Some(v); + self + } + /// Builds a `Swarm` with the current configuration. pub fn build(mut self) -> Swarm { let supported_protocols = self.behaviour @@ -1075,7 +1101,8 @@ where TBehaviour: NetworkBehaviour, listened_addrs: SmallVec::new(), external_addrs: Addresses::default(), banned_peers: HashSet::new(), - pending_event: None + pending_event: None, + substream_upgrade_protocol_override: self.substream_upgrade_protocol_override, } } } diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index a0d2f53d6c9..ce4b6b313fb 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -49,6 +49,8 @@ use wasm_timer::{Delay, Instant}; pub struct NodeHandlerWrapperBuilder { /// The underlying handler. handler: TIntoProtoHandler, + /// The substream upgrade protocol override, if any. + substream_upgrade_protocol_override: Option, } impl NodeHandlerWrapperBuilder @@ -59,8 +61,17 @@ where pub(crate) fn new(handler: TIntoProtoHandler) -> Self { NodeHandlerWrapperBuilder { handler, + substream_upgrade_protocol_override: None, } } + + pub(crate) fn with_substream_upgrade_protocol_override( + mut self, + version: Option + ) -> Self { + self.substream_upgrade_protocol_override = version; + self + } } impl IntoConnectionHandler @@ -79,6 +90,7 @@ where queued_dial_upgrades: Vec::new(), unique_dial_upgrade_id: 0, shutdown: Shutdown::None, + substream_upgrade_protocol_override: self.substream_upgrade_protocol_override, } } } @@ -109,6 +121,8 @@ where unique_dial_upgrade_id: u64, /// The currently planned connection & handler shutdown. shutdown: Shutdown, + /// The substream upgrade protocol override, if any. + substream_upgrade_protocol_override: Option, } struct SubstreamUpgrade { @@ -254,7 +268,13 @@ where } }; - let (_, (version, upgrade)) = self.queued_dial_upgrades.remove(pos); + let (_, (mut version, upgrade)) = self.queued_dial_upgrades.remove(pos); + if let Some(v) = self.substream_upgrade_protocol_override { + if v != version { + log::debug!("Substream upgrade protocol override: {:?} -> {:?}", version, v); + version = v; + } + } let upgrade = upgrade::apply_outbound(substream, upgrade, version); let timeout = Delay::new(timeout); self.negotiating_out.push(SubstreamUpgrade {