Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock between the crawler and dialer, and other hangs #1950

Merged
merged 7 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(60 + 20 + 20 + 20);
/// connected peer.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);

/// The number of GetAddr requests sent when crawling for new peers.
///
/// ## SECURITY
///
/// The fanout should be greater than 1, to ensure that Zebra's address book is
/// not dominated by a single peer.
pub const GET_ADDR_FANOUT: usize = 2;

/// Truncate timestamps in outbound address messages to this time interval.
///
/// This is intended to prevent a peer from learning exactly when we received
Expand Down
9 changes: 9 additions & 0 deletions zebra-network/src/peer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,13 @@ pub enum HandshakeError {
/// The remote peer offered a version older than our minimum version.
#[error("Peer offered obsolete version: {0:?}")]
ObsoleteVersion(crate::protocol::external::types::Version),
/// Sending or receiving a message timed out.
#[error("Timeout when sending or receiving a message to peer")]
Timeout,
}

impl From<tokio::time::error::Elapsed> for HandshakeError {
fn from(_source: tokio::time::error::Elapsed) -> Self {
HandshakeError::Timeout
}
}
159 changes: 121 additions & 38 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use tokio::{net::TcpStream, sync::broadcast};
use tokio::{net::TcpStream, sync::broadcast, time::timeout};
use tokio_util::codec::Framed;
use tower::Service;
use tracing::{span, Level, Span};
Expand All @@ -34,6 +34,12 @@ use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError};

/// A [`Service`] that handshakes with a remote peer and constructs a
/// client/server pair.
///
/// CORRECTNESS
///
/// To avoid hangs, each handshake (or its connector) should be:
/// - launched in a separate task, and
/// - wrapped in a timeout.
#[derive(Clone)]
pub struct Handshake<S> {
config: Config,
Expand Down Expand Up @@ -211,6 +217,10 @@ where
let fut = async move {
debug!("connecting to remote peer");

// CORRECTNESS
//
// As a defence-in-depth against hangs, every send or next on stream
// should be wrapped in a timeout.
let mut stream = Framed::new(
tcp_stream,
Codec::builder()
Expand Down Expand Up @@ -260,11 +270,10 @@ where
};

debug!(?version, "sending initial version message");
stream.send(version).await?;
timeout(constants::REQUEST_TIMEOUT, stream.send(version)).await??;

let remote_msg = stream
.next()
.await
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;

// Check that we got a Version and destructure its fields into the local scope.
Expand Down Expand Up @@ -293,11 +302,10 @@ where
return Err(HandshakeError::NonceReuse);
}

stream.send(Message::Verack).await?;
timeout(constants::REQUEST_TIMEOUT, stream.send(Message::Verack)).await??;

let remote_msg = stream
.next()
.await
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;
if let Message::Verack = remote_msg {
debug!("got verack from remote peer");
Expand Down Expand Up @@ -376,22 +384,42 @@ where
future::ready(Ok(msg))
});

