diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 63b0c0c1522..fdbad1c21b9 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -1,4 +1,4 @@ -//! The addressbook manages information about what peers exist, when they were +//! The `AddressBook` manages information about what peers exist, when they were //! seen, and what services they provide. use std::{ @@ -13,8 +13,39 @@ use tracing::Span; use crate::{constants, types::MetaAddr, PeerAddrState}; -/// A database of peers, their advertised services, and information on when they -/// were last seen. +/// A database of peer listener addresses, their advertised services, and +/// information on when they were last seen. +/// +/// # Security +/// +/// Address book state must be based on outbound connections to peers. +/// +/// If the address book is updated incorrectly: +/// - malicious peers can interfere with other peers' `AddressBook` state, +/// or +/// - Zebra can advertise unreachable addresses to its own peers. +/// +/// ## Adding Addresses +/// +/// The address book should only contain Zcash listener port addresses from peers +/// on the configured network. These addresses can come from: +/// - DNS seeders +/// - addresses gossiped by other peers +/// - the canonical address (`Version.address_from`) provided by each peer, +/// particularly peers on inbound connections. +/// +/// The remote addresses of inbound connections must not be added to the address +/// book, because they contain ephemeral outbound ports, not listener ports. +/// +/// Isolated connections must not add addresses or update the address book. +/// +/// ## Updating Address State +/// +/// Updates to address state must be based on outbound connections to peers. +/// +/// Updates must not be based on: +/// - the remote addresses of inbound connections, or +/// - the canonical address of any connection. #[derive(Clone, Debug)] pub struct AddressBook { /// Each known peer address has a matching `MetaAddr` @@ -33,8 +64,11 @@ pub struct AddressMetrics { /// The number of addresses in the `Responded` state. responded: usize, - /// The number of addresses in the `NeverAttempted` state. - never_attempted: usize, + /// The number of addresses in the `NeverAttemptedGossiped` state. + never_attempted_gossiped: usize, + + /// The number of addresses in the `NeverAttemptedAlternate` state. + never_attempted_alternate: usize, /// The number of addresses in the `Failed` state. failed: usize, @@ -93,9 +127,10 @@ impl AddressBook { /// Add `new` to the address book, updating the previous entry if `new` is /// more recent or discarding `new` if it is stale. /// - /// ## Note + /// # Correctness /// - /// All changes should go through `update` or `take`, to ensure accurate metrics. + /// All new addresses should go through `update`, so that the address book + /// only contains valid outbound addresses. pub fn update(&mut self, new: MetaAddr) { let _guard = self.span.enter(); trace!( @@ -104,6 +139,14 @@ impl AddressBook { recent_peers = self.recently_live_peers().count(), ); + // Drop any unspecified or client addresses. + // + // Communication with these addresses can be monitored via Zebra's + // metrics. (The address book is for valid peer addresses.) + if !new.is_valid_for_outbound() { + return; + } + if let Some(prev) = self.get_by_addr(new.addr) { if prev.get_last_seen() > new.get_last_seen() { return; @@ -117,9 +160,10 @@ impl AddressBook { /// Removes the entry with `addr`, returning it if it exists /// - /// ## Note + /// # Note /// - /// All changes should go through `update` or `take`, to ensure accurate metrics. + /// All address removals should go through `take`, so that the address + /// book metrics are accurate. fn take(&mut self, removed_addr: SocketAddr) -> Option { let _guard = self.span.enter(); trace!( @@ -254,7 +298,12 @@ impl AddressBook { /// Returns metrics for the addresses in this address book. pub fn address_metrics(&self) -> AddressMetrics { let responded = self.state_peers(PeerAddrState::Responded).count(); - let never_attempted = self.state_peers(PeerAddrState::NeverAttempted).count(); + let never_attempted_gossiped = self + .state_peers(PeerAddrState::NeverAttemptedGossiped) + .count(); + let never_attempted_alternate = self + .state_peers(PeerAddrState::NeverAttemptedAlternate) + .count(); let failed = self.state_peers(PeerAddrState::Failed).count(); let attempt_pending = self.state_peers(PeerAddrState::AttemptPending).count(); @@ -265,7 +314,8 @@ impl AddressBook { AddressMetrics { responded, - never_attempted, + never_attempted_gossiped, + never_attempted_alternate, failed, attempt_pending, recently_live, @@ -281,7 +331,11 @@ impl AddressBook { // TODO: rename to address_book.[state_name] metrics::gauge!("candidate_set.responded", m.responded as f64); - metrics::gauge!("candidate_set.gossiped", m.never_attempted as f64); + metrics::gauge!("candidate_set.gossiped", m.never_attempted_gossiped as f64); + metrics::gauge!( + "candidate_set.alternate", + m.never_attempted_alternate as f64 + ); metrics::gauge!("candidate_set.failed", m.failed as f64); metrics::gauge!("candidate_set.pending", m.attempt_pending as f64); @@ -327,7 +381,12 @@ impl AddressBook { self.last_address_log = Some(Instant::now()); // if all peers have failed - if m.responded + m.attempt_pending + m.never_attempted == 0 { + if m.responded + + m.attempt_pending + + m.never_attempted_gossiped + + m.never_attempted_alternate + == 0 + { warn!( address_metrics = ?m, "all peer addresses have failed. Hint: check your network connection" diff --git a/zebra-network/src/isolated.rs b/zebra-network/src/isolated.rs index 18e8be2a32e..f1a7872ffe0 100644 --- a/zebra-network/src/isolated.rs +++ b/zebra-network/src/isolated.rs @@ -15,6 +15,7 @@ use tower::{ }; use crate::{peer, BoxError, Config, Request, Response}; +use peer::ConnectedAddr; /// Use the provided TCP connection to create a Zcash connection completely /// isolated from all other node state. @@ -57,13 +58,11 @@ pub fn connect_isolated( .finish() .expect("provided mandatory builder parameters"); - // We can't get the remote addr from conn, because it might be a tcp - // connection through a socks proxy, not directly to the remote. But it - // doesn't seem like zcashd cares if we give a bogus one, and Zebra doesn't - // touch it at all. - let remote_addr = "0.0.0.0:8233".parse().unwrap(); + // Don't send any metadata about the connection + let connected_addr = ConnectedAddr::new_isolated(); - Oneshot::new(handshake, (conn, remote_addr)).map_ok(|client| BoxService::new(Wrapper(client))) + Oneshot::new(handshake, (conn, connected_addr)) + .map_ok(|client| BoxService::new(Wrapper(client))) } // This can be deleted when a new version of Tower with map_err is released. diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index 04f2521ff36..700b608fcc4 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -44,7 +44,13 @@ pub enum PeerAddrState { /// The peer's address has just been fetched from a DNS seeder, or via peer /// gossip, but we haven't attempted to connect to it yet. - NeverAttempted, + NeverAttemptedGossiped, + + /// The peer's address has just been received as part of a `Version` message, + /// so we might already be connected to this peer. + /// + /// Alternate addresses are attempted after gossiped addresses. + NeverAttemptedAlternate, /// The peer's TCP connection failed, or the peer sent us an unexpected /// Zcash protocol message, so we failed the connection. @@ -54,9 +60,11 @@ pub enum PeerAddrState { AttemptPending, } +// non-test code should explicitly specify the peer address state +#[cfg(test)] impl Default for PeerAddrState { fn default() -> Self { - NeverAttempted + NeverAttemptedGossiped } } @@ -66,19 +74,23 @@ impl Ord for PeerAddrState { /// /// See [`CandidateSet`] and [`MetaAddr::cmp`] for more details. fn cmp(&self, other: &Self) -> Ordering { + use Ordering::*; match (self, other) { (Responded, Responded) - | (NeverAttempted, NeverAttempted) | (Failed, Failed) - | (AttemptPending, AttemptPending) => Ordering::Equal, + | (NeverAttemptedGossiped, NeverAttemptedGossiped) + | (NeverAttemptedAlternate, NeverAttemptedAlternate) + | (AttemptPending, AttemptPending) => Equal, // We reconnect to `Responded` peers that have stopped sending messages, // then `NeverAttempted` peers, then `Failed` peers - (Responded, _) => Ordering::Less, - (_, Responded) => Ordering::Greater, - (NeverAttempted, _) => Ordering::Less, - (_, NeverAttempted) => Ordering::Greater, - (Failed, _) => Ordering::Less, - (_, Failed) => Ordering::Greater, + (Responded, _) => Less, + (_, Responded) => Greater, + (NeverAttemptedGossiped, _) => Less, + (_, NeverAttemptedGossiped) => Greater, + (NeverAttemptedAlternate, _) => Less, + (_, NeverAttemptedAlternate) => Greater, + (Failed, _) => Less, + (_, Failed) => Greater, // AttemptPending is covered by the other cases } } @@ -124,8 +136,8 @@ pub struct MetaAddr { } impl MetaAddr { - /// Create a new `MetaAddr` from the deserialized fields in an `Addr` - /// message. + /// Create a new `MetaAddr` from the deserialized fields in a gossiped + /// peer `Addr` message. pub fn new_gossiped( addr: &SocketAddr, services: &PeerServices, @@ -136,11 +148,19 @@ impl MetaAddr { services: *services, last_seen: *last_seen, // the state is Zebra-specific, it isn't part of the Zcash network protocol - last_connection_state: NeverAttempted, + last_connection_state: NeverAttemptedGossiped, } } /// Create a new `MetaAddr` for a peer that has just `Responded`. + /// + /// # Security + /// + /// This address must be the remote address from an outbound connection. + /// Otherwise: + /// - malicious peers could interfere with other peers' `AddressBook` state, + /// or + /// - Zebra could advertise unreachable addresses to its own peers. pub fn new_responded(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { MetaAddr { addr: *addr, @@ -160,6 +180,17 @@ impl MetaAddr { } } + /// Create a new `MetaAddr` for a peer's alternate address, received via a + /// `Version` message. + pub fn new_alternate(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { + MetaAddr { + addr: *addr, + services: *services, + last_seen: Utc::now(), + last_connection_state: NeverAttemptedAlternate, + } + } + /// Create a new `MetaAddr` for a peer that has just had an error. pub fn new_errored(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { MetaAddr { @@ -195,6 +226,13 @@ impl MetaAddr { self.last_seen } + /// Is this address valid for outbound connections? + pub fn is_valid_for_outbound(&self) -> bool { + self.services.contains(PeerServices::NODE_NETWORK) + && !self.addr.ip().is_unspecified() + && self.addr.port() != 0 + } + /// Return a sanitized version of this `MetaAddr`, for sending to a remote peer. pub fn sanitize(&self) -> MetaAddr { let interval = crate::constants::TIMESTAMP_TRUNCATION_SECONDS; @@ -207,7 +245,7 @@ impl MetaAddr { services: self.services, last_seen, // the state isn't sent to the remote peer, but sanitize it anyway - last_connection_state: Default::default(), + last_connection_state: NeverAttemptedGossiped, } } } @@ -222,6 +260,7 @@ impl Ord for MetaAddr { /// See [`CandidateSet`] for more details. fn cmp(&self, other: &Self) -> Ordering { use std::net::IpAddr::{V4, V6}; + use Ordering::*; let oldest_first = self.get_last_seen().cmp(&other.get_last_seen()); let newest_first = oldest_first.reverse(); @@ -229,22 +268,23 @@ impl Ord for MetaAddr { let connection_state = self.last_connection_state.cmp(&other.last_connection_state); let reconnection_time = match self.last_connection_state { Responded => oldest_first, - NeverAttempted => newest_first, + NeverAttemptedGossiped => newest_first, + NeverAttemptedAlternate => newest_first, Failed => oldest_first, AttemptPending => oldest_first, }; let ip_numeric = match (self.addr.ip(), other.addr.ip()) { (V4(a), V4(b)) => a.octets().cmp(&b.octets()), (V6(a), V6(b)) => a.octets().cmp(&b.octets()), - (V4(_), V6(_)) => Ordering::Less, - (V6(_), V4(_)) => Ordering::Greater, + (V4(_), V6(_)) => Less, + (V6(_), V4(_)) => Greater, }; connection_state .then(reconnection_time) // The remainder is meaningless as an ordering, but required so that we // have a total order on `MetaAddr` values: self and other must compare - // as Ordering::Equal iff they are equal. + // as Equal iff they are equal. .then(ip_numeric) .then(self.addr.port().cmp(&other.addr.port())) .then(self.services.bits().cmp(&other.services.bits())) diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 1711ee9f7aa..b87c1eb8202 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -21,4 +21,4 @@ pub use client::Client; pub use connection::Connection; pub use connector::Connector; pub use error::{HandshakeError, PeerError, SharedPeerError}; -pub use handshake::Handshake; +pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest}; diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 46023b6a7c0..547152e593b 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -11,7 +11,7 @@ use tower::{discover::Change, Service, ServiceExt}; use crate::{BoxError, Request, Response}; -use super::{Client, Handshake}; +use super::{Client, ConnectedAddr, Handshake}; /// A wrapper around [`peer::Handshake`] that opens a TCP connection before /// forwarding to the inner handshake service. Writing this as its own @@ -53,7 +53,8 @@ where async move { let stream = TcpStream::connect(addr).await?; hs.ready_and().await?; - let client = hs.call((stream, addr)).await?; + let connected_addr = ConnectedAddr::new_outbound_direct(addr); + let client = hs.call((stream, connected_addr)).await?; Ok(Change::Insert(addr, client)) } .boxed() diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index d24d282f2bf..6ff7f4009b3 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -1,7 +1,8 @@ use std::{ collections::HashSet, + fmt, future::Future, - net::SocketAddr, + net::{IpAddr, Ipv4Addr, SocketAddr}, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -12,6 +13,7 @@ use futures::{ channel::{mpsc, oneshot}, future, FutureExt, SinkExt, StreamExt, }; +use lazy_static::lazy_static; use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout}; use tokio_util::codec::Framed; use tower::Service; @@ -53,6 +55,191 @@ pub struct Handshake { parent_span: Span, } +/// The peer address that we are handshaking with. +/// +/// Typically, we can rely on outbound addresses, but inbound addresses don't +/// give us enough information to reconnect to that peer. +#[derive(Copy, Clone, PartialEq)] +pub enum ConnectedAddr { + /// The address we used to make a direct outbound connection. + /// + /// In an honest network, a Zcash peer is listening on this exact address + /// and port. + OutboundDirect { addr: SocketAddr }, + + /// The address we received from the OS, when a remote peer directly + /// connected to our Zcash listener port. + /// + /// In an honest network, a Zcash peer might be listening on this address, + /// if its outbound address is the same as its listener address. But the port + /// is an ephemeral outbound TCP port, not a listener port. + InboundDirect { + maybe_ip: IpAddr, + transient_port: u16, + }, + + /// The proxy address we used to make an outbound connection. + /// + /// The proxy address can be used by many connections, but our own ephemeral + /// outbound address and port can be used as an identifier for the duration + /// of this connection. + OutboundProxy { + proxy_addr: SocketAddr, + transient_local_addr: SocketAddr, + }, + + /// The address we received from the OS, when a remote peer connected via an + /// inbound proxy. + /// + /// The proxy's ephemeral outbound address can be used as an identifier for + /// the duration of this connection. + InboundProxy { transient_addr: SocketAddr }, + + /// An isolated connection, where we deliberately don't connect any metadata. + Isolated, + // + // TODO: handle Tor onion addresses +} + +lazy_static! { + /// An unspecified IPv4 address + pub static ref UNSPECIFIED_IPV4_ADDR: SocketAddr = + (Ipv4Addr::UNSPECIFIED, 0).into(); +} + +use ConnectedAddr::*; + +impl ConnectedAddr { + /// Returns a new outbound directly connected addr. + pub fn new_outbound_direct(addr: SocketAddr) -> ConnectedAddr { + OutboundDirect { addr } + } + + /// Returns a new inbound directly connected addr. + pub fn new_inbound_direct(addr: SocketAddr) -> ConnectedAddr { + InboundDirect { + maybe_ip: addr.ip(), + transient_port: addr.port(), + } + } + + /// Returns a new outbound connected addr via `proxy`. + /// + /// `local_addr` is the ephemeral local address of the connection. + #[allow(unused)] + pub fn new_outbound_proxy(proxy: SocketAddr, local_addr: SocketAddr) -> ConnectedAddr { + OutboundProxy { + proxy_addr: proxy, + transient_local_addr: local_addr, + } + } + + /// Returns a new inbound connected addr from `proxy`. + // + // TODO: distinguish between direct listeners and proxy listeners in the + // rest of zebra-network + #[allow(unused)] + pub fn new_inbound_proxy(proxy: SocketAddr) -> ConnectedAddr { + InboundProxy { + transient_addr: proxy, + } + } + + /// Returns a new isolated connected addr, with no metadata. + pub fn new_isolated() -> ConnectedAddr { + Isolated + } + + /// Returns a `SocketAddr` that can be used to track this connection in the + /// `AddressBook`. + /// + /// `None` for inbound connections, proxy connections, and isolated + /// connections. + /// + /// # Correctness + /// + /// This address can be used for reconnection attempts, or as a permanent + /// identifier. + /// + /// # Security + /// + /// This address must not depend on the canonical address from the `Version` + /// message. Otherwise, malicious peers could interfere with other peers + /// `AddressBook` state. + pub fn get_address_book_addr(&self) -> Option { + match self { + OutboundDirect { addr } => Some(*addr), + // TODO: consider using the canonical address of the peer to track + // outbound proxy connections + InboundDirect { .. } | OutboundProxy { .. } | InboundProxy { .. } | Isolated => None, + } + } + + /// Returns a `SocketAddr` that can be used to temporarily identify a + /// connection. + /// + /// Isolated connections must not change Zebra's peer set or address book + /// state, so they do not have an identifier. + /// + /// # Correctness + /// + /// The returned address is only valid while the original connection is + /// open. It must not be used in the `AddressBook`, for outbound connection + /// attempts, or as a permanent identifier. + /// + /// # Security + /// + /// This address must not depend on the canonical address from the `Version` + /// message. Otherwise, malicious peers could interfere with other peers' + /// `PeerSet` state. + pub fn get_transient_addr(&self) -> Option { + match self { + OutboundDirect { addr } => Some(*addr), + InboundDirect { + maybe_ip, + transient_port, + } => Some(SocketAddr::new(*maybe_ip, *transient_port)), + OutboundProxy { + transient_local_addr, + .. + } => Some(*transient_local_addr), + InboundProxy { transient_addr } => Some(*transient_addr), + Isolated => None, + } + } + + /// Returns the metrics label for this connection's address. + pub fn get_transient_addr_label(&self) -> String { + self.get_transient_addr() + .map_or_else(|| "isolated".to_string(), |addr| addr.to_string()) + } + + /// Returns a short label for the kind of connection. + pub fn get_short_kind_label(&self) -> &'static str { + match self { + OutboundDirect { .. } => "Out", + InboundDirect { .. } => "In", + OutboundProxy { .. } => "ProxOut", + InboundProxy { .. } => "ProxIn", + Isolated => "Isol", + } + } +} + +impl fmt::Debug for ConnectedAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let kind = self.get_short_kind_label(); + let addr = self.get_transient_addr_label(); + + if matches!(self, Isolated) { + f.write_str(kind) + } else { + f.debug_tuple(kind).field(&addr).finish() + } + } +} + +/// A builder for `Handshake`. pub struct Builder { config: Option, inbound_service: Option, @@ -81,6 +268,9 @@ where } /// Provide a channel for registering inventory advertisements. Optional. + /// + /// This channel takes transient remote addresses, which the `PeerSet` uses + /// to look up peers that have specific inventory. pub fn with_inventory_collector( mut self, inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, @@ -91,7 +281,8 @@ where /// Provide a hook for timestamp collection. Optional. /// - /// If this is unset, timestamps will not be collected. + /// This channel takes `MetaAddr`s, permanent addresses which can be used to + /// make outbound connections to peers. pub fn with_timestamp_collector(mut self, timestamp_collector: mpsc::Sender) -> Self { self.timestamp_collector = Some(timestamp_collector); self @@ -181,19 +372,19 @@ where } /// Negotiate the Zcash network protocol version with the remote peer -/// at `addr`, using the connection `peer_conn`. +/// at `connected_addr`, using the connection `peer_conn`. /// /// We split `Handshake` into its components before calling this function, /// to avoid infectious `Sync` bounds on the returned future. pub async fn negotiate_version( peer_conn: &mut Framed, - addr: &SocketAddr, + connected_addr: &ConnectedAddr, config: Config, nonces: Arc>>, user_agent: String, our_services: PeerServices, relay: bool, -) -> Result<(Version, PeerServices), HandshakeError> { +) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> { // Create a random nonce for this connection let local_nonce = Nonce::default(); // # Correctness @@ -227,7 +418,12 @@ pub async fn negotiate_version( version: constants::CURRENT_VERSION, services: our_services, timestamp, - address_recv: (PeerServices::NODE_NETWORK, *addr), + address_recv: ( + PeerServices::NODE_NETWORK, + connected_addr + .get_transient_addr() + .unwrap_or_else(|| *UNSPECIFIED_IPV4_ADDR), + ), // TODO: detect external address (#1893) address_from: (our_services, config.listen_addr), nonce: local_nonce, @@ -248,17 +444,28 @@ pub async fn negotiate_version( // Check that we got a Version and destructure its fields into the local scope. debug!(?remote_msg, "got message from remote peer"); - let (remote_nonce, remote_services, remote_version) = if let Message::Version { - nonce, - services, - version, - .. - } = remote_msg - { - (nonce, services, version) - } else { - Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))? - }; + let (remote_nonce, remote_services, remote_version, remote_canonical_addr) = + if let Message::Version { + version, + services, + address_from, + nonce, + .. + } = remote_msg + { + let (address_services, canonical_addr) = address_from; + if address_services != services { + info!( + ?services, + ?address_services, + "peer with inconsistent version services and version address services" + ); + } + + (nonce, services, version, canonical_addr) + } else { + Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))? + }; // Check for nonce reuse, indicating self-connection // @@ -317,10 +524,12 @@ pub async fn negotiate_version( Err(HandshakeError::ObsoleteVersion(remote_version))?; } - Ok((remote_version, remote_services)) + Ok((remote_version, remote_services, remote_canonical_addr)) } -impl Service<(TcpStream, SocketAddr)> for Handshake +pub type HandshakeRequest = (TcpStream, ConnectedAddr); + +impl Service for Handshake where S: Service + Clone + Send + 'static, S::Future: Send, @@ -334,14 +543,15 @@ where Poll::Ready(Ok(())) } - fn call(&mut self, req: (TcpStream, SocketAddr)) -> Self::Future { - let (tcp_stream, addr) = req; + fn call(&mut self, req: HandshakeRequest) -> Self::Future { + let (tcp_stream, connected_addr) = req; - let connector_span = span!(Level::INFO, "connector", ?addr); + let negotiator_span = span!(Level::INFO, "negotiator", peer = ?connected_addr); // set the peer connection span's parent to the global span, as it // should exist independently of its creation source (inbound // connection, crawler, initial peer, ...) - let connection_span = span!(parent: &self.parent_span, Level::INFO, "peer", ?addr); + let connection_span = + span!(parent: &self.parent_span, Level::INFO, "", peer = ?connected_addr); // Clone these upfront, so they can be moved into the future. let nonces = self.nonces.clone(); @@ -354,7 +564,10 @@ where let relay = self.relay; let fut = async move { - debug!(?addr, "negotiating protocol version with remote peer"); + debug!( + addr = ?connected_addr, + "negotiating protocol version with remote peer" + ); // CORRECTNESS // @@ -364,16 +577,16 @@ where tcp_stream, Codec::builder() .for_network(config.network) - .with_metrics_label(addr.ip().to_string()) + .with_metrics_addr_label(connected_addr.get_transient_addr_label()) .finish(), ); // Wrap the entire initial connection setup in a timeout. - let (remote_version, remote_services) = timeout( + let (remote_version, remote_services, _remote_canonical_addr) = timeout( constants::HANDSHAKE_TIMEOUT, negotiate_version( &mut peer_conn, - &addr, + &connected_addr, config, nonces, user_agent, @@ -418,7 +631,7 @@ where "zcash.net.out.messages", 1, "command" => msg.to_string(), - "addr" => addr.to_string(), + "addr" => connected_addr.get_transient_addr_label(), ); // We need to use future::ready rather than an async block here, // because we need the sink to be Unpin, and the With @@ -445,24 +658,30 @@ where "zcash.net.in.messages", 1, "command" => msg.to_string(), - "addr" => addr.to_string(), + "addr" => connected_addr.get_transient_addr_label(), ); - // the collector doesn't depend on network activity, - // so this await should not hang - let _ = inbound_ts_collector - .send(MetaAddr::new_responded(&addr, &remote_services)) - .await; + + if let Some(book_addr) = connected_addr.get_address_book_addr() { + // the collector doesn't depend on network activity, + // so this await should not hang + let _ = inbound_ts_collector + .send(MetaAddr::new_responded(&book_addr, &remote_services)) + .await; + } } Err(err) => { metrics::counter!( "zebra.net.in.errors", 1, "error" => err.to_string(), - "addr" => addr.to_string(), + "addr" => connected_addr.get_transient_addr_label(), ); - let _ = inbound_ts_collector - .send(MetaAddr::new_errored(&addr, &remote_services)) - .await; + + if let Some(book_addr) = connected_addr.get_address_book_addr() { + let _ = inbound_ts_collector + .send(MetaAddr::new_errored(&book_addr, &remote_services)) + .await; + } } } msg @@ -472,7 +691,9 @@ where let inv_collector = inv_collector.clone(); let span = debug_span!("inventory_filter"); async move { - if let Ok(Message::Inv(hashes)) = &msg { + if let (Ok(Message::Inv(hashes)), Some(transient_addr)) = + (&msg, connected_addr.get_transient_addr()) + { // We ignore inventory messages with more than one // block, because they are most likely replies to a // query, rather than a newly gossiped block. @@ -487,13 +708,15 @@ where // merged inv messages into separate inv messages. (#1799) match hashes.as_slice() { [hash @ InventoryHash::Block(_)] => { - let _ = inv_collector.send((*hash, addr)); + let _ = inv_collector.send((*hash, transient_addr)); } [hashes @ ..] => { for hash in hashes { if matches!(hash, InventoryHash::Tx(_)) { debug!(?hash, "registering Tx inventory hash"); - let _ = inv_collector.send((*hash, addr)); + // The peer set and inv collector use the peer's remote + // address as an identifier + let _ = inv_collector.send((*hash, transient_addr)); } else { trace!(?hash, "ignoring non Tx inventory hash") } @@ -562,10 +785,12 @@ where Either::Left(_) ) { tracing::trace!("shutting down due to Client shut down"); - // awaiting a local task won't hang - let _ = timestamp_collector - .send(MetaAddr::new_shutdown(&addr, &remote_services)) - .await; + if let Some(book_addr) = connected_addr.get_address_book_addr() { + // awaiting a local task won't hang + let _ = timestamp_collector + .send(MetaAddr::new_shutdown(&book_addr, &remote_services)) + .await; + } return; } @@ -579,7 +804,7 @@ where if heartbeat_timeout( heartbeat, &mut timestamp_collector, - &addr, + &connected_addr, &remote_services, ) .await @@ -597,7 +822,7 @@ where }; // Spawn a new task to drive this handshake. - tokio::spawn(fut.instrument(connector_span)) + tokio::spawn(fut.instrument(negotiator_span)) .map(|x: Result, JoinError>| Ok(x??)) .boxed() } @@ -652,7 +877,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender) -> Resu async fn heartbeat_timeout( fut: F, timestamp_collector: &mut mpsc::Sender, - addr: &SocketAddr, + connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result where @@ -660,21 +885,33 @@ where { let t = match timeout(constants::HEARTBEAT_INTERVAL, fut).await { Ok(inner_result) => { - handle_heartbeat_error(inner_result, timestamp_collector, addr, remote_services).await? + handle_heartbeat_error( + inner_result, + timestamp_collector, + connected_addr, + remote_services, + ) + .await? } Err(elapsed) => { - handle_heartbeat_error(Err(elapsed), timestamp_collector, addr, remote_services).await? + handle_heartbeat_error( + Err(elapsed), + timestamp_collector, + connected_addr, + remote_services, + ) + .await? } }; Ok(t) } -/// If `result.is_err()`, mark `addr` as failed using `timestamp_collector`. +/// If `result.is_err()`, mark `connected_addr` as failed using `timestamp_collector`. async fn handle_heartbeat_error( result: Result, timestamp_collector: &mut mpsc::Sender, - addr: &SocketAddr, + connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result where @@ -685,10 +922,11 @@ where Err(err) => { tracing::debug!(?err, "heartbeat error, shutting down"); - let _ = timestamp_collector - .send(MetaAddr::new_errored(&addr, &remote_services)) - .await; - + if let Some(book_addr) = connected_addr.get_address_book_addr() { + let _ = timestamp_collector + .send(MetaAddr::new_errored(&book_addr, &remote_services)) + .await; + } Err(err) } } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 44abfff2dab..c8dffc0a1d0 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -12,11 +12,7 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, TryFutureExt, }; -use tokio::{ - net::{TcpListener, TcpStream}, - sync::broadcast, - time::Instant, -}; +use tokio::{net::TcpListener, sync::broadcast, time::Instant}; use tower::{ buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover, util::BoxService, Service, ServiceExt, @@ -75,7 +71,7 @@ where // handshakes. These use the same handshake service internally to detect // self-connection attempts. Both are decorated with a tower TimeoutLayer to // enforce timeouts as specified in the Config. - let (listener, connector) = { + let (listen_handshaker, outbound_connector) = { use tower::timeout::TimeoutLayer; let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT); use crate::protocol::external::types::PeerServices; @@ -136,18 +132,19 @@ where } let listen_guard = tokio::spawn( - listen(config.listen_addr, listener, peerset_tx.clone()).instrument(Span::current()), + listen(config.listen_addr, listen_handshaker, peerset_tx.clone()) + .instrument(Span::current()), ); // 2. Initial peers, specified in the config. let initial_peers_fut = { let config = config.clone(); - let connector = connector.clone(); + let outbound_connector = outbound_connector.clone(); let peerset_tx = peerset_tx.clone(); async move { let initial_peers = config.initial_peers().await; // Connect the tx end to the 3 peer sources: - add_initial_peers(initial_peers, connector, peerset_tx).await + add_initial_peers(initial_peers, outbound_connector, peerset_tx).await } .boxed() }; @@ -175,7 +172,7 @@ where demand_tx, demand_rx, candidates, - connector, + outbound_connector, peerset_tx, ) .instrument(Span::current()), @@ -190,10 +187,10 @@ where /// Use the provided `handshaker` to connect to `initial_peers`, then send /// the results over `tx`. -#[instrument(skip(initial_peers, connector, tx))] +#[instrument(skip(initial_peers, outbound_connector, tx))] async fn add_initial_peers( initial_peers: std::collections::HashSet, - connector: S, + outbound_connector: S, mut tx: mpsc::Sender, ) -> Result<(), BoxError> where @@ -209,7 +206,7 @@ where // single `CallAll` to completion, and handshakes have a short timeout. use tower::util::CallAllUnordered; let addr_stream = futures::stream::iter(initial_peers.into_iter()); - let mut handshakes = CallAllUnordered::new(connector, addr_stream); + let mut handshakes = CallAllUnordered::new(outbound_connector, addr_stream); while let Some(handshake_result) = handshakes.next().await { // this is verbose, but it's better than just hanging with no output @@ -222,8 +219,11 @@ where Ok(()) } -/// Bind to `addr`, listen for peers using `handshaker`, then send the -/// results over `tx`. +/// Listens for peer connections on `addr`, then sets up each connection as a +/// Zcash peer. +/// +/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends +/// the `Client` result over `tx`. #[instrument(skip(tx, handshaker))] async fn listen( addr: SocketAddr, @@ -231,7 +231,7 @@ async fn listen( tx: mpsc::Sender, ) -> Result<(), BoxError> where - S: Service<(TcpStream, SocketAddr), Response = peer::Client, Error = BoxError> + Clone, + S: Service + Clone, S::Future: Send + 'static, { info!("Trying to open Zcash protocol endpoint at {}...", addr); @@ -253,8 +253,10 @@ where if let Ok((tcp_stream, addr)) = listener.accept().await { debug!(?addr, "got incoming connection"); handshaker.ready_and().await?; + // TODO: distinguish between proxied listeners and direct listeners + let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); // Construct a handshake future but do not drive it yet.... - let handshake = handshaker.call((tcp_stream, addr)); + let handshake = handshaker.call((tcp_stream, connected_addr)); // ... instead, spawn a new task to handle this connection let mut tx2 = tx.clone(); tokio::spawn(async move { @@ -292,7 +294,7 @@ enum CrawlerAction { /// /// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is /// demand, but no new peers in `candidates`. After crawling, try to connect to -/// one new peer using `connector`. +/// one new peer using `outbound_connector`. /// /// If a handshake fails, restore the unused demand signal by sending it to /// `demand_tx`. @@ -300,13 +302,13 @@ enum CrawlerAction { /// The crawler terminates when `candidates.update()` or `success_tx` returns a /// permanent internal error. Transient errors and individual peer errors should /// be handled within the crawler. -#[instrument(skip(demand_tx, demand_rx, candidates, connector, success_tx))] +#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, success_tx))] async fn crawl_and_dial( crawl_new_peer_interval: std::time::Duration, mut demand_tx: mpsc::Sender<()>, mut demand_rx: mpsc::Receiver<()>, mut candidates: CandidateSet, - connector: C, + outbound_connector: C, mut success_tx: mpsc::Sender, ) -> Result<(), BoxError> where @@ -380,10 +382,12 @@ where // spawn each handshake into an independent task, so it can make // progress independently of the crawls let hs_join = - tokio::spawn(dial(candidate, connector.clone())).map(move |res| match res { - Ok(crawler_action) => crawler_action, - Err(e) => { - panic!("panic during handshaking with {:?}: {:?} ", candidate, e); + tokio::spawn(dial(candidate, outbound_connector.clone())).map(move |res| { + match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!("panic during handshaking with {:?}: {:?} ", candidate, e); + } } }); handshakes.push(Box::pin(hs_join)); @@ -431,12 +435,12 @@ where } } -/// Try to connect to `candidate` using `connector`. +/// Try to connect to `candidate` using `outbound_connector`. /// /// Returns a `HandshakeConnected` action on success, and a /// `HandshakeFailed` action on error. -#[instrument(skip(connector,))] -async fn dial(candidate: MetaAddr, mut connector: C) -> CrawlerAction +#[instrument(skip(outbound_connector,))] +async fn dial(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction where C: Service, Error = BoxError> + Clone @@ -453,10 +457,13 @@ where debug!(?candidate.addr, "attempting outbound connection in response to demand"); // the connector is always ready, so this can't hang - let connector = connector.ready_and().await.expect("connector never errors"); + let outbound_connector = outbound_connector + .ready_and() + .await + .expect("outbound connector never errors"); // the handshake has timeouts, so it shouldn't hang - connector + outbound_connector .call(candidate.addr) .map_err(|e| (candidate, e)) .map(Into::into) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 44d6a878c80..5b5044cf605 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -40,6 +40,17 @@ use super::{ /// A [`tower::Service`] that abstractly represents "the rest of the network". /// +/// # Security +/// +/// The `Discover::Key` must be the transient remote address of each peer. This +/// address may only be valid for the duration of a single connection. (For +/// example, inbound connections have an ephemeral remote port, and proxy +/// connections have an ephemeral local or proxy port.) +/// +/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state. +/// +/// # Implementation +/// /// This implementation is adapted from the one in `tower-balance`, and as /// described in that crate's documentation, it /// diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index e64f5c9e42d..43929211444 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -45,8 +45,8 @@ pub struct Builder { version: Version, /// The maximum allowable message length. max_len: usize, - /// An optional label to use for reporting metrics. - metrics_label: Option, + /// An optional address label, to use for reporting metrics. + metrics_addr_label: Option, } impl Codec { @@ -56,7 +56,7 @@ impl Codec { network: Network::Mainnet, version: constants::CURRENT_VERSION, max_len: MAX_PROTOCOL_MESSAGE_LEN, - metrics_label: None, + metrics_addr_label: None, } } @@ -95,9 +95,9 @@ impl Builder { self } - /// Configure the codec for the given peer address. - pub fn with_metrics_label(mut self, metrics_label: String) -> Self { - self.metrics_label = Some(metrics_label); + /// Configure the codec with a label corresponding to the peer address. + pub fn with_metrics_addr_label(mut self, metrics_addr_label: String) -> Self { + self.metrics_addr_label = Some(metrics_addr_label); self } } @@ -116,8 +116,10 @@ impl Encoder for Codec { return Err(Parse("body length exceeded maximum size")); } - if let Some(label) = self.builder.metrics_label.clone() { - metrics::counter!("zcash.net.out.bytes.total", (body_length + HEADER_LEN) as u64, "addr" => label); + if let Some(addr_label) = self.builder.metrics_addr_label.clone() { + metrics::counter!("zcash.net.out.bytes.total", + (body_length + HEADER_LEN) as u64, + "addr" => addr_label); } use Message::*; @@ -367,7 +369,7 @@ impl Decoder for Codec { return Err(Parse("body length exceeded maximum size")); } - if let Some(label) = self.builder.metrics_label.clone() { + if let Some(label) = self.builder.metrics_addr_label.clone() { metrics::counter!("zcash.net.in.bytes.total", (body_len + HEADER_LEN) as u64, "addr" => label); }