Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3. Cleanup internal network request handler, fix unused request logging #3295

Merged
merged 4 commits into from
Jan 6, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 92 additions & 146 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,7 @@ where
);
self.update_state_metrics(format!("Out::Req::{}", request.command()));

// These matches return a Result with (new_state, Option<Sender>) or an (error, Sender)
let new_state_result = match (&self.state, request) {
let new_handler = match (&self.state, request) {
(Failed, request) => panic!(
"failed connection cannot handle new request: {:?}, client_receiver: {:?}",
request,
Expand All @@ -792,192 +791,125 @@ where
),

// Consume the cached addresses from the peer,
// to work-around a `zcashd` response rate-limit
// to work-around a `zcashd` response rate-limit.
(AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
let cached_addrs = std::mem::take(&mut self.cached_addrs);
debug!(
addrs = cached_addrs.len(),
"responding to Peers request using cached addresses",
);

Ok((
AwaitingResponse {
handler: Handler::Finished(Ok(Response::Peers(cached_addrs))),
tx,
span,
},
None,
))}
,
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::Peers,
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
},
Ok(Handler::Finished(Ok(Response::Peers(cached_addrs))))
}
(AwaitingRequest, Peers) => self
.peer_tx
.send(Message::GetAddr)
.await
.map(|()| Handler::Peers),

(AwaitingRequest, Ping(nonce)) => self
.peer_tx
.send(Message::Ping(nonce))
.await
.map(|()| Handler::Ping(nonce)),

(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::Ping(nonce),
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
},
(AwaitingRequest, BlocksByHash(hashes)) => {
match self
self
.peer_tx
.send(Message::GetData(
hashes.iter().map(|h| (*h).into()).collect(),
))
.await
{
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::BlocksByHash {
blocks: Vec::with_capacity(hashes.len()),
pending_hashes: hashes,
},
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::BlocksByHash {
blocks: Vec::with_capacity(hashes.len()),
pending_hashes: hashes,
}
)
}
(AwaitingRequest, TransactionsById(ids)) => {
match self
self
.peer_tx
.send(Message::GetData(
ids.iter().map(Into::into).collect(),
))
.await
{
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::TransactionsById {
transactions: Vec::with_capacity(ids.len()),
pending_ids: ids,
},
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::TransactionsById {
transactions: Vec::with_capacity(ids.len()),
pending_ids: ids,
})
}

(AwaitingRequest, FindBlocks { known_blocks, stop }) => {
match self
self
.peer_tx
.send(Message::GetBlocks { known_blocks, stop })
.await
{
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::FindBlocks,
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::FindBlocks
)
}
(AwaitingRequest, FindHeaders { known_blocks, stop }) => {
match self
self
.peer_tx
.send(Message::GetHeaders { known_blocks, stop })
.await
{
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::FindHeaders,
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::FindHeaders
)
}

