Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deadlocks in the address book mutex #3244

Merged
merged 15 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 67 additions & 3 deletions zebra-network/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{cmp::Reverse, iter::Extend, net::SocketAddr, time::Instant};

use chrono::Utc;
use ordered_map::OrderedMap;
use tokio::sync::watch;
use tracing::Span;

use crate::{
Expand Down Expand Up @@ -48,7 +49,7 @@ mod tests;
/// Updates must not be based on:
/// - the remote addresses of inbound connections, or
/// - the canonical address of any connection.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct AddressBook {
/// Peer listener addresses, suitable for outbound connections,
/// in connection attempt order.
Expand All @@ -71,12 +72,15 @@ pub struct AddressBook {
/// The span for operations on this address book.
span: Span,

/// A channel used to send the latest address book metrics.
address_metrics_tx: watch::Sender<AddressMetrics>,

/// The last time we logged a message about the address metrics.
last_address_log: Option<Instant>,
}

/// Metrics about the states of the addresses in an [`AddressBook`].
#[derive(Debug)]
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct AddressMetrics {
/// The number of addresses in the `Responded` state.
responded: usize,
Expand Down Expand Up @@ -111,11 +115,16 @@ impl AddressBook {
let instant_now = Instant::now();
let chrono_now = Utc::now();

// The default value is correct for an empty address book,
// and it gets replaced by `update_metrics` anyway.
let (address_metrics_tx, _address_metrics_rx) = watch::channel(AddressMetrics::default());

let mut new_book = AddressBook {
by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)),
addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK,
local_listener: canonical_socket_addr(local_listener),
span,
address_metrics_tx,
last_address_log: None,
};

Expand Down Expand Up @@ -169,6 +178,17 @@ impl AddressBook {
new_book
}

/// Return a watch channel for the address book metrics.
///
/// The metrics in the watch channel are only updated when the address book updates,
/// so they can be significantly outdated if Zebra is disconnected or hung.
///
/// The current metrics value is marked as seen.
/// So `Receiver::changed` will only return after the next address book update.
pub fn address_metrics_watcher(&self) -> watch::Receiver<AddressMetrics> {
self.address_metrics_tx.subscribe()
}

/// Get the local listener address.
///
/// This address contains minimal state, but it is not sanitized.
Expand Down Expand Up @@ -422,7 +442,25 @@ impl AddressBook {
}

/// Returns metrics for the addresses in this address book.
/// Only for use in tests.
///
/// # Correctness
///
/// Use [`AddressBook::address_metrics_watcher().borrow()`] in production code,
/// to avoid deadlocks.
#[cfg(test)]
pub fn address_metrics(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
self.address_metrics_internal(now)
}

/// Returns metrics for the addresses in this address book.
///
/// # Correctness
///
/// External callers should use [`AddressBook::address_metrics_watcher().borrow()`]
/// in production code, to avoid deadlocks.
/// (Using the watch channel receiver does not lock the address book mutex.)
fn address_metrics_internal(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
let responded = self.state_peers(PeerAddrState::Responded).count();
let never_attempted_gossiped = self
.state_peers(PeerAddrState::NeverAttemptedGossiped)
Expand Down Expand Up @@ -453,7 +491,10 @@ impl AddressBook {
fn update_metrics(&mut self, instant_now: Instant, chrono_now: chrono::DateTime<Utc>) {
let _guard = self.span.enter();

let m = self.address_metrics(chrono_now);
let m = self.address_metrics_internal(chrono_now);

// Ignore errors: we don't care if any receivers are listening.
let _ = self.address_metrics_tx.send(m);

// TODO: rename to address_book.[state_name]
metrics::gauge!("candidate_set.responded", m.responded as f64);
Expand Down Expand Up @@ -536,3 +577,26 @@ impl Extend<MetaAddrChange> for AddressBook {
}
}
}

impl Clone for AddressBook {
/// Clone the addresses, address limit, local listener address, and span.
///
/// Cloned address books have a separate metrics struct watch channel, and an empty last address log.
///
/// All address books update the same prometheus metrics.
fn clone(&self) -> AddressBook {
// The existing metrics might be outdated, but we avoid calling `update_metrics`,
// so we don't overwrite the prometheus metrics from the main address book.
let (address_metrics_tx, _address_metrics_rx) =
watch::channel(*self.address_metrics_tx.borrow());

AddressBook {
by_addr: self.by_addr.clone(),
addr_limit: self.addr_limit,
local_listener: self.local_listener,
span: self.span.clone(),
address_metrics_tx,
last_address_log: None,
}
}
}
43 changes: 29 additions & 14 deletions zebra-network/src/address_book_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

use std::{net::SocketAddr, sync::Arc};

use futures::{channel::mpsc, prelude::*};
use thiserror::Error;
use tokio::task::JoinHandle;
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
};

use crate::{meta_addr::MetaAddrChange, AddressBook, BoxError, Config};
use crate::{
address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
};

/// The `AddressBookUpdater` hooks into incoming message streams for each peer
/// and lets the owner of the sender handle update the address book. For
Expand All @@ -24,15 +28,19 @@ impl AddressBookUpdater {
/// configured with Zebra's actual `local_listener` address.
///
/// Returns handles for:
/// - the address book,
/// - the transmission channel for address book update events,
/// - the address book, and
/// - the address book updater task.
/// - a watch channel for address book metrics, and
/// - the address book updater task join handle.
///
/// The task exits with an error when the returned [`mpsc::Sender`] is closed.
pub fn spawn(
config: &Config,
local_listener: SocketAddr,
) -> (
Arc<std::sync::Mutex<AddressBook>>,
mpsc::Sender<MetaAddrChange>,
watch::Receiver<AddressMetrics>,
JoinHandle<Result<(), BoxError>>,
) {
use tracing::Level;
Expand All @@ -41,14 +49,14 @@ impl AddressBookUpdater {
// based on the maximum number of inbound and outbound peers.
let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit());

let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(
local_listener,
span!(Level::TRACE, "address book updater"),
)));
let worker_address_book = address_book.clone();
let address_book =
AddressBook::new(local_listener, span!(Level::TRACE, "address book updater"));
let address_metrics = address_book.address_metrics_watcher();
let address_book = Arc::new(std::sync::Mutex::new(address_book));

let worker = async move {
while let Some(event) = worker_rx.next().await {
let worker_address_book = address_book.clone();
let worker = move || {
while let Some(event) = worker_rx.blocking_recv() {
// # Correctness
//
// Briefly hold the address book threaded mutex, to update the
Expand All @@ -62,8 +70,15 @@ impl AddressBookUpdater {
Err(AllAddressBookUpdaterSendersClosed.into())
};

let address_book_updater_task_handle = tokio::spawn(worker.boxed());
// Correctness: spawn address book accesses on a blocking thread,
// to avoid deadlocks (see #1976)
let address_book_updater_task_handle = tokio::task::spawn_blocking(worker);

(address_book, worker_tx, address_book_updater_task_handle)
(
address_book,
worker_tx,
address_metrics,
address_book_updater_task_handle,
)
}
}
27 changes: 13 additions & 14 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use std::{
};

use chrono::{TimeZone, Utc};
use futures::{
channel::{mpsc, oneshot},
future, FutureExt, SinkExt, StreamExt,
};
use futures::{channel::oneshot, future, FutureExt, SinkExt, StreamExt};
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
use tokio_stream::wrappers::IntervalStream;
use tokio_util::codec::Framed;
Expand Down Expand Up @@ -54,7 +51,7 @@ use crate::{
pub struct Handshake<S, C = NoChainTip> {
config: Config,
inbound_service: S,
address_book_updater: mpsc::Sender<MetaAddrChange>,
address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
user_agent: String,
Expand Down Expand Up @@ -306,7 +303,7 @@ impl fmt::Debug for ConnectedAddr {
pub struct Builder<S, C = NoChainTip> {
config: Option<Config>,
inbound_service: Option<S>,
address_book_updater: Option<mpsc::Sender<MetaAddrChange>>,
address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
our_services: Option<PeerServices>,
user_agent: Option<String>,
relay: Option<bool>,
Expand Down Expand Up @@ -350,7 +347,7 @@ where
/// make outbound connections to peers.
pub fn with_address_book_updater(
mut self,
address_book_updater: mpsc::Sender<MetaAddrChange>,
address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
) -> Self {
self.address_book_updater = Some(address_book_updater);
self
Expand Down Expand Up @@ -415,7 +412,7 @@ where
let address_book_updater = self.address_book_updater.unwrap_or_else(|| {
// No `AddressBookUpdater` for timestamp collection was passed, so create a stub
// channel. Dropping the receiver means sends will fail, but we don't care.
let (tx, _rx) = mpsc::channel(1);
let (tx, _rx) = tokio::sync::mpsc::channel(1);
tx
});
let nonces = Arc::new(futures::lock::Mutex::new(HashSet::new()));
Expand Down Expand Up @@ -713,7 +710,7 @@ where
// Clone these upfront, so they can be moved into the future.
let nonces = self.nonces.clone();
let inbound_service = self.inbound_service.clone();
let mut address_book_updater = self.address_book_updater.clone();
let address_book_updater = self.address_book_updater.clone();
let inv_collector = self.inv_collector.clone();
let config = self.config.clone();
let user_agent = self.user_agent.clone();
Expand Down Expand Up @@ -787,7 +784,7 @@ where

// These channels should not be cloned more than they are
// in this block, see constants.rs for more.
let (server_tx, server_rx) = mpsc::channel(0);
let (server_tx, server_rx) = futures::channel::mpsc::channel(0);
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();

Expand Down Expand Up @@ -831,7 +828,7 @@ where
.then(move |msg| {
// Add a metric for inbound messages and errors.
// Fire a timestamp or failure event.
let mut inbound_ts_collector = inbound_ts_collector.clone();
let inbound_ts_collector = inbound_ts_collector.clone();
let span =
debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
async move {
Expand Down Expand Up @@ -1018,7 +1015,9 @@ where
}

/// Send one heartbeat using `server_tx`.
async fn send_one_heartbeat(server_tx: &mut mpsc::Sender<ClientRequest>) -> Result<(), BoxError> {
async fn send_one_heartbeat(
server_tx: &mut futures::channel::mpsc::Sender<ClientRequest>,
) -> Result<(), BoxError> {
// We just reached a heartbeat interval, so start sending
// a heartbeat.
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -1065,7 +1064,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender<ClientRequest>) -> Resu
/// `handle_heartbeat_error`.
async fn heartbeat_timeout<F, T>(
fut: F,
address_book_updater: &mut mpsc::Sender<MetaAddrChange>,
address_book_updater: &mut tokio::sync::mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, BoxError>
Expand Down Expand Up @@ -1099,7 +1098,7 @@ where
/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.
async fn handle_heartbeat_error<T, E>(
result: Result<T, E>,
address_book_updater: &mut mpsc::Sender<MetaAddrChange>,
address_book_updater: &mut tokio::sync::mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, E>
Expand Down
Loading