Skip to content

Commit

Permalink
Log response error codes as warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
acerone85 committed Oct 21, 2024
1 parent a18a2c6 commit 7b2bf3b
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
8 changes: 4 additions & 4 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ impl FuelP2PService {
ResponseSender::SealedHeaders(c) => match response {
ResponseMessage::SealedHeaders(v) => {
// TODO[AC]: Change type of ResponseSender and remove the .ok() here
c.send((peer, Ok(v.ok()))).is_ok()
c.send((peer, Ok(v))).is_ok()
}
_ => {
warn!(
Expand All @@ -688,7 +688,7 @@ impl FuelP2PService {
},
ResponseSender::Transactions(c) => match response {
ResponseMessage::Transactions(v) => {
c.send((peer, Ok(v.ok()))).is_ok()
c.send((peer, Ok(v))).is_ok()
}
_ => {
warn!(
Expand All @@ -700,7 +700,7 @@ impl FuelP2PService {
},
ResponseSender::TxPoolAllTransactionsIds(c) => match response {
ResponseMessage::TxPoolAllTransactionsIds(v) => {
c.send((peer, Ok(v.ok()))).is_ok()
c.send((peer, Ok(v))).is_ok()
}
_ => {
warn!(
Expand All @@ -712,7 +712,7 @@ impl FuelP2PService {
},
ResponseSender::TxPoolFullTransactions(c) => match response {
ResponseMessage::TxPoolFullTransactions(v) => {
c.send((peer, Ok(v.ok()))).is_ok()
c.send((peer, Ok(v))).is_ok()
}
_ => {
warn!(
Expand Down
12 changes: 8 additions & 4 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,14 @@ pub type OnResponse<T> = oneshot::Sender<(PeerId, Result<T, ResponseError>)>;

#[derive(Debug)]
pub enum ResponseSender {
SealedHeaders(OnResponse<Option<Vec<SealedBlockHeader>>>),
Transactions(OnResponse<Option<Vec<Transactions>>>),
TxPoolAllTransactionsIds(OnResponse<Option<Vec<TxId>>>),
TxPoolFullTransactions(OnResponse<Option<Vec<Option<NetworkableTransactionPool>>>>),
SealedHeaders(OnResponse<Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>>),
Transactions(OnResponse<Result<Vec<Transactions>, ResponseMessageErrorCode>>),
TxPoolAllTransactionsIds(OnResponse<Result<Vec<TxId>, ResponseMessageErrorCode>>),
TxPoolFullTransactions(
OnResponse<
Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
>,
),
}

#[derive(Debug, Error)]
Expand Down
55 changes: 38 additions & 17 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,23 @@ pub enum TaskRequest {
},
GetSealedHeaders {
block_height_range: Range<u32>,
channel: OnResponse<Option<Vec<SealedBlockHeader>>>,
channel: OnResponse<Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>>,
},
GetTransactions {
block_height_range: Range<u32>,
from_peer: PeerId,
channel: OnResponse<Option<Vec<Transactions>>>,
channel: OnResponse<Result<Vec<Transactions>, ResponseMessageErrorCode>>,
},
TxPoolGetAllTxIds {
from_peer: PeerId,
channel: OnResponse<Option<Vec<TxId>>>,
channel: OnResponse<Result<Vec<TxId>, ResponseMessageErrorCode>>,
},
TxPoolGetFullTransactions {
tx_ids: Vec<TxId>,
from_peer: PeerId,
channel: OnResponse<Option<Vec<Option<NetworkableTransactionPool>>>>,
channel: OnResponse<
Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
>,
},
// Responds back to the p2p network
RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)),
Expand Down Expand Up @@ -1029,8 +1031,17 @@ impl SharedState {

let (peer_id, response) = receiver.await.map_err(|e| anyhow!("{e}"))?;

let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
Ok((peer_id.to_bytes(), data))
let data = match response {
Err(request_response_protocol_error) => Err(anyhow!(
"Invalid response from peer {request_response_protocol_error:?}"
)),
Ok(Err(response_error_code)) => {
warn!("Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}");
Ok(None)
}
Ok(Ok(headers)) => Ok(Some(headers)),
};
data.map(|data| (peer_id.to_bytes(), data))
}

pub async fn get_transactions_from_peer(
Expand All @@ -1056,7 +1067,18 @@ impl SharedState {
"Bug: response from non-requested peer"
);

response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))
match response {
Err(request_response_protocol_error) => Err(anyhow!(
"Invalid response from peer {request_response_protocol_error:?}"
)),
Ok(Err(response_error_code)) => {
warn!(
"Peer {peer_id:?} failed to respond with transactions: {response_error_code:?}"
);
return Ok(None);
}
Ok(Ok(txs)) => Ok(Some(txs)),
}
}

pub async fn get_all_transactions_ids_from_peer(
Expand All @@ -1080,11 +1102,11 @@ impl SharedState {
"Bug: response from non-requested peer"
);

let Some(txs) =
response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?
else {
return Ok(vec![]);
};
let response =
response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;

let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get all transactions ids: {e:?}"); } ).unwrap_or_default();

if txs.len() > self.max_txs_per_request {
return Err(anyhow!("Too many transactions requested: {}", txs.len()));
}
Expand Down Expand Up @@ -1113,11 +1135,10 @@ impl SharedState {
"Bug: response from non-requested peer"
);

let Some(txs) =
response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?
else {
return Ok(vec![]);
};
let response =
response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get full transactions: {e:?}"); } ).unwrap_or_default();

if txs.len() > self.max_txs_per_request {
return Err(anyhow!("Too many transactions requested: {}", txs.len()));
}
Expand Down

0 comments on commit 7b2bf3b

Please sign in to comment.