// CORRECTNESS
//
// Every message and error must update the peer address state via
// the inbound_ts_collector.
let inbound_ts_collector = timestamp_collector.clone();
let peer_rx = peer_rx
.then(move |msg| {
// Add a metric for inbound messages and fire a timestamp event.
let mut timestamp_collector = timestamp_collector.clone();
// Add a metric for inbound messages and errors.
// Fire a timestamp or failure event.
let mut inbound_ts_collector = inbound_ts_collector.clone();
async move {
if let Ok(msg) = &msg {
metrics::counter!(
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
);
use futures::sink::SinkExt;
let _ = timestamp_collector
.send(MetaAddr::new_responded(&addr, &remote_services))
.await;
match &msg {
Ok(msg) => {
metrics::counter!(
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
);
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = inbound_ts_collector
.send(MetaAddr::new_responded(&addr, &remote_services))
.await;
}
Err(err) => {
metrics::counter!(
"zebra.net.in.errors",
1,
"error" => err.to_string(),
"addr" => addr.to_string(),
);
let _ = inbound_ts_collector
.send(MetaAddr::new_errored(&addr, &remote_services))
.await;
}
}
msg
}
Expand Down Expand Up @@ -452,6 +480,16 @@ where
.boxed(),
);

// CORRECTNESS
//
// To prevent hangs:
// - every await that depends on the network must have a timeout (or interval)
// - every error/shutdown must update the address book state and return
//
// The address book state can be updated via `ClientRequest.tx`, or the
// timestamp_collector.
//
// Returning from the spawned closure terminates the connection's heartbeat task.
let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat");
tokio::spawn(
async move {
Expand All @@ -460,9 +498,16 @@ where

let mut shutdown_rx = shutdown_rx;
let mut server_tx = server_tx;
let mut timestamp_collector = timestamp_collector.clone();
let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL);
loop {
let shutdown_rx_ref = Pin::new(&mut shutdown_rx);
let mut send_addr_err = false;

// Currently, select prefers the first future.
// There is no starvation risk here, because
// interval has a limited rate, and shutdown
// is a oneshot.
match future::select(interval_stream.next(), shutdown_rx_ref).await {
Either::Left(_) => {
let (tx, rx) = oneshot::channel();
Expand All @@ -474,19 +519,28 @@ where
span: tracing::Span::current(),
}) {
Ok(()) => {
match server_tx.flush().await {
Ok(()) => {}
// TODO: also wait on the shutdown_rx here
match timeout(
constants::HEARTBEAT_INTERVAL,
server_tx.flush(),
)
.await
{
Ok(Ok(())) => {
}
Ok(Err(e)) => {
tracing::warn!(
?e,
"flushing client request failed, shutting down"
);
send_addr_err = true;
}
Err(e) => {
// We can't get the client request for this failure,
// so we can't send an error back here. But that's ok,
// because:
// - this error never happens (or it's very rare)
// - if the flush() fails, the server hasn't
// received the request
tracing::warn!(
"flushing client request failed: {:?}",
e
?e,
"flushing client request timed out, shutting down"
);
send_addr_err = true;
}
}
}
Expand Down Expand Up @@ -514,17 +568,46 @@ where
// Heartbeats are checked internally to the
// connection logic, but we need to wait on the
// response to avoid canceling the request.
match rx.await {
Ok(_) => tracing::trace!("got heartbeat response"),
Err(_) => {
tracing::trace!(
//
// TODO: also wait on the shutdown_rx here
match timeout(constants::HEARTBEAT_INTERVAL, rx).await {
Ok(Ok(_)) => tracing::trace!("got heartbeat response"),
Ok(Err(e)) => {
tracing::warn!(
?e,
"error awaiting heartbeat response, shutting down"
);
return;
send_addr_err = true;
}
Err(e) => {
tracing::warn!(
?e,
"heartbeat response timed out, shutting down"
);
send_addr_err = true;
}
}
}
Either::Right(_) => return, // got shutdown signal
Either::Right(_) => {
tracing::trace!("shutting down due to Client shut down");
// awaiting a local task won't hang
let _ = timestamp_collector
.send(MetaAddr::new_shutdown(&addr, &remote_services))
.await;
return;
}
}
if send_addr_err {
// We can't get the client request for this failure,
// so we can't send an error back on `tx`. So
// we just update the address book with a failure.
let _ = timestamp_collector
.send(MetaAddr::new_errored(
&addr,
&remote_services,
))
.await;
return;
}
}
}
Expand Down
88 changes: 63 additions & 25 deletions zebra-network/src/peer_set/candidate_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use std::{
};

use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, sleep_until, Sleep};
use tokio::time::{sleep, sleep_until, timeout, Sleep};
use tower::{Service, ServiceExt};

use crate::{types::MetaAddr, AddressBook, BoxError, Request, Response};
use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response};

/// The `CandidateSet` manages the `PeerSet`'s peer reconnection attempts.
///
Expand Down Expand Up @@ -140,6 +140,9 @@ where
///
/// ## Correctness
///
/// The crawler exits when update returns an error, so it must only return
/// errors on permanent failures.
///
/// The handshaker sets up the peer message receiver so it also sends a
/// `Responded` peer address update.
///
Expand All @@ -150,37 +153,62 @@ where
// Opportunistically crawl the network on every update call to ensure
// we're actively fetching peers. Continue independently of whether we
// actually receive any peers, but always ask the network for more.
//
// Because requests are load-balanced across existing peers, we can make
// multiple requests concurrently, which will be randomly assigned to
// existing peers, but we don't make too many because update may be
// called while the peer set is already loaded.
let mut responses = FuturesUnordered::new();
trace!("sending GetPeers requests");
// Yes this loops only once (for now), until we add fanout back.
for _ in 0..1usize {
self.peer_service.ready_and().await?;
responses.push(self.peer_service.call(Request::Peers));
for _ in 0..constants::GET_ADDR_FANOUT {
// CORRECTNESS
//
// avoid deadlocks when there are no connected peers, and:
// - we're waiting on a handshake to complete so there are peers, or
// - another task that handles or adds peers is waiting on this task to complete.
let peer_service =
match timeout(constants::REQUEST_TIMEOUT, self.peer_service.ready_and()).await {
// update must only return an error for permanent failures
Err(temporary_error) => {
info!(
?temporary_error,
"timeout waiting for the peer service to become ready"
);
return Ok(());
}
Ok(Err(permanent_error)) => Err(permanent_error)?,
Ok(Ok(peer_service)) => peer_service,
};
responses.push(peer_service.call(Request::Peers));
}
while let Some(rsp) = responses.next().await {
if let Ok(Response::Peers(rsp_addrs)) = rsp {
// Filter new addresses to ensure that gossiped addresses are actually new
let peer_set = &self.peer_set;
let new_addrs = rsp_addrs
.iter()
.filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr))
.collect::<Vec<_>>();
trace!(
?rsp_addrs,
new_addr_count = ?new_addrs.len(),
"got response to GetPeers"
);
// New addresses are deserialized in the `NeverAttempted` state
peer_set
.lock()
.unwrap()
.extend(new_addrs.into_iter().cloned());
} else {
trace!("got error in GetPeers request");
match rsp {
Ok(Response::Peers(rsp_addrs)) => {
// Filter new addresses to ensure that gossiped addresses are actually new
let peer_set = &self.peer_set;
// TODO: reduce mutex contention by moving the filtering into
// the address book itself
let new_addrs = rsp_addrs
.iter()
.filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr))
.collect::<Vec<_>>();
trace!(
?rsp_addrs,
new_addr_count = ?new_addrs.len(),
"got response to GetPeers"
);
// New addresses are deserialized in the `NeverAttempted` state
peer_set
.lock()
.unwrap()
.extend(new_addrs.into_iter().cloned());
}
Err(e) => {
// since we do a fanout, and new updates are triggered by
// each demand, we can ignore errors in individual responses
trace!(?e, "got error in GetPeers request");
}
Ok(_) => unreachable!("Peers requests always return Peers responses"),
}
}

Expand Down Expand Up @@ -214,6 +242,16 @@ where
let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL);
mem::swap(&mut self.next_peer_min_wait, &mut sleep);

// CORRECTNESS
//
// In this critical section, we hold the address mutex.
//
// To avoid deadlocks, the critical section:
// - must not acquire any other locks
// - must not await any futures
//
// To avoid hangs, any computation in the critical section should
// be kept to a minimum.
let reconnect = {
let mut peer_set_guard = self.peer_set.lock().unwrap();
// It's okay to early return here because we're returning None
Expand Down
Loading