From 5ad40e157ca425b160836d9e1eeb311e8d15371b Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Tue, 26 Nov 2019 22:42:42 -0800 Subject: [PATCH 1/2] Refactor protocol into internal, external modules. This commit just moves things around and patches import paths. --- zebra-network/src/constants.rs | 2 +- zebra-network/src/network.rs | 2 +- zebra-network/src/peer/connector.rs | 2 +- zebra-network/src/peer/error.rs | 2 +- zebra-network/src/peer/handshake.rs | 5 +- zebra-network/src/peer/server.rs | 3 +- zebra-network/src/protocol.rs | 16 ++--- zebra-network/src/protocol/external.rs | 12 ++++ .../src/protocol/{ => external}/codec.rs | 0 .../src/protocol/{ => external}/inv.rs | 0 .../src/protocol/{ => external}/message.rs | 0 zebra-network/src/protocol/external/types.rs | 55 +++++++++++++++++ zebra-network/src/protocol/internal.rs | 55 ++--------------- .../src/protocol/internal/request.rs | 20 ++++++ .../src/protocol/internal/response.rs | 28 +++++++++ zebra-network/src/protocol/types.rs | 61 ++----------------- 16 files changed, 137 insertions(+), 126 deletions(-) create mode 100644 zebra-network/src/protocol/external.rs rename zebra-network/src/protocol/{ => external}/codec.rs (100%) rename zebra-network/src/protocol/{ => external}/inv.rs (100%) rename zebra-network/src/protocol/{ => external}/message.rs (100%) create mode 100644 zebra-network/src/protocol/external/types.rs create mode 100644 zebra-network/src/protocol/internal/request.rs create mode 100644 zebra-network/src/protocol/internal/response.rs diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index ec1a3b86e14..7a54b8a2678 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -3,7 +3,7 @@ use std::time::Duration; // XXX should these constants be split into protocol also? -use crate::protocol::types::*; +use crate::protocol::external::types::*; /// The timeout for requests made to a remote peer. pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/zebra-network/src/network.rs b/zebra-network/src/network.rs index febbb1f2191..d5cfdc478e7 100644 --- a/zebra-network/src/network.rs +++ b/zebra-network/src/network.rs @@ -1,4 +1,4 @@ -use crate::{constants::magics, protocol::types::Magic}; +use crate::{constants::magics, protocol::external::types::Magic}; /// An enum describing the possible network choices. #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index aae7f5f134f..a9845a68b38 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -54,6 +54,6 @@ where let client = hs.call((stream, addr)).await?; Ok(Change::Insert(addr, client)) } - .boxed() + .boxed() } } diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 76958c3e3e0..379da4c4f4e 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -68,7 +68,7 @@ impl ErrorSlot { pub enum HandshakeError { /// The remote peer sent an unexpected message during the handshake. #[error("The remote peer sent an unexpected message: {0:?}")] - UnexpectedMessage(crate::protocol::message::Message), + UnexpectedMessage(crate::protocol::external::Message), /// The peer connector detected handshake nonce reuse, possibly indicating self-connection. #[error("Detected nonce reuse, possible self-connection")] NonceReuse, diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index c6cf5d78145..d76e9638ca1 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -17,7 +17,10 @@ use zebra_chain::types::BlockHeight; use crate::{ constants, - protocol::{codec::*, internal::*, message::*, types::*}, + protocol::{ + external::{types::*, Codec, Message}, + internal::{Request, Response}, + }, types::MetaAddr, BoxedStdError, Config, }; diff --git a/zebra-network/src/peer/server.rs b/zebra-network/src/peer/server.rs index aa333b57ba4..dc32c487ccc 100644 --- a/zebra-network/src/peer/server.rs +++ b/zebra-network/src/peer/server.rs @@ -16,9 +16,8 @@ use zebra_chain::{serialization::SerializationError, transaction::TransactionHas use crate::{ constants, protocol::{ + external::{InventoryHash, Message}, internal::{Request, Response}, - inv::InventoryHash, - message::Message, }, BoxedStdError, }; diff --git a/zebra-network/src/protocol.rs b/zebra-network/src/protocol.rs index 38ac65dbb8e..303665d8fb8 100644 --- a/zebra-network/src/protocol.rs +++ b/zebra-network/src/protocol.rs @@ -1,14 +1,8 @@ //! Zcash network protocol handling. -pub mod codec; -pub mod message; -pub mod types; - -pub mod inv; - -// XXX at some later point the above should move to an `external` submodule, so -// that we have -// - protocol::external::{all_bitcoin_zcash_types}; -// - protocol::internal::{all_internal_req_rsp_types}; - +/// The external Bitcoin-based protocol. +pub mod external; +/// The internal request/response protocol. pub mod internal; +/// Newtype wrappers giving semantic meaning to primitive datatypes. +pub mod types; diff --git a/zebra-network/src/protocol/external.rs b/zebra-network/src/protocol/external.rs new file mode 100644 index 00000000000..befe8861e12 --- /dev/null +++ b/zebra-network/src/protocol/external.rs @@ -0,0 +1,12 @@ +/// A Tokio codec that transforms an `AsyncRead` into a `Stream` of `Message`s. +mod codec; +/// Inventory items. +mod inv; +/// An enum of all supported Bitcoin message types. +mod message; +/// Newtype wrappers for primitive types. +pub mod types; + +pub use codec::Codec; +pub use inv::InventoryHash; +pub use message::Message; diff --git a/zebra-network/src/protocol/codec.rs b/zebra-network/src/protocol/external/codec.rs similarity index 100% rename from zebra-network/src/protocol/codec.rs rename to zebra-network/src/protocol/external/codec.rs diff --git a/zebra-network/src/protocol/inv.rs b/zebra-network/src/protocol/external/inv.rs similarity index 100% rename from zebra-network/src/protocol/inv.rs rename to zebra-network/src/protocol/external/inv.rs diff --git a/zebra-network/src/protocol/message.rs b/zebra-network/src/protocol/external/message.rs similarity index 100% rename from zebra-network/src/protocol/message.rs rename to zebra-network/src/protocol/external/message.rs diff --git a/zebra-network/src/protocol/external/types.rs b/zebra-network/src/protocol/external/types.rs new file mode 100644 index 00000000000..022cbdbe497 --- /dev/null +++ b/zebra-network/src/protocol/external/types.rs @@ -0,0 +1,55 @@ +use hex; +use std::fmt; + +/// A magic number identifying the network. +#[derive(Copy, Clone, Eq, PartialEq)] +pub struct Magic(pub [u8; 4]); + +impl fmt::Debug for Magic { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("Magic").field(&hex::encode(&self.0)).finish() + } +} + +/// A protocol version number. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct Version(pub u32); + +bitflags! { + /// A bitflag describing services advertised by a node in the network. + /// + /// Note that bits 24-31 are reserved for temporary experiments; other + /// service bits should be allocated via the ZIP process. + #[derive(Default)] + pub struct PeerServices: u64 { + /// NODE_NETWORK means that the node is a full node capable of serving + /// blocks, as opposed to a light client that makes network requests but + /// does not provide network services. + const NODE_NETWORK = (1 << 0); + /// NODE_BLOOM means that the node supports bloom-filtered connections. + const NODE_BLOOM = (1 << 2); + } +} + +/// A nonce used in the networking layer to identify messages. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +pub struct Nonce(pub u64); + +impl Default for Nonce { + fn default() -> Self { + use rand::{thread_rng, Rng}; + Self(thread_rng().gen()) + } +} + +#[cfg(test)] +mod tests { + + use crate::constants::magics; + + #[test] + fn magic_debug() { + assert_eq!(format!("{:?}", magics::MAINNET), "Magic(\"24e92764\")"); + assert_eq!(format!("{:?}", magics::TESTNET), "Magic(\"fa1af9bf\")"); + } +} diff --git a/zebra-network/src/protocol/internal.rs b/zebra-network/src/protocol/internal.rs index 186d2fb4fcf..e97891a1f77 100644 --- a/zebra-network/src/protocol/internal.rs +++ b/zebra-network/src/protocol/internal.rs @@ -1,52 +1,5 @@ -//! Message types for the internal request/response protocol. -//! -//! These are currently defined just as enums with all possible requests and -//! responses, so that we have unified types to pass around. No serialization -//! is performed as these are only internal types. +mod request; +mod response; -use std::error::Error; - -use zebra_chain::transaction::Transaction; - -use crate::meta_addr::MetaAddr; - -use super::types::Nonce; - -/// A network request, represented in internal format. -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Request { - /// Requests additional peers from the server. - GetPeers, - /// Advertises peers to the remote server. - PushPeers(Vec), - /// Heartbeats triggered on peer connection start. - // This is included as a bit of a hack, it should only be used - // internally for connection management. You should not expect to - // be firing or handling `Ping` requests or `Pong` responses. - Ping(Nonce), - /// Requests the transactions the remote server has verified but - /// not yet confirmed. - GetMempool, -} - -/// A response to a network request, represented in internal format. -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum Response { - /// Generic success. - Ok, - /// Generic error. - Error, - /// A list of peers, used to respond to `GetPeers`. - Peers(Vec), - /// A list of transactions, such as in response to `GetMempool`. - Transactions(Vec), -} - -impl From for Response -where - E: Error, -{ - fn from(_e: E) -> Self { - Self::Error - } -} +pub use request::Request; +pub use response::Response; diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs new file mode 100644 index 00000000000..bfeb5bff17b --- /dev/null +++ b/zebra-network/src/protocol/internal/request.rs @@ -0,0 +1,20 @@ +use crate::meta_addr::MetaAddr; + +use super::super::types::Nonce; + +/// A network request, represented in internal format. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Request { + /// Requests additional peers from the server. + GetPeers, + /// Advertises peers to the remote server. + PushPeers(Vec), + /// Heartbeats triggered on peer connection start. + // This is included as a bit of a hack, it should only be used + // internally for connection management. You should not expect to + // be firing or handling `Ping` requests or `Pong` responses. + Ping(Nonce), + /// Requests the transactions the remote server has verified but + /// not yet confirmed. + GetMempool, +} diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs new file mode 100644 index 00000000000..8441e507221 --- /dev/null +++ b/zebra-network/src/protocol/internal/response.rs @@ -0,0 +1,28 @@ +use std::error::Error; + +// XXX clean module layout of zebra_chain +use zebra_chain::transaction::Transaction; + +use crate::meta_addr::MetaAddr; + +/// A response to a network request, represented in internal format. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Response { + /// Generic success. + Ok, + /// Generic error. + Error, + /// A list of peers, used to respond to `GetPeers`. + Peers(Vec), + /// A list of transactions, such as in response to `GetMempool`. + Transactions(Vec), +} + +impl From for Response +where + E: Error, +{ + fn from(_e: E) -> Self { + Self::Error + } +} diff --git a/zebra-network/src/protocol/types.rs b/zebra-network/src/protocol/types.rs index 6af2b78634d..26121bfb08f 100644 --- a/zebra-network/src/protocol/types.rs +++ b/zebra-network/src/protocol/types.rs @@ -1,57 +1,4 @@ -//! Newtype wrappers assigning semantic meaning to primitive types. - -use hex; -use std::fmt; - -/// A magic number identifying the network. -#[derive(Copy, Clone, Eq, PartialEq)] -pub struct Magic(pub [u8; 4]); - -impl fmt::Debug for Magic { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("Magic").field(&hex::encode(&self.0)).finish() - } -} - -/// A protocol version number. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub struct Version(pub u32); - -bitflags! { - /// A bitflag describing services advertised by a node in the network. - /// - /// Note that bits 24-31 are reserved for temporary experiments; other - /// service bits should be allocated via the ZIP process. - #[derive(Default)] - pub struct PeerServices: u64 { - /// NODE_NETWORK means that the node is a full node capable of serving - /// blocks, as opposed to a light client that makes network requests but - /// does not provide network services. - const NODE_NETWORK = (1 << 0); - /// NODE_BLOOM means that the node supports bloom-filtered connections. - const NODE_BLOOM = (1 << 2); - } -} - -/// A nonce used in the networking layer to identify messages. -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub struct Nonce(pub u64); - -impl Default for Nonce { - fn default() -> Self { - use rand::{thread_rng, Rng}; - Self(thread_rng().gen()) - } -} - -#[cfg(test)] -mod tests { - - use crate::constants::magics; - - #[test] - fn magic_debug() { - assert_eq!(format!("{:?}", magics::MAINNET), "Magic(\"24e92764\")"); - assert_eq!(format!("{:?}", magics::TESTNET), "Magic(\"fa1af9bf\")"); - } -} +// Because of the `ping` hack, `Nonce` is included in `Request`s. +pub use super::external::types::Nonce; +// The services flag is used in `MetaAddr`s. +pub use super::external::types::PeerServices; From 4a58b939f18203779e91925808721b6071cb1479 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Tue, 26 Nov 2019 23:04:05 -0800 Subject: [PATCH 2/2] Move PeerSet initialization into a submodule. --- zebra-network/src/peer_set.rs | 280 +--------------------- zebra-network/src/peer_set/initialize.rs | 281 +++++++++++++++++++++++ 2 files changed, 283 insertions(+), 278 deletions(-) create mode 100644 zebra-network/src/peer_set/initialize.rs diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 44ac0255be8..4c39df4cc13 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -1,285 +1,9 @@ -//! A peer set whose size is dynamically determined by resource constraints. - -// Portions of this submodule were adapted from tower-balance, -// which is (c) 2019 Tower Contributors (MIT licensed). - -// XXX these imports should go in a peer_set::initialize submodule - -use std::{ - net::SocketAddr, - sync::{Arc, Mutex}, - time::Duration, -}; - -use futures::{ - channel::mpsc, - future::{self, Future, FutureExt}, - sink::SinkExt, - stream::{FuturesUnordered, StreamExt}, -}; -use tokio::net::{TcpListener, TcpStream}; -use tower::{ - buffer::Buffer, - discover::{Change, ServiceStream}, - layer::Layer, - Service, ServiceExt, -}; -use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument}; - -use crate::{ - peer::{PeerClient, PeerConnector, PeerHandshake}, - timestamp_collector::TimestampCollector, - AddressBook, BoxedStdError, Config, Request, Response, -}; - mod candidate_set; +mod initialize; mod set; mod unready_service; use candidate_set::CandidateSet; use set::PeerSet; -type PeerChange = Result, BoxedStdError>; - -/// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`. -pub async fn init( - config: Config, - inbound_service: S, -) -> ( - impl Service< - Request, - Response = Response, - Error = BoxedStdError, - Future = impl Future> + Send, - > + Send - + Clone - + 'static, - Arc>, -) -where - S: Service + Clone + Send + 'static, - S::Future: Send + 'static, -{ - let (address_book, timestamp_collector) = TimestampCollector::spawn(); - - // Construct services that handle inbound handshakes and perform outbound - // handshakes. These use the same handshake service internally to detect - // self-connection attempts. Both are decorated with a tower TimeoutLayer to - // enforce timeouts as specified in the Config. - let (listener, connector) = { - use tower::timeout::TimeoutLayer; - let hs_timeout = TimeoutLayer::new(config.handshake_timeout); - let hs = PeerHandshake::new(config.clone(), inbound_service, timestamp_collector); - ( - hs_timeout.layer(hs.clone()), - hs_timeout.layer(PeerConnector::new(hs)), - ) - }; - - // Create an mpsc channel for peer changes, with a generous buffer. - let (peerset_tx, peerset_rx) = mpsc::channel::(100); - // Create an mpsc channel for peerset demand signaling. - let (demand_tx, demand_rx) = mpsc::channel::<()>(100); - - // Connect the rx end to a PeerSet, wrapping new peers in load instruments. - let peer_set = Buffer::new( - PeerSet::new( - PeakEwmaDiscover::new( - ServiceStream::new( - // ServiceStream interprets an error as stream termination, - // so discard any errored connections... - peerset_rx.filter(|result| future::ready(result.is_ok())), - ), - config.ewma_default_rtt, - config.ewma_decay_time, - NoInstrument, - ), - demand_tx, - ), - config.peerset_request_buffer_size, - ); - - // Connect the tx end to the 3 peer sources: - - // 1. Initial peers, specified in the config. - tokio::spawn(add_initial_peers( - config.initial_peers(), - connector.clone(), - peerset_tx.clone(), - )); - - // 2. Incoming peer connections, via a listener. - tokio::spawn( - listen(config.listen_addr, listener, peerset_tx.clone()).map(|result| { - if let Err(e) = result { - error!(%e); - } - }), - ); - - // 3. Outgoing peers we connect to in response to load. - - let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); - - // We need to await candidates.update() here, because Zcashd only sends one - // `addr` message per connection, and if we only have one initial peer we - // need to ensure that its `addr` message is used by the crawler. - // XXX this should go in CandidateSet::new, but we need init() -> Result<_,_> - let _ = candidates.update().await; - - info!("Sending initial request for peers"); - tokio::spawn( - crawl_and_dial( - config.new_peer_interval, - demand_rx, - candidates, - connector, - peerset_tx, - ) - .map(|result| { - if let Err(e) = result { - error!(%e); - } - }), - ); - - (peer_set, address_book) -} - -/// Use the provided `handshaker` to connect to `initial_peers`, then send -/// the results over `tx`. -#[instrument(skip(initial_peers, connector, tx))] -async fn add_initial_peers( - initial_peers: Vec, - connector: S, - mut tx: mpsc::Sender, -) where - S: Service, Error = BoxedStdError> - + Clone, - S::Future: Send + 'static, -{ - info!(?initial_peers, "Connecting to initial peer set"); - use tower::util::CallAllUnordered; - let addr_stream = futures::stream::iter(initial_peers.into_iter()); - let mut handshakes = CallAllUnordered::new(connector, addr_stream); - while let Some(handshake_result) = handshakes.next().await { - let _ = tx.send(handshake_result).await; - } -} - -/// Bind to `addr`, listen for peers using `handshaker`, then send the -/// results over `tx`. -#[instrument(skip(tx, handshaker))] -async fn listen( - addr: SocketAddr, - mut handshaker: S, - tx: mpsc::Sender, -) -> Result<(), BoxedStdError> -where - S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, - S::Future: Send + 'static, -{ - let mut listener = TcpListener::bind(addr).await?; - loop { - if let Ok((tcp_stream, addr)) = listener.accept().await { - debug!(?addr, "got incoming connection"); - handshaker.ready().await?; - // Construct a handshake future but do not drive it yet.... - let handshake = handshaker.call((tcp_stream, addr)); - // ... instead, spawn a new task to handle this connection - let mut tx2 = tx.clone(); - tokio::spawn(async move { - if let Ok(client) = handshake.await { - let _ = tx2.send(Ok(Change::Insert(addr, client))).await; - } - }); - } - } -} - -/// Given a channel that signals a need for new peers, try to connect to a peer -/// and send the resulting `PeerClient` through a channel. -/// -#[instrument(skip(new_peer_interval, demand_signal, candidates, connector, success_tx))] -async fn crawl_and_dial( - new_peer_interval: Duration, - demand_signal: mpsc::Receiver<()>, - mut candidates: CandidateSet, - mut connector: C, - mut success_tx: mpsc::Sender, -) -> Result<(), BoxedStdError> -where - C: Service, Error = BoxedStdError> - + Clone, - C::Future: Send + 'static, - S: Service, - S::Future: Send + 'static, -{ - use futures::TryFutureExt; - - // On creation, we are likely to have very few peers, so try to get more - // connections quickly by concurrently connecting to a large number of - // candidates. - let mut handshakes = FuturesUnordered::new(); - for _ in 0..50usize { - if let Some(candidate) = candidates.next() { - connector.ready().await?; - handshakes.push( - connector - .call(candidate.addr) - // Use map_err to tag failed connections with the MetaAddr, - // so they can be reported to the CandidateSet. - .map_err(move |_| candidate), - ) - } - } - while let Some(handshake) = handshakes.next().await { - match handshake { - Ok(change) => { - debug!("Successfully dialed new peer, sending to peerset"); - success_tx.send(Ok(change)).await?; - } - Err(candidate) => { - debug!(?candidate.addr, "marking address as failed"); - candidates.report_failed(candidate); - } - } - } - - use tokio::timer::Interval; - let mut connect_signal = futures::stream::select( - Interval::new_interval(new_peer_interval).map(|_| ()), - demand_signal, - ); - while let Some(()) = connect_signal.next().await { - debug!("got demand signal from peer set, updating candidates"); - candidates.update().await?; - loop { - let candidate = match candidates.next() { - Some(candidate) => candidate, - None => { - warn!("got demand for more peers but no available candidates"); - break; - } - }; - - connector.ready().await?; - match connector - .call(candidate.addr) - .map_err(move |_| candidate) - .await - { - Ok(change) => { - debug!("Successfully dialed new peer, sending to peerset"); - success_tx.send(Ok(change)).await?; - break; - } - Err(candidate) => { - debug!(?candidate.addr, "marking address as failed"); - candidates.report_failed(candidate); - } - } - } - } - Ok(()) -} +pub use initialize::init; diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs new file mode 100644 index 00000000000..bc0b4d6afea --- /dev/null +++ b/zebra-network/src/peer_set/initialize.rs @@ -0,0 +1,281 @@ +//! A peer set whose size is dynamically determined by resource constraints. + +// Portions of this submodule were adapted from tower-balance, +// which is (c) 2019 Tower Contributors (MIT licensed). + +// XXX these imports should go in a peer_set::initialize submodule + +use std::{ + net::SocketAddr, + sync::{Arc, Mutex}, + time::Duration, +}; + +use futures::{ + channel::mpsc, + future::{self, Future, FutureExt}, + sink::SinkExt, + stream::{FuturesUnordered, StreamExt}, +}; +use tokio::net::{TcpListener, TcpStream}; +use tower::{ + buffer::Buffer, + discover::{Change, ServiceStream}, + layer::Layer, + Service, ServiceExt, +}; +use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument}; + +use crate::{ + peer::{PeerClient, PeerConnector, PeerHandshake}, + timestamp_collector::TimestampCollector, + AddressBook, BoxedStdError, Config, Request, Response, +}; + +use super::CandidateSet; +use super::PeerSet; + +type PeerChange = Result, BoxedStdError>; + +/// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`. +pub async fn init( + config: Config, + inbound_service: S, +) -> ( + impl Service< + Request, + Response = Response, + Error = BoxedStdError, + Future = impl Future> + Send, + > + Send + + Clone + + 'static, + Arc>, +) +where + S: Service + Clone + Send + 'static, + S::Future: Send + 'static, +{ + let (address_book, timestamp_collector) = TimestampCollector::spawn(); + + // Construct services that handle inbound handshakes and perform outbound + // handshakes. These use the same handshake service internally to detect + // self-connection attempts. Both are decorated with a tower TimeoutLayer to + // enforce timeouts as specified in the Config. + let (listener, connector) = { + use tower::timeout::TimeoutLayer; + let hs_timeout = TimeoutLayer::new(config.handshake_timeout); + let hs = PeerHandshake::new(config.clone(), inbound_service, timestamp_collector); + ( + hs_timeout.layer(hs.clone()), + hs_timeout.layer(PeerConnector::new(hs)), + ) + }; + + // Create an mpsc channel for peer changes, with a generous buffer. + let (peerset_tx, peerset_rx) = mpsc::channel::(100); + // Create an mpsc channel for peerset demand signaling. + let (demand_tx, demand_rx) = mpsc::channel::<()>(100); + + // Connect the rx end to a PeerSet, wrapping new peers in load instruments. + let peer_set = Buffer::new( + PeerSet::new( + PeakEwmaDiscover::new( + ServiceStream::new( + // ServiceStream interprets an error as stream termination, + // so discard any errored connections... + peerset_rx.filter(|result| future::ready(result.is_ok())), + ), + config.ewma_default_rtt, + config.ewma_decay_time, + NoInstrument, + ), + demand_tx, + ), + config.peerset_request_buffer_size, + ); + + // Connect the tx end to the 3 peer sources: + + // 1. Initial peers, specified in the config. + tokio::spawn(add_initial_peers( + config.initial_peers(), + connector.clone(), + peerset_tx.clone(), + )); + + // 2. Incoming peer connections, via a listener. + tokio::spawn( + listen(config.listen_addr, listener, peerset_tx.clone()).map(|result| { + if let Err(e) = result { + error!(%e); + } + }), + ); + + // 3. Outgoing peers we connect to in response to load. + + let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); + + // We need to await candidates.update() here, because Zcashd only sends one + // `addr` message per connection, and if we only have one initial peer we + // need to ensure that its `addr` message is used by the crawler. + // XXX this should go in CandidateSet::new, but we need init() -> Result<_,_> + let _ = candidates.update().await; + + info!("Sending initial request for peers"); + tokio::spawn( + crawl_and_dial( + config.new_peer_interval, + demand_rx, + candidates, + connector, + peerset_tx, + ) + .map(|result| { + if let Err(e) = result { + error!(%e); + } + }), + ); + + (peer_set, address_book) +} + +/// Use the provided `handshaker` to connect to `initial_peers`, then send +/// the results over `tx`. +#[instrument(skip(initial_peers, connector, tx))] +async fn add_initial_peers( + initial_peers: Vec, + connector: S, + mut tx: mpsc::Sender, +) where + S: Service, Error = BoxedStdError> + + Clone, + S::Future: Send + 'static, +{ + info!(?initial_peers, "Connecting to initial peer set"); + use tower::util::CallAllUnordered; + let addr_stream = futures::stream::iter(initial_peers.into_iter()); + let mut handshakes = CallAllUnordered::new(connector, addr_stream); + while let Some(handshake_result) = handshakes.next().await { + let _ = tx.send(handshake_result).await; + } +} + +/// Bind to `addr`, listen for peers using `handshaker`, then send the +/// results over `tx`. +#[instrument(skip(tx, handshaker))] +async fn listen( + addr: SocketAddr, + mut handshaker: S, + tx: mpsc::Sender, +) -> Result<(), BoxedStdError> +where + S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, + S::Future: Send + 'static, +{ + let mut listener = TcpListener::bind(addr).await?; + loop { + if let Ok((tcp_stream, addr)) = listener.accept().await { + debug!(?addr, "got incoming connection"); + handshaker.ready().await?; + // Construct a handshake future but do not drive it yet.... + let handshake = handshaker.call((tcp_stream, addr)); + // ... instead, spawn a new task to handle this connection + let mut tx2 = tx.clone(); + tokio::spawn(async move { + if let Ok(client) = handshake.await { + let _ = tx2.send(Ok(Change::Insert(addr, client))).await; + } + }); + } + } +} + +/// Given a channel that signals a need for new peers, try to connect to a peer +/// and send the resulting `PeerClient` through a channel. +/// +#[instrument(skip(new_peer_interval, demand_signal, candidates, connector, success_tx))] +async fn crawl_and_dial( + new_peer_interval: Duration, + demand_signal: mpsc::Receiver<()>, + mut candidates: CandidateSet, + mut connector: C, + mut success_tx: mpsc::Sender, +) -> Result<(), BoxedStdError> +where + C: Service, Error = BoxedStdError> + + Clone, + C::Future: Send + 'static, + S: Service, + S::Future: Send + 'static, +{ + use futures::TryFutureExt; + + // On creation, we are likely to have very few peers, so try to get more + // connections quickly by concurrently connecting to a large number of + // candidates. + let mut handshakes = FuturesUnordered::new(); + for _ in 0..50usize { + if let Some(candidate) = candidates.next() { + connector.ready().await?; + handshakes.push( + connector + .call(candidate.addr) + // Use map_err to tag failed connections with the MetaAddr, + // so they can be reported to the CandidateSet. + .map_err(move |_| candidate), + ) + } + } + while let Some(handshake) = handshakes.next().await { + match handshake { + Ok(change) => { + debug!("Successfully dialed new peer, sending to peerset"); + success_tx.send(Ok(change)).await?; + } + Err(candidate) => { + debug!(?candidate.addr, "marking address as failed"); + candidates.report_failed(candidate); + } + } + } + + use tokio::timer::Interval; + let mut connect_signal = futures::stream::select( + Interval::new_interval(new_peer_interval).map(|_| ()), + demand_signal, + ); + while let Some(()) = connect_signal.next().await { + debug!("got demand signal from peer set, updating candidates"); + candidates.update().await?; + loop { + let candidate = match candidates.next() { + Some(candidate) => candidate, + None => { + warn!("got demand for more peers but no available candidates"); + break; + } + }; + + connector.ready().await?; + match connector + .call(candidate.addr) + .map_err(move |_| candidate) + .await + { + Ok(change) => { + debug!("Successfully dialed new peer, sending to peerset"); + success_tx.send(Ok(change)).await?; + break; + } + Err(candidate) => { + debug!(?candidate.addr, "marking address as failed"); + candidates.report_failed(candidate); + } + } + } + } + Ok(()) +}