diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ae4a1b849a..c6f6f241a85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,9 +25,11 @@ # Version 0.32.0 [unreleased] +- Update `libp2p-request-response`. + - Update to `libp2p-mdns-0.26`. -- Update `libp2p-websocket`. +- Update `libp2p-websocket` minimum patch version. # Version 0.31.2 [2020-12-02] diff --git a/Cargo.toml b/Cargo.toml index cc08d4d88af..94d766c53f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,7 +72,7 @@ libp2p-noise = { version = "0.27.0", path = "protocols/noise", optional = true } libp2p-ping = { version = "0.25.0", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.25.0", path = "protocols/plaintext", optional = true } libp2p-pnet = { version = "0.19.2", path = "protocols/pnet", optional = true } -libp2p-request-response = { version = "0.6.0", path = "protocols/request-response", optional = true } +libp2p-request-response = { version = "0.7.0", path = "protocols/request-response", optional = true } libp2p-swarm = { version = "0.25.0", path = "swarm" } libp2p-uds = { version = "0.25.0", path = "transports/uds", optional = true } libp2p-wasm-ext = { version = "0.25.0", path = "transports/wasm-ext", optional = true } diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index ed0d5d0e5e6..2facbd85364 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.7.0 [unreleased] + +- Refine emitted events for inbound requests, introducing + the `ResponseSent` event and the `ResponseOmission` + inbound failures. This effectively removes previous + support for one-way protocols without responses. + [PR 1867](https://github.com/libp2p/rust-libp2p/pull/1867). + # 0.6.0 [2020-11-25] - Update `libp2p-swarm` and `libp2p-core`. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 89d97a6f0ec..c2c99d0f8b7 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-request-response" edition = "2018" description = "Generic Request/Response Protocols" -version = "0.6.0" +version = "0.7.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index fe374f54877..b3f11888131 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -119,22 +119,29 @@ pub enum RequestResponseHandlerEvent where TCodec: RequestResponseCodec { - /// An inbound request. + /// A request has been received. Request { request_id: RequestId, request: TCodec::Request, sender: oneshot::Sender }, - /// An inbound response. + /// A response has been received. Response { request_id: RequestId, response: TCodec::Response }, - /// An outbound upgrade (i.e. request) timed out. + /// A response to an inbound request has been sent. + ResponseSent(RequestId), + /// A response to an inbound request was omitted as a result + /// of dropping the response `sender` of an inbound `Request`. + ResponseOmission(RequestId), + /// An outbound request timed out while sending the request + /// or waiting for the response. OutboundTimeout(RequestId), /// An outbound request failed to negotiate a mutually supported protocol. OutboundUnsupportedProtocols(RequestId), - /// An inbound request timed out. + /// An inbound request timed out while waiting for the request + /// or sending the response. InboundTimeout(RequestId), /// An inbound request failed to negotiate a mutually supported protocol. InboundUnsupportedProtocols(RequestId), @@ -187,9 +194,16 @@ where fn inject_fully_negotiated_inbound( &mut self, - (): (), - _: RequestId + sent: bool, + request_id: RequestId ) { + if sent { + self.pending_events.push_back( + RequestResponseHandlerEvent::ResponseSent(request_id)) + } else { + self.pending_events.push_back( + RequestResponseHandlerEvent::ResponseOmission(request_id)) + } } fn inject_fully_negotiated_outbound( diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index 0fc2b99df9f..81e19e4b180 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -93,7 +93,7 @@ impl InboundUpgrade for ResponseProtocol where TCodec: RequestResponseCodec + Send + 'static, { - type Output = (); + type Output = bool; type Error = io::Error; type Future = BoxFuture<'static, Result>; @@ -105,10 +105,12 @@ where if let Ok(response) = self.response_receiver.await { let write = self.codec.write_response(&protocol, &mut io, response); write.await?; + } else { + return Ok(false) } } io.close().await?; - Ok(()) + Ok(true) }.boxed() } } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 31ee6af9304..286c3f29a91 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -46,18 +46,6 @@ //! For that purpose, [`RequestResponseCodec::Protocol`] is typically //! instantiated with a sum type. //! -//! ## One-Way Protocols -//! -//! The implementation supports one-way protocols that do not -//! have responses. In these cases the [`RequestResponseCodec::Response`] can -//! be defined as `()` and [`RequestResponseCodec::read_response`] as well as -//! [`RequestResponseCodec::write_response`] given the obvious implementations. -//! Note that `RequestResponseMessage::Response` will still be emitted, -//! immediately after the request has been sent, since `RequestResponseCodec::read_response` -//! will not actually read anything from the given I/O stream. -//! [`RequestResponse::send_response`] need not be called for one-way protocols, -//! i.e. the [`ResponseChannel`] may just be dropped. -//! //! ## Limited Protocol Support //! //! It is possible to only support inbound or outbound requests for @@ -115,9 +103,11 @@ pub enum RequestResponseMessage, }, /// A response message. @@ -151,6 +141,14 @@ pub enum RequestResponseEvent error: OutboundFailure, }, /// An inbound request failed. + /// + /// > **Note**: The case whereby a connection on which a response is sent + /// > closes after [`RequestResponse::send_response`] already succeeded + /// > but before the response could be sent on the connection is reflected + /// > by there being no [`RequestResponseEvent::ResponseSent`] event. + /// > Code interested in ensuring a response has been successfully + /// > handed to the transport layer, e.g. before continuing with the next + /// > step in a multi-step protocol, should listen to these events. InboundFailure { /// The peer from whom the request was received. peer: PeerId, @@ -159,6 +157,16 @@ pub enum RequestResponseEvent /// The error that occurred. error: InboundFailure, }, + /// A response to an inbound request has been sent. + /// + /// When this event is received, the response has been flushed on + /// the underlying transport connection. + ResponseSent { + /// The peer to whom the response was sent. + peer: PeerId, + /// The ID of the inbound request whose response was sent. + request_id: RequestId, + }, } /// Possible failures occurring in the context of sending @@ -186,14 +194,17 @@ pub enum OutboundFailure { #[derive(Debug)] pub enum InboundFailure { /// The inbound request timed out, either while reading the - /// incoming request or before a response is sent, i.e. if + /// incoming request or before a response is sent, e.g. if /// [`RequestResponse::send_response`] is not called in a /// timely manner. Timeout, - /// The local peer supports none of the requested protocols. + /// The local peer supports none of the protocols requested + /// by the remote. UnsupportedProtocols, - /// The connection closed before a response was delivered. - ConnectionClosed, + /// The local peer failed to respond to an inbound request + /// due to the [`ResponseChannel`] being dropped instead of + /// being passed to [`RequestResponse::send_response`]. + ResponseOmission, } /// A channel for sending a response to an inbound request. @@ -379,17 +390,18 @@ where /// Initiates sending a response to an inbound request. /// - /// If the `ResponseChannel` is already closed due to a timeout, - /// the response is discarded and eventually [`RequestResponseEvent::InboundFailure`] - /// is emitted by `RequestResponse::poll`. + /// If the `ResponseChannel` is already closed due to a timeout or + /// the connection being closed, the response is returned as an `Err` + /// for further handling. Once the response has been successfully sent + /// on the corresponding connection, [`RequestResponseEvent::ResponseSent`] + /// is emitted. /// - /// The provided `ResponseChannel` is obtained from a + /// The provided `ResponseChannel` is obtained from an inbound /// [`RequestResponseMessage::Request`]. - pub fn send_response(&mut self, ch: ResponseChannel, rs: TCodec::Response) { - // Fails only if the inbound upgrade timed out waiting for the response, - // in which case the handler emits `RequestResponseHandlerEvent::InboundTimeout` - // which in turn results in `RequestResponseEvent::InboundFailure`. - let _ = ch.sender.send(rs); + pub fn send_response(&mut self, ch: ResponseChannel, rs: TCodec::Response) + -> Result<(), TCodec::Response> + { + ch.sender.send(rs) } /// Adds a known address for a peer that can be used for @@ -577,6 +589,20 @@ where RequestResponseEvent::Message { peer, message } )); } + RequestResponseHandlerEvent::ResponseSent(request_id) => { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::ResponseSent { peer, request_id })); + } + RequestResponseHandlerEvent::ResponseOmission(request_id) => { + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::InboundFailure { + peer, + request_id, + error: InboundFailure::ResponseOmission + })); + } RequestResponseHandlerEvent::OutboundTimeout(request_id) => { if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) { self.pending_events.push_back( diff --git a/protocols/request-response/src/throttled.rs b/protocols/request-response/src/throttled.rs index d66b7efb46e..8c12564cb1a 100644 --- a/protocols/request-response/src/throttled.rs +++ b/protocols/request-response/src/throttled.rs @@ -42,7 +42,7 @@ use futures::ready; use libp2p_core::{ConnectedPoint, connection::ConnectionId, Multiaddr, PeerId}; use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use lru::LruCache; -use std::{collections::{HashMap, VecDeque}, task::{Context, Poll}}; +use std::{collections::{HashMap, HashSet, VecDeque}, task::{Context, Poll}}; use std::{cmp::max, num::NonZeroU16}; use super::{ ProtocolSupport, @@ -75,21 +75,20 @@ where limit_overrides: HashMap, /// Pending events to report in `Throttled::poll`. events: VecDeque>>, - /// Current outbound credit grants in flight. - credit_messages: HashMap, /// The current credit ID. - credit_id: u64 + next_grant_id: u64 } -/// Credit information that is sent to remote peers. +/// Information about a credit grant that is sent to remote peers. #[derive(Clone, Copy, Debug)] -struct Credit { - /// A credit ID. Used to deduplicate retransmitted credit messages. - id: u64, +struct Grant { + /// The grant ID. Used to deduplicate retransmitted credit grants. + id: GrantId, /// The ID of the outbound credit grant message. request: RequestId, - /// The number of requests the remote is allowed to send. - amount: u16 + /// The credit given in this grant, i.e. the number of additional + /// requests the remote is allowed to send. + credit: u16 } /// Max. number of inbound requests that can be received. @@ -130,28 +129,81 @@ impl Limit { } } +type GrantId = u64; + +/// Information related to the current send budget with a peer. +#[derive(Clone, Debug)] +struct SendBudget { + /// The last received credit grant. + grant: Option, + /// The remaining credit for requests to send. + remaining: u16, + /// Credit grant requests received and acknowledged where the outcome + /// of the acknowledgement (i.e. response sent) is still undetermined. + /// Used to avoid emitting events for successful (`ResponseSent`) or failed + /// acknowledgements. + received: HashSet, +} + +/// Information related to the current receive budget with a peer. +#[derive(Clone, Debug)] +struct RecvBudget { + /// The grant currently given to the remote but yet to be acknowledged. + /// + /// Set to `Some` when a new grant is sent to the remote, followed + /// by `None` when an acknowledgment or a request is received. The + /// latter is seen as an implicit acknowledgement. + grant: Option, + /// The limit for new credit grants when the `remaining` credit is + /// exhausted. + limit: Limit, + /// The remaining credit for requests to receive. + remaining: u16, + /// Credit grants sent whose outcome is still undetermined. + /// Used to avoid emitting events for failed credit grants. + /// + /// > **Note**: While receiving an inbound request is an implicit + /// > acknowledgement for the last sent `grant`, the outcome of + /// > the outbound request remains undetermined until a success or + /// > failure event is received for that request or the corresponding + /// > connection closes. + sent: HashSet, +} + /// Budget information about a peer. #[derive(Clone, Debug)] struct PeerInfo { - /// Limit that applies to this peer. - limit: Limit, - /// Remaining number of outbound requests that can be sent. - send_budget: u16, - /// Remaining number of inbound requests that can be received. - recv_budget: u16, - /// The ID of the credit message that granted the current `send_budget`. - send_budget_id: Option + send_budget: SendBudget, + recv_budget: RecvBudget, } impl PeerInfo { - fn new(limit: Limit) -> Self { + fn new(recv_limit: Limit) -> Self { PeerInfo { - limit, - send_budget: 1, - recv_budget: 1, - send_budget_id: None + send_budget: SendBudget { + grant: None, + remaining: 1, + received: HashSet::new(), + }, + recv_budget: RecvBudget { + grant: None, + limit: recv_limit, + remaining: 1, + sent: HashSet::new(), + } } } + + fn into_disconnected(mut self) -> Self { + self.send_budget.received = HashSet::new(); + self.send_budget.remaining = 1; + self.recv_budget.sent = HashSet::new(); + self.recv_budget.remaining = max(1, self.recv_budget.remaining); + // Since we potentially reset the remaining receive budget, + // we forget about the potentially still unacknowledged last grant. + self.recv_budget.grant = None; + self + } } impl Throttled @@ -180,8 +232,7 @@ where default_limit: Limit::new(NonZeroU16::new(1).expect("1 > 0")), limit_overrides: HashMap::new(), events: VecDeque::new(), - credit_messages: HashMap::new(), - credit_id: 0 + next_grant_id: 0 } } @@ -195,9 +246,9 @@ where pub fn override_receive_limit(&mut self, p: &PeerId, limit: NonZeroU16) { log::debug!("{:08x}: override limit for {}: {:?}", self.id, p, limit); if let Some(info) = self.peer_info.get_mut(p) { - info.limit.set(limit) + info.recv_budget.limit.set(limit) } else if let Some(info) = self.offline_peer_info.get_mut(p) { - info.limit.set(limit) + info.recv_budget.limit.set(limit) } self.limit_overrides.insert(p.clone(), Limit::new(limit)); } @@ -210,7 +261,7 @@ where /// Has the limit of outbound requests been reached for the given peer? pub fn can_send(&mut self, p: &PeerId) -> bool { - self.peer_info.get(p).map(|i| i.send_budget > 0).unwrap_or(true) + self.peer_info.get(p).map(|i| i.send_budget.remaining > 0).unwrap_or(true) } /// Send a request to a peer. @@ -219,33 +270,32 @@ where /// returned. Sending more outbound requests should only be attempted /// once [`Event::ResumeSending`] has been received from [`NetworkBehaviour::poll`]. pub fn send_request(&mut self, p: &PeerId, req: C::Request) -> Result { - let info = - if let Some(info) = self.peer_info.get_mut(p) { - info - } else if let Some(info) = self.offline_peer_info.pop(p) { - if info.recv_budget > 1 { - self.send_credit(p, info.recv_budget - 1) + let connected = &mut self.peer_info; + let disconnected = &mut self.offline_peer_info; + let remaining = + if let Some(info) = connected.get_mut(p).or_else(|| disconnected.get_mut(p)) { + if info.send_budget.remaining == 0 { + log::trace!("{:08x}: no more budget to send another request to {}", self.id, p); + return Err(req) } - self.peer_info.entry(p.clone()).or_insert(info) + info.send_budget.remaining -= 1; + info.send_budget.remaining } else { let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit); - self.peer_info.entry(p.clone()).or_insert(PeerInfo::new(limit)) + let mut info = PeerInfo::new(limit); + info.send_budget.remaining -= 1; + let remaining = info.send_budget.remaining; + self.offline_peer_info.put(p.clone(), info); + remaining }; - if info.send_budget == 0 { - log::trace!("{:08x}: no more budget to send another request to {}", self.id, p); - return Err(req) - } - - info.send_budget -= 1; - let rid = self.behaviour.send_request(p, Message::request(req)); - log::trace! { "{:08x}: sending request {} to {} (send budget = {})", + log::trace! { "{:08x}: sending request {} to {} (budget remaining = {})", self.id, rid, p, - info.send_budget + 1 + remaining }; Ok(rid) @@ -254,16 +304,21 @@ where /// Answer an inbound request with a response. /// /// See [`RequestResponse::send_response`] for details. - pub fn send_response(&mut self, ch: ResponseChannel>, res: C::Response) { + pub fn send_response(&mut self, ch: ResponseChannel>, res: C::Response) + -> Result<(), C::Response> + { log::trace!("{:08x}: sending response {} to peer {}", self.id, ch.request_id(), &ch.peer); if let Some(info) = self.peer_info.get_mut(&ch.peer) { - if info.recv_budget == 0 { // need to send more credit to the remote peer - let crd = info.limit.switch(); - info.recv_budget = info.limit.max_recv.get(); - self.send_credit(&ch.peer, crd) + if info.recv_budget.remaining == 0 { // need to send more credit to the remote peer + let crd = info.recv_budget.limit.switch(); + info.recv_budget.remaining = info.recv_budget.limit.max_recv.get(); + self.send_credit(&ch.peer, crd); } } - self.behaviour.send_response(ch, Message::response(res)) + match self.behaviour.send_response(ch, Message::response(res)) { + Ok(()) => Ok(()), + Err(m) => Err(m.into_parts().1.expect("Missing response data.")), + } } /// Add a known peer address. @@ -295,19 +350,16 @@ where } /// Send a credit grant to the given peer. - fn send_credit(&mut self, p: &PeerId, amount: u16) { - let cid = self.next_credit_id(); - let rid = self.behaviour.send_request(p, Message::credit(amount, cid)); - log::trace!("{:08x}: sending {} as credit {} to {}", self.id, amount, cid, p); - let credit = Credit { id: cid, request: rid, amount }; - self.credit_messages.insert(p.clone(), credit); - } - - /// Create a new credit message ID. - fn next_credit_id(&mut self) -> u64 { - let n = self.credit_id; - self.credit_id += 1; - n + fn send_credit(&mut self, p: &PeerId, credit: u16) { + if let Some(info) = self.peer_info.get_mut(p) { + let cid = self.next_grant_id; + self.next_grant_id += 1; + let rid = self.behaviour.send_request(p, Message::credit(credit, cid)); + log::trace!("{:08x}: sending {} credit as grant {} to {}", self.id, credit, cid, p); + let grant = Grant { id: cid, request: rid, credit }; + info.recv_budget.grant = Some(grant); + info.recv_budget.sent.insert(rid); + } } } @@ -346,15 +398,15 @@ where fn inject_connection_closed(&mut self, peer: &PeerId, id: &ConnectionId, end: &ConnectedPoint) { self.behaviour.inject_connection_closed(peer, id, end); - if self.is_connected(peer) { - if let Some(credit) = self.credit_messages.get_mut(peer) { + if let Some(info) = self.peer_info.get_mut(peer) { + if let Some(grant) = &mut info.recv_budget.grant { log::debug! { "{:08x}: resending credit grant {} to {} after connection closed", self.id, - credit.id, + grant.id, peer }; - let msg = Message::credit(credit.amount, credit.id); - credit.request = self.behaviour.send_request(peer, msg) + let msg = Message::credit(grant.credit, grant.id); + grant.request = self.behaviour.send_request(peer, msg) } } } @@ -364,28 +416,24 @@ where self.behaviour.inject_connected(p); // The limit may have been added by `Throttled::send_request` already. if !self.peer_info.contains_key(p) { - let info = - if let Some(info) = self.offline_peer_info.pop(p) { - if info.recv_budget > 1 { - self.send_credit(p, info.recv_budget - 1) - } - info - } else { - let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit); - PeerInfo::new(limit) - }; - self.peer_info.insert(p.clone(), info); + if let Some(info) = self.offline_peer_info.pop(p) { + let recv_budget = info.recv_budget.remaining; + self.peer_info.insert(p.clone(), info); + if recv_budget > 1 { + self.send_credit(p, recv_budget - 1); + } + } else { + let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit); + self.peer_info.insert(p.clone(), PeerInfo::new(limit)); + } } } fn inject_disconnected(&mut self, p: &PeerId) { log::trace!("{:08x}: disconnected from {}", self.id, p); - if let Some(mut info) = self.peer_info.remove(p) { - info.send_budget = 1; - info.recv_budget = max(1, info.recv_budget); - self.offline_peer_info.put(p.clone(), info); + if let Some(info) = self.peer_info.remove(p) { + self.offline_peer_info.put(p.clone(), info.into_disconnected()); } - self.credit_messages.remove(p); self.behaviour.inject_disconnected(p) } @@ -413,11 +461,14 @@ where | RequestResponseMessage::Response { request_id, response } => match &response.header().typ { | Some(Type::Ack) => { - if let Some(id) = self.credit_messages.get(&peer).map(|c| c.id) { - if Some(id) == response.header().ident { - log::trace!("{:08x}: received ack {} from {}", self.id, id, peer); - self.credit_messages.remove(&peer); + if let Some(info) = self.peer_info.get_mut(&peer) { + if let Some(id) = info.recv_budget.grant.as_ref().map(|c| c.id) { + if Some(id) == response.header().ident { + log::trace!("{:08x}: received ack {} from {}", self.id, id, peer); + info.recv_budget.grant = None; + } } + info.recv_budget.sent.remove(&request_id); } continue } @@ -464,15 +515,23 @@ where id, peer }; - if info.send_budget_id < Some(id) { - if info.send_budget == 0 && credit > 0 { + if info.send_budget.grant < Some(id) { + if info.send_budget.remaining == 0 && credit > 0 { log::trace!("{:08x}: sending to peer {} can resume", self.id, peer); self.events.push_back(Event::ResumeSending(peer.clone())) } - info.send_budget += credit; - info.send_budget_id = Some(id) + info.send_budget.remaining += credit; + info.send_budget.grant = Some(id); + } + match self.behaviour.send_response(channel, Message::ack(id)) { + Err(_) => log::debug! { + "{:08x}: Failed to send ack for credit grant {}.", + self.id, id + }, + Ok(()) => { + info.send_budget.received.insert(request_id); + } } - self.behaviour.send_response(channel, Message::ack(id)) } continue } @@ -481,18 +540,18 @@ where log::trace! { "{:08x}: received request {} (recv. budget = {})", self.id, request_id, - info.recv_budget + info.recv_budget.remaining }; - if info.recv_budget == 0 { + if info.recv_budget.remaining == 0 { log::debug!("{:08x}: peer {} exceeds its budget", self.id, peer); self.events.push_back(Event::TooManyInboundRequests(peer.clone())); continue } - info.recv_budget -= 1; + info.recv_budget.remaining -= 1; // We consider a request as proof that our credit grant has // reached the peer. Usually, an ACK has already been // received. - self.credit_messages.remove(&peer); + info.recv_budget.grant = None; } if let Some(rq) = request.into_parts().1 { RequestResponseMessage::Request { request_id, request: rq, channel } @@ -524,16 +583,25 @@ where request_id, error }) => { - if let Some(credit) = self.credit_messages.get_mut(&peer) { - if credit.request == request_id { - log::debug! { "{:08x}: failed to send {} as credit {} to {}; retrying...", - self.id, - credit.amount, - credit.id, - peer - }; - let msg = Message::credit(credit.amount, credit.id); - credit.request = self.behaviour.send_request(&peer, msg) + if let Some(info) = self.peer_info.get_mut(&peer) { + if let Some(grant) = info.recv_budget.grant.as_mut() { + if grant.request == request_id { + log::debug! { + "{:08x}: failed to send {} as credit {} to {}; retrying...", + self.id, + grant.credit, + grant.id, + peer + }; + let msg = Message::credit(grant.credit, grant.id); + grant.request = self.behaviour.send_request(&peer, msg); + } + } + + // If the outbound failure was for a credit message, don't report it on + // the public API and retry the sending. + if info.recv_budget.sent.remove(&request_id) { + continue } } let event = RequestResponseEvent::OutboundFailure { peer, request_id, error }; @@ -544,9 +612,39 @@ where request_id, error }) => { + // If the inbound failure occurred in the context of responding to a + // credit grant, don't report it on the public API. + if let Some(info) = self.peer_info.get_mut(&peer) { + if info.send_budget.received.remove(&request_id) { + log::debug! { + "{:08}: failed to acknowledge credit grant from {}: {:?}", + self.id, peer, error + }; + continue + } + } let event = RequestResponseEvent::InboundFailure { peer, request_id, error }; NetworkBehaviourAction::GenerateEvent(Event::Event(event)) } + | NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::ResponseSent { + peer, + request_id + }) => { + // If this event is for an ACK response that was sent for + // the last received credit grant, skip it. + if let Some(info) = self.peer_info.get_mut(&peer) { + if info.send_budget.received.remove(&request_id) { + log::trace! { + "{:08}: successfully sent ACK for credit grant {:?}.", + self.id, + info.send_budget.grant, + } + continue + } + } + NetworkBehaviourAction::GenerateEvent(Event::Event( + RequestResponseEvent::ResponseSent { peer, request_id })) + } | NetworkBehaviourAction::DialAddress { address } => NetworkBehaviourAction::DialAddress { address }, | NetworkBehaviourAction::DialPeer { peer_id, condition } => diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 9aa7f093300..de4d1983689 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -77,8 +77,13 @@ fn ping_protocol() { } => { assert_eq!(&request, &expected_ping); assert_eq!(&peer, &peer2_id); - swarm1.send_response(channel, pong.clone()); + swarm1.send_response(channel, pong.clone()).unwrap(); }, + RequestResponseEvent::ResponseSent { + peer, .. + } => { + assert_eq!(&peer, &peer2_id); + } e => panic!("Peer1: Unexpected event: {:?}", e) } } @@ -159,8 +164,13 @@ fn ping_protocol_throttled() { }) => { assert_eq!(&request, &expected_ping); assert_eq!(&peer, &peer2_id); - swarm1.send_response(channel, pong.clone()); + swarm1.send_response(channel, pong.clone()).unwrap(); }, + throttled::Event::Event(RequestResponseEvent::ResponseSent { + peer, .. + }) => { + assert_eq!(&peer, &peer2_id); + } e => panic!("Peer1: Unexpected event: {:?}", e) } if i % 31 == 0 {