Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock between the crawler and dialer, and other hangs #1950

Merged
merged 7 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(60 + 20 + 20 + 20);
/// connected peer.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);

/// The number of GetAddr requests sent when crawling for new peers.
///
/// ## SECURITY
///
/// The fanout should be greater than 1, to ensure that Zebra's address book is
/// not dominated by a single peer.
pub const GET_ADDR_FANOUT: usize = 2;

/// Truncate timestamps in outbound address messages to this time interval.
///
/// This is intended to prevent a peer from learning exactly when we received
Expand Down
9 changes: 9 additions & 0 deletions zebra-network/src/peer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,13 @@ pub enum HandshakeError {
/// The remote peer offered a version older than our minimum version.
#[error("Peer offered obsolete version: {0:?}")]
ObsoleteVersion(crate::protocol::external::types::Version),
/// Sending or receiving a message timed out.
#[error("Timeout when sending or receiving a message to peer")]
Timeout,
}

impl From<tokio::time::error::Elapsed> for HandshakeError {
fn from(_source: tokio::time::error::Elapsed) -> Self {
HandshakeError::Timeout
}
}
162 changes: 124 additions & 38 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use tokio::{net::TcpStream, sync::broadcast};
use tokio::{net::TcpStream, sync::broadcast, time::timeout};
use tokio_util::codec::Framed;
use tower::Service;
use tracing::{span, Level, Span};
Expand All @@ -34,6 +34,12 @@ use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError};

/// A [`Service`] that handshakes with a remote peer and constructs a
/// client/server pair.
///
/// CORRECTNESS
///
/// To avoid hangs, each handshake (or its connector) should be:
/// - launched in a separate task, and
/// - wrapped in a timeout.
#[derive(Clone)]
pub struct Handshake<S> {
config: Config,
Expand Down Expand Up @@ -211,6 +217,10 @@ where
let fut = async move {
debug!("connecting to remote peer");

// CORRECTNESS
//
// As a defence-in-depth against hangs, every send or next on stream
// should be wrapped in a timeout.
let mut stream = Framed::new(
tcp_stream,
Codec::builder()
Expand Down Expand Up @@ -260,11 +270,10 @@ where
};

debug!(?version, "sending initial version message");
stream.send(version).await?;
timeout(constants::REQUEST_TIMEOUT, stream.send(version)).await??;

let remote_msg = stream
.next()
.await
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;

// Check that we got a Version and destructure its fields into the local scope.
Expand Down Expand Up @@ -293,11 +302,10 @@ where
return Err(HandshakeError::NonceReuse);
}

stream.send(Message::Verack).await?;
timeout(constants::REQUEST_TIMEOUT, stream.send(Message::Verack)).await??;

let remote_msg = stream
.next()
.await
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;
if let Message::Verack = remote_msg {
debug!("got verack from remote peer");
Expand Down Expand Up @@ -376,22 +384,44 @@ where
future::ready(Ok(msg))
});

// CORRECTNESS
//
// Every message and error must update the peer address state via
// the inbound_ts_collector.
let inbound_ts_collector = timestamp_collector.clone();
let peer_rx = peer_rx
.then(move |msg| {
// Add a metric for inbound messages and fire a timestamp event.
let mut timestamp_collector = timestamp_collector.clone();
// Add a metric for inbound messages and errors.
// Fire a timestamp or failure event.
let mut inbound_ts_collector = inbound_ts_collector.clone();
async move {
if let Ok(msg) = &msg {
metrics::counter!(
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
);
use futures::sink::SinkExt;
let _ = timestamp_collector
.send(MetaAddr::new_responded(&addr, &remote_services))
.await;
match &msg {
Ok(msg) => {
metrics::counter!(
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
);
use futures::sink::SinkExt;
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = inbound_ts_collector
.send(MetaAddr::new_responded(&addr, &remote_services))
.await;
}
Err(err) => {
metrics::counter!(
"zebra.net.in.errors",
1,
"error" => err.to_string(),
"addr" => addr.to_string(),
);
use futures::sink::SinkExt;
let _ = inbound_ts_collector
.send(MetaAddr::new_errored(&addr, &remote_services))
.await;
}
}
msg
}
Expand Down Expand Up @@ -452,17 +482,35 @@ where
.boxed(),
);

// CORRECTNESS
//
// To prevent hangs:
// - every await that depends on the network must have a timeout (or interval)
// - every error/shutdown must update the address book state and return
//
// The address book state can be updated via `ClientRequest.tx`, or the
// timestamp_collector.
//
// Returning from the spawned closure terminates the connection's heartbeat task.
let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat");
tokio::spawn(
async move {
use super::ClientRequest;
use futures::future::Either;
use futures::sink::SinkExt;

let mut shutdown_rx = shutdown_rx;
let mut server_tx = server_tx;
let mut timestamp_collector = timestamp_collector.clone();
let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL);
loop {
let shutdown_rx_ref = Pin::new(&mut shutdown_rx);
let mut send_addr_err = false;

// Currently, select prefers the first future.
// There is no starvation risk here, because
// interval has a limited rate, and shutdown
// is a oneshot.
match future::select(interval_stream.next(), shutdown_rx_ref).await {
Either::Left(_) => {
let (tx, rx) = oneshot::channel();
Expand All @@ -474,19 +522,28 @@ where
span: tracing::Span::current(),
}) {
Ok(()) => {
match server_tx.flush().await {
Ok(()) => {}
// TODO: also wait on the shutdown_rx here
match timeout(
constants::HEARTBEAT_INTERVAL,
server_tx.flush(),
)
.await
{
Ok(Ok(())) => {
}
Ok(Err(e)) => {
tracing::warn!(
?e,
"flushing client request failed, shutting down"
);
send_addr_err = true;
}
Err(e) => {
// We can't get the client request for this failure,
// so we can't send an error back here. But that's ok,
// because:
// - this error never happens (or it's very rare)
// - if the flush() fails, the server hasn't
// received the request
tracing::warn!(
"flushing client request failed: {:?}",
e
?e,
"flushing client request timed out, shutting down"
);
send_addr_err = true;
}
}
}
Expand Down Expand Up @@ -514,17 +571,46 @@ where
// Heartbeats are checked internally to the
// connection logic, but we need to wait on the
// response to avoid canceling the request.
match rx.await {
Ok(_) => tracing::trace!("got heartbeat response"),
Err(_) => {
tracing::trace!(
//
// TODO: also wait on the shutdown_rx here
match timeout(constants::HEARTBEAT_INTERVAL, rx).await {
Ok(Ok(_)) => tracing::trace!("got heartbeat response"),
Ok(Err(e)) => {
tracing::warn!(
?e,
"error awaiting heartbeat response, shutting down"
);
return;
send_addr_err = true;
}
Err(e) => {
tracing::warn!(
?e,
"heartbeat response timed out, shutting down"
);
send_addr_err = true;
}
}
}
Either::Right(_) => return, // got shutdown signal
Either::Right(_) => {
tracing::trace!("shutting down due to Client shut down");
// awaiting a local task won't hang
let _ = timestamp_collector
.send(MetaAddr::new_shutdown(&addr, &remote_services))
.await;
return;
}
}
if send_addr_err {
// We can't get the client request for this failure,
// so we can't send an error back on `tx`. So
// we just update the address book with a failure.
let _ = timestamp_collector
.send(MetaAddr::new_errored(
&addr,
&remote_services,
))
.await;
return;
}
}
}
Expand Down
Loading