diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 8fa4864a304..22c1d93772b 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -1,17 +1,15 @@ -//! The `AddressBook` manages information about what peers exist, when they were +//! The [`AddressBook`] manages information about what peers exist, when they were //! seen, and what services they provide. -use std::{ - collections::{BTreeSet, HashMap}, - iter::Extend, - net::SocketAddr, - time::Instant, -}; +use std::{collections::HashMap, iter::Extend, net::SocketAddr, sync::Arc, time::Instant}; use chrono::{DateTime, Utc}; use tracing::Span; -use crate::{constants, types::MetaAddr, Config, PeerAddrState}; +use crate::{constants, meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState}; + +#[cfg(any(test, feature = "proptest-impl"))] +use proptest_derive::Arbitrary; /// A database of peer listener addresses, their advertised services, and /// information on when they were last seen. @@ -21,7 +19,7 @@ use crate::{constants, types::MetaAddr, Config, PeerAddrState}; /// Address book state must be based on outbound connections to peers. /// /// If the address book is updated incorrectly: -/// - malicious peers can interfere with other peers' `AddressBook` state, +/// - malicious peers can interfere with other peers' [`AddressBook`] state, /// or /// - Zebra can advertise unreachable addresses to its own peers. /// @@ -29,9 +27,9 @@ use crate::{constants, types::MetaAddr, Config, PeerAddrState}; /// /// The address book should only contain Zcash listener port addresses from peers /// on the configured network. These addresses can come from: -/// - DNS seeders +/// - the initial seed peers config /// - addresses gossiped by other peers -/// - the canonical address (`Version.address_from`) provided by each peer, +/// - the canonical address ([`Version.address_from`]) provided by each peer, /// particularly peers on inbound connections. /// /// The remote addresses of inbound connections must not be added to the address @@ -46,15 +44,22 @@ use crate::{constants, types::MetaAddr, Config, PeerAddrState}; /// Updates must not be based on: /// - the remote addresses of inbound connections, or /// - the canonical address of any connection. +/// +/// See the [`CandidateSet`] for a detailed peer state diagram. #[derive(Clone, Debug)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub struct AddressBook { - /// Each known peer address has a matching `MetaAddr`. + /// Each known peer address has a matching [`MetaAddr`]. by_addr: HashMap, /// The local listener address. local_listener: SocketAddr, /// The span for operations on this address book. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(value = "Span::current()") + )] span: Span, /// The last time we logged a message about the address metrics. @@ -62,34 +67,42 @@ pub struct AddressBook { } /// Metrics about the states of the addresses in an [`AddressBook`]. -#[derive(Debug)] +#[derive(Copy, Clone, Debug)] pub struct AddressMetrics { - /// The number of addresses in the `Responded` state. + /// The number of addresses in the [`Responded`] state. responded: usize, - /// The number of addresses in the `NeverAttemptedGossiped` state. + /// The number of addresses in the [`NeverAttemptedSeed`] state. + never_attempted_seed: usize, + + /// The number of addresses in the [`NeverAttemptedGossiped`] state. never_attempted_gossiped: usize, - /// The number of addresses in the `NeverAttemptedAlternate` state. + /// The number of addresses in the [`NeverAttemptedAlternate`] state. never_attempted_alternate: usize, - /// The number of addresses in the `Failed` state. + /// The number of addresses in the [`Failed`] state. failed: usize, - /// The number of addresses in the `AttemptPending` state. + /// The number of addresses in the [`AttemptPending`] state. attempt_pending: usize, - /// The number of `Responded` addresses within the liveness limit. + /// The number of peers that we've tried to connect to recently. + recently_attempted: usize, + + /// The number of peers that have recently sent us messages. recently_live: usize, - /// The number of `Responded` addresses outside the liveness limit. - recently_stopped_responding: usize, + /// The number of peers that have failed recently. + recently_failed: usize, + + /// The number of peers that are connection candidates. + connection_candidates: usize, } -#[allow(clippy::len_without_is_empty)] impl AddressBook { - /// Construct an `AddressBook` with the given `config` and [`tracing::Span`]. - pub fn new(config: &Config, span: Span) -> AddressBook { + /// Construct an [`AddressBook`] with the given `config` and [`tracing::Span`]. + pub fn new(config: Config, span: Span) -> AddressBook { let constructor_span = span.clone(); let _guard = constructor_span.enter(); @@ -104,82 +117,98 @@ impl AddressBook { new_book } - /// Get the local listener address. - pub fn get_local_listener(&self) -> MetaAddr { - MetaAddr::new_local_listener(&self.local_listener) + /// Returns a Change that adds or updates the local listener address in an + /// [`AddressBook`]. + /// + /// Our inbound listener port can be advertised to peers by applying this + /// change to the inbound request address book. + /// + /// # Correctness + /// + /// Avoid inserting this address into the local [`AddressBook`]. + /// (If peers gossip our address back to us, the handshake nonce will + /// protect us from self-connections.) + pub fn get_local_listener(&self) -> MetaAddrChange { + MetaAddr::new_local_listener(self.local_listener) } /// Get the contents of `self` in random order with sanitized timestamps. + /// + /// Skips peers with missing fields. pub fn sanitized(&self) -> Vec { use rand::seq::SliceRandom; let _guard = self.span.enter(); let mut peers = self - .peers() - .map(|a| MetaAddr::sanitize(&a)) + .peers_unordered() + .filter_map(|a| MetaAddr::sanitize(&a)) .collect::>(); peers.shuffle(&mut rand::thread_rng()); peers } /// Returns true if the address book has an entry for `addr`. - pub fn contains_addr(&self, addr: &SocketAddr) -> bool { + pub fn contains_addr(&self, addr: SocketAddr) -> bool { let _guard = self.span.enter(); - self.by_addr.contains_key(addr) + self.by_addr.contains_key(&addr) } - /// Returns the entry corresponding to `addr`, or `None` if it does not exist. + /// Returns the entry corresponding to `addr`, or [`None`] if it does not exist. pub fn get_by_addr(&self, addr: SocketAddr) -> Option { let _guard = self.span.enter(); self.by_addr.get(&addr).cloned() } - /// Add `new` to the address book, updating the previous entry if `new` is - /// more recent or discarding `new` if it is stale. + /// Apply `change` to the address book. + /// + /// If an entry was added or updated, return it. /// /// # Correctness /// - /// All new addresses should go through `update`, so that the address book - /// only contains valid outbound addresses. - pub fn update(&mut self, new: MetaAddr) { + /// All address book changes should go through `update`, so that the + /// address book only contains valid outbound addresses. + pub fn update(&mut self, change: MetaAddrChange) -> Option { let _guard = self.span.enter(); trace!( - ?new, + ?change, total_peers = self.by_addr.len(), recent_peers = self.recently_live_peers().count(), ); + let addr = change.get_addr(); + let book_entry = self.by_addr.get(&addr).cloned(); // If a node that we are directly connected to has changed to a client, // remove it from the address book. - if new.is_direct_client() && self.contains_addr(&new.addr) { + if change.is_direct_client(book_entry) && self.contains_addr(addr) { std::mem::drop(_guard); - self.take(new.addr); - return; + self.take(addr); + self.update_metrics(); + return None; } // Never add unspecified addresses or client services. // // Communication with these addresses can be monitored via Zebra's // metrics. (The address book is for valid peer addresses.) - if !new.is_valid_for_outbound() { - return; + if !change.is_valid_for_outbound(book_entry) { + return None; } - if let Some(prev) = self.get_by_addr(new.addr) { - if prev.get_last_seen() > new.get_last_seen() { - return; - } + let new_entry = change.into_meta_addr(book_entry); + if let Some(new_entry) = new_entry { + self.by_addr.insert(addr, new_entry); + std::mem::drop(_guard); + self.update_metrics(); + Some(new_entry) + } else { + None } - - self.by_addr.insert(new.addr, new); - std::mem::drop(_guard); - self.update_metrics(); } /// Removes the entry with `addr`, returning it if it exists /// /// # Note /// - /// All address removals should go through `take`, so that the address + /// All address removals should go through [`take`], so that the address /// book metrics are accurate. fn take(&mut self, removed_addr: SocketAddr) -> Option { let _guard = self.span.enter(); @@ -214,97 +243,122 @@ impl AddressBook { } /// Returns true if the given [`SocketAddr`] has recently sent us a message. - pub fn recently_live_addr(&self, addr: &SocketAddr) -> bool { + pub fn recently_live_addr(&self, addr: SocketAddr) -> bool { + let _guard = self.span.enter(); + match self.by_addr.get(&addr) { + None => false, + // Responded peers are the only peers that can be live + Some(peer) => { + peer.get_last_success().unwrap_or(chrono::MIN_DATETIME) + > AddressBook::liveness_cutoff_time() + } + } + } + + /// Returns true if the given [`SocketAddr`] had a recent connection attempt. + pub fn recently_attempted_addr(&self, addr: SocketAddr) -> bool { let _guard = self.span.enter(); - match self.by_addr.get(addr) { + match self.by_addr.get(&addr) { None => false, - // NeverAttempted, Failed, and AttemptPending peers should never be live Some(peer) => { - peer.last_connection_state == PeerAddrState::Responded - && peer.get_last_seen() > AddressBook::liveness_cutoff_time() + peer.get_last_attempt().unwrap_or(chrono::MIN_DATETIME) + > AddressBook::liveness_cutoff_time() } } } - /// Returns true if the given [`SocketAddr`] is pending a reconnection - /// attempt. - pub fn pending_reconnection_addr(&self, addr: &SocketAddr) -> bool { + /// Returns true if the given [`SocketAddr`] recently failed. + pub fn recently_failed_addr(&self, addr: SocketAddr) -> bool { let _guard = self.span.enter(); - match self.by_addr.get(addr) { + match self.by_addr.get(&addr) { None => false, - Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending, + Some(peer) => { + peer.get_last_failed().unwrap_or(chrono::MIN_DATETIME) + > AddressBook::liveness_cutoff_time() + } } } - /// Returns true if the given [`SocketAddr`] might be connected to a node - /// feeding timestamps into this address book. - pub fn maybe_connected_addr(&self, addr: &SocketAddr) -> bool { - self.recently_live_addr(addr) || self.pending_reconnection_addr(addr) + /// Returns true if the given [`SocketAddr`] had recent attempts, successes, + /// or failures. + pub fn recently_used_addr(&self, addr: SocketAddr) -> bool { + self.recently_live_addr(addr) + || self.recently_attempted_addr(addr) + || self.recently_failed_addr(addr) } - /// Return an iterator over all peers. - /// - /// Returns peers in reconnection attempt order, then recently live peers in - /// an arbitrary order. - pub fn peers(&'_ self) -> impl Iterator + '_ { + /// Return an unordered iterator over all peers. + fn peers_unordered(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.reconnection_peers() - .chain(self.maybe_connected_peers()) + self.by_addr.values() } - /// Return an iterator over peers that are due for a reconnection attempt, - /// in reconnection attempt order. - pub fn reconnection_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over peers that we've recently tried to connect to, + /// in arbitrary order. + fn recently_attempted_peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - // TODO: optimise, if needed, or get rid of older peers + self.peers_unordered() + .filter(move |peer| self.recently_attempted_addr(peer.addr)) + .cloned() + } + + /// Return an iterator over peers that have recently sent us messages, + /// in arbitrary order. + pub fn recently_live_peers(&'_ self) -> impl Iterator + '_ { + let _guard = self.span.enter(); - // Skip live peers, and peers pending a reconnect attempt, then sort using BTreeSet - self.by_addr - .values() - .filter(move |peer| !self.maybe_connected_addr(&peer.addr)) - .collect::>() - .into_iter() + self.peers_unordered() + .filter(move |peer| self.recently_live_addr(peer.addr)) .cloned() } - /// Return an iterator over all the peers in `state`, in arbitrary order. - pub fn state_peers(&'_ self, state: PeerAddrState) -> impl Iterator + '_ { + /// Return an iterator over peers that have recently failed, + /// in arbitrary order. + fn recently_failed_peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.by_addr - .values() - .filter(move |peer| peer.last_connection_state == state) + self.peers_unordered() + .filter(move |peer| self.recently_failed_addr(peer.addr)) .cloned() } - /// Return an iterator over peers that might be connected, in arbitrary - /// order. - pub fn maybe_connected_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over peers that had recent attempts, successes, or failures, + /// in arbitrary order. + pub fn recently_used_peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.by_addr - .values() - .filter(move |peer| self.maybe_connected_addr(&peer.addr)) + self.peers_unordered() + .filter(move |peer| self.recently_used_addr(peer.addr)) .cloned() } - /// Return an iterator over peers we've seen recently, in arbitrary order. - pub fn recently_live_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over candidate peers, in arbitrary order. + /// + /// Candidate peers have not had recent attempts, successes, or failures. + fn candidate_peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.by_addr - .values() - .filter(move |peer| self.recently_live_addr(&peer.addr)) + // Skip recently used peers (including live peers) + self.peers_unordered() + .filter(move |peer| !self.recently_used_addr(peer.addr)) .cloned() } - /// Returns an iterator that drains entries from the address book. + /// Return the next peer that is due for a connection attempt. + pub fn next_candidate_peer(&self) -> Option { + let _guard = self.span.enter(); + + self.candidate_peers().min() + } + + /// Return the number of candidate peers. /// - /// Removes entries in reconnection attempt then arbitrary order, - /// see [`peers`] for details. - pub fn drain(&'_ mut self) -> impl Iterator + '_ { - Drain { book: self } + /// This number can change over time as recently used peers expire. + pub fn candidate_peer_count(&self) -> usize { + let _guard = self.span.enter(); + + self.candidate_peers().count() } /// Returns the number of entries in this address book. @@ -312,31 +366,74 @@ impl AddressBook { self.by_addr.len() } + /// Is this address book empty? + pub fn is_empty(&self) -> bool { + self.by_addr.is_empty() + } + /// Returns metrics for the addresses in this address book. pub fn address_metrics(&self) -> AddressMetrics { - let responded = self.state_peers(PeerAddrState::Responded).count(); + let responded = self + .peers_unordered() + .filter(|peer| matches!(peer.last_connection_state, PeerAddrState::Responded { .. })) + .count(); + let never_attempted_seed = self + .peers_unordered() + .filter(|peer| { + matches!( + peer.last_connection_state, + PeerAddrState::NeverAttemptedSeed + ) + }) + .count(); let never_attempted_gossiped = self - .state_peers(PeerAddrState::NeverAttemptedGossiped) + .peers_unordered() + .filter(|peer| { + matches!( + peer.last_connection_state, + PeerAddrState::NeverAttemptedGossiped { .. } + ) + }) .count(); let never_attempted_alternate = self - .state_peers(PeerAddrState::NeverAttemptedAlternate) + .peers_unordered() + .filter(|peer| { + matches!( + peer.last_connection_state, + PeerAddrState::NeverAttemptedAlternate { .. } + ) + }) + .count(); + let failed = self + .peers_unordered() + .filter(|peer| matches!(peer.last_connection_state, PeerAddrState::Failed { .. })) + .count(); + let attempt_pending = self + .peers_unordered() + .filter(|peer| { + matches!( + peer.last_connection_state, + PeerAddrState::AttemptPending { .. } + ) + }) .count(); - let failed = self.state_peers(PeerAddrState::Failed).count(); - let attempt_pending = self.state_peers(PeerAddrState::AttemptPending).count(); + let recently_attempted = self.recently_attempted_peers().count(); let recently_live = self.recently_live_peers().count(); - let recently_stopped_responding = responded - .checked_sub(recently_live) - .expect("all recently live peers must have responded"); + let recently_failed = self.recently_failed_peers().count(); + let connection_candidates = self.len() - self.recently_used_peers().count(); AddressMetrics { responded, + never_attempted_seed, never_attempted_gossiped, never_attempted_alternate, failed, attempt_pending, + recently_attempted, recently_live, - recently_stopped_responding, + recently_failed, + connection_candidates, } } @@ -346,30 +443,38 @@ impl AddressBook { let m = self.address_metrics(); + // States // TODO: rename to address_book.[state_name] - metrics::gauge!("candidate_set.responded", m.responded as f64); + metrics::gauge!("candidate_set.seed", m.never_attempted_seed as f64); metrics::gauge!("candidate_set.gossiped", m.never_attempted_gossiped as f64); metrics::gauge!( "candidate_set.alternate", m.never_attempted_alternate as f64 ); + metrics::gauge!("candidate_set.responded", m.responded as f64); metrics::gauge!("candidate_set.failed", m.failed as f64); metrics::gauge!("candidate_set.pending", m.attempt_pending as f64); - // TODO: rename to address_book.responded.recently_live + // Times + metrics::gauge!( + "candidate_set.recently_attempted", + m.recently_attempted as f64 + ); metrics::gauge!("candidate_set.recently_live", m.recently_live as f64); - // TODO: rename to address_book.responded.stopped_responding + metrics::gauge!("candidate_set.recently_failed", m.recently_failed as f64); + + // Candidates (state and time based) metrics::gauge!( - "candidate_set.disconnected", - m.recently_stopped_responding as f64 + "candidate_set.connection_candidates", + m.connection_candidates as f64 ); std::mem::drop(_guard); - self.log_metrics(&m); + self.log_metrics(m); } /// Log metrics for this address book - fn log_metrics(&mut self, m: &AddressMetrics) { + fn log_metrics(&mut self, m: AddressMetrics) { let _guard = self.span.enter(); trace!( @@ -417,26 +522,41 @@ impl AddressBook { } } -impl Extend for AddressBook { +impl Extend for AddressBook { fn extend(&mut self, iter: T) where - T: IntoIterator, + T: IntoIterator, { - for meta in iter.into_iter() { - self.update(meta); + for change in iter.into_iter() { + self.update(change); } } } -struct Drain<'a> { - book: &'a mut AddressBook, -} - -impl<'a> Iterator for Drain<'a> { - type Item = MetaAddr; - - fn next(&mut self) -> Option { - let next_item_addr = self.book.peers().next()?.addr; - self.book.take(next_item_addr) - } +/// Run `f(address_book.lock())` in a thread dedicated to blocking tasks. +/// +/// This avoids blocking code running concurrently in the same task, and other +/// async tasks on the same thread. +/// +/// For details, see [`tokio::task::spawn_blocking`]. +/// +/// # Panics +/// +/// If `f` panics, or if a previous task panicked while holding the mutex. +pub async fn spawn_blocking(address_book: &Arc>, f: F) -> R +where + F: FnOnce(&mut AddressBook) -> R + Send + 'static, + R: Send + 'static, +{ + let address_book = address_book.clone(); + let lock_query_fn = move || { + let mut address_book_guard = address_book + .lock() + .expect("unexpected panic when mutex was previously locked"); + f(&mut address_book_guard) + }; + + tokio::task::spawn_blocking(lock_query_fn) + .await + .expect("unexpected panic in spawned address book task") } diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index 3c40801b2b3..8f73cdf871a 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -46,6 +46,19 @@ pub struct Config { /// If you have a slow network connection, and Zebra is having trouble /// syncing, try reducing the peer set size. You can also reduce the peer /// set size to reduce Zebra's bandwidth usage. + /// + /// Note: changing this config can make Zebra slower or less reliabile. + /// (See ticket #2193.) + /// + /// # SECURITY + /// + /// This config controls the inbound download buffer size. + /// Large values are a memory denial of service risk, because the + /// inbound buffer is before semantic verification, which checks + /// proof of work. + /// + /// This buffer is high risk, because it accepts partially-validated blocks + /// from any peer. pub peerset_initial_target_size: usize, /// How frequently we attempt to crawl the network to discover new peer diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 8b593ab57ac..ae8842f2d01 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -10,6 +10,42 @@ use crate::protocol::external::types::*; use zebra_chain::parameters::NetworkUpgrade; +/// The maximum number of [`GetAddr]` requests sent when crawling for new peers. +/// This is also the default fanout limit. +/// +/// ## SECURITY +/// +/// The fanout should be greater than 2, so that Zebra avoids getting a majority +/// of its initial address book entries from a single peer. +/// +/// Zebra regularly crawls for new peers, initiating a new crawl every +/// [`crawl_new_peer_interval`]. +/// +/// TODO: limit the number of addresses that Zebra uses from a single peer +/// response (#1869) +/// +/// [`GetAddr`]: [`crate::protocol::external::message::Message::GetAddr`] +/// [`crawl_new_peer_interval`](crate::config::Config.crawl_new_peer_interval). +/// +/// Note: Zebra is currently very sensitive to fanout changes (#2193) +pub const MAX_GET_ADDR_FANOUT: usize = 3; + +/// The fraction of live peers used for [`GetAddr`] fanouts. +/// +/// `zcashd` rate-limits [`Addr`] responses. To avoid choking the peer set, we +/// only want send requests to a small fraction of peers. +/// +/// [`GetAddr`]: [`crate::protocol::external::message::Message::GetAddr`] +/// [`Addr`]: [`crate::protocol::external::message::Message::Addr`] +/// +/// Note: Zebra is currently very sensitive to fanout changes (#2193) +pub const GET_ADDR_FANOUT_LIVE_PEERS_DIVISOR: usize = 10; + +/// Controls the number of peers used for each ObtainTips and ExtendTips request. +/// +/// Note: Zebra is currently very sensitive to fanout changes (#2193) +pub const SYNC_FANOUT: usize = 4; + /// The buffer size for the peer set. /// /// This should be greater than 1 to avoid sender contention, but also reasonably @@ -17,21 +53,70 @@ use zebra_chain::parameters::NetworkUpgrade; /// of in-flight block downloads can choke a constrained local network /// connection, or a small peer set on testnet.) /// +/// This should also be greater than [`MAX_GET_ADDR_FANOUT`], to avoid contention +/// during [`CandidateSet::update`]. +/// /// We assume that Zebra nodes have at least 10 Mbps bandwidth. Therefore, a /// maximum-sized block can take up to 2 seconds to download. So the peer set -/// buffer adds up to 6 seconds worth of blocks to the queue. -pub const PEERSET_BUFFER_SIZE: usize = 3; +/// buffer adds up to `PEERSET_BUFFER_SIZE*2` seconds worth of blocks to the +/// queue. +/// +/// We want enough buffer to support concurrent fanouts, a sync download, +/// an inbound download, and some extra slots to avoid contention. +/// +/// Note: Zebra is currently very sensitive to buffer size changes (#2193) +pub const PEERSET_BUFFER_SIZE: usize = MAX_GET_ADDR_FANOUT + SYNC_FANOUT + 3; -/// The timeout for requests made to a remote peer. -pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20); +/// The timeout for DNS lookups. +/// +/// [6.1.3.3 Efficient Resource Usage] from [RFC 1123: Requirements for Internet Hosts] +/// suggest no less than 5 seconds for resolving timeout. +/// +/// [RFC 1123: Requirements for Internet Hosts]: https://tools.ietf.org/rfcmarkup?doc=1123 +/// [6.1.3.3 Efficient Resource Usage]: https://tools.ietf.org/rfcmarkup?doc=1123#page-77 +pub const DNS_LOOKUP_TIMEOUT: Duration = Duration::from_secs(5); + +/// The minimum time between connections to initial or candidate peers. +/// +/// ## Security +/// +/// Zebra resists distributed denial of service attacks by making sure that new peer connections +/// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart. +pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); /// The timeout for handshakes when connecting to new peers. /// /// This timeout should remain small, because it helps stop slow peers getting -/// into the peer set. This is particularly important for network-constrained -/// nodes, and on testnet. +/// into the peer set. There is a tradeoff for network-constrained nodes and on +/// testnet: +/// - we want to avoid very slow nodes, but +/// - we want to have as many nodes as possible. +/// +/// Note: Zebra is currently very sensitive to timing changes (#2193) pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4); +/// The timeout for crawler [`GetAddr`] requests. +/// +/// This timeout is a tradeoff between: +/// - ignored late responses because the timeout is too low, and +/// - peer set congestion from [`GetAddr`] requests dropped by peers. +/// +/// Test changes to this timeout against `zcashd` peers on [`Testnet`], or +/// another small network. +/// +/// [`GetAddr`]: [`crate::protocol::external::message::Message::GetAddr`] +/// [`Testnet`]: [`zebra_chain::parameters::Network::Testnet`] +/// +/// Note: Zebra is currently very sensitive to timing changes (#2193) +pub const GET_ADDR_TIMEOUT: Duration = Duration::from_secs(8); + +/// The maximum timeout for requests made to a remote peer. +/// +/// [`PeerSet`] callers can set lower timeouts using [`tower::timeout`]. +/// +/// Note: Zebra is currently very sensitive to timing changes (#2193) +pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20); + /// We expect to receive a message from a live peer at least once in this time duration. /// /// This is the sum of: @@ -49,20 +134,6 @@ pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(60 + 20 + 20 + 20); /// connected peer. pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60); -/// The number of GetAddr requests sent when crawling for new peers. -/// -/// ## SECURITY -/// -/// The fanout should be greater than 2, so that Zebra avoids getting a majority -/// of its initial address book entries from a single peer. -/// -/// Zebra regularly crawls for new peers, initiating a new crawl every -/// [`crawl_new_peer_interval`](crate::config::Config.crawl_new_peer_interval). -/// -/// TODO: limit the number of addresses that Zebra uses from a single peer -/// response (#1869) -pub const GET_ADDR_FANOUT: usize = 3; - /// Truncate timestamps in outbound address messages to this time interval. /// /// ## SECURITY @@ -71,6 +142,14 @@ pub const GET_ADDR_FANOUT: usize = 3; /// messages from each of our peers. pub const TIMESTAMP_TRUNCATION_SECONDS: i64 = 30 * 60; +/// The maximum queue length for the timestamp collector. Further changes will +/// block until the task processes them. +/// +/// This parameter provides a performance/memory tradeoff. It doesn't have much +/// impact on latency, because all queued changes are processed each time the +/// collector runs. +pub const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100; + /// The User-Agent string provided by the node. /// /// This must be a valid [BIP 14] user agent. @@ -123,15 +202,6 @@ lazy_static! { }.expect("regex is valid"); } -/// The timeout for DNS lookups. -/// -/// [6.1.3.3 Efficient Resource Usage] from [RFC 1123: Requirements for Internet Hosts] -/// suggest no less than 5 seconds for resolving timeout. -/// -/// [RFC 1123: Requirements for Internet Hosts] https://tools.ietf.org/rfcmarkup?doc=1123 -/// [6.1.3.3 Efficient Resource Usage] https://tools.ietf.org/rfcmarkup?doc=1123#page-77 -pub const DNS_LOOKUP_TIMEOUT: Duration = Duration::from_secs(5); - /// Magic numbers used to identify different Zcash networks. pub mod magics { use super::*; @@ -145,6 +215,18 @@ pub mod magics { mod tests { use super::*; + use std::cmp::min; + + /// Make sure that the fanout and buffer sizes are consistent with each other. + #[test] + fn ensure_buffers_consistent() { + zebra_test::init(); + + assert!( + PEERSET_BUFFER_SIZE >= min(MAX_GET_ADDR_FANOUT, SYNC_FANOUT), + "Zebra's peerset buffer should hold the smallest fanout" + ); + } /// This assures that the `Duration` value we are computing for /// LIVE_PEER_DURATION actually matches the other const values it @@ -164,9 +246,19 @@ mod tests { fn ensure_timeouts_consistent() { zebra_test::init(); + // Specific requests can't have timeouts longer than the maximum timeout assert!(HANDSHAKE_TIMEOUT <= REQUEST_TIMEOUT, - "Handshakes are requests, so the handshake timeout can't be longer than the timeout for all requests."); - // This check is particularly important on testnet, which has a small + "Handshakes are requests, so their timeout can't be longer than the timeout for all requests."); + assert!(GET_ADDR_TIMEOUT <= REQUEST_TIMEOUT, + "GetAddrs are requests, so their timeout can't be longer than the timeout for all requests."); + + // Other requests shouldn't have timeouts shorter than the handshake timeout + assert!(GET_ADDR_TIMEOUT >= HANDSHAKE_TIMEOUT, + "GetAddrs require a successful handshake, so their timeout shouldn't be shorter than the handshake timeout."); + + // Basic EWMA checks + + // The RTT check is particularly important on testnet, which has a small // number of peers, which are often slow. assert!(EWMA_DEFAULT_RTT > REQUEST_TIMEOUT, "The default EWMA RTT should be higher than the request timeout, so new peers are required to prove they are fast, before we prefer them to other peers."); diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index fbcdee69319..eb39aba1d87 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -80,7 +80,7 @@ pub use crate::{ config::Config, isolated::connect_isolated, meta_addr::PeerAddrState, - peer_set::init, + peer_set::{init, spawn_fanout}, policies::{RetryErrors, RetryLimit}, protocol::internal::{Request, Response}, }; diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index 005669d4973..51d2fd1333a 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -1,4 +1,6 @@ //! An address-with-metadata type used in Bitcoin networking. +//! +//! In Zebra, [`MetaAddr`]s also track Zebra-specific peer state. use std::{ cmp::{Ord, Ordering}, @@ -17,83 +19,388 @@ use zebra_chain::serialization::{ use crate::protocol::{external::MAX_PROTOCOL_MESSAGE_LEN, types::PeerServices}; +use MetaAddrChange::*; use PeerAddrState::*; +#[cfg(any(test, feature = "proptest-impl"))] +use proptest::option; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; + #[cfg(any(test, feature = "proptest-impl"))] -mod arbitrary; +use zebra_chain::serialization::arbitrary::{datetime_full, datetime_u32}; #[cfg(test)] mod tests; /// Peer connection state, based on our interactions with the peer. /// -/// Zebra also tracks how recently a peer has sent us messages, and derives peer -/// liveness based on the current time. This derived state is tracked using -/// [`AddressBook::maybe_connected_peers`] and -/// [`AddressBook::reconnection_peers`]. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +/// Zebra also tracks how recently we've had different kinds of interactions with +/// each peer, and derives peer usage based on the current time. This derived state +/// is tracked using [`AddressBook::recently_used_peers`] and +/// [`AddressBook::candidate_peers`]. +/// +/// To avoid depending on untrusted or default data, Zebra tracks the required +/// and optional data in each state. State updates are applied using +/// [`MetaAddrChange`]s. +/// +/// See the [`CandidateSet`] for a detailed peer state diagram. +#[derive(Copy, Clone, Debug)] #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub enum PeerAddrState { - /// The peer has sent us a valid message. + /// The peer's address is a seed address. /// - /// Peers remain in this state, even if they stop responding to requests. - /// (Peer liveness is derived from the `last_seen` timestamp, and the current - /// time.) - Responded, + /// Seed addresses can come from: + /// - hard-coded IP addresses in the config, + /// - DNS names that resolve to a single peer, or + /// - DNS seeders, which resolve to a dynamic list of peers. + /// + /// Zebra attempts to connect to all seed peers on startup. As part of these + /// connection attempts, the initial peer connector updates all seed peers to + /// [`AttemptPending`]. + /// + /// If any seed addresses remain in the address book, they are attempted after + /// disconnected [`Responded`] peers. + NeverAttemptedSeed, - /// The peer's address has just been fetched from a DNS seeder, or via peer - /// gossip, but we haven't attempted to connect to it yet. - NeverAttemptedGossiped, + /// The peer's address has just been fetched via peer gossip, but we haven't + /// attempted to connect to it yet. + /// + /// Gossiped addresses are attempted after seed addresses. + NeverAttemptedGossiped { + /// The time that another node claims to have connected to this peer. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_u32()") + )] + untrusted_last_seen: DateTime, + /// The services another node claims this peer advertises. + /// See [`get_services`] for details. + untrusted_services: PeerServices, + }, - /// The peer's address has just been received as part of a `Version` message, + /// The peer's address has just been received as part of a [`Version`] message, /// so we might already be connected to this peer. /// /// Alternate addresses are attempted after gossiped addresses. - NeverAttemptedAlternate, + NeverAttemptedAlternate { + /// The last time another node gave us this address as their canonical + /// address. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + untrusted_last_seen: DateTime, + /// The services another node gave us along with their canonical address. + /// See [`get_services`] for details. + untrusted_services: PeerServices, + }, + + /// We are about to start a connection attempt to this peer. + /// + /// Pending addresses are retried last. + AttemptPending { + /// The last time we made an outbound attempt to this peer. + /// See [`get_last_attempt`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_attempt: DateTime, + /// The last time we made a successful outbound connection to this peer. + /// See [`get_last_success`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + last_success: Option>, + /// The last time an outbound connection to this peer failed. + /// See [`get_last_failed`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + last_failed: Option>, + /// The last time another node claimed this peer was valid. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + untrusted_last_seen: Option>, + /// The services another node claims this peer advertises. + /// See [`get_services`] for details. + untrusted_services: Option, + }, + + /// The peer has sent us a valid message. + /// + /// Peers remain in this state, even if they stop responding to requests. + /// (Peer liveness is derived from the [`last_success`] time, and the current + /// time.) + /// + /// Disconnected responded peers are retried first. + Responded { + /// The last time we made an outbound attempt to this peer. + /// See [`get_last_attempt`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_attempt: DateTime, + /// The last time we made a successful outbound connection to this peer. + /// See [`get_last_success`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_success: DateTime, + /// The last time an outbound connection to this peer failed. + /// See [`get_last_failed`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + last_failed: Option>, + /// The last time another node claimed this peer was valid. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + untrusted_last_seen: Option>, + /// The services advertised by this directly connected peer. + /// See [`get_services`] for details. + services: PeerServices, + }, /// The peer's TCP connection failed, or the peer sent us an unexpected /// Zcash protocol message, so we failed the connection. - Failed, + /// + /// Failed peers are retried after disconnected [`Responded`] and never + /// attempted peers. + Failed { + /// The last time we made an outbound attempt to this peer. + /// See [`get_last_attempt`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_attempt: DateTime, + /// The last time we made a successful outbound connection to this peer. + /// See [`get_last_success`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + last_success: Option>, + /// The last time an outbound connection to this peer failed. + /// See [`get_last_failed`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_failed: DateTime, + /// The last time another node claimed this peer was valid. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + untrusted_last_seen: Option>, + /// The services claimed by another peer or advertised by this peer. + /// See [`get_services`] for details. + // + // TODO: do we need to distinguish direct and untrusted services? + untrusted_services: Option, + }, +} - /// We just started a connection attempt to this peer. - AttemptPending, +impl PeerAddrState { + /// The last time we attempted to make a direct outbound connection to the + /// address of this peer. + /// + /// Only updated by the [`AttemptPending`] state. + /// Also present in [`Responded`] and [`Failed`]. + pub fn get_last_attempt(&self) -> Option> { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { .. } => None, + NeverAttemptedAlternate { .. } => None, + AttemptPending { last_attempt, .. } => Some(*last_attempt), + Responded { last_attempt, .. } => Some(*last_attempt), + Failed { last_attempt, .. } => Some(*last_attempt), + } + } + + /// The last time we successfully made a direct outbound connection to the + /// address of this peer. + /// + /// Only updated by the [`Responded`] state. + /// Also optionally present in [`Failed`] and [`AttemptPending`]. + pub fn get_last_success(&self) -> Option> { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { .. } => None, + NeverAttemptedAlternate { .. } => None, + AttemptPending { last_success, .. } => *last_success, + Responded { last_success, .. } => Some(*last_success), + Failed { last_success, .. } => *last_success, + } + } + + /// The last time a direct outbound connection to the address of this peer + /// failed. + /// + /// Only updated by the [`Failed`] state. + /// Also optionally present in [`AttemptPending`] and [`Responded`]. + pub fn get_last_failed(&self) -> Option> { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { .. } => None, + NeverAttemptedAlternate { .. } => None, + AttemptPending { last_failed, .. } => *last_failed, + Responded { last_failed, .. } => *last_failed, + Failed { last_failed, .. } => Some(*last_failed), + } + } + + /// The last time another peer successfully connected to this peer. + /// + /// Only updated by the [`NeverAttemptedGossiped`] and + /// [`NeverAttemptedAlternate`] states. + /// + /// Optionally present in the [`AttemptPending`], [`Responded`], and [`Failed`] + /// states. + pub fn get_untrusted_last_seen(&self) -> Option> { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { + untrusted_last_seen, + .. + } => Some(*untrusted_last_seen), + NeverAttemptedAlternate { + untrusted_last_seen, + .. + } => Some(*untrusted_last_seen), + AttemptPending { + untrusted_last_seen, + .. + } => *untrusted_last_seen, + Responded { + untrusted_last_seen, + .. + } => *untrusted_last_seen, + Failed { + untrusted_last_seen, + .. + } => *untrusted_last_seen, + } + } + + /// Only updated by the [`NeverAttemptedGossiped`] and + /// [`NeverAttemptedAlternate`] states. + /// + /// Optionally present in the [`AttemptPending`], [`Responded`], and [`Failed`] + /// states. + pub fn get_untrusted_services(&self) -> Option { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { + untrusted_services, .. + } => Some(*untrusted_services), + NeverAttemptedAlternate { + untrusted_services, .. + } => Some(*untrusted_services), + AttemptPending { + untrusted_services, .. + } => *untrusted_services, + Responded { services, .. } => Some(*services), + Failed { + untrusted_services, .. + } => *untrusted_services, + } + } } // non-test code should explicitly specify the peer address state #[cfg(test)] impl Default for PeerAddrState { fn default() -> Self { - NeverAttemptedGossiped + NeverAttemptedGossiped { + untrusted_last_seen: Utc::now(), + untrusted_services: PeerServices::NODE_NETWORK, + } } } impl Ord for PeerAddrState { - /// `PeerAddrState`s are sorted in approximate reconnection attempt + /// [`PeerAddrState`]s are sorted in approximate reconnection attempt /// order, ignoring liveness. /// /// See [`CandidateSet`] and [`MetaAddr::cmp`] for more details. fn cmp(&self, other: &Self) -> Ordering { use Ordering::*; - match (self, other) { - (Responded, Responded) - | (Failed, Failed) - | (NeverAttemptedGossiped, NeverAttemptedGossiped) - | (NeverAttemptedAlternate, NeverAttemptedAlternate) - | (AttemptPending, AttemptPending) => Equal, + let responded_never_failed_pending = match (self, other) { + // If the states are the same, use the times to choose an order + (Responded { .. }, Responded { .. }) + | (Failed { .. }, Failed { .. }) + | (NeverAttemptedGossiped { .. }, NeverAttemptedGossiped { .. }) + | (NeverAttemptedAlternate { .. }, NeverAttemptedAlternate { .. }) + | (NeverAttemptedSeed, NeverAttemptedSeed) + | (AttemptPending { .. }, AttemptPending { .. }) => Equal, + // We reconnect to `Responded` peers that have stopped sending messages, - // then `NeverAttempted` peers, then `Failed` peers - (Responded, _) => Less, - (_, Responded) => Greater, - (NeverAttemptedGossiped, _) => Less, - (_, NeverAttemptedGossiped) => Greater, - (NeverAttemptedAlternate, _) => Less, - (_, NeverAttemptedAlternate) => Greater, - (Failed, _) => Less, - (_, Failed) => Greater, + // then `NeverAttempted...` peers, then `Failed` peers + (Responded { .. }, _) => Less, + (_, Responded { .. }) => Greater, + (NeverAttemptedSeed, _) => Less, + (_, NeverAttemptedSeed) => Greater, + (NeverAttemptedGossiped { .. }, _) => Less, + (_, NeverAttemptedGossiped { .. }) => Greater, + (NeverAttemptedAlternate { .. }, _) => Less, + (_, NeverAttemptedAlternate { .. }) => Greater, + (Failed { .. }, _) => Less, + (_, Failed { .. }) => Greater, // AttemptPending is covered by the other cases - } + }; + + // Prioritise successful peers: + // - try the most recent successful peers first + // - try the oldest failed peers before re-trying the same peer + // - re-try the oldest attempted peers first + // - try the most recent last seen times first (untrusted - from remote peers) + // - use the services as a tie-breaker + // + // `None` is earlier than any `Some(time)`, which means: + // - peers that have never succeeded or with no last seen times sort last + // - peers that have never failed or attempted sort first + // + // # Security + // + // Ignore untrusted times if we have any local times. + let recent_successful = self + .get_last_success() + .cmp(&other.get_last_success()) + .reverse(); + let older_failed = self.get_last_failed().cmp(&other.get_last_failed()); + let older_attempted = self.get_last_attempt().cmp(&other.get_last_attempt()); + let recent_untrusted_last_seen = self + .get_untrusted_last_seen() + .cmp(&other.get_untrusted_last_seen()) + .reverse(); + let services_tie_breaker = self + .get_untrusted_services() + .cmp(&other.get_untrusted_services()); + + responded_never_failed_pending + .then(recent_successful) + .then(older_failed) + .then(older_attempted) + .then(recent_untrusted_last_seen) + .then(services_tie_breaker) } } @@ -103,40 +410,394 @@ impl PartialOrd for PeerAddrState { } } +impl PartialEq for PeerAddrState { + fn eq(&self, other: &Self) -> bool { + use Ordering::*; + self.cmp(other) == Equal + } +} + +impl Eq for PeerAddrState {} + +/// A change to a [`MetaAddr`] in an [`AddressBook`]. +/// +/// Most [`PeerAddrState`]s have a corresponding [`Change`]: +/// - `New...` changes create a new address book entry, or add fields to an +/// existing `NeverAttempted...` address book entry. +/// - `Update...` changes update the state, and add or update fields in an +/// existing address book entry. +/// The [`UpdateShutdown`] preserves the [`Responded`] state, but changes all +/// other states to [`Failed`]. +/// +/// See the [`CandidateSet`] for a detailed peer state diagram. +#[derive(Copy, Clone, Debug)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] +pub enum MetaAddrChange { + /// A new seed peer [`MetaAddr`]. + /// + /// The initial peers config provides the address. + /// Gossiped or alternate changes can provide missing services. + /// (But they can't update an existing service value.) + /// Existing services can be updated by future handshakes with this peer. + /// + /// Sets the untrusted last seen time and services to [`None`]. + NewSeed { addr: SocketAddr }, + + /// A new gossiped peer [`MetaAddr`]. + /// + /// [`Addr`] messages provide an address, services, and a last seen time. + /// The services can be updated by future handshakes with this peer. + /// The untrusted last seen time is overridden by any last success time. + NewGossiped { + addr: SocketAddr, + untrusted_services: PeerServices, + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_u32()") + )] + untrusted_last_seen: DateTime, + }, + + /// A new alternate peer [`MetaAddr`]. + /// + /// [`Version`] messages provide the canonical address and its services. + /// The services can be updated by future handshakes with this peer. + /// + /// Sets the untrusted last seen time to the current time. + /// (Becase a connected peer just gave us this address.) + NewAlternate { + addr: SocketAddr, + untrusted_services: PeerServices, + }, + + /// We have started a connection attempt to this peer. + /// + /// Sets the last attempt time to the current time. + UpdateAttempt { addr: SocketAddr }, + + /// A peer has sent us a message, after a successful handshake on an outbound + /// connection. + /// + /// The services are updated based on the handshake with this peer. + /// + /// Sets the last success time to the current time. + UpdateResponded { + addr: SocketAddr, + services: PeerServices, + }, + + /// A connection to this peer has failed. + /// + /// If the handshake with this peer succeeded, update its services. + /// + /// Sets the last failed time to the current time. + UpdateFailed { + addr: SocketAddr, + services: Option, + }, + + /// A connection to this peer has shut down. + /// + /// If the handshake with this peer succeeded, update its services. + /// + /// If the peer is in the [`Responded`] state, do nothing. + /// Otherwise, mark the peer as [`Failed`], and set the last failed time to the + /// current time. + UpdateShutdown { + addr: SocketAddr, + services: Option, + }, +} + +impl MetaAddrChange { + /// Return the address for the change. + pub fn get_addr(&self) -> SocketAddr { + match self { + NewSeed { addr, .. } => *addr, + NewGossiped { addr, .. } => *addr, + NewAlternate { addr, .. } => *addr, + UpdateAttempt { addr } => *addr, + UpdateResponded { addr, .. } => *addr, + UpdateFailed { addr, .. } => *addr, + UpdateShutdown { addr, .. } => *addr, + } + } + + /// Return the services for the change, if available. + pub fn get_untrusted_services(&self) -> Option { + match self { + NewSeed { .. } => None, + NewGossiped { + untrusted_services, .. + } => Some(*untrusted_services), + NewAlternate { + untrusted_services, .. + } => Some(*untrusted_services), + UpdateAttempt { .. } => None, + UpdateResponded { services, .. } => Some(*services), + UpdateFailed { services, .. } => *services, + UpdateShutdown { services, .. } => *services, + } + } + + /// Return the untrusted last seen time for the change, if available. + pub fn get_untrusted_last_seen(&self) -> Option> { + match self { + NewGossiped { + untrusted_last_seen, + .. + } => Some(*untrusted_last_seen), + NewSeed { .. } + | NewAlternate { .. } + | UpdateAttempt { .. } + | UpdateResponded { .. } + | UpdateFailed { .. } + | UpdateShutdown { .. } => None, + } + } + + /// Is this address valid for outbound connections? + /// + /// `book_entry` is the entry for [`self.get_addr`] in the address book. + /// + /// Assmes that missing fields or entries are valid. + pub fn is_valid_for_outbound(&self, book_entry: Option) -> bool { + // Use the latest valid info we have + self.into_meta_addr(book_entry) + .or(book_entry) + .map(|meta_addr| meta_addr.is_valid_for_outbound()) + .unwrap_or(true) + } + + /// Is this address a directly connected client? + /// + /// `book_entry` is the entry for [`self.get_addr`] in the address book. + /// + /// Assmes that missing fields or entries are not clients. + pub fn is_direct_client(&self, book_entry: Option) -> bool { + // Use the latest valid info we have + self.into_meta_addr(book_entry) + .or(book_entry) + .map(|meta_addr| meta_addr.is_direct_client()) + .unwrap_or(false) + } + + /// Apply this change to `old_entry`, returning an updated entry. + /// + /// [`None`] means "no update". + /// + /// Ignores out-of-order updates: + /// - the same address can arrive from multiple sources, + /// - messages are sent by multiple tasks, and + /// - multiple tasks read the address book, + /// so messaging ordering is not guaranteed. + /// (But the address book should eventually be consistent.) + pub fn into_meta_addr(self, old_entry: Option) -> Option { + let has_been_attempted = old_entry + .map(|old| old.has_been_attempted()) + .unwrap_or(false); + + let old_last_attempt = old_entry.map(|old| old.get_last_attempt()).flatten(); + let old_last_success = old_entry.map(|old| old.get_last_success()).flatten(); + let old_last_failed = old_entry.map(|old| old.get_last_failed()).flatten(); + let old_untrusted_last_seen = old_entry.map(|old| old.get_untrusted_last_seen()).flatten(); + + let old_untrusted_services = old_entry.map(|old| old.get_untrusted_services()).flatten(); + + match self { + // New seed, not in address book: + NewSeed { addr } if old_entry.is_none() => { + Some(MetaAddr::new(addr, NeverAttemptedSeed)) + } + + // New gossiped or alternate, not attempted: + // Update state and services, but ignore updates to untrusted last seen + NewGossiped { + addr, + untrusted_services, + untrusted_last_seen, + } if !has_been_attempted => Some(MetaAddr::new( + addr, + NeverAttemptedGossiped { + untrusted_services, + // Keep the first time we got + untrusted_last_seen: old_untrusted_last_seen.unwrap_or(untrusted_last_seen), + }, + )), + NewAlternate { + addr, + untrusted_services, + } if !has_been_attempted => Some(MetaAddr::new( + addr, + NeverAttemptedAlternate { + untrusted_services, + // Keep the first time we got + untrusted_last_seen: old_untrusted_last_seen.unwrap_or_else(Utc::now), + }, + )), + + // New entry, but already existing (seed) or attempted (others): + // Skip the update entirely + NewSeed { .. } | NewGossiped { .. } | NewAlternate { .. } => None, + + // Attempt: + // Update last_attempt + UpdateAttempt { addr } => Some(MetaAddr::new( + addr, + AttemptPending { + last_attempt: Utc::now(), + last_success: old_last_success, + last_failed: old_last_failed, + untrusted_last_seen: old_untrusted_last_seen, + untrusted_services: old_untrusted_services, + }, + )), + + // Responded: + // Update last_success and services + UpdateResponded { addr, services } => Some(MetaAddr::new( + addr, + Responded { + // When the attempt message arrives, it will replace this default. + // (But it's a reasonable default anyway.) + last_attempt: old_last_attempt.unwrap_or_else(Utc::now), + last_success: Utc::now(), + last_failed: old_last_failed, + untrusted_last_seen: old_untrusted_last_seen, + services, + }, + )), + + // Failed: + // Update last_failed and services if present + UpdateFailed { addr, services } => Some(MetaAddr::new( + addr, + Failed { + // When the attempt message arrives, it will replace this default. + // (But it's a reasonable default anyway.) + last_attempt: old_last_attempt.unwrap_or_else(Utc::now), + last_success: old_last_success, + last_failed: Utc::now(), + untrusted_last_seen: old_untrusted_last_seen, + // replace old services with new services if present + untrusted_services: services.or(old_untrusted_services), + }, + )), + + // Shutdown: + UpdateShutdown { + addr, + services: new_services, + } => { + match old_entry.map(|old| old.last_connection_state) { + // Responded: + // Keep as responded, update services if present + Some(Responded { + last_attempt, + last_success, + last_failed, + untrusted_last_seen, + services: old_services, + }) => Some(MetaAddr::new( + addr, + Responded { + last_attempt, + last_success, + last_failed, + untrusted_last_seen, + // this could be redundant, but it handles multiple + // connections and update reordering better + services: new_services.unwrap_or(old_services), + }, + )), + + // Attempted or Failed: + // Change to Failed + // Update last_failed and services if present + Some(AttemptPending { .. }) + | Some(Failed { .. }) + // Unexpected states: change to Failed anyway + | Some(NeverAttemptedSeed) + | Some(NeverAttemptedGossiped { .. }) + | Some(NeverAttemptedAlternate { .. }) + | None => Some(MetaAddr::new( + addr, + Failed { + // When the attempt message arrives, it will replace this default. + // (But it's a reasonable default anyway.) + last_attempt: old_last_attempt.unwrap_or_else(Utc::now), + last_success: old_last_success, + last_failed: Utc::now(), + untrusted_last_seen: old_untrusted_last_seen, + untrusted_services: new_services.or(old_untrusted_services), + }, + )), + } + } + } + } +} + /// An address with metadata on its advertised services and last-seen time. /// /// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#Network_address) -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub struct MetaAddr { /// The peer's address. - pub addr: SocketAddr, - - /// The services advertised by the peer. /// - /// The exact meaning depends on `last_connection_state`: - /// - `Responded`: the services advertised by this peer, the last time we - /// performed a handshake with it - /// - `NeverAttempted`: the unverified services provided by the remote peer - /// that sent us this address - /// - `Failed` or `AttemptPending`: unverified services via another peer, - /// or services advertised in a previous handshake + /// The exact meaning depends on [`last_connection_state`]: + /// - [`Responded`]: the address we used to make a direct outbound connection + /// to this peer + /// - [`NeverAttemptedSeed: an unverified address provided by the seed config + /// - [`NeverAttemptedGossiped`]: an unverified address and services provided + /// by a remote peer + /// - [`NeverAttemptedAlternate`]: a directly connected peer claimed that + /// this address was its canonical address in its [`Version`] message, + /// (and provided services). But either: + /// - the peer made an inbound connection to us, or + /// - the address we used to make a direct outbound connection was + /// different from the canonical address + /// - [`Failed`] or [`AttemptPending`]: an unverified seeder, gossiped or + /// alternate address, or an address from a previous direct outbound + /// connection /// /// ## Security /// - /// `services` from `NeverAttempted` peers may be invalid due to outdated - /// records, older peer versions, or buggy or malicious peers. - pub services: PeerServices, + /// `addr`s from non-`Responded` peers may be invalid due to outdated + /// records, or buggy or malicious peers. + // + // TODO: make the addr private to MetaAddr and AddressBook + pub(super) addr: SocketAddr, - /// The last time we interacted with this peer. + /// The outcome of our most recent direct outbound connection to this peer. /// - /// See `get_last_seen` for details. - last_seen: DateTime, - - /// The outcome of our most recent communication attempt with this peer. - pub last_connection_state: PeerAddrState, + /// If we haven't made a direct connection to this peer, contains untrusted + /// information provided by other peers (or seeds). + // + // TODO: make the state private to MetaAddr and AddressBook + pub(super) last_connection_state: PeerAddrState, } impl MetaAddr { + /// Create a new [`MetaAddr`] from its parts. + /// + /// This function should only be used by the [`meta_addr`] and [`address_book`] + /// modules. Other callers should use a more specific [`MetaAddr`] or + /// [`MetaAddrChange`] constructor. + fn new(addr: SocketAddr, last_connection_state: PeerAddrState) -> MetaAddr { + MetaAddr { + addr, + last_connection_state, + } + } + + /// Add or update an [`AddressBook`] entry, based on a configured seed + /// address. + pub fn new_seed(addr: SocketAddr) -> MetaAddrChange { + NewSeed { addr } + } + /// Create a new gossiped [`MetaAddr`], based on the deserialized fields from /// a gossiped peer [`Addr`][crate::protocol::external::Message::Addr] message. pub fn new_gossiped_meta_addr( @@ -146,14 +807,52 @@ impl MetaAddr { ) -> MetaAddr { MetaAddr { addr, - services: untrusted_services, - last_seen: untrusted_last_seen, - // the state is Zebra-specific, it isn't part of the Zcash network protocol - last_connection_state: NeverAttemptedGossiped, + last_connection_state: NeverAttemptedGossiped { + untrusted_last_seen, + untrusted_services, + }, } } - /// Create a new `MetaAddr` for a peer that has just `Responded`. + /// Add or update an [`AddressBook`] entry, based on a gossiped peer [`Addr`] + /// message. + /// + /// Panics unless `meta_addr` is in the [`NeverAttemptedGossiped`] state. + pub fn new_gossiped_change(meta_addr: MetaAddr) -> MetaAddrChange { + if let NeverAttemptedGossiped { + untrusted_last_seen, + untrusted_services, + } = meta_addr.last_connection_state + { + NewGossiped { + addr: meta_addr.addr, + untrusted_last_seen, + untrusted_services, + } + } else { + panic!( + "unexpected non-NeverAttemptedGossiped state: {:?}", + meta_addr + ) + } + } + + /// Add or update an [`AddressBook`] entry, based on the canonical address in a + /// peer's [`Version`] message. + pub fn new_alternate(addr: SocketAddr, untrusted_services: PeerServices) -> MetaAddrChange { + NewAlternate { + addr, + untrusted_services, + } + } + + /// Update an [`AddressBook`] entry when we start connecting to a peer. + pub fn update_attempt(addr: SocketAddr) -> MetaAddrChange { + UpdateAttempt { addr } + } + + /// Update an [`AddressBook`] entry when a peer sends a message after a + /// successful handshake. /// /// # Security /// @@ -161,120 +860,191 @@ impl MetaAddr { /// and the services must be the services from that peer's handshake. /// /// Otherwise: - /// - malicious peers could interfere with other peers' `AddressBook` state, + /// - malicious peers could interfere with other peers' [`AddressBook`] state, /// or /// - Zebra could advertise unreachable addresses to its own peers. - pub fn new_responded(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - MetaAddr { - addr: *addr, - services: *services, - last_seen: Utc::now(), - last_connection_state: Responded, - } + pub fn update_responded(addr: SocketAddr, services: PeerServices) -> MetaAddrChange { + UpdateResponded { addr, services } } - /// Create a new `MetaAddr` for a peer that we want to reconnect to. - pub fn new_reconnect(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - MetaAddr { - addr: *addr, - services: *services, - last_seen: Utc::now(), - last_connection_state: AttemptPending, - } + /// Update an [`AddressBook`] entry when a peer connection fails. + pub fn update_failed(addr: SocketAddr, services: Option) -> MetaAddrChange { + UpdateFailed { addr, services } } - /// Create a new `MetaAddr` for a peer's alternate address, received via a - /// `Version` message. - pub fn new_alternate(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - MetaAddr { - addr: *addr, - services: *services, - last_seen: Utc::now(), - last_connection_state: NeverAttemptedAlternate, - } + /// Update an [`AddressBook`] entry when a peer connection shuts down. + pub fn update_shutdown(addr: SocketAddr, services: Option) -> MetaAddrChange { + UpdateShutdown { addr, services } } - /// Create a new `MetaAddr` for our own listener address. - pub fn new_local_listener(addr: &SocketAddr) -> MetaAddr { - MetaAddr { - addr: *addr, + /// Add or update our local listener address in an [`AddressBook`]. + /// + /// See [`AddressBook::get_local_listener`] for details. + pub fn new_local_listener(addr: SocketAddr) -> MetaAddrChange { + NewAlternate { + addr, + // Note: in this unique case, the services are actually trusted + // // TODO: create a "local services" constant - services: PeerServices::NODE_NETWORK, - last_seen: Utc::now(), - last_connection_state: Responded, + untrusted_services: PeerServices::NODE_NETWORK, } } - /// Create a new `MetaAddr` for a peer that has just had an error. - pub fn new_errored(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - MetaAddr { - addr: *addr, - services: *services, - last_seen: Utc::now(), - last_connection_state: Failed, - } + /// The services advertised by the peer. + /// + /// The exact meaning depends on [`last_connection_state`]: + /// - [`Responded`]: the services advertised by this peer, the last time we + /// performed a handshake with it + /// - [`NeverAttemptedGossiped`]: the unverified services provided by the + /// remote peer that sent us this address + /// - [`NeverAttemptedAlternate`]: the services provided by the directly + /// connected peer that claimed that this address was its canonical + /// address + /// - [`NeverAttemptedSeed`]: the seed config doesn't have any service info, + /// so this field is [`None`] + /// - [`Failed`] or [`AttemptPending`]: unverified services via another peer, + /// or services advertised in a previous handshake + /// + /// ## Security + /// + /// `services` from non-`Responded` peers may be invalid due to outdated + /// records, older peer versions, or buggy or malicious peers. + /// The last time another peer successfully connected to this peer. + pub fn get_untrusted_services(&self) -> Option { + self.last_connection_state.get_untrusted_services() } - /// Create a new `MetaAddr` for a peer that has just shut down. - pub fn new_shutdown(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - // TODO: if the peer shut down in the Responded state, preserve that - // state. All other states should be treated as (timeout) errors. - MetaAddr::new_errored(addr, services) + /// The last time we attempted to make a direct outbound connection to the + /// address of this peer. + /// + /// See [`PeerAddrState::get_last_attempt`] for details. + pub fn get_last_attempt(&self) -> Option> { + self.last_connection_state.get_last_attempt() } - /// The last time we interacted with this peer. + /// The last time we successfully made a direct outbound connection to the + /// address of this peer. /// - /// The exact meaning depends on `last_connection_state`: - /// - `Responded`: the last time we processed a message from this peer - /// - `NeverAttempted`: the unverified time provided by the remote peer - /// that sent us this address - /// - `Failed`: the last time we marked the peer as failed - /// - `AttemptPending`: the last time we queued the peer for a reconnection - /// attempt + /// See [`PeerAddrState::get_last_success`] for details. + pub fn get_last_success(&self) -> Option> { + self.last_connection_state.get_last_success() + } + + /// The last time a direct outbound connection to the address of this peer + /// failed. + /// + /// See [`PeerAddrState::get_last_failed`] for details. + pub fn get_last_failed(&self) -> Option> { + self.last_connection_state.get_last_failed() + } + + /// The last time another node claimed this peer was valid. + /// + /// The exact meaning depends on [`last_connection_state`]: + /// - [`NeverAttemptedSeed`]: the seed config doesn't have any last seen info, + /// so this field is [`None`] + /// - [`NeverAttemptedGossiped`]: the unverified time provided by the remote + /// peer that sent us this address + /// - [`NeverAttemptedAlternate`]: the local time we received the [`Version`] + /// message containing this address from a peer + /// - [`Failed`] and [`AttemptPending`]: these states do not update this field /// /// ## Security /// - /// `last_seen` times from `NeverAttempted` peers may be invalid due to + /// last seen times from non-`Responded` peers may be invalid due to /// clock skew, or buggy or malicious peers. - pub fn get_last_seen(&self) -> DateTime { - self.last_seen + /// + /// Typically, this field should be ignored, unless the peer is in a + /// never attempted state. + pub fn get_untrusted_last_seen(&self) -> Option> { + self.last_connection_state.get_untrusted_last_seen() + } + + /// The last time we successfully made a direct outbound connection to this + /// peer, or another node claimed this peer was valid. + /// + /// Clamped to a [`u32`] number of seconds. + /// + /// [`None`] if the address was supplied as a seed. + /// + /// ## Security + /// + /// last seen times from non-`Responded` peers may be invalid due to + /// clock skew, or buggy or malicious peers. + /// + /// Use [`get_last_success`] if you need a trusted, unclamped value. + pub fn get_last_success_or_untrusted(&self) -> Option> { + // Use the best time we have, if any + let seconds = self + .get_last_success() + .or_else(|| self.get_untrusted_last_seen())? + .timestamp(); + + // Convert to a DateTime + let seconds = seconds.clamp(u32::MIN.into(), u32::MAX.into()); + let time = Utc + .timestamp_opt(seconds, 0) + .single() + .expect("unexpected invalid time: all u32 values should be valid"); + + Some(time) + } + + /// Has this peer ever been attempted? + pub fn has_been_attempted(&self) -> bool { + self.get_last_attempt().is_some() } /// Is this address a directly connected client? + /// + /// If we don't have enough information, assume that it is not a client. pub fn is_direct_client(&self) -> bool { match self.last_connection_state { - Responded => !self.services.contains(PeerServices::NODE_NETWORK), - NeverAttemptedGossiped | NeverAttemptedAlternate | Failed | AttemptPending => false, + Responded { services, .. } => !services.contains(PeerServices::NODE_NETWORK), + NeverAttemptedSeed + | NeverAttemptedGossiped { .. } + | NeverAttemptedAlternate { .. } + | Failed { .. } + | AttemptPending { .. } => false, } } /// Is this address valid for outbound connections? + /// + /// If we don't have enough information, assume that it is valid. pub fn is_valid_for_outbound(&self) -> bool { - self.services.contains(PeerServices::NODE_NETWORK) + self.get_untrusted_services() + .map(|services| services.contains(PeerServices::NODE_NETWORK)) + .unwrap_or(true) && !self.addr.ip().is_unspecified() && self.addr.port() != 0 } - /// Return a sanitized version of this `MetaAddr`, for sending to a remote peer. - pub fn sanitize(&self) -> MetaAddr { + /// Return a sanitized version of this [`MetaAddr`], for sending to a remote peer. + /// + /// If [`None`], this address should not be sent to remote peers. + pub fn sanitize(&self) -> Option { + // Sanitize the time let interval = crate::constants::TIMESTAMP_TRUNCATION_SECONDS; - let ts = self.get_last_seen().timestamp(); - let last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0); - MetaAddr { + let ts = self.get_last_success_or_untrusted()?.timestamp(); + let last_seen_maybe_untrusted = Utc.timestamp(ts - ts.rem_euclid(interval), 0); + + Some(MetaAddr { addr: self.addr, - // services are sanitized during parsing, or set to a fixed valued by - // new_local_listener, so we don't need to sanitize here - services: self.services, - last_seen, - // the state isn't sent to the remote peer, but sanitize it anyway - last_connection_state: NeverAttemptedGossiped, - } + // the exact state variant isn't sent to the remote peer, but sanitize it anyway + last_connection_state: NeverAttemptedGossiped { + untrusted_last_seen: last_seen_maybe_untrusted, + // services are sanitized during parsing, or set to a fixed value by + // new_local_listener, so we don't need to sanitize here + untrusted_services: self.get_untrusted_services()?, + }, + }) } } impl Ord for MetaAddr { - /// `MetaAddr`s are sorted in approximate reconnection attempt order, but - /// with `Responded` peers sorted first as a group. + /// [`MetaAddr`]s are sorted in approximate reconnection attempt order, but + /// with [`Responded`] peers sorted first as a group. /// /// This order should not be used for reconnection attempts: use /// [`AddressBook::reconnection_peers`] instead. @@ -284,17 +1054,8 @@ impl Ord for MetaAddr { use std::net::IpAddr::{V4, V6}; use Ordering::*; - let oldest_first = self.get_last_seen().cmp(&other.get_last_seen()); - let newest_first = oldest_first.reverse(); - let connection_state = self.last_connection_state.cmp(&other.last_connection_state); - let reconnection_time = match self.last_connection_state { - Responded => oldest_first, - NeverAttemptedGossiped => newest_first, - NeverAttemptedAlternate => newest_first, - Failed => oldest_first, - AttemptPending => oldest_first, - }; + let ip_numeric = match (self.addr.ip(), other.addr.ip()) { (V4(a), V4(b)) => a.octets().cmp(&b.octets()), (V6(a), V6(b)) => a.octets().cmp(&b.octets()), @@ -303,13 +1064,11 @@ impl Ord for MetaAddr { }; connection_state - .then(reconnection_time) // The remainder is meaningless as an ordering, but required so that we // have a total order on `MetaAddr` values: self and other must compare // as Equal iff they are equal. .then(ip_numeric) .then(self.addr.port().cmp(&other.addr.port())) - .then(self.services.bits().cmp(&other.services.bits())) } } @@ -319,15 +1078,29 @@ impl PartialOrd for MetaAddr { } } +impl PartialEq for MetaAddr { + fn eq(&self, other: &Self) -> bool { + use Ordering::*; + self.cmp(other) == Equal + } +} + +impl Eq for MetaAddr {} + impl ZcashSerialize for MetaAddr { fn zcash_serialize(&self, mut writer: W) -> Result<(), std::io::Error> { writer.write_u32::( - self.get_last_seen() + self.get_last_success_or_untrusted() + .expect("unexpected serialization of MetaAddr with missing fields") .timestamp() .try_into() .expect("time is in range"), )?; - writer.write_u64::(self.services.bits())?; + writer.write_u64::( + self.get_untrusted_services() + .expect("unexpected serialization of MetaAddr with missing fields") + .bits(), + )?; writer.write_socket_addr(self.addr)?; Ok(()) } diff --git a/zebra-network/src/meta_addr/arbitrary.rs b/zebra-network/src/meta_addr/arbitrary.rs deleted file mode 100644 index 3c2231afb5a..00000000000 --- a/zebra-network/src/meta_addr/arbitrary.rs +++ /dev/null @@ -1,31 +0,0 @@ -use proptest::{arbitrary::any, arbitrary::Arbitrary, prelude::*}; - -use super::{MetaAddr, PeerAddrState, PeerServices}; - -use chrono::{TimeZone, Utc}; -use std::net::SocketAddr; - -impl Arbitrary for MetaAddr { - type Parameters = (); - - fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { - ( - any::(), - any::(), - any::(), - any::(), - ) - .prop_map( - |(addr, services, last_seen, last_connection_state)| MetaAddr { - addr, - services, - // This can't panic, because all u32 values are valid `Utc.timestamp`s - last_seen: Utc.timestamp(last_seen.into(), 0), - last_connection_state, - }, - ) - .boxed() - } - - type Strategy = BoxedStrategy; -} diff --git a/zebra-network/src/meta_addr/tests/check.rs b/zebra-network/src/meta_addr/tests/check.rs index 061854d5fea..6de8861d7e3 100644 --- a/zebra-network/src/meta_addr/tests/check.rs +++ b/zebra-network/src/meta_addr/tests/check.rs @@ -1,45 +1,57 @@ //! Shared test checks for MetaAddr -use super::super::MetaAddr; +use super::super::{MetaAddr, PeerAddrState::NeverAttemptedGossiped}; use crate::constants::TIMESTAMP_TRUNCATION_SECONDS; /// Make sure that the sanitize function reduces time and state metadata /// leaks. pub(crate) fn sanitize_avoids_leaks(entry: &MetaAddr) { - let sanitized = entry.sanitize(); + let sanitized = match entry.sanitize() { + Some(sanitized) => sanitized, + // Skip addresses that will never be sent to peers + None => { + return; + } + }; // We want the sanitized timestamp to: // - be a multiple of the truncation interval, // - have a zero nanoseconds component, and // - be within the truncation interval of the original timestamp. - assert_eq!( - sanitized.get_last_seen().timestamp() % TIMESTAMP_TRUNCATION_SECONDS, - 0 - ); - assert_eq!(sanitized.get_last_seen().timestamp_subsec_nanos(), 0); - // handle underflow and overflow by skipping the check - // the other check will ensure correctness - let lowest_time = entry - .get_last_seen() - .timestamp() - .checked_sub(TIMESTAMP_TRUNCATION_SECONDS); - let highest_time = entry - .get_last_seen() - .timestamp() - .checked_add(TIMESTAMP_TRUNCATION_SECONDS); - if let Some(lowest_time) = lowest_time { - assert!(sanitized.get_last_seen().timestamp() > lowest_time); - } - if let Some(highest_time) = highest_time { - assert!(sanitized.get_last_seen().timestamp() < highest_time); + if let Some(sanitized_last) = sanitized.get_last_success_or_untrusted() { + assert_eq!(sanitized_last.timestamp() % TIMESTAMP_TRUNCATION_SECONDS, 0); + assert_eq!(sanitized_last.timestamp_subsec_nanos(), 0); + // handle underflow and overflow by skipping the check + // the other check will ensure correctness + let lowest_time = entry + .get_last_success_or_untrusted() + .map(|t| t.timestamp().checked_sub(TIMESTAMP_TRUNCATION_SECONDS)) + .flatten(); + let highest_time = entry + .get_last_success_or_untrusted() + .map(|t| t.timestamp().checked_add(TIMESTAMP_TRUNCATION_SECONDS)) + .flatten(); + if let Some(lowest_time) = lowest_time { + assert!(sanitized_last.timestamp() > lowest_time); + } + if let Some(highest_time) = highest_time { + assert!(sanitized_last.timestamp() < highest_time); + } } - // Sanitize to the the default state, even though it's not serialized - assert_eq!(sanitized.last_connection_state, Default::default()); + // Sanitize to the the gossiped state, even though it's not serialized + assert!(matches!( + sanitized.last_connection_state, + NeverAttemptedGossiped { .. } + )); + // We want the other fields to be unmodified assert_eq!(sanitized.addr, entry.addr); // Services are sanitized during parsing, so we don't need to make // any changes in sanitize() - assert_eq!(sanitized.services, entry.services); + assert_eq!( + sanitized.get_untrusted_services(), + entry.get_untrusted_services() + ); } diff --git a/zebra-network/src/meta_addr/tests/preallocate.rs b/zebra-network/src/meta_addr/tests/preallocate.rs index c9fb9dbb440..3c7ea002200 100644 --- a/zebra-network/src/meta_addr/tests/preallocate.rs +++ b/zebra-network/src/meta_addr/tests/preallocate.rs @@ -9,9 +9,12 @@ use std::convert::TryInto; proptest! { /// Confirm that each MetaAddr takes exactly META_ADDR_SIZE bytes when serialized. - /// This verifies that our calculated `TrustedPreallocate::max_allocation()` is indeed an upper bound. + /// This verifies that our calculated [`TrustedPreallocate::max_allocation`] is indeed an upper bound. #[test] fn meta_addr_size_is_correct(addr in MetaAddr::arbitrary()) { + // TODO: make a strategy that has no None fields + prop_assume!(addr.sanitize().is_some()); + let serialized = addr .zcash_serialize_to_vec() .expect("Serialization to vec must succeed"); @@ -23,6 +26,9 @@ proptest! { /// 2. The largest allowed vector is small enough to fit in a legal Zcash message #[test] fn meta_addr_max_allocation_is_correct(addr in MetaAddr::arbitrary()) { + // TODO: make a strategy that has no None fields + prop_assume!(addr.sanitize().is_some()); + let max_allocation: usize = MetaAddr::max_allocation().try_into().unwrap(); let mut smallest_disallowed_vec = Vec::with_capacity(max_allocation + 1); for _ in 0..(MetaAddr::max_allocation() + 1) { diff --git a/zebra-network/src/meta_addr/tests/vectors.rs b/zebra-network/src/meta_addr/tests/vectors.rs index 538ada7b8c7..2f93c95c5b1 100644 --- a/zebra-network/src/meta_addr/tests/vectors.rs +++ b/zebra-network/src/meta_addr/tests/vectors.rs @@ -1,28 +1,59 @@ //! Test vectors for MetaAddr. -use super::{super::MetaAddr, check}; +use super::{ + super::{MetaAddr, PeerAddrState}, + check, +}; use chrono::{MAX_DATETIME, MIN_DATETIME}; /// Make sure that the sanitize function handles minimum and maximum times. #[test] fn sanitize_extremes() { + use PeerAddrState::*; + zebra_test::init(); - let min_time_entry = MetaAddr { + let min_time_untrusted = MetaAddr { + addr: "127.0.0.1:8233".parse().unwrap(), + last_connection_state: NeverAttemptedGossiped { + untrusted_last_seen: MIN_DATETIME, + untrusted_services: Default::default(), + }, + }; + + let min_time_local = MetaAddr { + addr: "127.0.0.1:8233".parse().unwrap(), + last_connection_state: Responded { + last_attempt: MIN_DATETIME, + last_success: MIN_DATETIME, + last_failed: Some(MIN_DATETIME), + untrusted_last_seen: Some(MIN_DATETIME), + services: Default::default(), + }, + }; + + let max_time_untrusted = MetaAddr { addr: "127.0.0.1:8233".parse().unwrap(), - services: Default::default(), - last_seen: MIN_DATETIME, - last_connection_state: Default::default(), + last_connection_state: NeverAttemptedGossiped { + untrusted_last_seen: MAX_DATETIME, + untrusted_services: Default::default(), + }, }; - let max_time_entry = MetaAddr { + let max_time_local = MetaAddr { addr: "127.0.0.1:8233".parse().unwrap(), - services: Default::default(), - last_seen: MAX_DATETIME, - last_connection_state: Default::default(), + last_connection_state: Responded { + last_attempt: MAX_DATETIME, + last_success: MAX_DATETIME, + last_failed: Some(MAX_DATETIME), + untrusted_last_seen: Some(MAX_DATETIME), + services: Default::default(), + }, }; - check::sanitize_avoids_leaks(&min_time_entry); - check::sanitize_avoids_leaks(&max_time_entry); + check::sanitize_avoids_leaks(&min_time_untrusted); + check::sanitize_avoids_leaks(&min_time_local); + check::sanitize_avoids_leaks(&max_time_untrusted); + check::sanitize_avoids_leaks(&max_time_local); } diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 0d51ae5c6d7..9e329120ccf 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -6,6 +6,8 @@ //! This module contains a lot of undocumented state, assumptions and invariants. //! And it's unclear if these assumptions match the `zcashd` implementation. //! It should be refactored into a cleaner set of request/response pairs (#1515). +//! +//! Zebra is also very sensitive to order, timing, and buffer size changes (#2193). use std::{collections::HashSet, sync::Arc}; @@ -328,7 +330,7 @@ pub struct Connection { /// State so that we can move the future out of it independently of /// other state handling. pub(super) request_timer: Option, - pub(super) svc: S, + pub(super) inbound_service: S, /// A `mpsc::Receiver` that converts its results to /// `InProgressClientRequest` pub(super) client_rx: ClientRequestReceiver, @@ -904,30 +906,38 @@ where /// of connected peers. async fn drive_peer_request(&mut self, req: Request) { trace!(?req); + use futures::TryFutureExt; + use tokio::sync::oneshot::error::TryRecvError; use tower::{load_shed::error::Overloaded, ServiceExt}; - if self.svc.ready_and().await.is_err() { - // Treat all service readiness errors as Overloaded - // TODO: treat `TryRecvError::Closed` in `Inbound::poll_ready` as a fatal error (#1655) - self.fail_with(PeerError::Overloaded); - return; - } - - let rsp = match self.svc.call(req).await { + let rsp = match self + .inbound_service + .ready_and() + .and_then(|service| service.call(req.clone())) + .await + { Err(e) => { - if e.is::() { - tracing::warn!("inbound service is overloaded, closing connection"); - metrics::counter!("pool.closed.loadshed", 1); - self.fail_with(PeerError::Overloaded); + if e.is::() { + panic!( + "unexpected initialization error in inbound service: {:?}", + e + ); + } else if e.is::() { + // TODO: rate-limit closing connections during inbound service overload (#2107) + // TODO: should we send a reject here? + tracing::debug!(%req, "inbound service is overloaded, dropping request"); + metrics::counter!("pool.overloaded", 1); } else { // We could send a reject to the remote peer, but that might cause // them to disconnect, and we might be using them to sync blocks. // For similar reasons, we don't want to fail_with() here - we // only close the connection if the peer is doing something wrong. - error!(%e, - connection_state = ?self.state, - client_receiver = ?self.client_rx, - "error processing peer request"); + error!( + %e, + connection_state = ?self.state, + client_receiver = ?self.client_rx, + "error processing peer request" + ); } return; } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 99ce1f7b08e..1b41231951e 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -23,11 +23,11 @@ use zebra_chain::{block, parameters::Network}; use crate::{ constants, + meta_addr::{MetaAddr, MetaAddrChange}, protocol::{ external::{types::*, Codec, InventoryHash, Message}, internal::{Request, Response}, }, - types::MetaAddr, BoxError, Config, }; @@ -45,7 +45,7 @@ use super::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerEr pub struct Handshake { config: Config, inbound_service: S, - timestamp_collector: mpsc::Sender, + timestamp_collector: mpsc::Sender, inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, nonces: Arc>>, user_agent: String, @@ -296,7 +296,7 @@ impl fmt::Debug for ConnectedAddr { pub struct Builder { config: Option, inbound_service: Option, - timestamp_collector: Option>, + timestamp_collector: Option>, our_services: Option, user_agent: Option, relay: Option, @@ -334,9 +334,12 @@ where /// Provide a hook for timestamp collection. Optional. /// - /// This channel takes `MetaAddr`s, permanent addresses which can be used to - /// make outbound connections to peers. - pub fn with_timestamp_collector(mut self, timestamp_collector: mpsc::Sender) -> Self { + /// This channel takes [`MetaAddrChange`]s, which contain permanent addresses + /// that be used to make outbound connections to peers. + pub fn with_timestamp_collector( + mut self, + timestamp_collector: mpsc::Sender, + ) -> Self { self.timestamp_collector = Some(timestamp_collector); self } @@ -670,8 +673,8 @@ where // `Version` messages. let alternate_addrs = connected_addr.get_alternate_addrs(remote_canonical_addr); for alt_addr in alternate_addrs { - let alt_addr = MetaAddr::new_alternate(&alt_addr, &remote_services); - if alt_addr.is_valid_for_outbound() { + let alt_addr = MetaAddr::new_alternate(alt_addr, remote_services); + if alt_addr.is_valid_for_outbound(None) { tracing::info!( ?alt_addr, "sending valid alternate peer address to the address book" @@ -758,7 +761,10 @@ where // the collector doesn't depend on network activity, // so this await should not hang let _ = inbound_ts_collector - .send(MetaAddr::new_responded(&book_addr, &remote_services)) + .send(MetaAddr::update_responded( + book_addr, + remote_services, + )) .await; } } @@ -772,7 +778,10 @@ where if let Some(book_addr) = connected_addr.get_address_book_addr() { let _ = inbound_ts_collector - .send(MetaAddr::new_errored(&book_addr, &remote_services)) + .send(MetaAddr::update_failed( + book_addr, + Some(remote_services), + )) .await; } } @@ -827,7 +836,7 @@ where use super::connection; let server = Connection { state: connection::State::AwaitingRequest, - svc: inbound_service, + inbound_service, client_rx: server_rx.into(), error_slot: slot, peer_tx, @@ -882,7 +891,10 @@ where if let Some(book_addr) = connected_addr.get_address_book_addr() { // awaiting a local task won't hang let _ = timestamp_collector - .send(MetaAddr::new_shutdown(&book_addr, &remote_services)) + .send(MetaAddr::update_shutdown( + book_addr, + Some(remote_services), + )) .await; } return; @@ -970,7 +982,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender) -> Resu /// `handle_heartbeat_error`. async fn heartbeat_timeout( fut: F, - timestamp_collector: &mut mpsc::Sender, + timestamp_collector: &mut mpsc::Sender, connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result @@ -1004,7 +1016,7 @@ where /// If `result.is_err()`, mark `connected_addr` as failed using `timestamp_collector`. async fn handle_heartbeat_error( result: Result, - timestamp_collector: &mut mpsc::Sender, + timestamp_collector: &mut mpsc::Sender, connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result @@ -1018,7 +1030,7 @@ where if let Some(book_addr) = connected_addr.get_address_book_addr() { let _ = timestamp_collector - .send(MetaAddr::new_errored(&book_addr, &remote_services)) + .send(MetaAddr::update_failed(book_addr, Some(*remote_services))) .await; } Err(err) diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 412d7cafb80..6be2afd362f 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -9,3 +9,4 @@ use inventory_registry::InventoryRegistry; use set::PeerSet; pub use initialize::init; +pub use set::spawn_fanout; diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index be51be482da..13768ff0886 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -1,149 +1,157 @@ -use std::{cmp::min, mem, sync::Arc, time::Duration}; +use std::{ + cmp::{max, min}, + net::SocketAddr, + sync::Arc, +}; use chrono::{DateTime, Utc}; -use futures::stream::{FuturesUnordered, StreamExt}; -use tokio::time::{sleep, sleep_until, timeout, Sleep}; -use tower::{Service, ServiceExt}; +use futures::stream::StreamExt; +use tokio::time::{sleep_until, Instant}; +use tower::Service; -use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response}; +use crate::{ + address_book::{spawn_blocking, AddressMetrics}, + constants, peer_set, + types::MetaAddr, + AddressBook, BoxError, Request, Response, +}; -/// The `CandidateSet` manages the `PeerSet`'s peer reconnection attempts. +/// The [`CandidateSet`] manages outbound peer connection attempts. +/// Successful connections become peers in the [`PeerSet`]. /// -/// It divides the set of all possible candidate peers into disjoint subsets, -/// using the `PeerAddrState`: +/// The candidate set divides the set of all possible outbound peers into +/// disjoint subsets, using the [`PeerAddrState`]: +/// +/// 1. [`Responded`] peers, which we previously had outbound connections to. +/// 2. [`NeverAttemptedSeed`] peers, which we learned about from our seed config, +/// but have never connected to; +/// 3. [`NeverAttemptedGossiped`] peers, which we learned about from other peers +/// but have never connected to; +/// 4. [`NeverAttemptedAlternate`] peers, which we learned from the [`Version`] +/// messages of directly connected peers, but have never connected to; +/// 5. [`Failed`] peers, to whom we attempted to connect but were unable to; +/// 6. [`AttemptPending`] peers, which we've recently queued for a connection. +/// +/// Never attempted peers are always available for connection. +/// +/// If a peer's attempted, success, or failed time is recent +/// (within the liveness limit), we avoid reconnecting to it. +/// Otherwise, we assume that it has disconnected or hung, +/// and attempt reconnection. /// -/// 1. `Responded` peers, which we previously had inbound or outbound connections -/// to. If we have not received any messages from a `Responded` peer within a -/// cutoff time, we assume that it has disconnected or hung, and attempt -/// reconnection; -/// 2. `NeverAttempted` peers, which we learned about from other peers or a DNS -/// seeder, but have never connected to; -/// 3. `Failed` peers, to whom we attempted to connect but were unable to; -/// 4. `AttemptPending` peers, which we've recently queued for reconnection. /// /// ```ascii,no_run /// ┌──────────────────┐ -/// │ PeerSet │ -/// │GetPeers Responses│ -/// └──────────────────┘ -/// │ -/// │ -/// │ -/// │ +/// │ Config / DNS │ +/// ┌───────────│ Seed │───────────┐ +/// │ │ Addresses │ │ +/// │ └──────────────────┘ │ +/// │ │ untrusted_last_seen │ +/// │ │ is unknown │ +/// ▼ │ ▼ +/// ┌──────────────────┐ │ ┌──────────────────┐ +/// │ Handshake │ │ │ Peer Set │ +/// │ Canonical │──────────┼──────────│ Gossiped │ +/// │ Addresses │ │ │ Addresses │ +/// └──────────────────┘ │ └──────────────────┘ +/// untrusted_last_seen │ provides +/// set to now │ untrusted_last_seen /// ▼ -/// filter by Λ -/// !contains_addr ╱ ╲ -/// ┌────────────────────────────▶▕ ▏ -/// │ ╲ ╱ -/// │ V -/// │ │ -/// │ │ -/// │ │ -/// │ ┌──────────────────┐ │ -/// │ │ Inbound │ │ -/// │ │ Peer Connections │ │ -/// │ └──────────────────┘ │ -/// │ │ │ -/// ├──────────┼────────────────────┼───────────────────────────────┐ -/// │ PeerSet ▼ AddressBook ▼ │ -/// │ ┌─────────────┐ ┌────────────────┐ ┌─────────────┐ │ -/// │ │ Possibly │ │`NeverAttempted`│ │ `Failed` │ │ -/// │ │Disconnected │ │ Peers │ │ Peers │◀┼┐ -/// │ │ `Responded` │ │ │ │ │ ││ -/// │ │ Peers │ │ │ │ │ ││ -/// │ └─────────────┘ └────────────────┘ └─────────────┘ ││ -/// │ │ │ │ ││ -/// │ #1 oldest_first #2 newest_first #3 oldest_first ││ -/// │ │ │ │ ││ -/// │ ├──────────────────────┴──────────────────────┘ ││ -/// │ │ disjoint `PeerAddrState`s ││ -/// ├────────┼──────────────────────────────────────────────────────┘│ -/// │ ▼ │ -/// │ Λ │ -/// │ ╱ ╲ filter by │ -/// └─────▶▕ ▏!is_potentially_connected │ -/// ╲ ╱ to remove live │ -/// V `Responded` peers │ -/// │ │ -/// │ Try outbound connection │ -/// ▼ │ -/// ┌────────────────┐ │ -/// │`AttemptPending`│ │ -/// │ Peers │ │ -/// │ │ │ -/// └────────────────┘ │ -/// │ │ -/// │ │ -/// ▼ │ -/// Λ │ -/// ╱ ╲ │ -/// ▕ ▏─────────────────────────────────────────────────────┘ -/// ╲ ╱ connection failed, update last_seen to now() -/// V -/// │ -/// │ -/// ▼ -/// ┌────────────┐ -/// │ send │ -/// │peer::Client│ -/// │to Discover │ -/// └────────────┘ -/// │ -/// │ -/// ▼ -/// ┌───────────────────────────────────────┐ -/// │ every time we receive a peer message: │ -/// │ * update state to `Responded` │ -/// │ * update last_seen to now() │ +/// Λ ignore changes if ever attempted +/// ╱ ╲ (including `Responded` and `Failed`) +/// ▕ ▏ otherwise, if never attempted: +/// ╲ ╱ update services only +/// V +/// ┌───────────────────────────────┼───────────────────────────────┐ +/// │ AddressBook │ │ +/// │ disjoint `PeerAddrState`s ▼ │ +/// │ ┌─────────────┐ ┌─────────────────────────┐ ┌─────────────┐ │ +/// │ │ `Responded` │ │ `NeverAttemptedSeed` │ │ `Failed` │ │ +/// ┌┼▶│ Peers │ │`NeverAttemptedGossiped` │ │ Peers │◀┼┐ +/// ││ │ │ │`NeverAttemptedAlternate`│ │ │ ││ +/// ││ │ │ │ Peers │ │ │ ││ +/// ││ └─────────────┘ └─────────────────────────┘ └─────────────┘ ││ +/// ││ │ │ │ ││ +/// ││ #1 oldest_first #2 newest_first #3 oldest_first ││ +/// ││ ├──────────────────────┴──────────────────────┘ ││ +/// ││ ▼ ││ +/// ││ Λ ││ +/// ││ ╱ ╲ filter by ││ +/// ││ ▕ ▏ !recently_used_addr ││ +/// ││ ╲ ╱ to remove recent `Responded`, ││ +/// ││ V `AttemptPending`, and `Failed` peers ││ +/// ││ │ ││ +/// ││ │ try outbound connection, ││ +/// ││ ▼ update last_attempted to now() ││ +/// ││┌────────────────┐ ││ +/// │││`AttemptPending`│ ││ +/// │││ Peers │ ││ +/// │││ │ ││ +/// ││└────────────────┘ ││ +/// │└────────┼──────────────────────────────────────────────────────┘│ +/// │ ▼ │ +/// │ Λ │ +/// │ ╱ ╲ │ +/// │ ▕ ▏─────────────────────────────────────────────────────┘ +/// │ ╲ ╱ connection failed, update last_failed to now() +/// │ V (`AttemptPending` and `Responded` peers) +/// │ │ +/// │ │ connection succeeded +/// │ ▼ +/// │ ┌────────────┐ +/// │ │ send │ +/// │ │peer::Client│ +/// │ │to Discover │ +/// │ └────────────┘ +/// │ │ +/// │ ▼ +/// │┌───────────────────────────────────────┐ +/// ││ every time we receive a peer message: │ +/// └│ * update state to `Responded` │ +/// │ * update last_success to now() │ /// └───────────────────────────────────────┘ -/// /// ``` // TODO: -// * draw arrow from the "peer message" box into the `Responded` state box -// * make the "disjoint states" box include `AttemptPending` -pub(super) struct CandidateSet { +// * show all possible transitions between Attempt/Responded/Failed, +// except Failed -> Responded is invalid, must go through Attempt +// * show that seed peers that transition to other never attempted +// states are already in the address book +#[derive(Clone, Debug)] +pub(super) struct CandidateSet

