diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 61d3a63efc9..3c14a59bd02 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -26,6 +26,7 @@ use zebra_chain::{ use crate::{ constants, + meta_addr::MetaAddr, peer::{ error::AlreadyErrored, ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError, SharedPeerError, @@ -420,6 +421,15 @@ pub struct Connection { /// other state handling. pub(super) request_timer: Option>>, + /// A cached copy of the last unsolicited `addr` or `addrv2` message from this peer. + /// + /// When Zebra requests peers, the cache is consumed and returned as a synthetic response. + /// This works around `zcashd`'s address response rate-limit. + /// + /// Multi-peer `addr` or `addrv2` messages replace single-peer messages in the cache. + /// (`zcashd` also gossips its own address at regular intervals.) + pub(super) cached_addrs: Vec, + /// The `inbound` service, used to answer requests from this connection's peer. pub(super) svc: S, @@ -548,6 +558,52 @@ where } } } + + // Check whether the handler is finished before waiting for a response message, + // because the response might be `Nil` or synthetic. + State::AwaitingResponse { + handler: Handler::Finished(_), + ref span, + .. + } => { + // We have to get rid of the span reference so we can tamper with the state. + let span = span.clone(); + trace!( + parent: &span, + "returning completed response to client request" + ); + + // Replace the state with a temporary value, + // so we can take ownership of the response sender. + let tmp_state = std::mem::replace(&mut self.state, State::Failed); + + if let State::AwaitingResponse { + handler: Handler::Finished(response), + tx, + .. + } = tmp_state + { + if let Ok(response) = response.as_ref() { + debug!(%response, "finished receiving peer response to Zebra request"); + // Add a metric for inbound responses to outbound requests. + metrics::counter!( + "zebra.net.in.responses", + 1, + "command" => response.command(), + "addr" => self.metrics_label.clone(), + ); + } else { + debug!(error = ?response, "error in peer response to Zebra request"); + } + + let _ = tx.send(response.map_err(Into::into)); + } else { + unreachable!("already checked for AwaitingResponse"); + } + + self.state = State::AwaitingRequest; + } + // We're awaiting a response to a client request, // so wait on either a peer message, or on a request cancellation. State::AwaitingResponse { @@ -600,45 +656,6 @@ where self.update_state_metrics(None); - // Check whether the handler is finished processing messages, - // and update the state. - // (Some messages can indicate that a response has finished, - // even if the message wasn't consumed as a response or a request.) - // - // Replace the state with a temporary value, - // so we can take ownership of the response sender. - self.state = match std::mem::replace(&mut self.state, State::Failed) { - State::AwaitingResponse { - handler: Handler::Finished(response), - tx, - .. - } => { - if let Ok(response) = response.as_ref() { - debug!(%response, "finished receiving peer response to Zebra request"); - // Add a metric for inbound responses to outbound requests. - metrics::counter!( - "zebra.net.in.responses", - 1, - "command" => response.command(), - "addr" => self.metrics_label.clone(), - ); - } else { - debug!(error = ?response, "error in peer response to Zebra request"); - } - let _ = tx.send(response.map_err(Into::into)); - State::AwaitingRequest - } - pending @ State::AwaitingResponse { .. } => - pending - , - _ => unreachable!( - "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", - self.client_rx - ), - }; - - self.update_state_metrics(None); - // If the message was not consumed as a response, // check whether it can be handled as a request. let unused_msg = if let Some(request_msg) = request_msg { @@ -695,6 +712,7 @@ where } } } + // This connection has failed: stop the event loop, and complete the future. State::Failed => break, } @@ -723,7 +741,7 @@ where self.shutdown(error); } - /// Handle an incoming client request, possibly generating outgoing messages to the + /// Handle an internal client request, possibly generating outgoing messages to the /// remote peer. /// /// NOTE: the caller should use .instrument(msg.span) to instrument the function. @@ -772,6 +790,25 @@ where pending, self.client_rx ), + + // Consume the cached addresses from the peer, + // to work-around a `zcashd` response rate-limit + (AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => { + let cached_addrs = std::mem::take(&mut self.cached_addrs); + debug!( + addrs = cached_addrs.len(), + "responding to Peers request using cached addresses", + ); + + Ok(( + AwaitingResponse { + handler: Handler::Finished(Ok(Response::Peers(cached_addrs))), + tx, + span, + }, + None, + ))} + , (AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await { Ok(()) => Ok(( AwaitingResponse { @@ -783,6 +820,7 @@ where )), Err(e) => Err((e, tx)), }, + (AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await { Ok(()) => Ok(( AwaitingResponse { @@ -1012,8 +1050,23 @@ where } // Zebra crawls the network proactively, to prevent // peers from inserting data into our address book. - Message::Addr(_) => { - debug!(%msg, "ignoring unsolicited addr message"); + Message::Addr(ref addrs) => { + // Workaround `zcashd`'s `getaddr` response rate-limit + if addrs.len() > 1 { + // Always refresh the cache with multi-addr messages. + debug!(%msg, "caching unsolicited multi-addr message"); + self.cached_addrs = addrs.clone(); + } else if addrs.len() == 1 && self.cached_addrs.len() <= 1 { + // Only refresh a cached single addr message with another single addr. + // (`zcashd` regularly advertises its own address.) + debug!(%msg, "caching unsolicited single addr message"); + self.cached_addrs = addrs.clone(); + } else { + debug!( + %msg, + "ignoring unsolicited single addr message: already cached a multi-addr message" + ); + } None } Message::Tx(ref transaction) => Some(Request::PushTransaction(transaction.clone())), diff --git a/zebra-network/src/peer/connection/tests/vectors.rs b/zebra-network/src/peer/connection/tests/vectors.rs index b98b3e45c43..1964e91200a 100644 --- a/zebra-network/src/peer/connection/tests/vectors.rs +++ b/zebra-network/src/peer/connection/tests/vectors.rs @@ -43,6 +43,7 @@ async fn connection_run_loop_ok() { let connection = Connection { state: State::AwaitingRequest, request_timer: None, + cached_addrs: Vec::new(), svc: unused_inbound_service, client_rx: ClientRequestReceiver::from(client_rx), error_slot: shared_error_slot.clone(), @@ -103,6 +104,7 @@ async fn connection_run_loop_future_drop() { let connection = Connection { state: State::AwaitingRequest, request_timer: None, + cached_addrs: Vec::new(), svc: unused_inbound_service, client_rx: ClientRequestReceiver::from(client_rx), error_slot: shared_error_slot.clone(), @@ -152,6 +154,7 @@ async fn connection_run_loop_client_close() { let connection = Connection { state: State::AwaitingRequest, request_timer: None, + cached_addrs: Vec::new(), svc: unused_inbound_service, client_rx: ClientRequestReceiver::from(client_rx), error_slot: shared_error_slot.clone(), @@ -208,6 +211,7 @@ async fn connection_run_loop_client_drop() { let connection = Connection { state: State::AwaitingRequest, request_timer: None, + cached_addrs: Vec::new(), svc: unused_inbound_service, client_rx: ClientRequestReceiver::from(client_rx), error_slot: shared_error_slot.clone(), @@ -263,6 +267,7 @@ async fn connection_run_loop_inbound_close() { let connection = Connection { state: State::AwaitingRequest, request_timer: None, + cached_addrs: Vec::new(), svc: unused_inbound_service, client_rx: ClientRequestReceiver::from(client_rx), error_slot: shared_error_slot.clone(), @@ -319,6 +324,7 @@ async fn connection_run_loop_inbound_drop() { let connection = Connection { state: State::AwaitingRequest, request_timer: None, + cached_addrs: Vec::new(), svc: unused_inbound_service, client_rx: ClientRequestReceiver::from(client_rx), error_slot: shared_error_slot.clone(), @@ -379,6 +385,7 @@ async fn connection_run_loop_failed() { let connection = Connection { state: State::Failed, request_timer: None, + cached_addrs: Vec::new(), svc: unused_inbound_service, client_rx: ClientRequestReceiver::from(client_rx), error_slot: shared_error_slot.clone(), diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 512e1d8d5a2..0f8e9d20dfa 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -915,6 +915,7 @@ where let server = Connection { state: connection::State::AwaitingRequest, request_timer: None, + cached_addrs: Vec::new(), svc: inbound_service, client_rx: server_rx.into(), error_slot: error_slot.clone(),