Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect and send more accurate peer addresses #2123

Merged
merged 3 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions zebra-network/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use chrono::{DateTime, Utc};
use tracing::Span;

use crate::{constants, types::MetaAddr, PeerAddrState};
use crate::{constants, types::MetaAddr, Config, PeerAddrState};

/// A database of peer listener addresses, their advertised services, and
/// information on when they were last seen.
Expand Down Expand Up @@ -48,13 +48,16 @@ use crate::{constants, types::MetaAddr, PeerAddrState};
/// - the canonical address of any connection.
#[derive(Clone, Debug)]
pub struct AddressBook {
/// Each known peer address has a matching `MetaAddr`
/// Each known peer address has a matching `MetaAddr`.
by_addr: HashMap<SocketAddr, MetaAddr>,

/// The local listener address.
local_listener: SocketAddr,

/// The span for operations on this address book.
span: Span,

/// The last time we logged a message about the address metrics
/// The last time we logged a message about the address metrics.
last_address_log: Option<Instant>,
}

Expand Down Expand Up @@ -85,13 +88,14 @@ pub struct AddressMetrics {

#[allow(clippy::len_without_is_empty)]
impl AddressBook {
/// Construct an `AddressBook` with the given [`tracing::Span`].
pub fn new(span: Span) -> AddressBook {
/// Construct an `AddressBook` with the given `config` and [`tracing::Span`].
pub fn new(config: &Config, span: Span) -> AddressBook {
let constructor_span = span.clone();
let _guard = constructor_span.enter();

let mut new_book = AddressBook {
by_addr: HashMap::default(),
local_listener: config.listen_addr,
span,
last_address_log: None,
};
Expand All @@ -100,6 +104,11 @@ impl AddressBook {
new_book
}

/// Get the local listener address.
pub fn get_local_listener(&self) -> MetaAddr {
MetaAddr::new_local_listener(&self.local_listener)
}

/// Get the contents of `self` in random order with sanitized timestamps.
pub fn sanitized(&self) -> Vec<MetaAddr> {
use rand::seq::SliceRandom;
Expand Down
15 changes: 13 additions & 2 deletions zebra-network/src/meta_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ impl MetaAddr {
}
}

/// Create a new `MetaAddr` for our own listener address.
pub fn new_local_listener(addr: &SocketAddr) -> MetaAddr {
MetaAddr {
addr: *addr,
// TODO: create a "local services" constant
services: PeerServices::NODE_NETWORK,
last_seen: Utc::now(),
last_connection_state: Responded,
}
}

/// Create a new `MetaAddr` for a peer that has just had an error.
pub fn new_errored(addr: &SocketAddr, services: &PeerServices) -> MetaAddr {
MetaAddr {
Expand Down Expand Up @@ -251,8 +262,8 @@ impl MetaAddr {
let last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0);
MetaAddr {
addr: self.addr,
// services are sanitized during parsing, so we don't need to make
// any changes here
// services are sanitized during parsing, or set to a fixed valued by
// new_local_listener, so we don't need to sanitize here
services: self.services,
last_seen,
// the state isn't sent to the remote peer, but sanitize it anyway
Expand Down
83 changes: 81 additions & 2 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,61 @@ impl ConnectedAddr {
Isolated => "Isol",
}
}

/// Returns a list of alternate remote peer addresses, which can be used for
/// reconnection attempts.
///
/// Uses the connected address, and the remote canonical address.
///
/// Skips duplicates. If this is an outbound connection, also skips the
/// remote address that we're currently connected to.
pub fn get_alternate_addrs(
&self,
mut canonical_remote: SocketAddr,
) -> impl Iterator<Item = SocketAddr> {
let addrs = match self {
OutboundDirect { addr } => {
// Fixup unspecified addresses and ports using known good data
if canonical_remote.ip().is_unspecified() {
canonical_remote.set_ip(addr.ip());
}
if canonical_remote.port() == 0 {
canonical_remote.set_port(addr.port());
}

// Try the canonical remote address, if it is different from the
// outbound address (which we already have in our address book)
if &canonical_remote != addr {
vec![canonical_remote]
} else {
// we didn't learn a new address from the handshake:
// it's the same as the outbound address, which is already in our address book
Vec::new()
}
}

InboundDirect { maybe_ip, .. } => {
// Use the IP from the TCP connection, and the port the peer told us
let maybe_addr = SocketAddr::new(*maybe_ip, canonical_remote.port());

// Try both addresses, but remove one duplicate if they match
if canonical_remote != maybe_addr {
vec![canonical_remote, maybe_addr]
} else {
vec![canonical_remote]
}
}

// Proxy addresses can't be used for reconnection attempts, but we
// can try the canonical remote address
OutboundProxy { .. } | InboundProxy { .. } => vec![canonical_remote],

// Hide all metadata for isolated connections
Isolated => Vec::new(),
};

addrs.into_iter()
}
}

impl fmt::Debug for ConnectedAddr {
Expand Down Expand Up @@ -564,7 +619,7 @@ where
// Clone these upfront, so they can be moved into the future.
let nonces = self.nonces.clone();
let inbound_service = self.inbound_service.clone();
let timestamp_collector = self.timestamp_collector.clone();
let mut timestamp_collector = self.timestamp_collector.clone();
let inv_collector = self.inv_collector.clone();
let config = self.config.clone();
let user_agent = self.user_agent.clone();
Expand All @@ -590,7 +645,7 @@ where
);

// Wrap the entire initial connection setup in a timeout.
let (remote_version, remote_services, _remote_canonical_addr) = timeout(
let (remote_version, remote_services, remote_canonical_addr) = timeout(
constants::HANDSHAKE_TIMEOUT,
negotiate_version(
&mut peer_conn,
Expand All @@ -604,6 +659,30 @@ where
)
.await??;

// If we've learned potential peer addresses from an inbound
// connection or handshake, add those addresses to our address book.
//
// # Security
//
// We must handle alternate addresses separately from connected
// addresses. Otherwise, malicious peers could interfere with the
// address book state of other peers by providing their addresses in
// `Version` messages.
let alternate_addrs = connected_addr.get_alternate_addrs(remote_canonical_addr);
for alt_addr in alternate_addrs {
let alt_addr = MetaAddr::new_alternate(&alt_addr, &remote_services);
if alt_addr.is_valid_for_outbound() {
tracing::info!(
?alt_addr,
"sending valid alternate peer address to the address book"
);
// awaiting a local task won't hang
let _ = timestamp_collector.send(alt_addr).await;
} else {
tracing::trace!(?alt_addr, "dropping invalid alternate peer address");
}
}

// Set the connection's version to the minimum of the received version or our own.
let negotiated_version = std::cmp::min(remote_version, constants::CURRENT_VERSION);

Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send + 'static,
{
let (address_book, timestamp_collector) = TimestampCollector::spawn();
let (address_book, timestamp_collector) = TimestampCollector::spawn(&config);
let (inv_sender, inv_receiver) = broadcast::channel(100);

// Construct services that handle inbound handshakes and perform outbound
Expand Down
12 changes: 6 additions & 6 deletions zebra-network/src/timestamp_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use futures::{channel::mpsc, prelude::*};

use crate::{types::MetaAddr, AddressBook};
use crate::{types::MetaAddr, AddressBook, Config};

/// The timestamp collector hooks into incoming message streams for each peer and
/// records per-connection last-seen timestamps into an [`AddressBook`].
Expand All @@ -14,14 +14,14 @@ impl TimestampCollector {
/// Spawn a new [`TimestampCollector`] task, and return handles for the
/// transmission channel for timestamp events and for the [`AddressBook`] it
/// updates.
pub fn spawn() -> (Arc<std::sync::Mutex<AddressBook>>, mpsc::Sender<MetaAddr>) {
pub fn spawn(config: &Config) -> (Arc<std::sync::Mutex<AddressBook>>, mpsc::Sender<MetaAddr>) {
use tracing::Level;
const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100;
let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE);
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(span!(
Level::TRACE,
"timestamp collector"
))));
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(
config,
span!(Level::TRACE, "timestamp collector"),
)));
let worker_address_book = address_book.clone();

let worker = async move {
Expand Down
8 changes: 7 additions & 1 deletion zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,13 @@ impl Service<zn::Request> for Inbound {
// Briefly hold the address book threaded mutex while
// cloning the address book. Then sanitize after releasing
// the lock.
let peers = address_book.lock().unwrap().clone();
let mut peers = address_book.lock().unwrap().clone();

// Add our local listener address to the advertised peers
let local_listener = address_book.lock().unwrap().get_local_listener();
peers.update(local_listener);

// Send a sanitized response
let mut peers = peers.sanitized();
const MAX_ADDR: usize = 1000; // bitcoin protocol constant
peers.truncate(MAX_ADDR);
Expand Down