From 11a4017e89ea930a44fbe4104c2a92afffb092f6 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 24 Dec 2021 09:00:32 +1000 Subject: [PATCH] Simplify connection internal request handling code --- zebra-network/src/peer/connection.rs | 214 +++++++++------------------ 1 file changed, 73 insertions(+), 141 deletions(-) diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 3c14a59bd02..8e5cd14baaf 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -777,8 +777,7 @@ where ); self.update_state_metrics(format!("Out::Req::{}", request.command())); - // These matches return a Result with (new_state, Option) or an (error, Sender) - let new_state_result = match (&self.state, request) { + let new_handler = match (&self.state, request) { (Failed, request) => panic!( "failed connection cannot handle new request: {:?}, client_receiver: {:?}", request, @@ -792,7 +791,7 @@ where ), // Consume the cached addresses from the peer, - // to work-around a `zcashd` response rate-limit + // to work-around a `zcashd` response rate-limit. (AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => { let cached_addrs = std::mem::take(&mut self.cached_addrs); debug!( @@ -800,184 +799,117 @@ where "responding to Peers request using cached addresses", ); - Ok(( - AwaitingResponse { - handler: Handler::Finished(Ok(Response::Peers(cached_addrs))), - tx, - span, - }, - None, - ))} - , - (AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await { - Ok(()) => Ok(( - AwaitingResponse { - handler: Handler::Peers, - tx, - span, - }, - None, - )), - Err(e) => Err((e, tx)), - }, + Ok(Handler::Finished(Ok(Response::Peers(cached_addrs)))) + } + (AwaitingRequest, Peers) => self + .peer_tx + .send(Message::GetAddr) + .await + .map(|()| Handler::Peers), + + (AwaitingRequest, Ping(nonce)) => self + .peer_tx + .send(Message::Ping(nonce)) + .await + .map(|()| Handler::Ping(nonce)), - (AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await { - Ok(()) => Ok(( - AwaitingResponse { - handler: Handler::Ping(nonce), - tx, - span, - }, - None, - )), - Err(e) => Err((e, tx)), - }, (AwaitingRequest, BlocksByHash(hashes)) => { - match self + self .peer_tx .send(Message::GetData( hashes.iter().map(|h| (*h).into()).collect(), )) .await - { - Ok(()) => Ok(( - AwaitingResponse { - handler: Handler::BlocksByHash { - blocks: Vec::with_capacity(hashes.len()), - pending_hashes: hashes, - }, - tx, - span, - }, - None, - )), - Err(e) => Err((e, tx)), - } + .map(|()| + Handler::BlocksByHash { + blocks: Vec::with_capacity(hashes.len()), + pending_hashes: hashes, + } + ) } (AwaitingRequest, TransactionsById(ids)) => { - match self + self .peer_tx .send(Message::GetData( ids.iter().map(Into::into).collect(), )) .await - { - Ok(()) => Ok(( - AwaitingResponse { - handler: Handler::TransactionsById { - transactions: Vec::with_capacity(ids.len()), - pending_ids: ids, - }, - tx, - span, - }, - None, - )), - Err(e) => Err((e, tx)), - } + .map(|()| + Handler::TransactionsById { + transactions: Vec::with_capacity(ids.len()), + pending_ids: ids, + }) } + (AwaitingRequest, FindBlocks { known_blocks, stop }) => { - match self + self .peer_tx .send(Message::GetBlocks { known_blocks, stop }) .await - { - Ok(()) => Ok(( - AwaitingResponse { - handler: Handler::FindBlocks, - tx, - span, - }, - None, - )), - Err(e) => Err((e, tx)), - } + .map(|()| + Handler::FindBlocks + ) } (AwaitingRequest, FindHeaders { known_blocks, stop }) => { - match self + self .peer_tx .send(Message::GetHeaders { known_blocks, stop }) .await - { - Ok(()) => Ok(( - AwaitingResponse { - handler: Handler::FindHeaders, - tx, - span, - }, - None, - )), - Err(e) => Err((e, tx)), - } + .map(|()| + Handler::FindHeaders + ) } + (AwaitingRequest, MempoolTransactionIds) => { - match self.peer_tx.send(Message::Mempool).await { - Ok(()) => Ok(( - AwaitingResponse { - handler: Handler::MempoolTransactionIds, - tx, - span, - }, - None, - )), - Err(e) => Err((e, tx)), - } + self + .peer_tx + .send(Message::Mempool) + .await + .map(|()| + Handler::MempoolTransactionIds + ) } + (AwaitingRequest, PushTransaction(transaction)) => { - match self.peer_tx.send(Message::Tx(transaction)).await { - Ok(()) => Ok((AwaitingRequest, Some(tx))), - Err(e) => Err((e, tx)), - } + self + .peer_tx + .send(Message::Tx(transaction)) + .await + .map(|()| + Handler::Finished(Ok(Response::Nil)) + ) } (AwaitingRequest, AdvertiseTransactionIds(hashes)) => { - match self + self .peer_tx .send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect())) .await - { - Ok(()) => Ok((AwaitingRequest, Some(tx))), - Err(e) => Err((e, tx)), - } + .map(|()| + Handler::Finished(Ok(Response::Nil)) + ) } (AwaitingRequest, AdvertiseBlock(hash)) => { - match self.peer_tx.send(Message::Inv(vec![hash.into()])).await { - Ok(()) => Ok((AwaitingRequest, Some(tx))), - Err(e) => Err((e, tx)), - } + self + .peer_tx + .send(Message::Inv(vec![hash.into()])) + .await + .map(|()| + Handler::Finished(Ok(Response::Nil)) + ) } }; - // Updates state or fails. Sends the error on the Sender if it is Some. - match new_state_result { - Ok((AwaitingRequest, Some(tx))) => { - // Since we're not waiting for further messages, we need to - // send a response before dropping tx. - let _ = tx.send(Ok(Response::Nil)); - self.state = AwaitingRequest; - // We only need a timer when we're waiting for a response. - // (And we don't want to accidentally re-use old timers.) - self.request_timer = None; - } - Ok((new_state @ AwaitingResponse { .. }, None)) => { - self.state = new_state; + + // Update the connection state with a new handler, or fail with an error. + match new_handler { + Ok(handler) => { + self.state = AwaitingResponse { handler, span, tx }; self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT))); } - Err((e, tx)) => { - let e = SharedPeerError::from(e); - let _ = tx.send(Err(e.clone())); - self.fail_with(e); + Err(error) => { + let error = SharedPeerError::from(error); + let _ = tx.send(Err(error.clone())); + self.fail_with(error); } - // unreachable states - Ok((Failed, tx)) => unreachable!( - "failed client requests must use fail_with(error) to reach a Failed state. tx: {:?}", - tx - ), - Ok((AwaitingRequest, None)) => unreachable!( - "successful AwaitingRequest states must send a response on tx, but tx is None", - ), - Ok((new_state @ AwaitingResponse { .. }, Some(tx))) => unreachable!( - "successful AwaitingResponse states must keep tx, but tx is Some: {:?} for: {:?}", - tx, new_state, - ), }; }