Skip to content

Commit

Permalink
Simplify connection internal request handling code
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Dec 30, 2021
1 parent c28fd01 commit 11a4017
Showing 1 changed file with 73 additions and 141 deletions.
214 changes: 73 additions & 141 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,7 @@ where
);
self.update_state_metrics(format!("Out::Req::{}", request.command()));

// These matches return a Result with (new_state, Option<Sender>) or an (error, Sender)
let new_state_result = match (&self.state, request) {
let new_handler = match (&self.state, request) {
(Failed, request) => panic!(
"failed connection cannot handle new request: {:?}, client_receiver: {:?}",
request,
Expand All @@ -792,192 +791,125 @@ where
),

// Consume the cached addresses from the peer,
// to work-around a `zcashd` response rate-limit
// to work-around a `zcashd` response rate-limit.
(AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
let cached_addrs = std::mem::take(&mut self.cached_addrs);
debug!(
addrs = cached_addrs.len(),
"responding to Peers request using cached addresses",
);

Ok((
AwaitingResponse {
handler: Handler::Finished(Ok(Response::Peers(cached_addrs))),
tx,
span,
},
None,
))}
,
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::Peers,
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
},
Ok(Handler::Finished(Ok(Response::Peers(cached_addrs))))
}
(AwaitingRequest, Peers) => self
.peer_tx
.send(Message::GetAddr)
.await
.map(|()| Handler::Peers),

(AwaitingRequest, Ping(nonce)) => self
.peer_tx
.send(Message::Ping(nonce))
.await
.map(|()| Handler::Ping(nonce)),

(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::Ping(nonce),
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
},
(AwaitingRequest, BlocksByHash(hashes)) => {
match self
self
.peer_tx
.send(Message::GetData(
hashes.iter().map(|h| (*h).into()).collect(),
))
.await
{
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::BlocksByHash {
blocks: Vec::with_capacity(hashes.len()),
pending_hashes: hashes,
},
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::BlocksByHash {
blocks: Vec::with_capacity(hashes.len()),
pending_hashes: hashes,
}
)
}
(AwaitingRequest, TransactionsById(ids)) => {
match self
self
.peer_tx
.send(Message::GetData(
ids.iter().map(Into::into).collect(),
))
.await
{
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::TransactionsById {
transactions: Vec::with_capacity(ids.len()),
pending_ids: ids,
},
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::TransactionsById {
transactions: Vec::with_capacity(ids.len()),
pending_ids: ids,
})
}

(AwaitingRequest, FindBlocks { known_blocks, stop }) => {
match self
self
.peer_tx
.send(Message::GetBlocks { known_blocks, stop })
.await
{
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::FindBlocks,
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::FindBlocks
)
}
(AwaitingRequest, FindHeaders { known_blocks, stop }) => {
match self
self
.peer_tx
.send(Message::GetHeaders { known_blocks, stop })
.await
{
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::FindHeaders,
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::FindHeaders
)
}

(AwaitingRequest, MempoolTransactionIds) => {
match self.peer_tx.send(Message::Mempool).await {
Ok(()) => Ok((
AwaitingResponse {
handler: Handler::MempoolTransactionIds,
tx,
span,
},
None,
)),
Err(e) => Err((e, tx)),
}
self
.peer_tx
.send(Message::Mempool)
.await
.map(|()|
Handler::MempoolTransactionIds
)
}

(AwaitingRequest, PushTransaction(transaction)) => {
match self.peer_tx.send(Message::Tx(transaction)).await {
Ok(()) => Ok((AwaitingRequest, Some(tx))),
Err(e) => Err((e, tx)),
}
self
.peer_tx
.send(Message::Tx(transaction))
.await
.map(|()|
Handler::Finished(Ok(Response::Nil))
)
}
(AwaitingRequest, AdvertiseTransactionIds(hashes)) => {
match self
self
.peer_tx
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
.await
{
Ok(()) => Ok((AwaitingRequest, Some(tx))),
Err(e) => Err((e, tx)),
}
.map(|()|
Handler::Finished(Ok(Response::Nil))
)
}
(AwaitingRequest, AdvertiseBlock(hash)) => {
match self.peer_tx.send(Message::Inv(vec![hash.into()])).await {
Ok(()) => Ok((AwaitingRequest, Some(tx))),
Err(e) => Err((e, tx)),
}
self
.peer_tx
.send(Message::Inv(vec![hash.into()]))
.await
.map(|()|
Handler::Finished(Ok(Response::Nil))
)
}
};
// Updates state or fails. Sends the error on the Sender if it is Some.
match new_state_result {
Ok((AwaitingRequest, Some(tx))) => {
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
self.state = AwaitingRequest;
// We only need a timer when we're waiting for a response.
// (And we don't want to accidentally re-use old timers.)
self.request_timer = None;
}
Ok((new_state @ AwaitingResponse { .. }, None)) => {
self.state = new_state;

// Update the connection state with a new handler, or fail with an error.
match new_handler {
Ok(handler) => {
self.state = AwaitingResponse { handler, span, tx };
self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
}
Err((e, tx)) => {
let e = SharedPeerError::from(e);
let _ = tx.send(Err(e.clone()));
self.fail_with(e);
Err(error) => {
let error = SharedPeerError::from(error);
let _ = tx.send(Err(error.clone()));
self.fail_with(error);
}
// unreachable states
Ok((Failed, tx)) => unreachable!(
"failed client requests must use fail_with(error) to reach a Failed state. tx: {:?}",
tx
),
Ok((AwaitingRequest, None)) => unreachable!(
"successful AwaitingRequest states must send a response on tx, but tx is None",
),
Ok((new_state @ AwaitingResponse { .. }, Some(tx))) => unreachable!(
"successful AwaitingResponse states must keep tx, but tx is Some: {:?} for: {:?}",
tx, new_state,
),
};
}

Expand Down

0 comments on commit 11a4017

Please sign in to comment.