+where + P: Service + Clone + Send + 'static, + P::Future: Send + 'static, +{ pub(super) address_book: Arc>, - pub(super) peer_service: S, - next_peer_min_wait: Sleep, + pub(super) peer_service: tower::timeout::Timeout

, + next_peer_sleep_until: Arc>, } -impl CandidateSet +impl

CandidateSet

where - S: Service, - S::Future: Send + 'static, + P: Service + Clone + Send + 'static, + P::Future: Send + 'static, { - /// The minimum time between successive calls to `CandidateSet::next()`. - /// - /// ## Security - /// - /// Zebra resists distributed denial of service attacks by making sure that new peer connections - /// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart. - const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); - /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers. pub fn new( address_book: Arc>, - peer_service: S, - ) -> CandidateSet { + peer_service: P, + ) -> CandidateSet

{ CandidateSet { address_book, - peer_service, - next_peer_min_wait: sleep(Duration::from_secs(0)), + peer_service: tower::timeout::Timeout::new(peer_service, constants::GET_ADDR_TIMEOUT), + next_peer_sleep_until: Arc::new(futures::lock::Mutex::new(Instant::now())), } } - /// Update the peer set from the network, using the default fanout limit. - /// - /// See [`update_initial`][Self::update_initial] for details. - pub async fn update(&mut self) -> Result<(), BoxError> { - self.update_timeout(None).await - } - /// Update the peer set from the network, limiting the fanout to /// `fanout_limit`. /// - /// - Ask a few live [`Responded`] peers to send us more peers. - /// - Process all completed peer responses, adding new peers in the + /// Returns the number of new candidate peers after the update. + /// + /// Implementation: + /// - Asks a few live [`Responded`] peers to send us more peers. + /// - Processes peer responses, adding new peers in the /// [`NeverAttemptedGossiped`] state. /// /// ## Correctness @@ -165,62 +173,33 @@ where /// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped /// [`Failed`]: crate::PeerAddrState::Failed /// [`AttemptPending`]: crate::PeerAddrState::AttemptPending - pub async fn update_initial(&mut self, fanout_limit: usize) -> Result<(), BoxError> { - self.update_timeout(Some(fanout_limit)).await - } - - /// Update the peer set from the network, limiting the fanout to - /// `fanout_limit`, and imposing a timeout on the entire fanout. - /// - /// See [`update_initial`][Self::update_initial] for details. - async fn update_timeout(&mut self, fanout_limit: Option) -> Result<(), BoxError> { - // CORRECTNESS - // - // Use a timeout to avoid deadlocks when there are no connected - // peers, and: - // - we're waiting on a handshake to complete so there are peers, or - // - another task that handles or adds peers is waiting on this task - // to complete. - if let Ok(fanout_result) = - timeout(constants::REQUEST_TIMEOUT, self.update_fanout(fanout_limit)).await - { - fanout_result?; - } else { - // update must only return an error for permanent failures - info!("timeout waiting for the peer service to become ready"); - } - - Ok(()) + pub async fn update(&mut self, fanout_limit: usize) -> usize { + let fanout_limit = min(fanout_limit, constants::MAX_GET_ADDR_FANOUT); + self.update_fanout(fanout_limit).await } /// Update the peer set from the network, limiting the fanout to /// `fanout_limit`. /// /// See [`update_initial`][Self::update_initial] for details. - /// - /// # Correctness - /// - /// This function does not have a timeout. - /// Use [`update_timeout`][Self::update_timeout] instead. - async fn update_fanout(&mut self, fanout_limit: Option) -> Result<(), BoxError> { + #[instrument(skip(self))] + async fn update_fanout(&mut self, fanout_limit: usize) -> usize { + let before_candidates = self.candidate_peer_count().await; + // Opportunistically crawl the network on every update call to ensure // we're actively fetching peers. Continue independently of whether we // actually receive any peers, but always ask the network for more. - // - // Because requests are load-balanced across existing peers, we can make - // multiple requests concurrently, which will be randomly assigned to - // existing peers, but we don't make too many because update may be - // called while the peer set is already loaded. - let mut responses = FuturesUnordered::new(); - let fanout_limit = fanout_limit - .map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT)) - .unwrap_or(constants::GET_ADDR_FANOUT); - debug!(?fanout_limit, "sending GetPeers requests"); - // TODO: launch each fanout in its own task (might require tokio 1.6) - for _ in 0..fanout_limit { - let peer_service = self.peer_service.ready_and().await?; - responses.push(peer_service.call(Request::Peers)); - } + debug!(?before_candidates, "sending GetPeers requests"); + + let mut responses = peer_set::spawn_fanout( + self.peer_service.clone(), + Request::Peers, + fanout_limit, + constants::REQUEST_TIMEOUT, + ); + + // There's no need to spawn concurrent threads for each response, + // because address book updates are not concurrent. while let Some(rsp) = responses.next().await { match rsp { Ok(Response::Peers(addrs)) => { @@ -230,7 +209,7 @@ where "got response to GetPeers" ); let addrs = validate_addrs(addrs, Utc::now()); - self.send_addrs(addrs); + self.send_addrs(addrs).await; } Err(e) => { // since we do a fanout, and new updates are triggered by @@ -241,83 +220,204 @@ where } } - Ok(()) + // Calculate the change in the number of candidates. + // Since the address book is concurrent, this can be negative. + let after_candidates = self.candidate_peer_count().await; + let new_candidates = after_candidates.saturating_sub(before_candidates); + debug!( + ?new_candidates, + ?before_candidates, + ?after_candidates, + "updated candidate set" + ); + + new_candidates } /// Add new `addrs` to the address book. - fn send_addrs(&self, addrs: impl IntoIterator) { + async fn send_addrs(&mut self, addrs: Vec) { + debug!(addrs_len = ?addrs.len(), + "sending validated addresses to the address book"); + trace!(?addrs, "full validated address list"); + + // Turn the addresses into "new gossiped" changes + let addrs = addrs.into_iter().map(MetaAddr::new_gossiped_change); + // # Correctness // - // Briefly hold the address book threaded mutex, to extend - // the address list. + // Briefly hold the address book threaded mutex on a blocking thread. // // Extend handles duplicate addresses internally. - self.address_book.lock().unwrap().extend(addrs); + spawn_blocking(&self.address_book.clone(), |address_book| { + address_book.extend(addrs) + }) + .await; } /// Returns the next candidate for a connection attempt, if any are available. /// - /// Returns peers in this order: - /// - oldest `Responded` that are not live - /// - newest `NeverAttempted` - /// - oldest `Failed` + /// Returns peers in [`MetaAddr::cmp`] order, lowest first: + /// - oldest [`Responded`] that are not live + /// - [`NeverAttemptedSeed`], if any + /// - newest [`NeverAttemptedGossiped`] + /// - newest [`NeverAttemptedAlternate`] + /// - oldest [`Failed`] that are not recent + /// - oldest [`AttemptPending`] that are not recent /// - /// Skips `AttemptPending` peers and live `Responded` peers. + /// Skips peers that have recently been attempted, connected, or failed. /// /// ## Correctness /// - /// `AttemptPending` peers will become `Responded` if they respond, or - /// become `Failed` if they time out or provide a bad response. + /// [`AttemptPending`] peers will become [`Responded`] if they respond, or + /// become [`Failed`] if they time out or provide a bad response. /// - /// Live `Responded` peers will stay live if they keep responding, or - /// become a reconnection candidate if they stop responding. + /// Live [`Responded`] peers will stay live if they keep responding, or + /// become a connection candidate if they stop responding. /// /// ## Security /// /// Zebra resists distributed denial of service attacks by making sure that /// new peer connections are initiated at least - /// `MIN_PEER_CONNECTION_INTERVAL` apart. + /// [`MIN_PEER_CONNECTION_INTERVAL`] apart. + #[instrument(skip(self))] pub async fn next(&mut self) -> Option { - let current_deadline = self.next_peer_min_wait.deadline(); - let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL); - mem::swap(&mut self.next_peer_min_wait, &mut sleep); - // # Correctness // - // In this critical section, we hold the address mutex, blocking the - // current thread, and all async tasks scheduled on that thread. + // Briefly hold the address book threaded mutex on a blocking thread. // // To avoid deadlocks, the critical section: - // - must not acquire any other locks - // - must not await any futures + // - must not acquire any other locks, and + // - must not await any futures. // // To avoid hangs, any computation in the critical section should // be kept to a minimum. - let reconnect = { - let mut guard = self.address_book.lock().unwrap(); + let next_peer = spawn_blocking(&self.address_book.clone(), |address_book| { // It's okay to return without sleeping here, because we're returning // `None`. We only need to sleep before yielding an address. - let reconnect = guard.reconnection_peers().next()?; + let next_peer = address_book.next_candidate_peer()?; - let reconnect = MetaAddr::new_reconnect(&reconnect.addr, &reconnect.services); - guard.update(reconnect); - reconnect - }; + let change = MetaAddr::update_attempt(next_peer.addr); + // if the update fails, we return, to avoid a possible infinite loop + let updated_next_peer = address_book.update(change); + debug!( + ?next_peer, + ?updated_next_peer, + "updated next peer in address book" + ); + updated_next_peer + }) + .await?; - // SECURITY: rate-limit new candidate connections - sleep.await; + // # Security + // + // Rate-limit new candidate connections to avoid denial of service. + // + // Update the deadline before sleeping, so we handle concurrent requests + // correctly. + // + // # Correctness + // + // In this critical section, we hold the next sleep time mutex, blocking + // the current async task. (There is one task per handshake.) + // + // To avoid deadlocks, the critical section: + // - must not acquire any other locks, and + // - must not await any other futures. + let current_deadline; + let now; + { + let mut next_sleep_until_guard = self.next_peer_sleep_until.lock().await; + current_deadline = *next_sleep_until_guard; + now = Instant::now(); + // If we recently had a connection, base the next time off our sleep + // time. Otherwise, use the current time, to avoid large bursts. + *next_sleep_until_guard = + max(current_deadline, now) + constants::MIN_PEER_CONNECTION_INTERVAL; + } + + // Now that the next deadline has been updated, actually do the sleep + if current_deadline > now { + debug!(?current_deadline, + ?now, + next_peer_sleep_until = ?self.next_peer_sleep_until, + ?next_peer, + "sleeping to rate-limit new peer connections"); + } + sleep_until(current_deadline).await; + debug!(?next_peer, "returning next candidate peer"); - Some(reconnect) + Some(next_peer) } /// Mark `addr` as a failed peer. - pub fn report_failed(&mut self, addr: &MetaAddr) { - let addr = MetaAddr::new_errored(&addr.addr, &addr.services); + #[instrument(skip(self))] + pub async fn report_failed(&mut self, addr: SocketAddr) { + let addr = MetaAddr::update_failed(addr, None); + // # Correctness + // + // Briefly hold the address book threaded mutex on a blocking thread. + spawn_blocking(&self.address_book.clone(), move |address_book| { + address_book.update(addr) + }) + .await; + } + + /// Return the number of candidate outbound peers. + /// + /// This number can change over time as recently used peers expire. + pub async fn candidate_peer_count(&mut self) -> usize { + // # Correctness + // + // Briefly hold the address book threaded mutex on a blocking thread. + spawn_blocking(&self.address_book.clone(), |address_book| { + address_book.candidate_peer_count() + }) + .await + } + + /// Return the number of recently live outbound peers. + /// + /// This number can change over time as responded peers expire. + // + // TODO: get this information from the peer set, which tracks + // inbound connections as well as outbound connections (#1552) + pub async fn recently_live_peer_count(&mut self) -> usize { + // # Correctness + // + // Briefly hold the address book threaded mutex on a blocking thread. + spawn_blocking(&self.address_book.clone(), |address_book| { + address_book.recently_live_peers().count() + }) + .await + } + + /// Return the number of recently used outbound peers. + /// + /// This number can change over time as recently used peers expire. + pub async fn recently_used_peer_count(&mut self) -> usize { + // # Correctness + // + // Briefly hold the address book threaded mutex on a blocking thread. + spawn_blocking(&self.address_book.clone(), |address_book| { + address_book.recently_used_peers().count() + }) + .await + } + + /// Return the [`AddressBook`] metrics. + /// + /// # Performance + /// + /// Calculating these metrics can be expensive for large address books. + /// Don't call this function more than once every few seconds. + pub async fn address_metrics(&mut self) -> AddressMetrics { // # Correctness // - // Briefly hold the address book threaded mutex, to update the state for - // a single address. - self.address_book.lock().unwrap().update(addr); + // Briefly hold the address book threaded mutex on a blocking thread. + spawn_blocking(&self.address_book.clone(), |address_book| { + address_book.address_metrics() + }) + .await } } @@ -335,7 +435,7 @@ where fn validate_addrs( addrs: impl IntoIterator, last_seen_limit: DateTime, -) -> impl IntoIterator { +) -> Vec { // Note: The address book handles duplicate addresses internally, // so we don't need to de-duplicate addresses here. @@ -348,5 +448,5 @@ fn validate_addrs( // - Zebra should limit the number of addresses it uses from a single Addrs // response (#1869) - addrs + addrs.into_iter().collect() } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 26e9190ebd2..aeba0a92dcf 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -3,16 +3,19 @@ // Portions of this submodule were adapted from tower-balance, // which is (c) 2019 Tower Contributors (MIT licensed). -use std::{net::SocketAddr, sync::Arc}; +use std::{cmp::max, net::SocketAddr, sync::Arc}; use futures::{ channel::mpsc, future::{self, FutureExt}, sink::SinkExt, stream::{FuturesUnordered, StreamExt}, - TryFutureExt, }; -use tokio::{net::TcpListener, sync::broadcast, time::Instant}; +use tokio::{ + net::TcpListener, + sync::{broadcast, oneshot}, + time::Instant, +}; use tower::{ buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover, util::BoxService, Service, ServiceExt, @@ -21,15 +24,20 @@ use tracing::Span; use tracing_futures::Instrument; use crate::{ - constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector, AddressBook, - BoxError, Config, Request, Response, + constants, + meta_addr::{MetaAddr, MetaAddrChange}, + peer, + timestamp_collector::TimestampCollector, + types::PeerServices, + AddressBook, BoxError, Config, Request, Response, }; use zebra_chain::parameters::Network; use super::CandidateSet; use super::PeerSet; -use peer::Client; + +use CrawlerAction::*; type PeerChange = Result, BoxError>; @@ -64,7 +72,7 @@ where S: Service + Clone + Send + 'static, S::Future: Send + 'static, { - let (address_book, timestamp_collector) = TimestampCollector::spawn(&config); + let (address_book, timestamp_collector) = TimestampCollector::spawn(config.clone()); let (inv_sender, inv_receiver) = broadcast::channel(100); // Construct services that handle inbound handshakes and perform outbound @@ -74,12 +82,11 @@ where let (listen_handshaker, outbound_connector) = { use tower::timeout::TimeoutLayer; let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT); - use crate::protocol::external::types::PeerServices; let hs = peer::Handshake::builder() .with_config(config.clone()) .with_inbound_service(inbound_service) .with_inventory_collector(inv_sender) - .with_timestamp_collector(timestamp_collector) + .with_timestamp_collector(timestamp_collector.clone()) .with_advertised_services(PeerServices::NODE_NETWORK) .with_user_agent(crate::constants::USER_AGENT.to_string()) .want_transactions(true) @@ -95,7 +102,7 @@ where let (peerset_tx, peerset_rx) = mpsc::channel::(100); // Create an mpsc channel for peerset demand signaling. let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100); - let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); + let (handle_tx, handle_rx) = oneshot::channel(); // Connect the rx end to a PeerSet, wrapping new peers in load instruments. let peer_set = PeerSet::new( @@ -114,6 +121,8 @@ where ); let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); + // Connect the tx end to the 3 peer sources: + // // 1. Incoming peer connections, via a listener. // Warn if we're configured using the wrong network port. @@ -136,94 +145,83 @@ where .instrument(Span::current()), ); - // 2. Initial peers, specified in the config. - let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel(); + // 2. Initial peer connections, as specified in the config. let initial_peers_fut = { let config = config.clone(); - let outbound_connector = outbound_connector.clone(); - let peerset_tx = peerset_tx.clone(); async move { let initial_peers = config.initial_peers().await; - let _ = initial_peer_count_tx.send(initial_peers.len()); - // Connect the tx end to the 3 peer sources: - add_initial_peers(initial_peers, outbound_connector, peerset_tx).await + add_initial_peers(initial_peers, timestamp_collector).await } .boxed() }; - let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current())); + let initial_peers_guard = tokio::spawn(initial_peers_fut.instrument(Span::current())); // 3. Outgoing peers we connect to in response to load. - let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); - - // We need to await candidates.update() here, because zcashd only sends one - // `addr` message per connection, and if we only have one initial peer we - // need to ensure that its `addr` message is used by the crawler. - - info!("Sending initial request for peers"); - let _ = candidates - .update_initial(initial_peer_count_rx.await.expect("value sent before drop")) - .await; + let candidate_set = CandidateSet::new(address_book.clone(), peer_set.clone()); for _ in 0..config.peerset_initial_target_size { let _ = demand_tx.try_send(()); } + // Wait for the initial peers to be sent to the address book. + // This a workaround for zcashd's per-connection `addr` message rate limit. + info!("waiting for initial seed peer info"); + initial_peers_guard + .await + .expect("unexpected panic in initial peers task") + .expect("unexpected error sending initial peers"); + + let (initial_crawl_tx, initial_crawl_rx) = oneshot::channel(); let crawl_guard = tokio::spawn( crawl_and_dial( config.crawl_new_peer_interval, + config.peerset_initial_target_size, demand_tx, demand_rx, - candidates, + candidate_set, outbound_connector, peerset_tx, + initial_crawl_tx, ) .instrument(Span::current()), ); - handle_tx - .send(vec![add_guard, listen_guard, crawl_guard]) - .unwrap(); + // Wait for the initial crawls from the crawler. + // This a workaround for zcashd's per-connection `addr` message rate limit. + info!("waiting for initial seed peer getaddr crawls"); + initial_crawl_rx + .await + .expect("initial crawl oneshot unexpectedly dropped"); + + handle_tx.send(vec![listen_guard, crawl_guard]).unwrap(); (peer_set, address_book) } -/// Use the provided `handshaker` to connect to `initial_peers`, then send -/// the results over `tx`. -#[instrument(skip(initial_peers, outbound_connector, tx))] -async fn add_initial_peers( +/// Use the provided `outbound_connector` to connect to `initial_peers`, then +/// send the results over `peerset_tx`. +/// +/// Also adds those peers to the [`AddressBook`] using `timestamp_collector`, +/// and updates `success_count_tx` with the number of successful peers. +/// +/// Stops trying peers once we've had `peerset_initial_target_size` successful +/// handshakes. +#[instrument(skip( + initial_peers, + timestamp_collector, +), fields(initial_peers_len = %initial_peers.len()))] +async fn add_initial_peers( initial_peers: std::collections::HashSet, - outbound_connector: S, - mut tx: mpsc::Sender, -) -> Result<(), BoxError> -where - S: Service, Error = BoxError> + Clone, - S::Future: Send + 'static, -{ - info!(?initial_peers, "connecting to initial peer set"); - // ## Correctness: - // - // Each `CallAll` can hold one `Buffer` or `Batch` reservation for - // an indefinite period. We can use `CallAllUnordered` without filling - // the underlying `Inbound` buffer, because we immediately drive this - // single `CallAll` to completion, and handshakes have a short timeout. - let mut handshakes: FuturesUnordered<_> = initial_peers - .into_iter() - .map(|addr| { - outbound_connector - .clone() - .oneshot(addr) - .map_err(move |e| (addr, e)) - }) - .collect(); - - while let Some(handshake_result) = handshakes.next().await { - // this is verbose, but it's better than just hanging with no output - if let Err((addr, ref e)) = handshake_result { - info!(?addr, ?e, "an initial peer connection failed"); - } - tx.send(handshake_result.map_err(|(_addr, e)| e)).await?; - } + mut timestamp_collector: mpsc::Sender, +) -> Result<(), BoxError> { + info!(?initial_peers, "sending seed peers to the address book"); + + // Note: these address book updates are sent to a channel, so they might be + // applied after updates from concurrent tasks. + let seed_changes = initial_peers.into_iter().map(MetaAddr::new_seed); + let mut seed_changes = futures::stream::iter(seed_changes).map(Result::Ok); + timestamp_collector.send_all(&mut seed_changes).await?; Ok(()) } @@ -232,12 +230,12 @@ where /// Zcash peer. /// /// Uses `handshaker` to perform a Zcash network protocol handshake, and sends -/// the `Client` result over `tx`. -#[instrument(skip(tx, handshaker))] +/// the [`peer::Client`] result over `peerset_tx`. +#[instrument(skip(peerset_tx, handshaker))] async fn listen( addr: SocketAddr, mut handshaker: S, - tx: mpsc::Sender, + peerset_tx: mpsc::Sender, ) -> Result<(), BoxError> where S: Service + Clone, @@ -271,11 +269,11 @@ where // Construct a handshake future but do not drive it yet.... let handshake = handshaker.call((tcp_stream, connected_addr)); // ... instead, spawn a new task to handle this connection - let mut tx2 = tx.clone(); + let mut peerset_tx2 = peerset_tx.clone(); tokio::spawn( async move { if let Ok(client) = handshake.await { - let _ = tx2.send(Ok(Change::Insert(addr, client))).await; + let _ = peerset_tx2.send(Ok(Change::Insert(addr, client))).await; } } .instrument(handshaker_span), @@ -285,47 +283,65 @@ where } /// An action that the peer crawler can take. -#[allow(dead_code)] enum CrawlerAction { /// Drop the demand signal because there are too many pending handshakes. DemandDrop, - /// Initiate a handshake to `candidate` in response to demand. - DemandHandshake { candidate: MetaAddr }, + /// Initiate a handshake to the next candidate in response to demand. + DemandHandshake, /// Crawl existing peers for more peers in response to demand, because there /// are no available candidates. DemandCrawl, /// Crawl existing peers for more peers in response to a timer `tick`. TimerCrawl { tick: Instant }, - /// Handle a successfully connected handshake `peer_set_change`. - HandshakeConnected { - peer_set_change: Change, - }, + /// Handle a successfully connected handshake to `addr`. + HandshakeConnected { addr: SocketAddr }, /// Handle a handshake failure to `failed_addr`. - HandshakeFailed { failed_addr: MetaAddr }, + HandshakeFailed { + failed_addr: MetaAddr, + error: BoxError, + }, + /// Handle a completed crawl that produced `new_candidates`, based on a + /// `fanout_limit`. + CrawlCompleted { + new_candidates: usize, + fanout_limit: usize, + }, } /// Given a channel `demand_rx` that signals a need for new peers, try to find -/// and connect to new peers, and send the resulting `peer::Client`s through the -/// `success_tx` channel. +/// and connect to new peers, and send the resulting [`peer::Client`]s through the +/// `peerset_tx` channel. /// /// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is -/// demand, but no new peers in `candidates`. After crawling, try to connect to +/// demand, but no new peers in `candidate_set`. After crawling, try to connect to /// one new peer using `outbound_connector`. /// /// If a handshake fails, restore the unused demand signal by sending it to /// `demand_tx`. /// -/// The crawler terminates when `candidates.update()` or `success_tx` returns a +/// When the seed peer crawls are completed, notifies `initial_crawl_rx`. +/// +/// The crawler terminates when [`CandidateSet::update`] or `peerset_tx` returns a /// permanent internal error. Transient errors and individual peer errors should /// be handled within the crawler. -#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, success_tx))] -async fn crawl_and_dial( +#[instrument(skip( + demand_tx, + demand_rx, + candidate_set, + outbound_connector, + peerset_tx, + initial_crawl_tx +))] +#[allow(clippy::too_many_arguments)] +async fn crawl_and_dial( crawl_new_peer_interval: std::time::Duration, + peerset_initial_target_size: usize, mut demand_tx: mpsc::Sender<()>, mut demand_rx: mpsc::Receiver<()>, - mut candidates: CandidateSet, + mut candidate_set: CandidateSet

, outbound_connector: C, - mut success_tx: mpsc::Sender, + peerset_tx: mpsc::Sender, + initial_crawl_tx: oneshot::Sender<()>, ) -> Result<(), BoxError> where C: Service, Error = BoxError> @@ -333,55 +349,85 @@ where + Send + 'static, C::Future: Send + 'static, - S: Service, - S::Future: Send + 'static, + P: Service + Clone + Send + 'static, + P::Future: Send + 'static, { - use CrawlerAction::*; - // CORRECTNESS // // To avoid hangs and starvation, the crawler must: // - spawn a separate task for each crawl and handshake, so they can make // progress independently (and avoid deadlocking each other) // - use the `select!` macro for all actions, because the `select` function - // is biased towards the first ready future + // is biased towards its first argument if multiple futures are ready + // - perform all `await`s in the `select!` or in spawned tasks + // - perform all threaded mutex locks in spawned tasks + // TODO: replace threaded mutexes with services (#1976) + + let mut recently_live_peers = candidate_set.recently_live_peer_count().await; + + // These variables aren't updated, so drop them before the loop + { + let candidate_peers = candidate_set.candidate_peer_count().await; + let recently_used_peers = candidate_set.recently_used_peer_count().await; + info!( + ?candidate_peers, + ?recently_used_peers, + ?recently_live_peers, + "starting the peer crawler" + ); + } let mut handshakes = FuturesUnordered::new(); - // returns None when empty. - // Keeping an unresolved future in the pool means the stream - // never terminates. - // We could use StreamExt::select_next_some and StreamExt::fuse, but `fuse` - // prevents us from adding items to the stream and checking its length. - handshakes.push(future::pending().boxed()); + let mut crawls = FuturesUnordered::new(); - let mut crawl_timer = - tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick }); + let crawl_start = Instant::now() + crawl_new_peer_interval; + let mut crawl_timer = tokio::time::interval_at(crawl_start, crawl_new_peer_interval) + .map(|tick| TimerCrawl { tick }) + .fuse(); - loop { - metrics::gauge!( - "crawler.in_flight_handshakes", - handshakes - .len() - .checked_sub(1) - .expect("the pool always contains an unresolved future") as f64 - ); + // Zebra has just started, and we're the only task that makes connections, + // so the number of recently live peers must be zero here. + // + // TODO: remove this check when recently live peers uses the PeerSet, + // so we don't panic on early inbound connections + assert_eq!( + recently_live_peers, 0, + "Unexpected recently live peers: the crawler should be the only task that makes outbound connections" + ); + + // Start by doing a crawl after every successful handshake. + // + // If we only have a small number of initial peers, we need to ensure their + // `Addrs` responses used by the crawler. (zcashd has a per-connection + // `GetAddr` response rate limit.) + let mut initial_crawl_tx = Some(initial_crawl_tx); + let mut initial_crawls_left: usize = 2; - let crawler_action = tokio::select! { - next_handshake_res = handshakes.next() => next_handshake_res.expect( - "handshakes never terminates, because it contains a future that never resolves" - ), - next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"), + loop { + // Do we need a crawl() after processing this message? + let mut needs_crawl = false; + + metrics::gauge!("crawler.in_flight_handshakes", handshakes.len() as f64); + metrics::gauge!("crawler.in_flight_crawls", crawls.len() as f64); + metrics::gauge!("crawler.initial_crawls_left", initial_crawls_left as f64); + + // If multiple futures are ready, the `select!` macro chooses one at random. + // + // TODO: after we upgrade to tokio 1.6, use `select { biased; ... }` + // to prioritise handshakes, then crawls, then demand if multiple futures + // are ready. + // (Handshakes enable crawls, and both stop when there is no demand.) + let crawler_action = futures::select! { + next_handshake = handshakes.select_next_some() => next_handshake, + next_crawl = crawls.select_next_some() => next_crawl, + next_timer = crawl_timer.select_next_some() => next_timer, // turn the demand into an action, based on the crawler's current state _ = demand_rx.next() => { - if handshakes.len() > 50 { - // Too many pending handshakes already + // Too many pending handshakes and crawls already + if handshakes.len() + crawls.len() >= peerset_initial_target_size { DemandDrop - } else if let Some(candidate) = candidates.next().await { - // candidates.next has a short delay, and briefly holds the address - // book lock, so it shouldn't hang - DemandHandshake { candidate } - } else { - DemandCrawl + } else { + DemandHandshake } } }; @@ -391,77 +437,167 @@ where // This is set to trace level because when the peerset is // congested it can generate a lot of demand signal very // rapidly. - trace!("too many in-flight handshakes, dropping demand signal"); + trace!("too many in-flight handshakes and crawls, dropping demand signal"); continue; } - DemandHandshake { candidate } => { + DemandHandshake => { // spawn each handshake into an independent task, so it can make // progress independently of the crawls - let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone())) - .map(move |res| match res { - Ok(crawler_action) => crawler_action, - Err(e) => { - panic!("panic during handshaking with {:?}: {:?} ", candidate, e); - } - }) - .instrument(Span::current()); + let hs_join = tokio::spawn(dial( + candidate_set.clone(), + outbound_connector.clone(), + peerset_tx.clone(), + )) + .map(move |res| match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!("panic during handshake attempt, error: {:?}", e); + } + }) + .instrument(Span::current()); handshakes.push(Box::pin(hs_join)); } DemandCrawl => { debug!("demand for peers but no available candidates"); - // update has timeouts, and briefly holds the address book - // lock, so it shouldn't hang - // - // TODO: refactor candidates into a buffered service, so we can - // spawn independent tasks to avoid deadlocks - candidates.update().await?; - // Try to connect to a new peer. + needs_crawl = true; + // Try to connect to a new peer after the crawl. let _ = demand_tx.try_send(()); } TimerCrawl { tick } => { - debug!( - ?tick, + let address_metrics = candidate_set.address_metrics().await; + info!( + ?address_metrics, "crawling for more peers in response to the crawl timer" ); - // TODO: spawn independent tasks to avoid deadlocks - candidates.update().await?; - // Try to connect to a new peer. + debug!(?tick, "crawl timer value"); + needs_crawl = true; let _ = demand_tx.try_send(()); } - HandshakeConnected { peer_set_change } => { - if let Change::Insert(ref addr, _) = peer_set_change { - debug!(candidate.addr = ?addr, "successfully dialed new peer"); - } else { - unreachable!("unexpected handshake result: all changes should be Insert"); - } - // successes are handled by an independent task, so they - // shouldn't hang - success_tx.send(Ok(peer_set_change)).await?; + HandshakeConnected { addr } => { + trace!(?addr, "crawler cleared successful handshake"); + // Assume an update for the peer that just connected is waiting + // in the channel. + // + // TODO: replace with a live peers watch channel + // include inbound live peers using the peer set (#1552) + recently_live_peers = candidate_set.recently_live_peer_count().await + 1; + + // Work around a zcashd rate-limit + needs_crawl = initial_crawls_left > 0; } - HandshakeFailed { failed_addr } => { - debug!(?failed_addr.addr, "marking candidate as failed"); - candidates.report_failed(&failed_addr); + HandshakeFailed { failed_addr, error } => { + trace!(?failed_addr, ?error, "crawler cleared failed handshake"); // The demand signal that was taken out of the queue // to attempt to connect to the failed candidate never // turned into a connection, so add it back: let _ = demand_tx.try_send(()); } + CrawlCompleted { + new_candidates, + fanout_limit, + } => { + if initial_crawls_left > 0 { + info!(?new_candidates, ?fanout_limit, "crawler completed crawl"); + } else { + debug!(?new_candidates, ?fanout_limit, "crawler completed crawl"); + } + + if new_candidates > 0 { + if let Some(initial_crawl_tx) = initial_crawl_tx.take() { + initial_crawls_left = 0; + // We don't care if it has been dropped + let _ = initial_crawl_tx.send(()); + } + } + } + } + + // Only run one crawl at a time, to avoid congestion on testnet. + // TODO: do we want a small number of concurrent crawls on large networks + // like mainnet? + if needs_crawl && crawls.is_empty() { + // avoid occupying our zcashd peers with requests they'll only answer infrequently + let fanout_limit = max( + recently_live_peers / constants::GET_ADDR_FANOUT_LIVE_PEERS_DIVISOR, + 1, + ); + + // spawn each crawl into an independent task, so it can make + // progress in parallel with the handshakes + let crawl_join = tokio::spawn(crawl(candidate_set.clone(), fanout_limit)) + .map(move |res| match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!( + "panic during crawl, recently live peers: {:?}, error: {:?}", + recently_live_peers, e + ); + } + }) + .instrument(Span::current()); + crawls.push(Box::pin(crawl_join)); + + initial_crawls_left = initial_crawls_left.saturating_sub(1); + if initial_crawls_left == 0 { + if let Some(initial_crawl_tx) = initial_crawl_tx.take() { + let _ = initial_crawl_tx.send(()); + } + } } } } -/// Try to connect to `candidate` using `outbound_connector`. +/// Try to update `candidate_set` with a `fanout_limit`. /// -/// Returns a `HandshakeConnected` action on success, and a -/// `HandshakeFailed` action on error. -#[instrument(skip(outbound_connector,))] -async fn dial(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction +/// Returns an [`UpdateCompleted`] action containing the number of new candidate +/// peers. +#[instrument(skip(candidate_set))] +async fn crawl

(mut candidate_set: CandidateSet

, fanout_limit: usize) -> CrawlerAction +where + P: Service + Clone + Send + 'static, + P::Future: Send + 'static, +{ + // CORRECTNESS + // + // To avoid hangs, the crawler must only await: + // - functions that return immediately, or + // - functions that have a reasonable timeout + + debug!(?fanout_limit, "attempting peer set address crawl"); + + // CandidateSet::update has timeouts, so it shouldn't hang + // + // TODO: refactor CandidateSet into a buffered service, to avoid + // AddressBook threaded mutex deadlocks and contention (#1976) + let new_candidates = candidate_set.update(fanout_limit).await; + + CrawlCompleted { + new_candidates, + fanout_limit, + } +} + +/// Try to connect to a candidate from `candidate_set` using +/// `outbound_connector`, and send successful handshakes to `peerset_tx`. +/// +/// Returns: +/// - [`DemandCrawl`] if there are no candidates for outbound connection, +/// - [`HandshakeConnected`] for successful handshakes, and +/// - [`HandshakeFailed`] for failed handshakes. +#[instrument(skip(candidate_set, outbound_connector, peerset_tx))] +async fn dial( + mut candidate_set: CandidateSet

, + mut outbound_connector: C, + mut peerset_tx: mpsc::Sender, +) -> CrawlerAction where C: Service, Error = BoxError> + Clone + Send + 'static, C::Future: Send + 'static, + P: Service + Clone + Send + 'static, + P::Future: Send + 'static, { // CORRECTNESS // @@ -469,6 +605,15 @@ where // - functions that return immediately, or // - functions that have a reasonable timeout + debug!("trying to get the next candidate address"); + + // candidate_set.next has a short delay, and briefly holds the address + // book lock, so it shouldn't hang + let candidate = match candidate_set.next().await { + Some(candidate) => candidate, + None => return DemandCrawl, + }; + debug!(?candidate.addr, "attempting outbound connection in response to demand"); // the connector is always ready, so this can't hang @@ -478,23 +623,39 @@ where .expect("outbound connector never errors"); // the handshake has timeouts, so it shouldn't hang - outbound_connector - .call(candidate.addr) - .map_err(|e| (candidate, e)) - .map(Into::into) - .await -} + let handshake_result = outbound_connector.call(candidate.addr).await; + + match handshake_result { + Ok(peer_set_change) => { + if let Change::Insert(ref addr, _) = peer_set_change { + debug!(?addr, "successfully dialed new peer"); + } else { + unreachable!("unexpected handshake result: all changes should be Insert"); + } -impl From, (MetaAddr, BoxError)>> for CrawlerAction { - fn from(dial_result: Result, (MetaAddr, BoxError)>) -> Self { - use CrawlerAction::*; - match dial_result { - Ok(peer_set_change) => HandshakeConnected { peer_set_change }, - Err((candidate, e)) => { - debug!(?candidate.addr, ?e, "failed to connect to candidate"); - HandshakeFailed { - failed_addr: candidate, - } + // the peer set is handled by an independent task, so this send + // shouldn't hang + peerset_tx + .send(Ok(peer_set_change)) + .await + .expect("peer set never errors"); + + HandshakeConnected { + addr: candidate.addr, + } + } + Err(error) => { + debug!(addr = ?candidate.addr, ?error, "marking candidate as failed"); + // The handshaker sends its errors to the timestamp collector. + // But we also need to record errors in the connector. + // (And any other layers in the service stack.) + // + // TODO: replace this with the timestamp collector? + candidate_set.report_failed(candidate.addr).await; + + HandshakeFailed { + failed_addr: candidate, + error, } } } diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 5b5044cf605..a39ed98d9fc 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -18,11 +18,11 @@ use futures::{ }; use indexmap::IndexMap; use tokio::sync::{broadcast, oneshot::error::TryRecvError}; -use tokio::task::JoinHandle; +use tokio::{task::JoinHandle, time::timeout}; use tower::{ discover::{Change, Discover}, load::Load, - Service, + Service, ServiceExt, }; use crate::{ @@ -37,6 +37,7 @@ use super::{ unready_service::{Error as UnreadyError, UnreadyService}, InventoryRegistry, }; +use tracing::{Instrument, Span}; /// A [`tower::Service`] that abstractly represents "the rest of the network". /// @@ -530,3 +531,77 @@ where fut } } + +/// Spawn `fanout_limit` copies of `request` to `peer_service`, each in a +/// separate task. +/// +/// Each task waits `readiness_timeout` for the peer service to be ready, +/// and returns an error if there is a timeout. +/// (There is no default timeout on peer service readiness.) +/// +/// The readiness timeout happens before any request timeout on `peer_service`. +/// (Requests have a default [`REQUEST_TIMEOUT`]. A shorter timeout can be +/// applied using a [`tower::timeout::TimeoutLayer`] on `peer_service`.) +/// +/// # Panics +/// +/// If the peer service returns a permanent error. +/// If the spawned task panics. +/// +/// # Load Balancing +/// +/// Because requests are load-balanced across existing peers, we can make +/// multiple requests concurrently. These requests will be randomly assigned to +/// existing peers. Avoid making too many requests, because fanouts may be called +/// while the peer set is already loaded. +/// +/// # Timeouts and Deadlocks +/// +/// Use a small timeout to avoid deadlocks when there are no connected peers. +/// +/// Deadlocks can happen when: +/// - we're waiting on a handshake to complete so there are peers, or +/// - another task that handles or adds peers is waiting on this task to +/// complete. +/// +/// The `readiness_timeout` should be long enough for: +/// - some handshake tasks to complete, or +/// - some requests to complete or timeout, +/// and update the peer set readiness. +/// +/// Use [`REQUEST_TIMEOUT`], unless you're sure you need a lower timeout. +#[instrument(skip(peer_service))] +pub fn spawn_fanout

( + peer_service: P, + request: Request, + fanout_limit: usize, + readiness_timeout: std::time::Duration, +) -> impl futures::Stream> +where + P: Service + Clone + Send + 'static, + P::Future: Send + 'static, +{ + let responses = FuturesUnordered::new(); + + for _ in 0..fanout_limit { + let mut peer_service = peer_service.clone(); + let request = request.clone(); + let response_fut = tokio::spawn(async move { + let ready_peer_service_fut = timeout(readiness_timeout, peer_service.ready_and()); + if let Ok(peer_service) = ready_peer_service_fut.await { + // CORRECTNESS: peer set requests already have a timeout + peer_service + .expect("unexpected peer service error") + .call(request) + .await + } else { + // timeouts are transient errors + debug!("timeout waiting for peer service readiness: skipping this fanout"); + Err("peer set readiness timeout elapsed".into()) + } + }); + responses.push(response_fut.instrument(Span::current())); + } + + responses.map(|rsp| rsp.expect("unexpected join error in spawned peer service request")) +} diff --git a/zebra-network/src/protocol/external/message.rs b/zebra-network/src/protocol/external/message.rs index e3a75d9473e..e19108d8683 100644 --- a/zebra-network/src/protocol/external/message.rs +++ b/zebra-network/src/protocol/external/message.rs @@ -337,19 +337,23 @@ pub enum RejectReason { impl fmt::Display for Message { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str(match self { + // TODO: add network version, important fields Message::Version { .. } => "version", Message::Verack => "verack", Message::Ping(_) => "ping", Message::Pong(_) => "pong", Message::Reject { .. } => "reject", Message::GetAddr => "getaddr", + // TODO: add length Message::Addr(_) => "addr", Message::GetBlocks { .. } => "getblocks", Message::Inv(_) => "inv", Message::GetHeaders { .. } => "getheaders", Message::Headers(_) => "headers", Message::GetData(_) => "getdata", + // TODO: add transaction count, versions? Message::Block(_) => "block", + // TODO: add transparent, sprout, sapling, orchard counts? Message::Tx(_) => "tx", Message::NotFound(_) => "notfound", Message::Mempool => "mempool", diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index a31f7aeaa63..612b60302f4 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, fmt, sync::Arc}; use zebra_chain::{ block, @@ -175,3 +175,24 @@ pub enum Request { /// Returns [`Response::TransactionHashes`](super::Response::TransactionHashes). MempoolTransactions, } + +impl fmt::Display for Request { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(match self { + Request::Peers => "Peers", + Request::Ping(_) => "Ping", + // TODO: add length + Request::BlocksByHash(_) => "BlocksByHash", + Request::TransactionsByHash(_) => "TransactionsByHash", + // TODO: add length, stop Some/None + Request::FindBlocks { .. } => "FindBlocks", + Request::FindHeaders { .. } => "FindHeaders", + // TODO: add version, count of transparent, sprout, shielded, orchard + Request::PushTransaction(_) => "PushTransaction", + Request::AdvertiseTransactions(_) => "AdvertiseTransactions", + // TODO: add transaction count, versions? + Request::AdvertiseBlock(_) => "AdvertiseBlock", + Request::MempoolTransactions => "MempoolTransactions", + }) + } +} diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index 4cb3c17b52c..d1818c48e68 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -5,7 +5,7 @@ use zebra_chain::{ use crate::meta_addr::MetaAddr; -use std::sync::Arc; +use std::{fmt, sync::Arc}; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; @@ -39,3 +39,18 @@ pub enum Response { /// A list of transaction hashes. TransactionHashes(Vec), } + +impl fmt::Display for Response { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(match self { + Response::Nil => "Nil", + // TODO: add length + Response::Peers(_) => "Peers", + Response::Blocks(_) => "Blocks", + Response::BlockHashes(_) => "BlockHashes", + Response::BlockHeaders(_) => "BlockHeaders", + Response::Transactions(_) => "Transactions", + Response::TransactionHashes(_) => "TransactionHashes", + }) + } +} diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/timestamp_collector.rs index 143fe7cd629..4fd07b01d24 100644 --- a/zebra-network/src/timestamp_collector.rs +++ b/zebra-network/src/timestamp_collector.rs @@ -4,20 +4,28 @@ use std::sync::Arc; use futures::{channel::mpsc, prelude::*}; -use crate::{types::MetaAddr, AddressBook, Config}; +use crate::{ + address_book::spawn_blocking, constants, meta_addr::MetaAddrChange, AddressBook, Config, +}; /// The timestamp collector hooks into incoming message streams for each peer and -/// records per-connection last-seen timestamps into an [`AddressBook`]. +/// records per-connection [`MetaAddrChange`]s into an [`AddressBook`]. pub struct TimestampCollector {} impl TimestampCollector { - /// Spawn a new [`TimestampCollector`] task, and return handles for the - /// transmission channel for timestamp events and for the [`AddressBook`] it - /// updates. - pub fn spawn(config: &Config) -> (Arc>, mpsc::Sender) { + /// Spawn a new [`TimestampCollector`] task. + /// + /// Return handles for: + /// * the transmission channel for [`MetaAddrChange`] events, and + /// * the [`AddressBook`] it updates. + pub fn spawn( + config: Config, + ) -> ( + Arc>, + mpsc::Sender, + ) { use tracing::Level; - const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100; - let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE); + let (worker_tx, worker_rx) = mpsc::channel(constants::TIMESTAMP_WORKER_BUFFER_SIZE); let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new( config, span!(Level::TRACE, "timestamp collector"), @@ -25,15 +33,20 @@ impl TimestampCollector { let worker_address_book = address_book.clone(); let worker = async move { - while let Some(event) = worker_rx.next().await { + // # Performance + // + // The timestamp collector gets a change for every message, and locks + // the address book for every update. So we merge all ready changes + // into a single address book update. + let mut worker_rx = worker_rx.ready_chunks(constants::TIMESTAMP_WORKER_BUFFER_SIZE); + while let Some(chunk) = worker_rx.next().await { // # Correctness // - // Briefly hold the address book threaded mutex, to update the - // state for a single address. - worker_address_book - .lock() - .expect("mutex should be unpoisoned") - .update(event); + // Briefly hold the address book threaded mutex on a blocking thread. + spawn_blocking(&worker_address_book.clone(), move |address_book| { + address_book.extend(chunk) + }) + .await; } }; tokio::spawn(worker.boxed()); diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 3fd2caa68be..bfc840ff263 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -22,6 +22,7 @@ //! * handles requests from peers for network data and chain data //! * performs transaction and block diffusion //! * downloads and verifies gossiped blocks and transactions +use std::cmp::max; use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use color_eyre::eyre::{eyre, Report}; @@ -31,7 +32,7 @@ use tower::builder::ServiceBuilder; use crate::components::{tokio::RuntimeRun, Inbound}; use crate::config::ZebradConfig; use crate::{ - components::{tokio::TokioComponent, ChainSync}, + components::{tokio::TokioComponent, ChainSync, MAX_INBOUND_DOWNLOAD_CONCURRENCY}, prelude::*, }; @@ -49,10 +50,23 @@ impl StartCmd { info!(?config); info!("initializing node state"); - let state = ServiceBuilder::new().buffer(20).service(zebra_state::init( - config.state.clone(), - config.network.network, - )); + // Add buffer slots based on the largest concurrent caller + // + // Note: Zebra is currently very sensitive to buffer size changes (#2193) + // + // SECURITY + // + // Keep this buffer small, to avoid memory denial of service + let state_limit = max( + config.sync.max_concurrent_block_requests, + MAX_INBOUND_DOWNLOAD_CONCURRENCY, + ); + let state = ServiceBuilder::new() + .buffer(state_limit) + .service(zebra_state::init( + config.state.clone(), + config.network.network, + )); info!("initializing verifiers"); let verifier = zebra_consensus::chain::init( @@ -66,10 +80,27 @@ impl StartCmd { // The service that our node uses to respond to requests by peers. The // load_shed middleware ensures that we reduce the size of the peer set // in response to excess load. + + // Add buffer slots based on the largest concurrent caller. + // But reserve two thirds the peer connections for outbound requests. + // + // + // Note: Zebra is currently very sensitive to buffer size changes (#2193) + // + // # SECURITY + // + // This buffer is a memory denial of service risk, because its + // blocks and transactions have only been structurally validated. + // + // TODO: Make this buffer smaller, to avoid memory denial of service (#1685, #2107) + let inbound_limit = max( + config.network.peerset_initial_target_size, + MAX_INBOUND_DOWNLOAD_CONCURRENCY, + ) / 3; let (setup_tx, setup_rx) = oneshot::channel(); let inbound = ServiceBuilder::new() .load_shed() - .buffer(20) + .buffer(inbound_limit) .service(Inbound::new(setup_rx, state.clone(), verifier.clone())); let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound).await; diff --git a/zebrad/src/components.rs b/zebrad/src/components.rs index 1b78d397254..b2410a69471 100644 --- a/zebrad/src/components.rs +++ b/zebrad/src/components.rs @@ -11,5 +11,5 @@ mod sync; pub mod tokio; pub mod tracing; -pub use inbound::Inbound; +pub use inbound::{Inbound, MAX_INBOUND_DOWNLOAD_CONCURRENCY}; pub use sync::ChainSync; diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index e1e9656174a..8d421421055 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -25,6 +25,7 @@ use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; mod downloads; use downloads::Downloads; +pub use downloads::MAX_INBOUND_DOWNLOAD_CONCURRENCY; type Outbound = Buffer, zn::Request>; type State = Buffer, zs::Request>; @@ -225,6 +226,15 @@ impl Service for Inbound { #[instrument(name = "inbound", skip(self, req))] fn call(&mut self, req: zn::Request) -> Self::Future { + // Drop early requests to avoid load shedding during network setup + if !matches!(self.network_setup, Setup::Initialized { .. }) { + debug!(%req, + "ignoring request from remote peer during network setup" + ); + trace!(?req, "full inbound request"); + return async { Ok(zn::Response::Nil) }.boxed(); + } + match req { zn::Request::Peers => { if let Setup::Initialized { address_book, .. } = &self.network_setup { @@ -252,8 +262,7 @@ impl Service for Inbound { peers.truncate(MAX_ADDR); async { Ok(zn::Response::Peers(peers)) }.boxed() } else { - info!("ignoring `Peers` request from remote peer during network setup"); - async { Ok(zn::Response::Nil) }.boxed() + unreachable!("already checked for network setup"); } } zn::Request::BlocksByHash(hashes) => { @@ -326,10 +335,7 @@ impl Service for Inbound { if let Setup::Initialized { downloads, .. } = &mut self.network_setup { downloads.download_and_verify(hash); } else { - info!( - ?hash, - "ignoring `AdvertiseBlock` request from remote peer during network setup" - ); + unreachable!("already checked for network setup"); } async { Ok(zn::Response::Nil) }.boxed() } diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index 320993e85b5..11bcfe7772a 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -19,8 +19,6 @@ use zebra_chain::block::{self, Block}; use zebra_network as zn; use zebra_state as zs; -type BoxError = Box; - /// The maximum number of concurrent inbound download and verify tasks. /// /// We expect the syncer to download and verify checkpoints, so this bound @@ -32,10 +30,9 @@ type BoxError = Box; /// attacks. /// /// The maximum block size is 2 million bytes. A deserialized malicious -/// block with ~225_000 transparent outputs can take up 9MB of RAM. As of -/// February 2021, a growing `Vec` can allocate up to 2x its current length, -/// leading to an overall memory usage of 18MB per malicious block. (See -/// #1880 for more details.) +/// block with ~225_000 transparent outputs can take up 9MB of RAM. (Since Zebra +/// uses preallocation, the transparent output `Vec` won't have a large amount of +/// unused allocated memory.) /// /// Malicious blocks will eventually timeout or fail contextual validation. /// Once validation fails, the block is dropped, and its memory is deallocated. @@ -43,7 +40,9 @@ type BoxError = Box; /// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks /// will be directed to the malicious node that originally gossiped the hash. /// Therefore, this attack can be carried out by a single malicious node. -const MAX_INBOUND_CONCURRENCY: usize = 10; +pub const MAX_INBOUND_DOWNLOAD_CONCURRENCY: usize = 10; + +type BoxError = Box; /// The action taken in response to a peer's gossiped block hash. pub enum DownloadAction { @@ -58,7 +57,7 @@ pub enum DownloadAction { /// The queue is at capacity, so this request was ignored. /// /// The sync service should discover this block later, when we are closer - /// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`]. + /// to the tip. The queue's capacity is [`MAX_INBOUND_DOWNLOAD_CONCURRENCY`]. FullQueue, } @@ -173,17 +172,17 @@ where tracing::debug!( ?hash, queue_len = self.pending.len(), - ?MAX_INBOUND_CONCURRENCY, + ?MAX_INBOUND_DOWNLOAD_CONCURRENCY, "block hash already queued for inbound download: ignored block" ); return DownloadAction::AlreadyQueued; } - if self.pending.len() >= MAX_INBOUND_CONCURRENCY { + if self.pending.len() >= MAX_INBOUND_DOWNLOAD_CONCURRENCY { tracing::info!( ?hash, queue_len = self.pending.len(), - ?MAX_INBOUND_CONCURRENCY, + ?MAX_INBOUND_DOWNLOAD_CONCURRENCY, "too many blocks queued for inbound download: ignored block" ); return DownloadAction::FullQueue; @@ -254,7 +253,7 @@ where tracing::debug!( ?hash, queue_len = self.pending.len(), - ?MAX_INBOUND_CONCURRENCY, + ?MAX_INBOUND_DOWNLOAD_CONCURRENCY, "queued hash for download" ); metrics::gauge!("gossip.queued.block.count", self.pending.len() as _); diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 159e9e8d205..b8ed3ad0019 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -1,10 +1,7 @@ use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; use color_eyre::eyre::{eyre, Report}; -use futures::{ - future::FutureExt, - stream::{FuturesUnordered, StreamExt}, -}; +use futures::{future::FutureExt, stream::StreamExt}; use tokio::time::sleep; use tower::{ builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout, @@ -23,8 +20,7 @@ use crate::{config::ZebradConfig, BoxError}; mod downloads; use downloads::{AlwaysHedge, Downloads}; -/// Controls the number of peers used for each ObtainTips and ExtendTips request. -const FANOUT: usize = 4; +use zn::constants::SYNC_FANOUT as FANOUT; /// Controls how many times we will retry each block download. /// @@ -369,22 +365,20 @@ where tracing::info!(tip = ?block_locator.first().unwrap(), "trying to obtain new chain tips"); tracing::debug!(?block_locator, "got block locator"); - let mut requests = FuturesUnordered::new(); - for _ in 0..FANOUT { - requests.push( - self.tip_network - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zn::Request::FindBlocks { - known_blocks: block_locator.clone(), - stop: None, - }), - ); - } + let obtain_tips_req = zn::Request::FindBlocks { + known_blocks: block_locator.clone(), + stop: None, + }; + let mut responses = zn::spawn_fanout( + self.tip_network.clone(), + obtain_tips_req, + // TODO: use a lower limit if there aren't many live peers (#1552) + FANOUT, + zn::constants::REQUEST_TIMEOUT, + ); let mut download_set = HashSet::new(); - while let Some(res) = requests.next().await { + while let Some(res) = responses.next().await { match res.map_err::(|e| eyre!(e)) { Ok(zn::Response::BlockHashes(hashes)) => { tracing::trace!(?hashes); @@ -484,19 +478,19 @@ where tracing::info!(tips = ?tips.len(), "trying to extend chain tips"); for tip in tips { tracing::debug!(?tip, "asking peers to extend chain tip"); - let mut responses = FuturesUnordered::new(); - for _ in 0..FANOUT { - responses.push( - self.tip_network - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zn::Request::FindBlocks { - known_blocks: vec![tip.tip], - stop: None, - }), - ); - } + + let extend_tips_req = zn::Request::FindBlocks { + known_blocks: vec![tip.tip], + stop: None, + }; + let mut responses = zn::spawn_fanout( + self.tip_network.clone(), + extend_tips_req, + // TODO: use a lower limit if there aren't many live peers (#1552) + FANOUT, + zn::constants::REQUEST_TIMEOUT, + ); + while let Some(res) = responses.next().await { match res.map_err::(|e| eyre!(e)) { Ok(zn::Response::BlockHashes(hashes)) => { diff --git a/zebrad/src/config.rs b/zebrad/src/config.rs index 963c37d4e7b..58fb04bba66 100644 --- a/zebrad/src/config.rs +++ b/zebrad/src/config.rs @@ -151,6 +151,19 @@ pub struct SyncSection { /// network contention. Increasing this value may improve /// performance on machines with many cores and a fast network /// connection. + /// + /// Note: changing this config can make Zebra slower or less reliabile. + /// (See ticket #2193.) + /// + /// # SECURITY + /// + /// This config controls the syncer download and state buffer sizes. + /// Large values are a memory denial of service risk, because the + /// download buffer is before semantic verification, which checks + /// proof of work. + /// + /// The syncer fans out `FindBlocks` requests to random peers, + /// so the risk is lower than for inbound `AdvertiseBlock` requests. pub max_concurrent_block_requests: usize, /// Controls how far ahead of the chain tip the syncer tries to