From df83ce4eec6a3fca3c7282c7d4b7e6346b6aeb29 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 21 May 2021 13:44:33 +1000 Subject: [PATCH] Refactor MetaAddr to enable security fixes Track multiple last used times for each peer: - Add separate untrusted_last_seen, attempt, success, and failed time fields (#1868, #1876, #1848) - Add the new fields to the peer states, so they only appear in states where they are valid - Insert initial DNS seeder peers in the AddressBook in the correct states Create a new MetaAddrChange type for AddressBook changes: - Ignore invalid state changes - Ignore updates to the untrusted last seen time (but update the services field) - If we get a gossiped or alternate change for a seed peer, use the last seen and services info - Once a peer has responded, don't go back to the NeverResponded... states - Update the address book metrics - Optimise getting the next connection address from the address book --- zebra-network/src/address_book.rs | 366 +++--- zebra-network/src/meta_addr.rs | 1089 ++++++++++++++--- zebra-network/src/meta_addr/arbitrary.rs | 31 - zebra-network/src/meta_addr/tests/check.rs | 62 +- .../src/meta_addr/tests/preallocate.rs | 8 +- zebra-network/src/meta_addr/tests/vectors.rs | 53 +- zebra-network/src/peer/handshake.rs | 40 +- zebra-network/src/peer_set/candidate_set.rs | 164 ++- zebra-network/src/peer_set/initialize.rs | 56 +- zebra-network/src/timestamp_collector.rs | 19 +- 10 files changed, 1435 insertions(+), 453 deletions(-) delete mode 100644 zebra-network/src/meta_addr/arbitrary.rs diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 8fa4864a304..dbdf4373bc1 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -1,17 +1,15 @@ -//! The `AddressBook` manages information about what peers exist, when they were +//! The [`AddressBook`] manages information about what peers exist, when they were //! seen, and what services they provide. -use std::{ - collections::{BTreeSet, HashMap}, - iter::Extend, - net::SocketAddr, - time::Instant, -}; +use std::{collections::HashMap, iter::Extend, net::SocketAddr, time::Instant}; use chrono::{DateTime, Utc}; use tracing::Span; -use crate::{constants, types::MetaAddr, Config, PeerAddrState}; +use crate::{constants, meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState}; + +#[cfg(any(test, feature = "proptest-impl"))] +use proptest_derive::Arbitrary; /// A database of peer listener addresses, their advertised services, and /// information on when they were last seen. @@ -21,7 +19,7 @@ use crate::{constants, types::MetaAddr, Config, PeerAddrState}; /// Address book state must be based on outbound connections to peers. /// /// If the address book is updated incorrectly: -/// - malicious peers can interfere with other peers' `AddressBook` state, +/// - malicious peers can interfere with other peers' [`AddressBook`] state, /// or /// - Zebra can advertise unreachable addresses to its own peers. /// @@ -29,9 +27,9 @@ use crate::{constants, types::MetaAddr, Config, PeerAddrState}; /// /// The address book should only contain Zcash listener port addresses from peers /// on the configured network. These addresses can come from: -/// - DNS seeders +/// - the initial seed peers config /// - addresses gossiped by other peers -/// - the canonical address (`Version.address_from`) provided by each peer, +/// - the canonical address ([`Version.address_from`]) provided by each peer, /// particularly peers on inbound connections. /// /// The remote addresses of inbound connections must not be added to the address @@ -46,15 +44,22 @@ use crate::{constants, types::MetaAddr, Config, PeerAddrState}; /// Updates must not be based on: /// - the remote addresses of inbound connections, or /// - the canonical address of any connection. +/// +/// See the [`CandidateSet`] for a detailed peer state diagram. #[derive(Clone, Debug)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub struct AddressBook { - /// Each known peer address has a matching `MetaAddr`. + /// Each known peer address has a matching [`MetaAddr`]. by_addr: HashMap, /// The local listener address. local_listener: SocketAddr, /// The span for operations on this address book. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(value = "Span::current()") + )] span: Span, /// The last time we logged a message about the address metrics. @@ -62,34 +67,42 @@ pub struct AddressBook { } /// Metrics about the states of the addresses in an [`AddressBook`]. -#[derive(Debug)] +#[derive(Copy, Clone, Debug)] pub struct AddressMetrics { - /// The number of addresses in the `Responded` state. + /// The number of addresses in the [`Responded`] state. responded: usize, - /// The number of addresses in the `NeverAttemptedGossiped` state. + /// The number of addresses in the [`NeverAttemptedSeed`] state. + never_attempted_seed: usize, + + /// The number of addresses in the [`NeverAttemptedGossiped`] state. never_attempted_gossiped: usize, - /// The number of addresses in the `NeverAttemptedAlternate` state. + /// The number of addresses in the [`NeverAttemptedAlternate`] state. never_attempted_alternate: usize, - /// The number of addresses in the `Failed` state. + /// The number of addresses in the [`Failed`] state. failed: usize, - /// The number of addresses in the `AttemptPending` state. + /// The number of addresses in the [`AttemptPending`] state. attempt_pending: usize, - /// The number of `Responded` addresses within the liveness limit. + /// The number of peers that we've tried to connect to recently. + recently_attempted: usize, + + /// The number of peers that have recently sent us messages. recently_live: usize, - /// The number of `Responded` addresses outside the liveness limit. - recently_stopped_responding: usize, + /// The number of peers that have failed recently. + recently_failed: usize, + + /// The number of peers that are connection candidates. + connection_candidates: usize, } -#[allow(clippy::len_without_is_empty)] impl AddressBook { - /// Construct an `AddressBook` with the given `config` and [`tracing::Span`]. - pub fn new(config: &Config, span: Span) -> AddressBook { + /// Construct an [`AddressBook`] with the given `config` and [`tracing::Span`]. + pub fn new(config: Config, span: Span) -> AddressBook { let constructor_span = span.clone(); let _guard = constructor_span.enter(); @@ -104,82 +117,98 @@ impl AddressBook { new_book } - /// Get the local listener address. - pub fn get_local_listener(&self) -> MetaAddr { - MetaAddr::new_local_listener(&self.local_listener) + /// Returns a Change that adds or updates the local listener address in an + /// [`AddressBook`]. + /// + /// Our inbound listener port can be advertised to peers by applying this + /// change to the inbound request address book. + /// + /// # Correctness + /// + /// Avoid inserting this address into the local [`AddressBook`]. + /// (If peers gossip our address back to us, the handshake nonce will + /// protect us from self-connections.) + pub fn get_local_listener(&self) -> MetaAddrChange { + MetaAddr::new_local_listener(self.local_listener) } /// Get the contents of `self` in random order with sanitized timestamps. + /// + /// Skips peers with missing fields. pub fn sanitized(&self) -> Vec { use rand::seq::SliceRandom; let _guard = self.span.enter(); let mut peers = self - .peers() - .map(|a| MetaAddr::sanitize(&a)) + .peers_unordered() + .filter_map(|a| MetaAddr::sanitize(&a)) .collect::>(); peers.shuffle(&mut rand::thread_rng()); peers } /// Returns true if the address book has an entry for `addr`. - pub fn contains_addr(&self, addr: &SocketAddr) -> bool { + pub fn contains_addr(&self, addr: SocketAddr) -> bool { let _guard = self.span.enter(); - self.by_addr.contains_key(addr) + self.by_addr.contains_key(&addr) } - /// Returns the entry corresponding to `addr`, or `None` if it does not exist. + /// Returns the entry corresponding to `addr`, or [`None`] if it does not exist. pub fn get_by_addr(&self, addr: SocketAddr) -> Option { let _guard = self.span.enter(); self.by_addr.get(&addr).cloned() } - /// Add `new` to the address book, updating the previous entry if `new` is - /// more recent or discarding `new` if it is stale. + /// Apply `change` to the address book. + /// + /// If an entry was added or updated, return it. /// /// # Correctness /// - /// All new addresses should go through `update`, so that the address book - /// only contains valid outbound addresses. - pub fn update(&mut self, new: MetaAddr) { + /// All address book changes should go through `update`, so that the + /// address book only contains valid outbound addresses. + pub fn update(&mut self, change: MetaAddrChange) -> Option { let _guard = self.span.enter(); trace!( - ?new, + ?change, total_peers = self.by_addr.len(), recent_peers = self.recently_live_peers().count(), ); + let addr = change.get_addr(); + let book_entry = self.by_addr.get(&addr).cloned(); // If a node that we are directly connected to has changed to a client, // remove it from the address book. - if new.is_direct_client() && self.contains_addr(&new.addr) { + if change.is_direct_client(book_entry) && self.contains_addr(addr) { std::mem::drop(_guard); - self.take(new.addr); - return; + self.take(addr); + self.update_metrics(); + return None; } // Never add unspecified addresses or client services. // // Communication with these addresses can be monitored via Zebra's // metrics. (The address book is for valid peer addresses.) - if !new.is_valid_for_outbound() { - return; + if !change.is_valid_for_outbound(book_entry) { + return None; } - if let Some(prev) = self.get_by_addr(new.addr) { - if prev.get_last_seen() > new.get_last_seen() { - return; - } + let new_entry = change.into_meta_addr(book_entry); + if let Some(new_entry) = new_entry { + self.by_addr.insert(addr, new_entry); + std::mem::drop(_guard); + self.update_metrics(); + Some(new_entry) + } else { + None } - - self.by_addr.insert(new.addr, new); - std::mem::drop(_guard); - self.update_metrics(); } /// Removes the entry with `addr`, returning it if it exists /// /// # Note /// - /// All address removals should go through `take`, so that the address + /// All address removals should go through [`take`], so that the address /// book metrics are accurate. fn take(&mut self, removed_addr: SocketAddr) -> Option { let _guard = self.span.enter(); @@ -214,97 +243,122 @@ impl AddressBook { } /// Returns true if the given [`SocketAddr`] has recently sent us a message. - pub fn recently_live_addr(&self, addr: &SocketAddr) -> bool { + pub fn recently_live_addr(&self, addr: SocketAddr) -> bool { let _guard = self.span.enter(); - match self.by_addr.get(addr) { + match self.by_addr.get(&addr) { None => false, - // NeverAttempted, Failed, and AttemptPending peers should never be live + // Responded peers are the only peers that can be live Some(peer) => { - peer.last_connection_state == PeerAddrState::Responded - && peer.get_last_seen() > AddressBook::liveness_cutoff_time() + peer.get_last_success().unwrap_or(chrono::MIN_DATETIME) + > AddressBook::liveness_cutoff_time() } } } - /// Returns true if the given [`SocketAddr`] is pending a reconnection - /// attempt. - pub fn pending_reconnection_addr(&self, addr: &SocketAddr) -> bool { + /// Returns true if the given [`SocketAddr`] had a recent connection attempt. + pub fn recently_attempted_addr(&self, addr: SocketAddr) -> bool { let _guard = self.span.enter(); - match self.by_addr.get(addr) { + match self.by_addr.get(&addr) { None => false, - Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending, + Some(peer) => { + peer.get_last_attempt().unwrap_or(chrono::MIN_DATETIME) + > AddressBook::liveness_cutoff_time() + } } } - /// Returns true if the given [`SocketAddr`] might be connected to a node - /// feeding timestamps into this address book. - pub fn maybe_connected_addr(&self, addr: &SocketAddr) -> bool { - self.recently_live_addr(addr) || self.pending_reconnection_addr(addr) + /// Returns true if the given [`SocketAddr`] recently failed. + pub fn recently_failed_addr(&self, addr: SocketAddr) -> bool { + let _guard = self.span.enter(); + match self.by_addr.get(&addr) { + None => false, + Some(peer) => { + peer.get_last_failed().unwrap_or(chrono::MIN_DATETIME) + > AddressBook::liveness_cutoff_time() + } + } } - /// Return an iterator over all peers. - /// - /// Returns peers in reconnection attempt order, then recently live peers in - /// an arbitrary order. - pub fn peers(&'_ self) -> impl Iterator + '_ { + /// Returns true if the given [`SocketAddr`] had recent attempts, successes, + /// or failures. + pub fn recently_used_addr(&self, addr: SocketAddr) -> bool { + self.recently_live_addr(addr) + || self.recently_attempted_addr(addr) + || self.recently_failed_addr(addr) + } + + /// Return an unordered iterator over all peers. + fn peers_unordered(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.reconnection_peers() - .chain(self.maybe_connected_peers()) + self.by_addr.values() } - /// Return an iterator over peers that are due for a reconnection attempt, - /// in reconnection attempt order. - pub fn reconnection_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over peers that we've recently tried to connect to, + /// in arbitrary order. + fn recently_attempted_peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - // TODO: optimise, if needed, or get rid of older peers + self.peers_unordered() + .filter(move |peer| self.recently_attempted_addr(peer.addr)) + .cloned() + } + + /// Return an iterator over peers that have recently sent us messages, + /// in arbitrary order. + fn recently_live_peers(&'_ self) -> impl Iterator + '_ { + let _guard = self.span.enter(); - // Skip live peers, and peers pending a reconnect attempt, then sort using BTreeSet - self.by_addr - .values() - .filter(move |peer| !self.maybe_connected_addr(&peer.addr)) - .collect::>() - .into_iter() + self.peers_unordered() + .filter(move |peer| self.recently_live_addr(peer.addr)) .cloned() } - /// Return an iterator over all the peers in `state`, in arbitrary order. - pub fn state_peers(&'_ self, state: PeerAddrState) -> impl Iterator + '_ { + /// Return an iterator over peers that have recently failed, + /// in arbitrary order. + fn recently_failed_peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.by_addr - .values() - .filter(move |peer| peer.last_connection_state == state) + self.peers_unordered() + .filter(move |peer| self.recently_failed_addr(peer.addr)) .cloned() } - /// Return an iterator over peers that might be connected, in arbitrary - /// order. - pub fn maybe_connected_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over peers that had recent attempts, successes, or failures, + /// in arbitrary order. + pub fn recently_used_peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.by_addr - .values() - .filter(move |peer| self.maybe_connected_addr(&peer.addr)) + self.peers_unordered() + .filter(move |peer| self.recently_used_addr(peer.addr)) .cloned() } - /// Return an iterator over peers we've seen recently, in arbitrary order. - pub fn recently_live_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over candidate peers, in arbitrary order. + /// + /// Candidate peers have not had recent attempts, successes, or failures. + fn candidate_peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.by_addr - .values() - .filter(move |peer| self.recently_live_addr(&peer.addr)) + // Skip recently used peers (including live peers) + self.peers_unordered() + .filter(move |peer| !self.recently_used_addr(peer.addr)) .cloned() } - /// Returns an iterator that drains entries from the address book. + /// Return the next peer that is due for a connection attempt. + pub fn next_candidate_peer(&self) -> Option { + let _guard = self.span.enter(); + + self.candidate_peers().min() + } + + /// Return the number of candidate peers. /// - /// Removes entries in reconnection attempt then arbitrary order, - /// see [`peers`] for details. - pub fn drain(&'_ mut self) -> impl Iterator + '_ { - Drain { book: self } + /// This number can change over time as recently used peers expire. + pub fn candidate_peer_count(&self) -> usize { + let _guard = self.span.enter(); + + self.candidate_peers().count() } /// Returns the number of entries in this address book. @@ -312,31 +366,74 @@ impl AddressBook { self.by_addr.len() } + /// Is this address book empty? + pub fn is_empty(&self) -> bool { + self.by_addr.is_empty() + } + /// Returns metrics for the addresses in this address book. pub fn address_metrics(&self) -> AddressMetrics { - let responded = self.state_peers(PeerAddrState::Responded).count(); + let responded = self + .peers_unordered() + .filter(|peer| matches!(peer.last_connection_state, PeerAddrState::Responded { .. })) + .count(); + let never_attempted_seed = self + .peers_unordered() + .filter(|peer| { + matches!( + peer.last_connection_state, + PeerAddrState::NeverAttemptedSeed + ) + }) + .count(); let never_attempted_gossiped = self - .state_peers(PeerAddrState::NeverAttemptedGossiped) + .peers_unordered() + .filter(|peer| { + matches!( + peer.last_connection_state, + PeerAddrState::NeverAttemptedGossiped { .. } + ) + }) .count(); let never_attempted_alternate = self - .state_peers(PeerAddrState::NeverAttemptedAlternate) + .peers_unordered() + .filter(|peer| { + matches!( + peer.last_connection_state, + PeerAddrState::NeverAttemptedAlternate { .. } + ) + }) + .count(); + let failed = self + .peers_unordered() + .filter(|peer| matches!(peer.last_connection_state, PeerAddrState::Failed { .. })) + .count(); + let attempt_pending = self + .peers_unordered() + .filter(|peer| { + matches!( + peer.last_connection_state, + PeerAddrState::AttemptPending { .. } + ) + }) .count(); - let failed = self.state_peers(PeerAddrState::Failed).count(); - let attempt_pending = self.state_peers(PeerAddrState::AttemptPending).count(); + let recently_attempted = self.recently_attempted_peers().count(); let recently_live = self.recently_live_peers().count(); - let recently_stopped_responding = responded - .checked_sub(recently_live) - .expect("all recently live peers must have responded"); + let recently_failed = self.recently_failed_peers().count(); + let connection_candidates = self.len() - self.recently_used_peers().count(); AddressMetrics { responded, + never_attempted_seed, never_attempted_gossiped, never_attempted_alternate, failed, attempt_pending, + recently_attempted, recently_live, - recently_stopped_responding, + recently_failed, + connection_candidates, } } @@ -346,30 +443,38 @@ impl AddressBook { let m = self.address_metrics(); + // States // TODO: rename to address_book.[state_name] - metrics::gauge!("candidate_set.responded", m.responded as f64); + metrics::gauge!("candidate_set.seed", m.never_attempted_seed as f64); metrics::gauge!("candidate_set.gossiped", m.never_attempted_gossiped as f64); metrics::gauge!( "candidate_set.alternate", m.never_attempted_alternate as f64 ); + metrics::gauge!("candidate_set.responded", m.responded as f64); metrics::gauge!("candidate_set.failed", m.failed as f64); metrics::gauge!("candidate_set.pending", m.attempt_pending as f64); - // TODO: rename to address_book.responded.recently_live + // Times + metrics::gauge!( + "candidate_set.recently_attempted", + m.recently_attempted as f64 + ); metrics::gauge!("candidate_set.recently_live", m.recently_live as f64); - // TODO: rename to address_book.responded.stopped_responding + metrics::gauge!("candidate_set.recently_failed", m.recently_failed as f64); + + // Candidates (state and time based) metrics::gauge!( - "candidate_set.disconnected", - m.recently_stopped_responding as f64 + "candidate_set.connection_candidates", + m.connection_candidates as f64 ); std::mem::drop(_guard); - self.log_metrics(&m); + self.log_metrics(m); } /// Log metrics for this address book - fn log_metrics(&mut self, m: &AddressMetrics) { + fn log_metrics(&mut self, m: AddressMetrics) { let _guard = self.span.enter(); trace!( @@ -417,26 +522,13 @@ impl AddressBook { } } -impl Extend for AddressBook { +impl Extend for AddressBook { fn extend(&mut self, iter: T) where - T: IntoIterator, + T: IntoIterator, { - for meta in iter.into_iter() { - self.update(meta); + for change in iter.into_iter() { + self.update(change); } } } - -struct Drain<'a> { - book: &'a mut AddressBook, -} - -impl<'a> Iterator for Drain<'a> { - type Item = MetaAddr; - - fn next(&mut self) -> Option { - let next_item_addr = self.book.peers().next()?.addr; - self.book.take(next_item_addr) - } -} diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index 0bf4a047ef5..c1cd494d782 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -1,4 +1,6 @@ //! An address-with-metadata type used in Bitcoin networking. +//! +//! In Zebra, [`MetaAddr`]s also track Zebra-specific peer state. use std::{ cmp::{Ord, Ordering}, @@ -17,83 +19,388 @@ use zebra_chain::serialization::{ use crate::protocol::{external::MAX_PROTOCOL_MESSAGE_LEN, types::PeerServices}; +use MetaAddrChange::*; use PeerAddrState::*; +#[cfg(any(test, feature = "proptest-impl"))] +use proptest::option; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; + #[cfg(any(test, feature = "proptest-impl"))] -mod arbitrary; +use zebra_chain::primitives::arbitrary::{datetime_full, datetime_u32}; #[cfg(test)] mod tests; /// Peer connection state, based on our interactions with the peer. /// -/// Zebra also tracks how recently a peer has sent us messages, and derives peer -/// liveness based on the current time. This derived state is tracked using -/// [`AddressBook::maybe_connected_peers`] and -/// [`AddressBook::reconnection_peers`]. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +/// Zebra also tracks how recently we've had different kinds of interactions with +/// each peer, and derives peer usage based on the current time. This derived state +/// is tracked using [`AddressBook::recently_used_peers`] and +/// [`AddressBook::candidate_peers`]. +/// +/// To avoid depending on untrusted or default data, Zebra tracks the required +/// and optional data in each state. State updates are applied using +/// [`MetaAddrChange`]s. +/// +/// See the [`CandidateSet`] for a detailed peer state diagram. +#[derive(Copy, Clone, Debug)] #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub enum PeerAddrState { - /// The peer has sent us a valid message. + /// The peer's address is a seed address. /// - /// Peers remain in this state, even if they stop responding to requests. - /// (Peer liveness is derived from the `last_seen` timestamp, and the current - /// time.) - Responded, + /// Seed addresses can come from: + /// - hard-coded IP addresses in the config, + /// - DNS names that resolve to a single peer, or + /// - DNS seeders, which resolve to a dynamic list of peers. + /// + /// Zebra attempts to connect to all seed peers on startup. As part of these + /// connection attempts, the initial peer connector updates all seed peers to + /// [`AttemptPending`]. + /// + /// If any seed addresses remain in the address book, they are attempted after + /// disconnected [`Responded`] peers. + NeverAttemptedSeed, - /// The peer's address has just been fetched from a DNS seeder, or via peer - /// gossip, but we haven't attempted to connect to it yet. - NeverAttemptedGossiped, + /// The peer's address has just been fetched via peer gossip, but we haven't + /// attempted to connect to it yet. + /// + /// Gossiped addresses are attempted after seed addresses. + NeverAttemptedGossiped { + /// The time that another node claims to have connected to this peer. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_u32()") + )] + untrusted_last_seen: DateTime, + /// The services another node claims this peer advertises. + /// See [`get_services`] for details. + untrusted_services: PeerServices, + }, - /// The peer's address has just been received as part of a `Version` message, + /// The peer's address has just been received as part of a [`Version`] message, /// so we might already be connected to this peer. /// /// Alternate addresses are attempted after gossiped addresses. - NeverAttemptedAlternate, + NeverAttemptedAlternate { + /// The last time another node gave us this address as their canonical + /// address. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + untrusted_last_seen: DateTime, + /// The services another node gave us along with their canonical address. + /// See [`get_services`] for details. + untrusted_services: PeerServices, + }, + + /// We are about to start a connection attempt to this peer. + /// + /// Pending addresses are retried last. + AttemptPending { + /// The last time we made an outbound attempt to this peer. + /// See [`get_last_attempt`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_attempt: DateTime, + /// The last time we made a successful outbound connection to this peer. + /// See [`get_last_success`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + last_success: Option>, + /// The last time an outbound connection to this peer failed. + /// See [`get_last_failed`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + last_failed: Option>, + /// The last time another node claimed this peer was valid. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + untrusted_last_seen: Option>, + /// The services another node claims this peer advertises. + /// See [`get_services`] for details. + untrusted_services: Option, + }, + + /// The peer has sent us a valid message. + /// + /// Peers remain in this state, even if they stop responding to requests. + /// (Peer liveness is derived from the [`last_success`] time, and the current + /// time.) + /// + /// Disconnected responded peers are retried first. + Responded { + /// The last time we made an outbound attempt to this peer. + /// See [`get_last_attempt`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_attempt: DateTime, + /// The last time we made a successful outbound connection to this peer. + /// See [`get_last_success`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_success: DateTime, + /// The last time an outbound connection to this peer failed. + /// See [`get_last_failed`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + last_failed: Option>, + /// The last time another node claimed this peer was valid. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + untrusted_last_seen: Option>, + /// The services advertised by this directly connected peer. + /// See [`get_services`] for details. + services: PeerServices, + }, /// The peer's TCP connection failed, or the peer sent us an unexpected /// Zcash protocol message, so we failed the connection. - Failed, + /// + /// Failed peers are retried after disconnected [`Responded`] and never + /// attempted peers. + Failed { + /// The last time we made an outbound attempt to this peer. + /// See [`get_last_attempt`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_attempt: DateTime, + /// The last time we made a successful outbound connection to this peer. + /// See [`get_last_success`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + last_success: Option>, + /// The last time an outbound connection to this peer failed. + /// See [`get_last_failed`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_full()") + )] + last_failed: DateTime, + /// The last time another node claimed this peer was valid. + /// See [`get_untrusted_last_seen`] for details. + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "option::of(datetime_full())") + )] + untrusted_last_seen: Option>, + /// The services claimed by another peer or advertised by this peer. + /// See [`get_services`] for details. + // + // TODO: do we need to distinguish direct and untrusted services? + untrusted_services: Option, + }, +} + +impl PeerAddrState { + /// The last time we attempted to make a direct outbound connection to the + /// address of this peer. + /// + /// Only updated by the [`AttemptPending`] state. + /// Also present in [`Responded`] and [`Failed`]. + pub fn get_last_attempt(&self) -> Option> { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { .. } => None, + NeverAttemptedAlternate { .. } => None, + AttemptPending { last_attempt, .. } => Some(*last_attempt), + Responded { last_attempt, .. } => Some(*last_attempt), + Failed { last_attempt, .. } => Some(*last_attempt), + } + } + + /// The last time we successfully made a direct outbound connection to the + /// address of this peer. + /// + /// Only updated by the [`Responded`] state. + /// Also optionally present in [`Failed`] and [`AttemptPending`]. + pub fn get_last_success(&self) -> Option> { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { .. } => None, + NeverAttemptedAlternate { .. } => None, + AttemptPending { last_success, .. } => *last_success, + Responded { last_success, .. } => Some(*last_success), + Failed { last_success, .. } => *last_success, + } + } + + /// The last time a direct outbound connection to the address of this peer + /// failed. + /// + /// Only updated by the [`Failed`] state. + /// Also optionally present in [`AttemptPending`] and [`Responded`]. + pub fn get_last_failed(&self) -> Option> { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { .. } => None, + NeverAttemptedAlternate { .. } => None, + AttemptPending { last_failed, .. } => *last_failed, + Responded { last_failed, .. } => *last_failed, + Failed { last_failed, .. } => Some(*last_failed), + } + } + + /// The last time another peer successfully connected to this peer. + /// + /// Only updated by the [`NeverAttemptedGossiped`] and + /// [`NeverAttemptedAlternate`] states. + /// + /// Optionally present in the [`AttemptPending`], [`Responded`], and [`Failed`] + /// states. + pub fn get_untrusted_last_seen(&self) -> Option> { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { + untrusted_last_seen, + .. + } => Some(*untrusted_last_seen), + NeverAttemptedAlternate { + untrusted_last_seen, + .. + } => Some(*untrusted_last_seen), + AttemptPending { + untrusted_last_seen, + .. + } => *untrusted_last_seen, + Responded { + untrusted_last_seen, + .. + } => *untrusted_last_seen, + Failed { + untrusted_last_seen, + .. + } => *untrusted_last_seen, + } + } - /// We just started a connection attempt to this peer. - AttemptPending, + /// Only updated by the [`NeverAttemptedGossiped`] and + /// [`NeverAttemptedAlternate`] states. + /// + /// Optionally present in the [`AttemptPending`], [`Responded`], and [`Failed`] + /// states. + pub fn get_untrusted_services(&self) -> Option { + match self { + NeverAttemptedSeed => None, + NeverAttemptedGossiped { + untrusted_services, .. + } => Some(*untrusted_services), + NeverAttemptedAlternate { + untrusted_services, .. + } => Some(*untrusted_services), + AttemptPending { + untrusted_services, .. + } => *untrusted_services, + Responded { services, .. } => Some(*services), + Failed { + untrusted_services, .. + } => *untrusted_services, + } + } } // non-test code should explicitly specify the peer address state #[cfg(test)] impl Default for PeerAddrState { fn default() -> Self { - NeverAttemptedGossiped + NeverAttemptedGossiped { + untrusted_last_seen: Utc::now(), + untrusted_services: PeerServices::NODE_NETWORK, + } } } impl Ord for PeerAddrState { - /// `PeerAddrState`s are sorted in approximate reconnection attempt + /// [`PeerAddrState`]s are sorted in approximate reconnection attempt /// order, ignoring liveness. /// /// See [`CandidateSet`] and [`MetaAddr::cmp`] for more details. fn cmp(&self, other: &Self) -> Ordering { use Ordering::*; - match (self, other) { - (Responded, Responded) - | (Failed, Failed) - | (NeverAttemptedGossiped, NeverAttemptedGossiped) - | (NeverAttemptedAlternate, NeverAttemptedAlternate) - | (AttemptPending, AttemptPending) => Equal, + let responded_never_failed_pending = match (self, other) { + // If the states are the same, use the times to choose an order + (Responded { .. }, Responded { .. }) + | (Failed { .. }, Failed { .. }) + | (NeverAttemptedGossiped { .. }, NeverAttemptedGossiped { .. }) + | (NeverAttemptedAlternate { .. }, NeverAttemptedAlternate { .. }) + | (NeverAttemptedSeed, NeverAttemptedSeed) + | (AttemptPending { .. }, AttemptPending { .. }) => Equal, + // We reconnect to `Responded` peers that have stopped sending messages, - // then `NeverAttempted` peers, then `Failed` peers - (Responded, _) => Less, - (_, Responded) => Greater, - (NeverAttemptedGossiped, _) => Less, - (_, NeverAttemptedGossiped) => Greater, - (NeverAttemptedAlternate, _) => Less, - (_, NeverAttemptedAlternate) => Greater, - (Failed, _) => Less, - (_, Failed) => Greater, + // then `NeverAttempted...` peers, then `Failed` peers + (Responded { .. }, _) => Less, + (_, Responded { .. }) => Greater, + (NeverAttemptedSeed, _) => Less, + (_, NeverAttemptedSeed) => Greater, + (NeverAttemptedGossiped { .. }, _) => Less, + (_, NeverAttemptedGossiped { .. }) => Greater, + (NeverAttemptedAlternate { .. }, _) => Less, + (_, NeverAttemptedAlternate { .. }) => Greater, + (Failed { .. }, _) => Less, + (_, Failed { .. }) => Greater, // AttemptPending is covered by the other cases - } + }; + + // Prioritise successful peers: + // - try the most recent successful peers first + // - try the oldest failed peers before re-trying the same peer + // - re-try the oldest attempted peers first + // - try the most recent last seen times first (untrusted - from remote peers) + // - use the services as a tie-breaker + // + // `None` is earlier than any `Some(time)`, which means: + // - peers that have never succeeded or with no last seen times sort last + // - peers that have never failed or attempted sort first + // + // # Security + // + // Ignore untrusted times if we have any local times. + let recent_successful = self + .get_last_success() + .cmp(&other.get_last_success()) + .reverse(); + let older_failed = self.get_last_failed().cmp(&other.get_last_failed()); + let older_attempted = self.get_last_attempt().cmp(&other.get_last_attempt()); + let recent_untrusted_last_seen = self + .get_untrusted_last_seen() + .cmp(&other.get_untrusted_last_seen()) + .reverse(); + let services_tie_breaker = self + .get_untrusted_services() + .cmp(&other.get_untrusted_services()); + + responded_never_failed_pending + .then(recent_successful) + .then(older_failed) + .then(older_attempted) + .then(recent_untrusted_last_seen) + .then(services_tie_breaker) } } @@ -103,51 +410,407 @@ impl PartialOrd for PeerAddrState { } } +impl PartialEq for PeerAddrState { + fn eq(&self, other: &Self) -> bool { + use Ordering::*; + self.cmp(other) == Equal + } +} + +impl Eq for PeerAddrState {} + +/// A change to a [`MetaAddr`] in an [`AddressBook`]. +/// +/// Most [`PeerAddrState`]s have a corresponding [`Change`]: +/// - `New...` changes create a new address book entry, or add fields to an +/// existing `NeverAttempted...` address book entry. +/// - `Update...` changes update the state, and add or update fields in an +/// existing address book entry. +/// The [`UpdateShutdown`] preserves the [`Responded`] state, but changes all +/// other states to [`Failed`]. +/// +/// See the [`CandidateSet`] for a detailed peer state diagram. +#[derive(Copy, Clone, Debug)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] +pub enum MetaAddrChange { + /// A new seed peer [`MetaAddr`]. + /// + /// The initial peers config provides the address. + /// Gossiped or alternate changes can provide missing services. + /// (But they can't update an existing service value.) + /// Existing services can be updated by future handshakes with this peer. + /// + /// Sets the untrusted last seen time and services to [`None`]. + NewSeed { addr: SocketAddr }, + + /// A new gossiped peer [`MetaAddr`]. + /// + /// [`Addr`] messages provide an address, services, and a last seen time. + /// The services can be updated by future handshakes with this peer. + /// The untrusted last seen time is overridden by any last success time. + NewGossiped { + addr: SocketAddr, + untrusted_services: PeerServices, + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "datetime_u32()") + )] + untrusted_last_seen: DateTime, + }, + + /// A new alternate peer [`MetaAddr`]. + /// + /// [`Version`] messages provide the canonical address and its services. + /// The services can be updated by future handshakes with this peer. + /// + /// Sets the untrusted last seen time to the current time. + /// (Becase a connected peer just gave us this address.) + NewAlternate { + addr: SocketAddr, + untrusted_services: PeerServices, + }, + + /// We have started a connection attempt to this peer. + /// + /// Sets the last attempt time to the current time. + UpdateAttempt { addr: SocketAddr }, + + /// A peer has sent us a message, after a successful handshake on an outbound + /// connection. + /// + /// The services are updated based on the handshake with this peer. + /// + /// Sets the last success time to the current time. + UpdateResponded { + addr: SocketAddr, + services: PeerServices, + }, + + /// A connection to this peer has failed. + /// + /// If the handshake with this peer succeeded, update its services. + /// + /// Sets the last failed time to the current time. + UpdateFailed { + addr: SocketAddr, + services: Option, + }, + + /// A connection to this peer has shut down. + /// + /// If the handshake with this peer succeeded, update its services. + /// + /// If the peer is in the [`Responded`] state, do nothing. + /// Otherwise, mark the peer as [`Failed`], and set the last failed time to the + /// current time. + UpdateShutdown { + addr: SocketAddr, + services: Option, + }, +} + +impl MetaAddrChange { + /// Return the address for the change. + pub fn get_addr(&self) -> SocketAddr { + match self { + NewSeed { addr, .. } => *addr, + NewGossiped { addr, .. } => *addr, + NewAlternate { addr, .. } => *addr, + UpdateAttempt { addr } => *addr, + UpdateResponded { addr, .. } => *addr, + UpdateFailed { addr, .. } => *addr, + UpdateShutdown { addr, .. } => *addr, + } + } + + /// Return the services for the change, if available. + pub fn get_untrusted_services(&self) -> Option { + match self { + NewSeed { .. } => None, + NewGossiped { + untrusted_services, .. + } => Some(*untrusted_services), + NewAlternate { + untrusted_services, .. + } => Some(*untrusted_services), + UpdateAttempt { .. } => None, + UpdateResponded { services, .. } => Some(*services), + UpdateFailed { services, .. } => *services, + UpdateShutdown { services, .. } => *services, + } + } + + /// Return the untrusted last seen time for the change, if available. + pub fn get_untrusted_last_seen(&self) -> Option> { + match self { + NewGossiped { + untrusted_last_seen, + .. + } => Some(*untrusted_last_seen), + NewSeed { .. } + | NewAlternate { .. } + | UpdateAttempt { .. } + | UpdateResponded { .. } + | UpdateFailed { .. } + | UpdateShutdown { .. } => None, + } + } + + /// Is this address valid for outbound connections? + /// + /// `book_entry` is the entry for [`self.get_addr`] in the address book. + /// + /// Assmes that missing fields or entries are valid. + pub fn is_valid_for_outbound(&self, book_entry: Option) -> bool { + // Use the latest valid info we have + self.into_meta_addr(book_entry) + .or(book_entry) + .map(|meta_addr| meta_addr.is_valid_for_outbound()) + .unwrap_or(true) + } + + /// Is this address a directly connected client? + /// + /// `book_entry` is the entry for [`self.get_addr`] in the address book. + /// + /// Assmes that missing fields or entries are not clients. + pub fn is_direct_client(&self, book_entry: Option) -> bool { + // Use the latest valid info we have + self.into_meta_addr(book_entry) + .or(book_entry) + .map(|meta_addr| meta_addr.is_direct_client()) + .unwrap_or(false) + } + + /// Apply this change to `old_entry`, returning an updated entry. + /// + /// [`None`] means "no update". + /// + /// Ignores out-of-order updates: + /// - the same address can arrive from multiple sources, + /// - messages are sent by multiple tasks, and + /// - multiple tasks read the address book, + /// so messaging ordering is not guaranteed. + /// (But the address book should eventually be consistent.) + pub fn into_meta_addr(self, old_entry: Option) -> Option { + let has_been_attempted = old_entry + .map(|old| old.has_been_attempted()) + .unwrap_or(false); + + let old_last_attempt = old_entry.map(|old| old.get_last_attempt()).flatten(); + let old_last_success = old_entry.map(|old| old.get_last_success()).flatten(); + let old_last_failed = old_entry.map(|old| old.get_last_failed()).flatten(); + let old_untrusted_last_seen = old_entry.map(|old| old.get_untrusted_last_seen()).flatten(); + + let old_untrusted_services = old_entry.map(|old| old.get_untrusted_services()).flatten(); + + match self { + // New seed, not in address book: + NewSeed { addr } if old_entry.is_none() => { + Some(MetaAddr::new(addr, NeverAttemptedSeed)) + } + + // New gossiped or alternate, not attempted: + // Update state and services, but ignore updates to untrusted last seen + NewGossiped { + addr, + untrusted_services, + untrusted_last_seen, + } if !has_been_attempted => Some(MetaAddr::new( + addr, + NeverAttemptedGossiped { + untrusted_services, + // Keep the first time we got + untrusted_last_seen: old_untrusted_last_seen.unwrap_or(untrusted_last_seen), + }, + )), + NewAlternate { + addr, + untrusted_services, + } if !has_been_attempted => Some(MetaAddr::new( + addr, + NeverAttemptedAlternate { + untrusted_services, + // Keep the first time we got + untrusted_last_seen: old_untrusted_last_seen.unwrap_or_else(Utc::now), + }, + )), + + // New entry, but already existing (seed) or attempted (others): + // Skip the update entirely + NewSeed { .. } | NewGossiped { .. } | NewAlternate { .. } => None, + + // Attempt: + // Update last_attempt + UpdateAttempt { addr } => Some(MetaAddr::new( + addr, + AttemptPending { + last_attempt: Utc::now(), + last_success: old_last_success, + last_failed: old_last_failed, + untrusted_last_seen: old_untrusted_last_seen, + untrusted_services: old_untrusted_services, + }, + )), + + // Responded: + // Update last_success and services + UpdateResponded { addr, services } => Some(MetaAddr::new( + addr, + Responded { + // When the attempt message arrives, it will replace this default. + // (But it's a reasonable default anyway.) + last_attempt: old_last_attempt.unwrap_or_else(Utc::now), + last_success: Utc::now(), + last_failed: old_last_failed, + untrusted_last_seen: old_untrusted_last_seen, + services, + }, + )), + + // Failed: + // Update last_failed and services if present + UpdateFailed { addr, services } => Some(MetaAddr::new( + addr, + Failed { + // When the attempt message arrives, it will replace this default. + // (But it's a reasonable default anyway.) + last_attempt: old_last_attempt.unwrap_or_else(Utc::now), + last_success: old_last_success, + last_failed: Utc::now(), + untrusted_last_seen: old_untrusted_last_seen, + // replace old services with new services if present + untrusted_services: services.or(old_untrusted_services), + }, + )), + + // Shutdown: + UpdateShutdown { + addr, + services: new_services, + } => { + match old_entry.map(|old| old.last_connection_state) { + // Responded: + // Keep as responded, update services if present + Some(Responded { + last_attempt, + last_success, + last_failed, + untrusted_last_seen, + services: old_services, + }) => Some(MetaAddr::new( + addr, + Responded { + last_attempt, + last_success, + last_failed, + untrusted_last_seen, + // this could be redundant, but it handles multiple + // connections and update reordering better + services: new_services.unwrap_or(old_services), + }, + )), + + // Attempted or Failed: + // Change to Failed + // Update last_failed and services if present + Some(AttemptPending { .. }) + | Some(Failed { .. }) + // Unexpected states: change to Failed anyway + | Some(NeverAttemptedSeed) + | Some(NeverAttemptedGossiped { .. }) + | Some(NeverAttemptedAlternate { .. }) + | None => Some(MetaAddr::new( + addr, + Failed { + // When the attempt message arrives, it will replace this default. + // (But it's a reasonable default anyway.) + last_attempt: old_last_attempt.unwrap_or_else(Utc::now), + last_success: old_last_success, + last_failed: Utc::now(), + untrusted_last_seen: old_untrusted_last_seen, + untrusted_services: new_services.or(old_untrusted_services), + }, + )), + } + } + } + } +} + /// An address with metadata on its advertised services and last-seen time. /// /// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#Network_address) -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub struct MetaAddr { /// The peer's address. - pub addr: SocketAddr, - - /// The services advertised by the peer. /// - /// The exact meaning depends on `last_connection_state`: - /// - `Responded`: the services advertised by this peer, the last time we - /// performed a handshake with it - /// - `NeverAttempted`: the unverified services provided by the remote peer - /// that sent us this address - /// - `Failed` or `AttemptPending`: unverified services via another peer, - /// or services advertised in a previous handshake + /// The exact meaning depends on [`last_connection_state`]: + /// - [`Responded`]: the address we used to make a direct outbound connection + /// to this peer + /// - [`NeverAttemptedSeed: an unverified address provided by the seed config + /// - [`NeverAttemptedGossiped`]: an unverified address and services provided + /// by a remote peer + /// - [`NeverAttemptedAlternate`]: a directly connected peer claimed that + /// this address was its canonical address in its [`Version`] message, + /// (and provided services). But either: + /// - the peer made an inbound connection to us, or + /// - the address we used to make a direct outbound connection was + /// different from the canonical address + /// - [`Failed`] or [`AttemptPending`]: an unverified seeder, gossiped or + /// alternate address, or an address from a previous direct outbound + /// connection /// /// ## Security /// - /// `services` from `NeverAttempted` peers may be invalid due to outdated - /// records, older peer versions, or buggy or malicious peers. - pub services: PeerServices, + /// `addr`s from non-`Responded` peers may be invalid due to outdated + /// records, or buggy or malicious peers. + // + // TODO: make the addr private to MetaAddr and AddressBook + pub(super) addr: SocketAddr, - /// The last time we interacted with this peer. + /// The outcome of our most recent direct outbound connection to this peer. /// - /// See `get_last_seen` for details. - last_seen: DateTime, - - /// The outcome of our most recent communication attempt with this peer. - pub last_connection_state: PeerAddrState, + /// If we haven't made a direct connection to this peer, contains untrusted + /// information provided by other peers (or seeds). + // + // TODO: make the state private to MetaAddr and AddressBook + pub(super) last_connection_state: PeerAddrState, } impl MetaAddr { - /// Create a new seed [`MetaAddr`, based on the configured seed addresses. + /// Create a new [`MetaAddr`] from its parts. + /// + /// This function should only be used by the [`meta_addr`] and [`address_book`] + /// modules. Other callers should use a more specific [`MetaAddr`] or + /// [`MetaAddrChange`] constructor. + fn new(addr: SocketAddr, last_connection_state: PeerAddrState) -> MetaAddr { + MetaAddr { + addr, + last_connection_state, + } + } + + /// Create a new seed [`MetaAddr`], based on the configured seed addresses. pub fn new_seed_meta_addr(addr: SocketAddr) -> MetaAddr { MetaAddr { addr, - // TODO: stop guessing this field - services: PeerServices::NODE_NETWORK, - // TODO: remove this time, it's far too optimistic. Using `now` can - // keep invalid peers in the consensus peer set for a long time. - last_seen: Utc::now(), - // TODO: replace with NeverAttemptedSeed - last_connection_state: NeverAttemptedGossiped, + last_connection_state: NeverAttemptedSeed, + } + } + + /// Add or update an [`AddressBook`] entry, based on the address provided by + /// the seeder. + /// + /// Panics unless `meta_addr` is in the [`NeverAttemptedSeed`] state. + pub fn new_seed_change(meta_addr: MetaAddr) -> MetaAddrChange { + if meta_addr.last_connection_state == NeverAttemptedSeed { + NewSeed { + addr: meta_addr.addr, + } + } else { + panic!("unexpected non-NeverAttemptedSeed state: {:?}", meta_addr) } } @@ -160,14 +823,52 @@ impl MetaAddr { ) -> MetaAddr { MetaAddr { addr, - services: untrusted_services, - last_seen: untrusted_last_seen, - // the state is Zebra-specific, it isn't part of the Zcash network protocol - last_connection_state: NeverAttemptedGossiped, + last_connection_state: NeverAttemptedGossiped { + untrusted_last_seen, + untrusted_services, + }, + } + } + + /// Add or update an [`AddressBook`] entry, based on a gossiped peer [`Addr`] + /// message. + /// + /// Panics unless `meta_addr` is in the [`NeverAttemptedGossiped`] state. + pub fn new_gossiped_change(meta_addr: MetaAddr) -> MetaAddrChange { + if let NeverAttemptedGossiped { + untrusted_last_seen, + untrusted_services, + } = meta_addr.last_connection_state + { + NewGossiped { + addr: meta_addr.addr, + untrusted_last_seen, + untrusted_services, + } + } else { + panic!( + "unexpected non-NeverAttemptedGossiped state: {:?}", + meta_addr + ) + } + } + + /// Add or update an [`AddressBook`] entry, based on the canonical address in a + /// peer's [`Version`] message. + pub fn new_alternate(addr: SocketAddr, untrusted_services: PeerServices) -> MetaAddrChange { + NewAlternate { + addr, + untrusted_services, } } - /// Create a new `MetaAddr` for a peer that has just `Responded`. + /// Update an [`AddressBook`] entry when we start connecting to a peer. + pub fn update_attempt(addr: SocketAddr) -> MetaAddrChange { + UpdateAttempt { addr } + } + + /// Update an [`AddressBook`] entry when a peer sends a message after a + /// successful handshake. /// /// # Security /// @@ -175,120 +876,191 @@ impl MetaAddr { /// and the services must be the services from that peer's handshake. /// /// Otherwise: - /// - malicious peers could interfere with other peers' `AddressBook` state, + /// - malicious peers could interfere with other peers' [`AddressBook`] state, /// or /// - Zebra could advertise unreachable addresses to its own peers. - pub fn new_responded(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - MetaAddr { - addr: *addr, - services: *services, - last_seen: Utc::now(), - last_connection_state: Responded, - } + pub fn update_responded(addr: SocketAddr, services: PeerServices) -> MetaAddrChange { + UpdateResponded { addr, services } } - /// Create a new `MetaAddr` for a peer that we want to reconnect to. - pub fn new_reconnect(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - MetaAddr { - addr: *addr, - services: *services, - last_seen: Utc::now(), - last_connection_state: AttemptPending, - } + /// Update an [`AddressBook`] entry when a peer connection fails. + pub fn update_failed(addr: SocketAddr, services: Option) -> MetaAddrChange { + UpdateFailed { addr, services } } - /// Create a new `MetaAddr` for a peer's alternate address, received via a - /// `Version` message. - pub fn new_alternate(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - MetaAddr { - addr: *addr, - services: *services, - last_seen: Utc::now(), - last_connection_state: NeverAttemptedAlternate, - } + /// Update an [`AddressBook`] entry when a peer connection shuts down. + pub fn update_shutdown(addr: SocketAddr, services: Option) -> MetaAddrChange { + UpdateShutdown { addr, services } } - /// Create a new `MetaAddr` for our own listener address. - pub fn new_local_listener(addr: &SocketAddr) -> MetaAddr { - MetaAddr { - addr: *addr, + /// Add or update our local listener address in an [`AddressBook`]. + /// + /// See [`AddressBook::get_local_listener`] for details. + pub fn new_local_listener(addr: SocketAddr) -> MetaAddrChange { + NewAlternate { + addr, + // Note: in this unique case, the services are actually trusted + // // TODO: create a "local services" constant - services: PeerServices::NODE_NETWORK, - last_seen: Utc::now(), - last_connection_state: Responded, + untrusted_services: PeerServices::NODE_NETWORK, } } - /// Create a new `MetaAddr` for a peer that has just had an error. - pub fn new_errored(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - MetaAddr { - addr: *addr, - services: *services, - last_seen: Utc::now(), - last_connection_state: Failed, - } + /// The services advertised by the peer. + /// + /// The exact meaning depends on [`last_connection_state`]: + /// - [`Responded`]: the services advertised by this peer, the last time we + /// performed a handshake with it + /// - [`NeverAttemptedGossiped`]: the unverified services provided by the + /// remote peer that sent us this address + /// - [`NeverAttemptedAlternate`]: the services provided by the directly + /// connected peer that claimed that this address was its canonical + /// address + /// - [`NeverAttemptedSeed`]: the seed config doesn't have any service info, + /// so this field is [`None`] + /// - [`Failed`] or [`AttemptPending`]: unverified services via another peer, + /// or services advertised in a previous handshake + /// + /// ## Security + /// + /// `services` from non-`Responded` peers may be invalid due to outdated + /// records, older peer versions, or buggy or malicious peers. + /// The last time another peer successfully connected to this peer. + pub fn get_untrusted_services(&self) -> Option { + self.last_connection_state.get_untrusted_services() } - /// Create a new `MetaAddr` for a peer that has just shut down. - pub fn new_shutdown(addr: &SocketAddr, services: &PeerServices) -> MetaAddr { - // TODO: if the peer shut down in the Responded state, preserve that - // state. All other states should be treated as (timeout) errors. - MetaAddr::new_errored(addr, services) + /// The last time we attempted to make a direct outbound connection to the + /// address of this peer. + /// + /// See [`PeerAddrState::get_last_attempt`] for details. + pub fn get_last_attempt(&self) -> Option> { + self.last_connection_state.get_last_attempt() } - /// The last time we interacted with this peer. + /// The last time we successfully made a direct outbound connection to the + /// address of this peer. /// - /// The exact meaning depends on `last_connection_state`: - /// - `Responded`: the last time we processed a message from this peer - /// - `NeverAttempted`: the unverified time provided by the remote peer - /// that sent us this address - /// - `Failed`: the last time we marked the peer as failed - /// - `AttemptPending`: the last time we queued the peer for a reconnection - /// attempt + /// See [`PeerAddrState::get_last_success`] for details. + pub fn get_last_success(&self) -> Option> { + self.last_connection_state.get_last_success() + } + + /// The last time a direct outbound connection to the address of this peer + /// failed. + /// + /// See [`PeerAddrState::get_last_failed`] for details. + pub fn get_last_failed(&self) -> Option> { + self.last_connection_state.get_last_failed() + } + + /// The last time another node claimed this peer was valid. + /// + /// The exact meaning depends on [`last_connection_state`]: + /// - [`NeverAttemptedSeed`]: the seed config doesn't have any last seen info, + /// so this field is [`None`] + /// - [`NeverAttemptedGossiped`]: the unverified time provided by the remote + /// peer that sent us this address + /// - [`NeverAttemptedAlternate`]: the local time we received the [`Version`] + /// message containing this address from a peer + /// - [`Failed`] and [`AttemptPending`]: these states do not update this field /// /// ## Security /// - /// `last_seen` times from `NeverAttempted` peers may be invalid due to + /// last seen times from non-`Responded` peers may be invalid due to /// clock skew, or buggy or malicious peers. - pub fn get_last_seen(&self) -> DateTime { - self.last_seen + /// + /// Typically, this field should be ignored, unless the peer is in a + /// never attempted state. + pub fn get_untrusted_last_seen(&self) -> Option> { + self.last_connection_state.get_untrusted_last_seen() + } + + /// The last time we successfully made a direct outbound connection to this + /// peer, or another node claimed this peer was valid. + /// + /// Clamped to a [`u32`] number of seconds. + /// + /// [`None`] if the address was supplied as a seed. + /// + /// ## Security + /// + /// last seen times from non-`Responded` peers may be invalid due to + /// clock skew, or buggy or malicious peers. + /// + /// Use [`get_last_success`] if you need a trusted, unclamped value. + pub fn get_last_success_or_untrusted(&self) -> Option> { + // Use the best time we have, if any + let seconds = self + .get_last_success() + .or_else(|| self.get_untrusted_last_seen())? + .timestamp(); + + // Convert to a DateTime + let seconds = seconds.clamp(u32::MIN.into(), u32::MAX.into()); + let time = Utc + .timestamp_opt(seconds, 0) + .single() + .expect("unexpected invalid time: all u32 values should be valid"); + + Some(time) + } + + /// Has this peer ever been attempted? + pub fn has_been_attempted(&self) -> bool { + self.get_last_attempt().is_some() } /// Is this address a directly connected client? + /// + /// If we don't have enough information, assume that it is not a client. pub fn is_direct_client(&self) -> bool { match self.last_connection_state { - Responded => !self.services.contains(PeerServices::NODE_NETWORK), - NeverAttemptedGossiped | NeverAttemptedAlternate | Failed | AttemptPending => false, + Responded { services, .. } => !services.contains(PeerServices::NODE_NETWORK), + NeverAttemptedSeed + | NeverAttemptedGossiped { .. } + | NeverAttemptedAlternate { .. } + | Failed { .. } + | AttemptPending { .. } => false, } } /// Is this address valid for outbound connections? + /// + /// If we don't have enough information, assume that it is valid. pub fn is_valid_for_outbound(&self) -> bool { - self.services.contains(PeerServices::NODE_NETWORK) + self.get_untrusted_services() + .map(|services| services.contains(PeerServices::NODE_NETWORK)) + .unwrap_or(true) && !self.addr.ip().is_unspecified() && self.addr.port() != 0 } - /// Return a sanitized version of this `MetaAddr`, for sending to a remote peer. - pub fn sanitize(&self) -> MetaAddr { + /// Return a sanitized version of this [`MetaAddr`], for sending to a remote peer. + /// + /// If [`None`], this address should not be sent to remote peers. + pub fn sanitize(&self) -> Option { + // Sanitize the time let interval = crate::constants::TIMESTAMP_TRUNCATION_SECONDS; - let ts = self.get_last_seen().timestamp(); - let last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0); - MetaAddr { + let ts = self.get_last_success_or_untrusted()?.timestamp(); + let last_seen_maybe_untrusted = Utc.timestamp(ts - ts.rem_euclid(interval), 0); + + Some(MetaAddr { addr: self.addr, - // services are sanitized during parsing, or set to a fixed valued by - // new_local_listener, so we don't need to sanitize here - services: self.services, - last_seen, - // the state isn't sent to the remote peer, but sanitize it anyway - last_connection_state: NeverAttemptedGossiped, - } + // the exact state variant isn't sent to the remote peer, but sanitize it anyway + last_connection_state: NeverAttemptedGossiped { + untrusted_last_seen: last_seen_maybe_untrusted, + // services are sanitized during parsing, or set to a fixed value by + // new_local_listener, so we don't need to sanitize here + untrusted_services: self.get_untrusted_services()?, + }, + }) } } impl Ord for MetaAddr { - /// `MetaAddr`s are sorted in approximate reconnection attempt order, but - /// with `Responded` peers sorted first as a group. + /// [`MetaAddr`]s are sorted in approximate reconnection attempt order, but + /// with [`Responded`] peers sorted first as a group. /// /// This order should not be used for reconnection attempts: use /// [`AddressBook::reconnection_peers`] instead. @@ -298,17 +1070,8 @@ impl Ord for MetaAddr { use std::net::IpAddr::{V4, V6}; use Ordering::*; - let oldest_first = self.get_last_seen().cmp(&other.get_last_seen()); - let newest_first = oldest_first.reverse(); - let connection_state = self.last_connection_state.cmp(&other.last_connection_state); - let reconnection_time = match self.last_connection_state { - Responded => oldest_first, - NeverAttemptedGossiped => newest_first, - NeverAttemptedAlternate => newest_first, - Failed => oldest_first, - AttemptPending => oldest_first, - }; + let ip_numeric = match (self.addr.ip(), other.addr.ip()) { (V4(a), V4(b)) => a.octets().cmp(&b.octets()), (V6(a), V6(b)) => a.octets().cmp(&b.octets()), @@ -317,13 +1080,11 @@ impl Ord for MetaAddr { }; connection_state - .then(reconnection_time) // The remainder is meaningless as an ordering, but required so that we // have a total order on `MetaAddr` values: self and other must compare // as Equal iff they are equal. .then(ip_numeric) .then(self.addr.port().cmp(&other.addr.port())) - .then(self.services.bits().cmp(&other.services.bits())) } } @@ -333,15 +1094,29 @@ impl PartialOrd for MetaAddr { } } +impl PartialEq for MetaAddr { + fn eq(&self, other: &Self) -> bool { + use Ordering::*; + self.cmp(other) == Equal + } +} + +impl Eq for MetaAddr {} + impl ZcashSerialize for MetaAddr { fn zcash_serialize(&self, mut writer: W) -> Result<(), std::io::Error> { writer.write_u32::( - self.get_last_seen() + self.get_last_success_or_untrusted() + .expect("unexpected serialization of MetaAddr with missing fields") .timestamp() .try_into() .expect("time is in range"), )?; - writer.write_u64::(self.services.bits())?; + writer.write_u64::( + self.get_untrusted_services() + .expect("unexpected serialization of MetaAddr with missing fields") + .bits(), + )?; writer.write_socket_addr(self.addr)?; Ok(()) } diff --git a/zebra-network/src/meta_addr/arbitrary.rs b/zebra-network/src/meta_addr/arbitrary.rs deleted file mode 100644 index 3c2231afb5a..00000000000 --- a/zebra-network/src/meta_addr/arbitrary.rs +++ /dev/null @@ -1,31 +0,0 @@ -use proptest::{arbitrary::any, arbitrary::Arbitrary, prelude::*}; - -use super::{MetaAddr, PeerAddrState, PeerServices}; - -use chrono::{TimeZone, Utc}; -use std::net::SocketAddr; - -impl Arbitrary for MetaAddr { - type Parameters = (); - - fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { - ( - any::(), - any::(), - any::(), - any::(), - ) - .prop_map( - |(addr, services, last_seen, last_connection_state)| MetaAddr { - addr, - services, - // This can't panic, because all u32 values are valid `Utc.timestamp`s - last_seen: Utc.timestamp(last_seen.into(), 0), - last_connection_state, - }, - ) - .boxed() - } - - type Strategy = BoxedStrategy; -} diff --git a/zebra-network/src/meta_addr/tests/check.rs b/zebra-network/src/meta_addr/tests/check.rs index 061854d5fea..6de8861d7e3 100644 --- a/zebra-network/src/meta_addr/tests/check.rs +++ b/zebra-network/src/meta_addr/tests/check.rs @@ -1,45 +1,57 @@ //! Shared test checks for MetaAddr -use super::super::MetaAddr; +use super::super::{MetaAddr, PeerAddrState::NeverAttemptedGossiped}; use crate::constants::TIMESTAMP_TRUNCATION_SECONDS; /// Make sure that the sanitize function reduces time and state metadata /// leaks. pub(crate) fn sanitize_avoids_leaks(entry: &MetaAddr) { - let sanitized = entry.sanitize(); + let sanitized = match entry.sanitize() { + Some(sanitized) => sanitized, + // Skip addresses that will never be sent to peers + None => { + return; + } + }; // We want the sanitized timestamp to: // - be a multiple of the truncation interval, // - have a zero nanoseconds component, and // - be within the truncation interval of the original timestamp. - assert_eq!( - sanitized.get_last_seen().timestamp() % TIMESTAMP_TRUNCATION_SECONDS, - 0 - ); - assert_eq!(sanitized.get_last_seen().timestamp_subsec_nanos(), 0); - // handle underflow and overflow by skipping the check - // the other check will ensure correctness - let lowest_time = entry - .get_last_seen() - .timestamp() - .checked_sub(TIMESTAMP_TRUNCATION_SECONDS); - let highest_time = entry - .get_last_seen() - .timestamp() - .checked_add(TIMESTAMP_TRUNCATION_SECONDS); - if let Some(lowest_time) = lowest_time { - assert!(sanitized.get_last_seen().timestamp() > lowest_time); - } - if let Some(highest_time) = highest_time { - assert!(sanitized.get_last_seen().timestamp() < highest_time); + if let Some(sanitized_last) = sanitized.get_last_success_or_untrusted() { + assert_eq!(sanitized_last.timestamp() % TIMESTAMP_TRUNCATION_SECONDS, 0); + assert_eq!(sanitized_last.timestamp_subsec_nanos(), 0); + // handle underflow and overflow by skipping the check + // the other check will ensure correctness + let lowest_time = entry + .get_last_success_or_untrusted() + .map(|t| t.timestamp().checked_sub(TIMESTAMP_TRUNCATION_SECONDS)) + .flatten(); + let highest_time = entry + .get_last_success_or_untrusted() + .map(|t| t.timestamp().checked_add(TIMESTAMP_TRUNCATION_SECONDS)) + .flatten(); + if let Some(lowest_time) = lowest_time { + assert!(sanitized_last.timestamp() > lowest_time); + } + if let Some(highest_time) = highest_time { + assert!(sanitized_last.timestamp() < highest_time); + } } - // Sanitize to the the default state, even though it's not serialized - assert_eq!(sanitized.last_connection_state, Default::default()); + // Sanitize to the the gossiped state, even though it's not serialized + assert!(matches!( + sanitized.last_connection_state, + NeverAttemptedGossiped { .. } + )); + // We want the other fields to be unmodified assert_eq!(sanitized.addr, entry.addr); // Services are sanitized during parsing, so we don't need to make // any changes in sanitize() - assert_eq!(sanitized.services, entry.services); + assert_eq!( + sanitized.get_untrusted_services(), + entry.get_untrusted_services() + ); } diff --git a/zebra-network/src/meta_addr/tests/preallocate.rs b/zebra-network/src/meta_addr/tests/preallocate.rs index c9fb9dbb440..3c7ea002200 100644 --- a/zebra-network/src/meta_addr/tests/preallocate.rs +++ b/zebra-network/src/meta_addr/tests/preallocate.rs @@ -9,9 +9,12 @@ use std::convert::TryInto; proptest! { /// Confirm that each MetaAddr takes exactly META_ADDR_SIZE bytes when serialized. - /// This verifies that our calculated `TrustedPreallocate::max_allocation()` is indeed an upper bound. + /// This verifies that our calculated [`TrustedPreallocate::max_allocation`] is indeed an upper bound. #[test] fn meta_addr_size_is_correct(addr in MetaAddr::arbitrary()) { + // TODO: make a strategy that has no None fields + prop_assume!(addr.sanitize().is_some()); + let serialized = addr .zcash_serialize_to_vec() .expect("Serialization to vec must succeed"); @@ -23,6 +26,9 @@ proptest! { /// 2. The largest allowed vector is small enough to fit in a legal Zcash message #[test] fn meta_addr_max_allocation_is_correct(addr in MetaAddr::arbitrary()) { + // TODO: make a strategy that has no None fields + prop_assume!(addr.sanitize().is_some()); + let max_allocation: usize = MetaAddr::max_allocation().try_into().unwrap(); let mut smallest_disallowed_vec = Vec::with_capacity(max_allocation + 1); for _ in 0..(MetaAddr::max_allocation() + 1) { diff --git a/zebra-network/src/meta_addr/tests/vectors.rs b/zebra-network/src/meta_addr/tests/vectors.rs index 538ada7b8c7..2f93c95c5b1 100644 --- a/zebra-network/src/meta_addr/tests/vectors.rs +++ b/zebra-network/src/meta_addr/tests/vectors.rs @@ -1,28 +1,59 @@ //! Test vectors for MetaAddr. -use super::{super::MetaAddr, check}; +use super::{ + super::{MetaAddr, PeerAddrState}, + check, +}; use chrono::{MAX_DATETIME, MIN_DATETIME}; /// Make sure that the sanitize function handles minimum and maximum times. #[test] fn sanitize_extremes() { + use PeerAddrState::*; + zebra_test::init(); - let min_time_entry = MetaAddr { + let min_time_untrusted = MetaAddr { + addr: "127.0.0.1:8233".parse().unwrap(), + last_connection_state: NeverAttemptedGossiped { + untrusted_last_seen: MIN_DATETIME, + untrusted_services: Default::default(), + }, + }; + + let min_time_local = MetaAddr { + addr: "127.0.0.1:8233".parse().unwrap(), + last_connection_state: Responded { + last_attempt: MIN_DATETIME, + last_success: MIN_DATETIME, + last_failed: Some(MIN_DATETIME), + untrusted_last_seen: Some(MIN_DATETIME), + services: Default::default(), + }, + }; + + let max_time_untrusted = MetaAddr { addr: "127.0.0.1:8233".parse().unwrap(), - services: Default::default(), - last_seen: MIN_DATETIME, - last_connection_state: Default::default(), + last_connection_state: NeverAttemptedGossiped { + untrusted_last_seen: MAX_DATETIME, + untrusted_services: Default::default(), + }, }; - let max_time_entry = MetaAddr { + let max_time_local = MetaAddr { addr: "127.0.0.1:8233".parse().unwrap(), - services: Default::default(), - last_seen: MAX_DATETIME, - last_connection_state: Default::default(), + last_connection_state: Responded { + last_attempt: MAX_DATETIME, + last_success: MAX_DATETIME, + last_failed: Some(MAX_DATETIME), + untrusted_last_seen: Some(MAX_DATETIME), + services: Default::default(), + }, }; - check::sanitize_avoids_leaks(&min_time_entry); - check::sanitize_avoids_leaks(&max_time_entry); + check::sanitize_avoids_leaks(&min_time_untrusted); + check::sanitize_avoids_leaks(&min_time_local); + check::sanitize_avoids_leaks(&max_time_untrusted); + check::sanitize_avoids_leaks(&max_time_local); } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 99ce1f7b08e..d36b5971939 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -23,11 +23,11 @@ use zebra_chain::{block, parameters::Network}; use crate::{ constants, + meta_addr::{MetaAddr, MetaAddrChange}, protocol::{ external::{types::*, Codec, InventoryHash, Message}, internal::{Request, Response}, }, - types::MetaAddr, BoxError, Config, }; @@ -45,7 +45,7 @@ use super::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerEr pub struct Handshake { config: Config, inbound_service: S, - timestamp_collector: mpsc::Sender, + timestamp_collector: mpsc::Sender, inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, nonces: Arc>>, user_agent: String, @@ -296,7 +296,7 @@ impl fmt::Debug for ConnectedAddr { pub struct Builder { config: Option, inbound_service: Option, - timestamp_collector: Option>, + timestamp_collector: Option>, our_services: Option, user_agent: Option, relay: Option, @@ -334,9 +334,12 @@ where /// Provide a hook for timestamp collection. Optional. /// - /// This channel takes `MetaAddr`s, permanent addresses which can be used to - /// make outbound connections to peers. - pub fn with_timestamp_collector(mut self, timestamp_collector: mpsc::Sender) -> Self { + /// This channel takes [`MetaAddrChange`]s, which contain permanent addresses + /// that be used to make outbound connections to peers. + pub fn with_timestamp_collector( + mut self, + timestamp_collector: mpsc::Sender, + ) -> Self { self.timestamp_collector = Some(timestamp_collector); self } @@ -670,8 +673,8 @@ where // `Version` messages. let alternate_addrs = connected_addr.get_alternate_addrs(remote_canonical_addr); for alt_addr in alternate_addrs { - let alt_addr = MetaAddr::new_alternate(&alt_addr, &remote_services); - if alt_addr.is_valid_for_outbound() { + let alt_addr = MetaAddr::new_alternate(alt_addr, remote_services); + if alt_addr.is_valid_for_outbound(None) { tracing::info!( ?alt_addr, "sending valid alternate peer address to the address book" @@ -758,7 +761,10 @@ where // the collector doesn't depend on network activity, // so this await should not hang let _ = inbound_ts_collector - .send(MetaAddr::new_responded(&book_addr, &remote_services)) + .send(MetaAddr::update_responded( + book_addr, + remote_services, + )) .await; } } @@ -772,7 +778,10 @@ where if let Some(book_addr) = connected_addr.get_address_book_addr() { let _ = inbound_ts_collector - .send(MetaAddr::new_errored(&book_addr, &remote_services)) + .send(MetaAddr::update_failed( + book_addr, + Some(remote_services), + )) .await; } } @@ -882,7 +891,10 @@ where if let Some(book_addr) = connected_addr.get_address_book_addr() { // awaiting a local task won't hang let _ = timestamp_collector - .send(MetaAddr::new_shutdown(&book_addr, &remote_services)) + .send(MetaAddr::update_shutdown( + book_addr, + Some(remote_services), + )) .await; } return; @@ -970,7 +982,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender) -> Resu /// `handle_heartbeat_error`. async fn heartbeat_timeout( fut: F, - timestamp_collector: &mut mpsc::Sender, + timestamp_collector: &mut mpsc::Sender, connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result @@ -1004,7 +1016,7 @@ where /// If `result.is_err()`, mark `connected_addr` as failed using `timestamp_collector`. async fn handle_heartbeat_error( result: Result, - timestamp_collector: &mut mpsc::Sender, + timestamp_collector: &mut mpsc::Sender, connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result @@ -1018,7 +1030,7 @@ where if let Some(book_addr) = connected_addr.get_address_book_addr() { let _ = timestamp_collector - .send(MetaAddr::new_errored(&book_addr, &remote_services)) + .send(MetaAddr::update_failed(book_addr, Some(*remote_services))) .await; } Err(err) diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index be51be482da..bb3520a06c7 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -1,4 +1,4 @@ -use std::{cmp::min, mem, sync::Arc, time::Duration}; +use std::{cmp::min, mem, net::SocketAddr, sync::Arc, time::Duration}; use chrono::{DateTime, Utc}; use futures::stream::{FuturesUnordered, StreamExt}; @@ -7,28 +7,47 @@ use tower::{Service, ServiceExt}; use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response}; -/// The `CandidateSet` manages the `PeerSet`'s peer reconnection attempts. +/// The [`CandidateSet`] manages outbound peer connection attempts. +/// Successful connections become peers in the [`PeerSet`]. /// -/// It divides the set of all possible candidate peers into disjoint subsets, -/// using the `PeerAddrState`: +/// The candidate set divides the set of all possible outbound peers into +/// disjoint subsets, using the [`PeerAddrState`: +/// +/// 1. [`Responded`] peers, which we previously had outbound connections to. +/// 2. [`NeverAttemptedGossiped`] peers, which we learned about from other peers +/// but have never connected to; +/// 3. [`NeverAttemptedAlternate`] peers, which we learned from the [`Version` +/// messages of directly connected peers, but have never connected to; +/// 4. [`NeverAttemptedSeed`] peers, which we learned about from our seed config, +/// but have never connected to; +/// 5. [`Failed`] peers, to whom we attempted to connect but were unable to; +/// 6. [`AttemptPending`] peers, which we've recently queued for a connection. +/// +/// Never attempted peers are always available for connection. +/// +/// If a peer's attempted, success, or failed time is recent +/// (within the liveness limit), we avoid reconnecting to it. +/// Otherwise, we assume that it has disconnected or hung, +/// and attempt reconnection. /// -/// 1. `Responded` peers, which we previously had inbound or outbound connections -/// to. If we have not received any messages from a `Responded` peer within a -/// cutoff time, we assume that it has disconnected or hung, and attempt -/// reconnection; -/// 2. `NeverAttempted` peers, which we learned about from other peers or a DNS -/// seeder, but have never connected to; -/// 3. `Failed` peers, to whom we attempted to connect but were unable to; -/// 4. `AttemptPending` peers, which we've recently queued for reconnection. /// /// ```ascii,no_run /// ┌──────────────────┐ /// │ PeerSet │ -/// │GetPeers Responses│ +/// │ Gossiped │ +/// │ Addresses │ /// └──────────────────┘ +/// │ provides +/// │ untrusted_last_seen /// │ /// │ -/// │ +/// ┌──────────────────┐ │ ┌──────────────────┐ +/// │ Handshake │ │ │ Seed │ +/// │ Canonical │──────────┼──────────│ Addresses │ +/// │ Addresses │ │ │ │ +/// └──────────────────┘ │ └──────────────────┘ +/// untrusted_last_seen │ untrusted_last_seen +/// set to now │ is unknown /// │ /// ▼ /// filter by Λ @@ -37,21 +56,14 @@ use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response /// │ ╲ ╱ /// │ V /// │ │ -/// │ │ -/// │ │ -/// │ ┌──────────────────┐ │ -/// │ │ Inbound │ │ -/// │ │ Peer Connections │ │ -/// │ └──────────────────┘ │ -/// │ │ │ -/// ├──────────┼────────────────────┼───────────────────────────────┐ -/// │ PeerSet ▼ AddressBook ▼ │ -/// │ ┌─────────────┐ ┌────────────────┐ ┌─────────────┐ │ -/// │ │ Possibly │ │`NeverAttempted`│ │ `Failed` │ │ -/// │ │Disconnected │ │ Peers │ │ Peers │◀┼┐ -/// │ │ `Responded` │ │ │ │ │ ││ -/// │ │ Peers │ │ │ │ │ ││ -/// │ └─────────────┘ └────────────────┘ └─────────────┘ ││ +/// ├───────────────────────────────┼───────────────────────────────┐ +/// │ AddressBook ▼ │ +/// │ ┌─────────────┐ ┌─────────────────────────┐ ┌─────────────┐ │ +/// │ │ `Responded` │ │`NeverAttemptedGossiped` │ │ `Failed` │ │ +/// │ │ Peers │ │`NeverAttemptedAlternate`│ │ Peers │◀┼┐ +/// │ │ │ │ `NeverAttemptedSeed` │ │ │ ││ +/// │ │ │ │ Peers │ │ │ ││ +/// │ └─────────────┘ └─────────────────────────┘ └─────────────┘ ││ /// │ │ │ │ ││ /// │ #1 oldest_first #2 newest_first #3 oldest_first ││ /// │ │ │ │ ││ @@ -60,28 +72,26 @@ use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response /// ├────────┼──────────────────────────────────────────────────────┘│ /// │ ▼ │ /// │ Λ │ -/// │ ╱ ╲ filter by │ -/// └─────▶▕ ▏!is_potentially_connected │ -/// ╲ ╱ to remove live │ -/// V `Responded` peers │ +/// │ ╱ ╲ filter by │ +/// └─────▶▕ ▏ !recently_used_addr │ +/// ╲ ╱ to remove recent `Responded`, │ +/// V `AttemptPending`, and `Failed` peers │ /// │ │ -/// │ Try outbound connection │ -/// ▼ │ +/// │ try outbound connection, │ +/// ▼ update last_attempted to now() │ /// ┌────────────────┐ │ /// │`AttemptPending`│ │ /// │ Peers │ │ /// │ │ │ /// └────────────────┘ │ /// │ │ -/// │ │ /// ▼ │ /// Λ │ /// ╱ ╲ │ /// ▕ ▏─────────────────────────────────────────────────────┘ -/// ╲ ╱ connection failed, update last_seen to now() +/// ╲ ╱ connection failed, update last_failed to now() /// V -/// │ -/// │ +/// │ connection succeeded /// ▼ /// ┌────────────┐ /// │ send │ @@ -89,18 +99,19 @@ use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response /// │to Discover │ /// └────────────┘ /// │ -/// │ /// ▼ /// ┌───────────────────────────────────────┐ /// │ every time we receive a peer message: │ /// │ * update state to `Responded` │ -/// │ * update last_seen to now() │ +/// │ * update last_success to now() │ /// └───────────────────────────────────────┘ -/// /// ``` // TODO: // * draw arrow from the "peer message" box into the `Responded` state box // * make the "disjoint states" box include `AttemptPending` +// * show the Seed -> Gossip / Alternate transition +// * show all possible transitions between Attempt/Responded/Failed, +// except Failed -> Responded is invalid, must go through Attempt pub(super) struct CandidateSet { pub(super) address_book: Arc>, pub(super) peer_service: S, @@ -112,12 +123,12 @@ where S: Service, S::Future: Send + 'static, { - /// The minimum time between successive calls to `CandidateSet::next()`. + /// The minimum time between successive calls to [`CandidateSet::next`]. /// /// ## Security /// /// Zebra resists distributed denial of service attacks by making sure that new peer connections - /// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart. + /// are initiated at least [`MIN_PEER_CONNECTION_INTERVAL`] apart. const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers. @@ -246,6 +257,9 @@ where /// Add new `addrs` to the address book. fn send_addrs(&self, addrs: impl IntoIterator) { + // Turn the addresses into "new gossiped" changes + let addrs = addrs.into_iter().map(MetaAddr::new_gossiped_change); + // # Correctness // // Briefly hold the address book threaded mutex, to extend @@ -257,26 +271,29 @@ where /// Returns the next candidate for a connection attempt, if any are available. /// - /// Returns peers in this order: - /// - oldest `Responded` that are not live - /// - newest `NeverAttempted` - /// - oldest `Failed` + /// Returns peers in [`MetaAddr::cmp`] order, lowest first: + /// - oldest [`Responded`] that are not live + /// - [`NeverAttemptedSeed`], if any + /// - newest [`NeverAttemptedGossiped`] + /// - newest [`NeverAttemptedAlternate`] + /// - oldest [`Failed`] that are not recent + /// - oldest [`AttemptPending`] that are not recent /// - /// Skips `AttemptPending` peers and live `Responded` peers. + /// Skips peers that have recently been attempted, connected, or failed. /// /// ## Correctness /// - /// `AttemptPending` peers will become `Responded` if they respond, or - /// become `Failed` if they time out or provide a bad response. + /// [`AttemptPending`] peers will become [`Responded`] if they respond, or + /// become [`Failed`] if they time out or provide a bad response. /// - /// Live `Responded` peers will stay live if they keep responding, or - /// become a reconnection candidate if they stop responding. + /// Live [`Responded`] peers will stay live if they keep responding, or + /// become a connection candidate if they stop responding. /// /// ## Security /// /// Zebra resists distributed denial of service attacks by making sure that /// new peer connections are initiated at least - /// `MIN_PEER_CONNECTION_INTERVAL` apart. + /// [`MIN_PEER_CONNECTION_INTERVAL`] apart. pub async fn next(&mut self) -> Option { let current_deadline = self.next_peer_min_wait.deadline(); let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL); @@ -293,32 +310,55 @@ where // // To avoid hangs, any computation in the critical section should // be kept to a minimum. - let reconnect = { + let connect = { let mut guard = self.address_book.lock().unwrap(); // It's okay to return without sleeping here, because we're returning // `None`. We only need to sleep before yielding an address. - let reconnect = guard.reconnection_peers().next()?; + let connect = guard.next_candidate_peer()?; - let reconnect = MetaAddr::new_reconnect(&reconnect.addr, &reconnect.services); - guard.update(reconnect); - reconnect + let connect = MetaAddr::update_attempt(connect.addr); + guard.update(connect)? }; // SECURITY: rate-limit new candidate connections sleep.await; - Some(reconnect) + Some(connect) } /// Mark `addr` as a failed peer. - pub fn report_failed(&mut self, addr: &MetaAddr) { - let addr = MetaAddr::new_errored(&addr.addr, &addr.services); + pub fn report_failed(&mut self, addr: SocketAddr) { + let addr = MetaAddr::update_failed(addr, None); // # Correctness // // Briefly hold the address book threaded mutex, to update the state for // a single address. self.address_book.lock().unwrap().update(addr); } + + /// Return the number of candidate peers. + /// + /// This number can change over time as recently used peers expire. + pub fn candidate_peer_count(&self) -> usize { + // # Correctness + // + // Briefly hold the address book threaded mutex. + self.address_book.lock().unwrap().candidate_peer_count() + } + + /// Return the number of recently used peers. + /// + /// This number can change over time as recently used peers expire. + pub fn recent_peer_count(&self) -> usize { + // # Correctness + // + // Briefly hold the address book threaded mutex. + self.address_book + .lock() + .unwrap() + .recently_used_peers() + .count() + } } /// Check new `addrs` before adding them to the address book. diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index ff8c39cba49..b43e98da12e 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -25,8 +25,12 @@ use tracing::Span; use tracing_futures::Instrument; use crate::{ - constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector, - types::PeerServices, AddressBook, BoxError, Config, Request, Response, + constants, + meta_addr::{MetaAddr, MetaAddrChange}, + peer, + timestamp_collector::TimestampCollector, + types::PeerServices, + AddressBook, BoxError, Config, Request, Response, }; use zebra_chain::parameters::Network; @@ -67,7 +71,7 @@ where S: Service + Clone + Send + 'static, S::Future: Send + 'static, { - let (address_book, timestamp_collector) = TimestampCollector::spawn(&config); + let (address_book, timestamp_collector) = TimestampCollector::spawn(config.clone()); let (inv_sender, inv_receiver) = broadcast::channel(100); // Construct services that handle inbound handshakes and perform outbound @@ -81,7 +85,7 @@ where .with_config(config.clone()) .with_inbound_service(inbound_service) .with_inventory_collector(inv_sender) - .with_timestamp_collector(timestamp_collector) + .with_timestamp_collector(timestamp_collector.clone()) .with_advertised_services(PeerServices::NODE_NETWORK) .with_user_agent(crate::constants::USER_AGENT.to_string()) .want_transactions(true) @@ -147,12 +151,14 @@ where let initial_peers_fut = { let config = config.clone(); let outbound_connector = outbound_connector.clone(); + let timestamp_collector = timestamp_collector.clone(); let peerset_tx = peerset_tx.clone(); async move { let initial_peers = config.initial_peers().await; add_initial_peers( initial_peers, outbound_connector, + timestamp_collector, peerset_tx, initial_success_count_tx, ) @@ -174,6 +180,8 @@ where // it doesn't matter if the sender has been dropped let _ = initial_success_count_rx.changed().await; info!(initial_successes = ?*initial_success_count_rx.borrow(), + candidates = ?candidates.candidate_peer_count(), + excluded_recent = ?candidates.recent_peer_count(), "asking initial peers for new peers"); let _ = candidates .update_initial(*initial_success_count_rx.borrow()) @@ -205,11 +213,19 @@ where /// Use the provided `outbound_connector` to connect to `initial_peers`, then /// send the results over `peerset_tx`. /// -/// Also updates `success_count_tx` with the number of successful peers. -#[instrument(skip(initial_peers, outbound_connector, peerset_tx, success_count_tx))] +/// Also adds those peers to the [`AddressBook`] using `timestamp_collector`, +/// and updates `success_count_tx` with the number of successful peers. +#[instrument(skip( + initial_peers, + outbound_connector, + timestamp_collector, + peerset_tx, + success_count_tx +))] async fn add_initial_peers( initial_peers: std::collections::HashSet, outbound_connector: C, + mut timestamp_collector: mpsc::Sender, mut peerset_tx: mpsc::Sender, success_count_tx: watch::Sender, ) -> Result<(), BoxError> @@ -227,7 +243,24 @@ where "connecting to initial peer set" ); - let initial_meta_addr = initial_peers.into_iter().map(MetaAddr::new_seed_meta_addr); + // Add the seed peers to the address book, in the `AttemptPending` state. + // + // Note: these address book updates are sent to a channel, so they might be + // applied after updates from concurrent tasks. + let mut initial_meta_addr = Vec::new(); + for addr in initial_peers { + let seeder_meta_addr = MetaAddr::new_seed_meta_addr(addr); + let _ = timestamp_collector + .send(MetaAddr::new_seed_change(seeder_meta_addr)) + .await; + let update_change = MetaAddr::update_attempt(addr); + let _ = timestamp_collector.send(update_change).await; + // Apply the change, just like the AddressBook would + let update_meta_addr = update_change + .into_meta_addr(Some(seeder_meta_addr)) + .expect("unexpected invalid seeder to attempt transition"); + initial_meta_addr.push(update_meta_addr); + } // # Correctness // @@ -294,6 +327,9 @@ where ?success_count, "an initial peer connection failed"); } + let _ = timestamp_collector + .send(MetaAddr::update_failed(failed_addr.addr, None)) + .await; continue; } DemandCrawl | DemandDrop | DemandHandshake { .. } | TimerCrawl { .. } => { @@ -440,7 +476,9 @@ where // - use the `select!` macro for all actions, because the `select` function // is biased towards the first ready future - info!("starting the peer crawler"); + info!(candidates = ?candidates.candidate_peer_count(), + recent = ?candidates.recent_peer_count(), + "starting the peer crawler"); let mut handshakes = FuturesUnordered::new(); // returns None when empty. @@ -536,7 +574,7 @@ where } HandshakeFailed { failed_addr, error } => { debug!(addr = ?failed_addr.addr, ?error, "marking candidate as failed"); - candidates.report_failed(&failed_addr); + candidates.report_failed(failed_addr.addr); // The demand signal that was taken out of the queue // to attempt to connect to the failed candidate never // turned into a connection, so add it back: diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/timestamp_collector.rs index 143fe7cd629..2c31919e172 100644 --- a/zebra-network/src/timestamp_collector.rs +++ b/zebra-network/src/timestamp_collector.rs @@ -4,17 +4,24 @@ use std::sync::Arc; use futures::{channel::mpsc, prelude::*}; -use crate::{types::MetaAddr, AddressBook, Config}; +use crate::{meta_addr::MetaAddrChange, AddressBook, Config}; /// The timestamp collector hooks into incoming message streams for each peer and -/// records per-connection last-seen timestamps into an [`AddressBook`]. +/// records per-connection [`MetaAddrChange`]s into an [`AddressBook`]. pub struct TimestampCollector {} impl TimestampCollector { - /// Spawn a new [`TimestampCollector`] task, and return handles for the - /// transmission channel for timestamp events and for the [`AddressBook`] it - /// updates. - pub fn spawn(config: &Config) -> (Arc>, mpsc::Sender) { + /// Spawn a new [`TimestampCollector`] task. + /// + /// Return handles for: + /// * the transmission channel for [`MetaAddrChange`] events, and + /// * the [`AddressBook`] it updates. + pub fn spawn( + config: Config, + ) -> ( + Arc>, + mpsc::Sender, + ) { use tracing::Level; const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100; let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE);