Skip to content

Commit

Permalink
Reliability: send local listener address to peers
Browse files Browse the repository at this point in the history
When peers ask for peer addresses, add our local listener address to the
set of addresses, sanitize, then truncate. Sanitize shuffles addresses,
so if there are lots of addresses in the address book, our address will
only be sent to some peers.
  • Loading branch information
teor2345 committed May 16, 2021
1 parent 72a25f9 commit dc69fa3
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 15 deletions.
19 changes: 14 additions & 5 deletions zebra-network/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use chrono::{DateTime, Utc};
use tracing::Span;

use crate::{constants, types::MetaAddr, PeerAddrState};
use crate::{constants, types::MetaAddr, Config, PeerAddrState};

/// A database of peer listener addresses, their advertised services, and
/// information on when they were last seen.
Expand Down Expand Up @@ -48,13 +48,16 @@ use crate::{constants, types::MetaAddr, PeerAddrState};
/// - the canonical address of any connection.
#[derive(Clone, Debug)]
pub struct AddressBook {
/// Each known peer address has a matching `MetaAddr`
/// Each known peer address has a matching `MetaAddr`.
by_addr: HashMap<SocketAddr, MetaAddr>,

/// The local listener address.
local_listener: SocketAddr,

/// The span for operations on this address book.
span: Span,

/// The last time we logged a message about the address metrics
/// The last time we logged a message about the address metrics.
last_address_log: Option<Instant>,
}

Expand Down Expand Up @@ -85,13 +88,14 @@ pub struct AddressMetrics {

#[allow(clippy::len_without_is_empty)]
impl AddressBook {
/// Construct an `AddressBook` with the given [`tracing::Span`].
pub fn new(span: Span) -> AddressBook {
/// Construct an `AddressBook` with the given `config` and [`tracing::Span`].
pub fn new(config: &Config, span: Span) -> AddressBook {
let constructor_span = span.clone();
let _guard = constructor_span.enter();

let mut new_book = AddressBook {
by_addr: HashMap::default(),
local_listener: config.listen_addr,
span,
last_address_log: None,
};
Expand All @@ -100,6 +104,11 @@ impl AddressBook {
new_book
}

/// Get the local listener address.
pub fn get_local_listener(&self) -> MetaAddr {
MetaAddr::new_local_listener(&self.local_listener)
}

/// Get the contents of `self` in random order with sanitized timestamps.
pub fn sanitized(&self) -> Vec<MetaAddr> {
use rand::seq::SliceRandom;
Expand Down
15 changes: 13 additions & 2 deletions zebra-network/src/meta_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ impl MetaAddr {
}
}

/// Create a new `MetaAddr` for our own listener address.
pub fn new_local_listener(addr: &SocketAddr) -> MetaAddr {
MetaAddr {
addr: *addr,
// TODO: create a "local services" constant
services: PeerServices::NODE_NETWORK,
last_seen: Utc::now(),
last_connection_state: Responded,
}
}

/// Create a new `MetaAddr` for a peer that has just had an error.
pub fn new_errored(addr: &SocketAddr, services: &PeerServices) -> MetaAddr {
MetaAddr {
Expand Down Expand Up @@ -250,8 +261,8 @@ impl MetaAddr {
let last_seen = Utc.timestamp(ts - ts.rem_euclid(interval), 0);
MetaAddr {
addr: self.addr,
// services are sanitized during parsing, so we don't need to make
// any changes here
// services are sanitized during parsing, or set to a fixed valued by
// new_local_listener, so we don't need to sanitize here
services: self.services,
last_seen,
// the state isn't sent to the remote peer, but sanitize it anyway
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send + 'static,
{
let (address_book, timestamp_collector) = TimestampCollector::spawn();
let (address_book, timestamp_collector) = TimestampCollector::spawn(&config);
let (inv_sender, inv_receiver) = broadcast::channel(100);

// Construct services that handle inbound handshakes and perform outbound
Expand Down
12 changes: 6 additions & 6 deletions zebra-network/src/timestamp_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use futures::{channel::mpsc, prelude::*};

use crate::{types::MetaAddr, AddressBook};
use crate::{types::MetaAddr, AddressBook, Config};

/// The timestamp collector hooks into incoming message streams for each peer and
/// records per-connection last-seen timestamps into an [`AddressBook`].
Expand All @@ -14,14 +14,14 @@ impl TimestampCollector {
/// Spawn a new [`TimestampCollector`] task, and return handles for the
/// transmission channel for timestamp events and for the [`AddressBook`] it
/// updates.
pub fn spawn() -> (Arc<std::sync::Mutex<AddressBook>>, mpsc::Sender<MetaAddr>) {
pub fn spawn(config: &Config) -> (Arc<std::sync::Mutex<AddressBook>>, mpsc::Sender<MetaAddr>) {
use tracing::Level;
const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100;
let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE);
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(span!(
Level::TRACE,
"timestamp collector"
))));
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(
config,
span!(Level::TRACE, "timestamp collector"),
)));
let worker_address_book = address_book.clone();

let worker = async move {
Expand Down
8 changes: 7 additions & 1 deletion zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,13 @@ impl Service<zn::Request> for Inbound {
// Briefly hold the address book threaded mutex while
// cloning the address book. Then sanitize after releasing
// the lock.
let peers = address_book.lock().unwrap().clone();
let mut peers = address_book.lock().unwrap().clone();

// Add our local listener address to the advertised peers
let local_listener = address_book.lock().unwrap().get_local_listener();
peers.update(local_listener);

// Send a sanitized response
let mut peers = peers.sanitized();
const MAX_ADDR: usize = 1000; // bitcoin protocol constant
peers.truncate(MAX_ADDR);
Expand Down

0 comments on commit dc69fa3

Please sign in to comment.