(AwaitingRequest, MempoolTransactionIds) => {
match self.peer_tx.send(Message::Mempool).await {
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::MempoolTransactionIds,
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
self
.peer_tx
.send(Message::Mempool)
.await
.map(|()|
Handler::MempoolTransactionIds
)
}

(AwaitingRequest, PushTransaction(transaction)) => {
match self.peer_tx.send(Message::Tx(transaction)).await {
Ok(()) => Ok((AwaitingRequest, Some(tx))),
Err(e) => Err((e, tx)),
}
self
.peer_tx
.send(Message::Tx(transaction))
.await
.map(|()|
Handler::Finished(Ok(Response::Nil))
)
}
(AwaitingRequest, AdvertiseTransactionIds(hashes)) => {
match self
self
.peer_tx
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
.await
{
Ok(()) => Ok((AwaitingRequest, Some(tx))),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::Finished(Ok(Response::Nil))
)
}
(AwaitingRequest, AdvertiseBlock(hash)) => {
match self.peer_tx.send(Message::Inv(vec![hash.into()])).await {
Ok(()) => Ok((AwaitingRequest, Some(tx))),
Err(e) => Err((e, tx)),
}
self
.peer_tx
.send(Message::Inv(vec![hash.into()]))
.await
.map(|()|
Handler::Finished(Ok(Response::Nil))
)
}
};
// Updates state or fails. Sends the error on the Sender if it is Some.
match new_state_result {
Ok((AwaitingRequest, Some(tx))) => {
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
self.state = AwaitingRequest;
// We only need a timer when we're waiting for a response.
// (And we don't want to accidentally re-use old timers.)
self.request_timer = None;
}
Ok((new_state @ AwaitingResponse { .. }, None)) => {
self.state = new_state;

// Update the connection state with a new handler, or fail with an error.
match new_handler {
Ok(handler) => {
self.state = AwaitingResponse { handler, span, tx };
self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
}
Err((e, tx)) => {
let e = SharedPeerError::from(e);
let _ = tx.send(Err(e.clone()));
self.fail_with(e);
Err(error) => {
let error = SharedPeerError::from(error);
let _ = tx.send(Err(error.clone()));
self.fail_with(error);
}
// unreachable states
Ok((Failed, tx)) => unreachable!(
"failed client requests must use fail_with(error) to reach a Failed state. tx: {:?}",
tx
),
Ok((AwaitingRequest, None)) => unreachable!(
"successful AwaitingRequest states must send a response on tx, but tx is None",
),
Ok((new_state @ AwaitingResponse { .. }, Some(tx))) => unreachable!(
"successful AwaitingResponse states must keep tx, but tx is Some: {:?} for: {:?}",
tx, new_state,
),
};
}

Expand All @@ -993,6 +925,8 @@ where

self.update_state_metrics(format!("In::Msg::{}", msg.command()));

let mut unused_msg = None;

let req = match msg {
Message::Ping(nonce) => {
trace!(?nonce, "responding to heartbeat");
Expand All @@ -1016,22 +950,27 @@ where
// that we've already forgotten about.
Message::Reject { .. } => {
debug!(%msg, "got reject message unsolicited or from canceled request");
unused_msg = Some(msg.clone());
None
}
Message::NotFound { .. } => {
debug!(%msg, "got notfound message unsolicited or from canceled request");
unused_msg = Some(msg.clone());
None
}
Message::Pong(_) => {
debug!(%msg, "got pong message unsolicited or from canceled request");
unused_msg = Some(msg.clone());
None
}
Message::Block(_) => {
debug!(%msg, "got block message unsolicited or from canceled request");
unused_msg = Some(msg.clone());
None
}
Message::Headers(_) => {
debug!(%msg, "got headers message unsolicited or from canceled request");
unused_msg = Some(msg.clone());
None
}
// These messages should never be sent by peers.
Expand All @@ -1046,6 +985,7 @@ where
// Since we can't verify their source, Zebra needs to ignore unexpected messages,
// because closing the connection could cause a denial of service or eclipse attack.
debug!(%msg, "got BIP111 message without advertising NODE_BLOOM");
unused_msg = Some(msg.clone());
None
}
// Zebra crawls the network proactively, to prevent
Expand All @@ -1056,18 +996,21 @@ where
// Always refresh the cache with multi-addr messages.
debug!(%msg, "caching unsolicited multi-addr message");
self.cached_addrs = addrs.clone();
None
} else if addrs.len() == 1 && self.cached_addrs.len() <= 1 {
// Only refresh a cached single addr message with another single addr.
// (`zcashd` regularly advertises its own address.)
debug!(%msg, "caching unsolicited single addr message");
self.cached_addrs = addrs.clone();
None
} else {
debug!(
%msg,
"ignoring unsolicited single addr message: already cached a multi-addr message"
);
unused_msg = Some(msg.clone());
None
}
None
}
Message::Tx(ref transaction) => Some(Request::PushTransaction(transaction.clone())),
Message::Inv(ref items) => match &items[..] {
Expand All @@ -1087,14 +1030,17 @@ where
// Log detailed messages for ignored inv advertisement messages.
[] => {
debug!(%msg, "ignoring empty inv");
unused_msg = Some(msg.clone());
None
}
[InventoryHash::Block(_), InventoryHash::Block(_), ..] => {
debug!(%msg, "ignoring inv with multiple blocks");
unused_msg = Some(msg.clone());
None
}
_ => {
debug!(%msg, "ignoring inv with no transactions");
unused_msg = Some(msg.clone());
None
}
},
Expand All @@ -1121,10 +1067,12 @@ where
// Log detailed messages for ignored getdata request messages.
[] => {
debug!(%msg, "ignoring empty getdata");
unused_msg = Some(msg.clone());
None
}
_ => {
debug!(%msg, "ignoring getdata with no blocks or transactions");
unused_msg = Some(msg.clone());
None
}
},
Expand All @@ -1148,11 +1096,9 @@ where

if let Some(req) = req {
self.drive_peer_request(req).await;
None
} else {
// return the unused message
Some(msg)
}

unused_msg
}

/// Given a `req` originating from the peer, drive it to completion and send
Expand Down