diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 52355b37d91..253ebed5e76 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -27,7 +27,7 @@ where S::Future: Send, C: ChainTip + Clone + Send + 'static, { - handshaker: Handshake, + handshaker: Handshake, } impl Clone for Connector @@ -49,7 +49,7 @@ where S::Future: Send, C: ChainTip + Clone + Send + 'static, { - pub fn new(handshaker: Handshake) -> Self { + pub fn new(handshaker: Handshake) -> Self { Connector { handshaker } } } @@ -87,15 +87,14 @@ where connection_tracker, }: OutboundConnectorRequest = req; - let mut hs = self.handshaker.clone(); + let hs = self.handshaker.clone(); let connected_addr = ConnectedAddr::new_outbound_direct(addr); let connector_span = info_span!("connector", peer = ?connected_addr); async move { let tcp_stream = TcpStream::connect(addr).await?; - hs.ready().await?; let client = hs - .call(HandshakeRequest:: { + .oneshot(HandshakeRequest:: { data_stream: tcp_stream, connected_addr, connection_tracker, diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index be3076b55f9..47cbfaaacf2 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -1,9 +1,10 @@ +//! Initial [`Handshake`s] with Zebra peers over a `PeerTransport`. + use std::{ cmp::min, collections::HashSet, fmt, future::Future, - marker::PhantomData, net::{IpAddr, Ipv4Addr, SocketAddr}, pin::Pin, sync::Arc, @@ -54,12 +55,11 @@ use crate::{ /// To avoid hangs, each handshake (or its connector) should be: /// - launched in a separate task, and /// - wrapped in a timeout. -pub struct Handshake +pub struct Handshake where S: Service + Clone + Send + 'static, S::Future: Send, C: ChainTip + Clone + Send + 'static, - PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, { config: Config, user_agent: String, @@ -73,16 +73,13 @@ where nonces: Arc>>, parent_span: Span, - - _phantom_data: PhantomData, } -impl Clone for Handshake +impl Clone for Handshake where S: Service + Clone + Send + 'static, S::Future: Send, C: ChainTip + Clone + Send + 'static, - PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, { fn clone(&self) -> Self { Self { @@ -96,7 +93,6 @@ where minimum_peer_version: self.minimum_peer_version.clone(), nonces: self.nonces.clone(), parent_span: self.parent_span.clone(), - _phantom_data: self._phantom_data, } } } @@ -340,12 +336,11 @@ impl fmt::Debug for ConnectedAddr { } /// A builder for `Handshake`. -pub struct Builder +pub struct Builder where S: Service + Clone + Send + 'static, S::Future: Send, C: ChainTip + Clone + Send + 'static, - PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, { config: Option, our_services: Option, @@ -356,16 +351,13 @@ where address_book_updater: Option>, inv_collector: Option>, latest_chain_tip: C, - - _phantom_data: PhantomData, } -impl Builder +impl Builder where S: Service + Clone + Send + 'static, S::Future: Send, C: ChainTip + Clone + Send + 'static, - PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, { /// Provide a config. Mandatory. pub fn with_config(mut self, config: Config) -> Self { @@ -425,10 +417,7 @@ where /// constant over network upgrade activations. /// /// Use [`NoChainTip`] to explicitly provide no chain tip. - pub fn with_latest_chain_tip( - self, - latest_chain_tip: NewC, - ) -> Builder + pub fn with_latest_chain_tip(self, latest_chain_tip: NewC) -> Builder where NewC: ChainTip + Clone + Send + 'static, { @@ -443,7 +432,6 @@ where user_agent: self.user_agent, relay: self.relay, inv_collector: self.inv_collector, - _phantom_data: self._phantom_data, } } @@ -458,7 +446,7 @@ where /// Consume this builder and produce a [`Handshake`]. /// /// Returns an error only if any mandatory field was unset. - pub fn finish(self) -> Result, &'static str> { + pub fn finish(self) -> Result, &'static str> { let config = self.config.ok_or("did not specify config")?; let inbound_service = self .inbound_service @@ -491,19 +479,17 @@ where minimum_peer_version, nonces, parent_span: Span::current(), - _phantom_data: self._phantom_data, }) } } -impl Handshake +impl Handshake where S: Service + Clone + Send + 'static, S::Future: Send, - PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static, { /// Create a builder that configures a [`Handshake`] service. - pub fn builder() -> Builder { + pub fn builder() -> Builder { // We don't derive `Default` because the derive inserts a `where S: // Default` bound even though `Option` implements `Default` even if // `S` does not. @@ -516,7 +502,6 @@ where address_book_updater: None, inv_collector: None, latest_chain_tip: NoChainTip, - _phantom_data: PhantomData::default(), } } } @@ -745,8 +730,7 @@ where pub connection_tracker: ConnectionTracker, } -impl Service> - for Handshake +impl Service> for Handshake where S: Service + Clone + Send + 'static, S::Future: Send,