Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2. Cache unsolicited address messages, and use them as responses #3294

Merged
merged 1 commit into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 95 additions & 42 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use zebra_chain::{

use crate::{
constants,
meta_addr::MetaAddr,
peer::{
error::AlreadyErrored, ClientRequestReceiver, ErrorSlot, InProgressClientRequest,
MustUseOneshotSender, PeerError, SharedPeerError,
Expand Down Expand Up @@ -420,6 +421,15 @@ pub struct Connection<S, Tx> {
/// other state handling.
pub(super) request_timer: Option<Pin<Box<Sleep>>>,

/// A cached copy of the last unsolicited `addr` or `addrv2` message from this peer.
///
/// When Zebra requests peers, the cache is consumed and returned as a synthetic response.
/// This works around `zcashd`'s address response rate-limit.
///
/// Multi-peer `addr` or `addrv2` messages replace single-peer messages in the cache.
/// (`zcashd` also gossips its own address at regular intervals.)
pub(super) cached_addrs: Vec<MetaAddr>,

/// The `inbound` service, used to answer requests from this connection's peer.
pub(super) svc: S,

Expand Down Expand Up @@ -548,6 +558,52 @@ where
}
}
}

// Check whether the handler is finished before waiting for a response message,
// because the response might be `Nil` or synthetic.
State::AwaitingResponse {
handler: Handler::Finished(_),
ref span,
..
} => {
// We have to get rid of the span reference so we can tamper with the state.
let span = span.clone();
trace!(
parent: &span,
"returning completed response to client request"
);

// Replace the state with a temporary value,
// so we can take ownership of the response sender.
let tmp_state = std::mem::replace(&mut self.state, State::Failed);

if let State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} = tmp_state
{
if let Ok(response) = response.as_ref() {
debug!(%response, "finished receiving peer response to Zebra request");
// Add a metric for inbound responses to outbound requests.
metrics::counter!(
"zebra.net.in.responses",
1,
"command" => response.command(),
"addr" => self.metrics_label.clone(),
);
} else {
debug!(error = ?response, "error in peer response to Zebra request");
}

let _ = tx.send(response.map_err(Into::into));
} else {
unreachable!("already checked for AwaitingResponse");
}

self.state = State::AwaitingRequest;
}

// We're awaiting a response to a client request,
// so wait on either a peer message, or on a request cancellation.
State::AwaitingResponse {
Expand Down Expand Up @@ -600,45 +656,6 @@ where

self.update_state_metrics(None);

// Check whether the handler is finished processing messages,
// and update the state.
// (Some messages can indicate that a response has finished,
// even if the message wasn't consumed as a response or a request.)
//
// Replace the state with a temporary value,
// so we can take ownership of the response sender.
self.state = match std::mem::replace(&mut self.state, State::Failed) {
State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} => {
if let Ok(response) = response.as_ref() {
debug!(%response, "finished receiving peer response to Zebra request");
// Add a metric for inbound responses to outbound requests.
metrics::counter!(
"zebra.net.in.responses",
1,
"command" => response.command(),
"addr" => self.metrics_label.clone(),
);
} else {
debug!(error = ?response, "error in peer response to Zebra request");
}
let _ = tx.send(response.map_err(Into::into));
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } =>
pending
,
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};

self.update_state_metrics(None);

// If the message was not consumed as a response,
// check whether it can be handled as a request.
let unused_msg = if let Some(request_msg) = request_msg {
Expand Down Expand Up @@ -695,6 +712,7 @@ where
}
}
}

// This connection has failed: stop the event loop, and complete the future.
State::Failed => break,
}
Expand Down Expand Up @@ -723,7 +741,7 @@ where
self.shutdown(error);
}

/// Handle an incoming client request, possibly generating outgoing messages to the
/// Handle an internal client request, possibly generating outgoing messages to the
/// remote peer.
///
/// NOTE: the caller should use .instrument(msg.span) to instrument the function.
Expand Down Expand Up @@ -772,6 +790,25 @@ where
pending,
self.client_rx
),

// Consume the cached addresses from the peer,
// to work-around a `zcashd` response rate-limit
(AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
let cached_addrs = std::mem::take(&mut self.cached_addrs);
debug!(
addrs = cached_addrs.len(),
"responding to Peers request using cached addresses",
);

Ok((
AwaitingResponse {
handler: Handler::Finished(Ok(Response::Peers(cached_addrs))),
tx,
span,
},
None,
))}
,
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Ok((
AwaitingResponse {
Expand All @@ -783,6 +820,7 @@ where
)),
Err(e) => Err((e, tx)),
},

(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
Ok(()) => Ok((
AwaitingResponse {
Expand Down Expand Up @@ -1012,8 +1050,23 @@ where
}
// Zebra crawls the network proactively, to prevent
// peers from inserting data into our address book.
Message::Addr(_) => {
debug!(%msg, "ignoring unsolicited addr message");
Message::Addr(ref addrs) => {
// Workaround `zcashd`'s `getaddr` response rate-limit
if addrs.len() > 1 {
// Always refresh the cache with multi-addr messages.
debug!(%msg, "caching unsolicited multi-addr message");
self.cached_addrs = addrs.clone();
} else if addrs.len() == 1 && self.cached_addrs.len() <= 1 {
// Only refresh a cached single addr message with another single addr.
// (`zcashd` regularly advertises its own address.)
debug!(%msg, "caching unsolicited single addr message");
self.cached_addrs = addrs.clone();
} else {
debug!(
%msg,
"ignoring unsolicited single addr message: already cached a multi-addr message"
);
}
None
}
Message::Tx(ref transaction) => Some(Request::PushTransaction(transaction.clone())),
Expand Down
7 changes: 7 additions & 0 deletions zebra-network/src/peer/connection/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async fn connection_run_loop_ok() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -103,6 +104,7 @@ async fn connection_run_loop_future_drop() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -152,6 +154,7 @@ async fn connection_run_loop_client_close() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -208,6 +211,7 @@ async fn connection_run_loop_client_drop() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -263,6 +267,7 @@ async fn connection_run_loop_inbound_close() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -319,6 +324,7 @@ async fn connection_run_loop_inbound_drop() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down Expand Up @@ -379,6 +385,7 @@ async fn connection_run_loop_failed() {
let connection = Connection {
state: State::Failed,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
Expand Down
1 change: 1 addition & 0 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ where
let server = Connection {
state: connection::State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: inbound_service,
client_rx: server_rx.into(),
error_slot: error_slot.clone(),
Expand Down