From a087eb73d2dde66d5aeb1ad8547a01a6084372fb Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Tue, 1 Dec 2020 17:36:41 +0100 Subject: [PATCH] Track pending credit request IDs. In order to avoid emitting events relating to credit grants or acks on the public API. The public API should only emit events relating to the actual requests and responses sent by client code. --- protocols/request-response/src/throttled.rs | 273 ++++++++++++-------- 1 file changed, 160 insertions(+), 113 deletions(-) diff --git a/protocols/request-response/src/throttled.rs b/protocols/request-response/src/throttled.rs index 44096c247bbc..95545a6ffff6 100644 --- a/protocols/request-response/src/throttled.rs +++ b/protocols/request-response/src/throttled.rs @@ -42,7 +42,7 @@ use futures::ready; use libp2p_core::{ConnectedPoint, connection::ConnectionId, Multiaddr, PeerId}; use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use lru::LruCache; -use std::{collections::{HashMap, VecDeque}, task::{Context, Poll}}; +use std::{collections::{HashMap, HashSet, VecDeque}, task::{Context, Poll}}; use std::{cmp::max, num::NonZeroU16}; use super::{ ProtocolSupport, @@ -75,21 +75,20 @@ where limit_overrides: HashMap, /// Pending events to report in `Throttled::poll`. events: VecDeque>>, - /// Current outbound credit grants in flight. - credit_messages: HashMap, /// The current credit ID. credit_id: u64 } -/// Credit information that is sent to remote peers. +/// Information about a credit grant that is sent to remote peers. #[derive(Clone, Copy, Debug)] -struct Credit { +struct Grant { /// A credit ID. Used to deduplicate retransmitted credit messages. - id: u64, + id: GrantId, /// The ID of the outbound credit grant message. request: RequestId, - /// The number of requests the remote is allowed to send. - amount: u16 + /// The credit given in this grant, i.e. the number of additional + /// requests the remote is allowed to send. + credit: u16 } /// Max. number of inbound requests that can be received. @@ -130,31 +129,78 @@ impl Limit { } } +type GrantId = u64; + +/// Information related to the current send budget with a peer. +#[derive(Clone, Debug)] +struct SendBudget { + /// The last received credit grant. + grant: Option, + /// The remaining credit for requests to send. + remaining: u16, + /// Credit grant requests received and acknowledged where the outcome + /// of the acknowledgement (i.e. response sent) is still undetermined. + /// Used to avoid emitting events for successful (`ResponseSent`) or failed + /// acknowledgements. + received: HashSet, +} + +/// Information related to the current receive budget with a peer. +#[derive(Clone, Debug)] +struct RecvBudget { + /// The grant currently given to the remote but yet to be acknowledged. + /// + /// Set to `Some` when a new grant is sent to the remote, followed + /// by `None` when an acknowledgment or a request is received. The + /// latter is seen as an implicit acknowledgement. + grant: Option, + /// The limit for new credit grants when the `remaining` credit is + /// exhausted. + limit: Limit, + /// The remaining credit for requests to receive. + remaining: u16, + /// Credit grants sent whose outcome is still undetermined. + /// Used to avoid emitting events for failed credit grants. + /// + /// > **Note**: While receiving an inbound request is an implicit + /// > acknowledgement for the last sent `grant`, the outcome of + /// > the outbound request remains undetermined until a success or + /// > failure event is received for that request or the corresponding + /// > connection closes. + sent: HashSet, +} + /// Budget information about a peer. #[derive(Clone, Debug)] struct PeerInfo { - /// Limit that applies to this peer. - limit: Limit, - /// Remaining number of outbound requests that can be sent. - send_budget: u16, - /// Remaining number of inbound requests that can be received. - recv_budget: u16, - /// The ID of the credit message that granted the current `send_budget`. - send_budget_id: Option, - /// The inbound request ID of the credit message that granted the current `send_budget`. - send_budget_request_id: Option, + send_budget: SendBudget, + recv_budget: RecvBudget, } impl PeerInfo { - fn new(limit: Limit) -> Self { + fn new(recv_limit: Limit) -> Self { PeerInfo { - limit, - send_budget: 1, - recv_budget: 1, - send_budget_id: None, - send_budget_request_id: None, + send_budget: SendBudget { + grant: None, + remaining: 1, + received: HashSet::new(), + }, + recv_budget: RecvBudget { + grant: None, + limit: recv_limit, + remaining: 1, + sent: HashSet::new(), + } } } + + fn into_disconnected(mut self) -> Self { + self.send_budget.received = HashSet::new(); + self.send_budget.remaining = 1; + self.recv_budget.sent = HashSet::new(); + self.recv_budget.remaining = max(1, self.recv_budget.remaining); + self + } } impl Throttled @@ -183,7 +229,7 @@ where default_limit: Limit::new(NonZeroU16::new(1).expect("1 > 0")), limit_overrides: HashMap::new(), events: VecDeque::new(), - credit_messages: HashMap::new(), + // credit_messages: HashMap::new(), credit_id: 0 } } @@ -198,9 +244,9 @@ where pub fn override_receive_limit(&mut self, p: &PeerId, limit: NonZeroU16) { log::debug!("{:08x}: override limit for {}: {:?}", self.id, p, limit); if let Some(info) = self.peer_info.get_mut(p) { - info.limit.set(limit) + info.recv_budget.limit.set(limit) } else if let Some(info) = self.offline_peer_info.get_mut(p) { - info.limit.set(limit) + info.recv_budget.limit.set(limit) } self.limit_overrides.insert(p.clone(), Limit::new(limit)); } @@ -213,7 +259,7 @@ where /// Has the limit of outbound requests been reached for the given peer? pub fn can_send(&mut self, p: &PeerId) -> bool { - self.peer_info.get(p).map(|i| i.send_budget > 0).unwrap_or(true) + self.peer_info.get(p).map(|i| i.send_budget.remaining > 0).unwrap_or(true) } /// Send a request to a peer. @@ -222,33 +268,32 @@ where /// returned. Sending more outbound requests should only be attempted /// once [`Event::ResumeSending`] has been received from [`NetworkBehaviour::poll`]. pub fn send_request(&mut self, p: &PeerId, req: C::Request) -> Result { - let info = - if let Some(info) = self.peer_info.get_mut(p) { - info - } else if let Some(info) = self.offline_peer_info.pop(p) { - if info.recv_budget > 1 { - self.send_credit(p, info.recv_budget - 1) + let connected = &mut self.peer_info; + let disconnected = &mut self.offline_peer_info; + let remaining = + if let Some(info) = connected.get_mut(p).or_else(|| disconnected.get_mut(p)) { + if info.send_budget.remaining == 0 { + log::trace!("{:08x}: no more budget to send another request to {}", self.id, p); + return Err(req) } - self.peer_info.entry(p.clone()).or_insert(info) + info.send_budget.remaining -= 1; + info.send_budget.remaining } else { let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit); - self.peer_info.entry(p.clone()).or_insert(PeerInfo::new(limit)) + let mut info = PeerInfo::new(limit); + info.send_budget.remaining -= 1; + let remaining = info.send_budget.remaining; + self.offline_peer_info.put(p.clone(), info); + remaining }; - if info.send_budget == 0 { - log::trace!("{:08x}: no more budget to send another request to {}", self.id, p); - return Err(req) - } - - info.send_budget -= 1; - let rid = self.behaviour.send_request(p, Message::request(req)); - log::trace! { "{:08x}: sending request {} to {} (send budget = {})", + log::trace! { "{:08x}: sending request {} to {} (budget remaining = {})", self.id, rid, p, - info.send_budget + 1 + remaining }; Ok(rid) @@ -262,10 +307,10 @@ where { log::trace!("{:08x}: sending response {} to peer {}", self.id, ch.request_id(), &ch.peer); if let Some(info) = self.peer_info.get_mut(&ch.peer) { - if info.recv_budget == 0 { // need to send more credit to the remote peer - let crd = info.limit.switch(); - info.recv_budget = info.limit.max_recv.get(); - self.send_credit(&ch.peer, crd) + if info.recv_budget.remaining == 0 { // need to send more credit to the remote peer + let crd = info.recv_budget.limit.switch(); + info.recv_budget.remaining = info.recv_budget.limit.max_recv.get(); + self.send_credit(&ch.peer, crd); } } match self.behaviour.send_response(ch, Message::response(res)) { @@ -303,19 +348,16 @@ where } /// Send a credit grant to the given peer. - fn send_credit(&mut self, p: &PeerId, amount: u16) { - let cid = self.next_credit_id(); - let rid = self.behaviour.send_request(p, Message::credit(amount, cid)); - log::trace!("{:08x}: sending {} as credit {} to {}", self.id, amount, cid, p); - let credit = Credit { id: cid, request: rid, amount }; - self.credit_messages.insert(p.clone(), credit); - } - - /// Create a new credit message ID. - fn next_credit_id(&mut self) -> u64 { - let n = self.credit_id; - self.credit_id += 1; - n + fn send_credit(&mut self, p: &PeerId, credit: u16) { + if let Some(info) = self.peer_info.get_mut(p) { + let cid = self.credit_id; + self.credit_id += 1; + let rid = self.behaviour.send_request(p, Message::credit(credit, cid)); + log::trace!("{:08x}: sending {} credit as grant {} to {}", self.id, credit, cid, p); + let grant = Grant { id: cid, request: rid, credit }; + info.recv_budget.grant = Some(grant); + info.recv_budget.sent.insert(rid); + } } } @@ -354,15 +396,15 @@ where fn inject_connection_closed(&mut self, peer: &PeerId, id: &ConnectionId, end: &ConnectedPoint) { self.behaviour.inject_connection_closed(peer, id, end); - if self.is_connected(peer) { - if let Some(credit) = self.credit_messages.get_mut(peer) { + if let Some(info) = self.peer_info.get_mut(peer) { + if let Some(grant) = &mut info.recv_budget.grant { log::debug! { "{:08x}: resending credit grant {} to {} after connection closed", self.id, - credit.id, + grant.id, peer }; - let msg = Message::credit(credit.amount, credit.id); - credit.request = self.behaviour.send_request(peer, msg) + let msg = Message::credit(grant.credit, grant.id); + grant.request = self.behaviour.send_request(peer, msg) } } } @@ -372,28 +414,24 @@ where self.behaviour.inject_connected(p); // The limit may have been added by `Throttled::send_request` already. if !self.peer_info.contains_key(p) { - let info = - if let Some(info) = self.offline_peer_info.pop(p) { - if info.recv_budget > 1 { - self.send_credit(p, info.recv_budget - 1) - } - info - } else { - let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit); - PeerInfo::new(limit) - }; - self.peer_info.insert(p.clone(), info); + if let Some(info) = self.offline_peer_info.pop(p) { + let recv_budget = info.recv_budget.remaining; + self.peer_info.insert(p.clone(), info); + if recv_budget > 1 { + self.send_credit(p, recv_budget - 1); + } + } else { + let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit); + self.peer_info.insert(p.clone(), PeerInfo::new(limit)); + } } } fn inject_disconnected(&mut self, p: &PeerId) { log::trace!("{:08x}: disconnected from {}", self.id, p); - if let Some(mut info) = self.peer_info.remove(p) { - info.send_budget = 1; - info.recv_budget = max(1, info.recv_budget); - self.offline_peer_info.put(p.clone(), info); + if let Some(info) = self.peer_info.remove(p) { + self.offline_peer_info.put(p.clone(), info.into_disconnected()); } - self.credit_messages.remove(p); self.behaviour.inject_disconnected(p) } @@ -421,11 +459,14 @@ where | RequestResponseMessage::Response { request_id, response } => match &response.header().typ { | Some(Type::Ack) => { - if let Some(id) = self.credit_messages.get(&peer).map(|c| c.id) { - if Some(id) == response.header().ident { - log::trace!("{:08x}: received ack {} from {}", self.id, id, peer); - self.credit_messages.remove(&peer); + if let Some(info) = self.peer_info.get_mut(&peer) { + if let Some(id) = info.recv_budget.grant.as_ref().map(|c| c.id) { + if Some(id) == response.header().ident { + log::trace!("{:08x}: received ack {} from {}", self.id, id, peer); + info.recv_budget.grant = None; + } } + info.recv_budget.sent.remove(&request_id); } continue } @@ -472,20 +513,21 @@ where id, peer }; - if info.send_budget_id < Some(id) { - if info.send_budget == 0 && credit > 0 { + if info.send_budget.grant < Some(id) { + if info.send_budget.remaining == 0 && credit > 0 { log::trace!("{:08x}: sending to peer {} can resume", self.id, peer); self.events.push_back(Event::ResumeSending(peer.clone())) } - info.send_budget += credit; - info.send_budget_id = Some(id); - info.send_budget_request_id = Some(request_id); + info.send_budget.remaining += credit; + info.send_budget.grant = Some(id); } if let Err(_) = self.behaviour.send_response(channel, Message::ack(id)) { log::debug! { "{:08x}: Failed to send ack for credit grant {}.", self.id, id }; + } else { + info.send_budget.received.insert(request_id); } } continue @@ -495,18 +537,18 @@ where log::trace! { "{:08x}: received request {} (recv. budget = {})", self.id, request_id, - info.recv_budget + info.recv_budget.remaining }; - if info.recv_budget == 0 { + if info.recv_budget.remaining == 0 { log::debug!("{:08x}: peer {} exceeds its budget", self.id, peer); self.events.push_back(Event::TooManyInboundRequests(peer.clone())); continue } - info.recv_budget -= 1; + info.recv_budget.remaining -= 1; // We consider a request as proof that our credit grant has // reached the peer. Usually, an ACK has already been // received. - self.credit_messages.remove(&peer); + info.recv_budget.grant = None; } if let Some(rq) = request.into_parts().1 { RequestResponseMessage::Request { request_id, request: rq, channel } @@ -538,19 +580,24 @@ where request_id, error }) => { - // If the outbound failure was for a credit message, don't report it on - // the public API and retry the sending. - if let Some(credit) = self.credit_messages.get_mut(&peer) { - if credit.request == request_id { - log::debug! { - "{:08x}: failed to send {} as credit {} to {}; retrying...", - self.id, - credit.amount, - credit.id, - peer - }; - let msg = Message::credit(credit.amount, credit.id); - credit.request = self.behaviour.send_request(&peer, msg); + if let Some(info) = self.peer_info.get_mut(&peer) { + if let Some(grant) = info.recv_budget.grant.as_mut() { + if grant.request == request_id { + log::debug! { + "{:08x}: failed to send {} as credit {} to {}; retrying...", + self.id, + grant.credit, + grant.id, + peer + }; + let msg = Message::credit(grant.credit, grant.id); + grant.request = self.behaviour.send_request(&peer, msg); + } + } + + // If the outbound failure was for a credit message, don't report it on + // the public API and retry the sending. + if info.recv_budget.sent.remove(&request_id) { continue } } @@ -565,9 +612,9 @@ where // If the inbound failure occurred in the context of responding to a // credit grant, don't report it on the public API. if let Some(info) = self.peer_info.get_mut(&peer) { - if info.send_budget_request_id == Some(request_id) { + if info.send_budget.received.remove(&request_id) { log::debug! { - "{:08}: failed to respond to credit grant from {}: {:?}", + "{:08}: failed to acknowledge credit grant from {}: {:?}", self.id, peer, error }; continue @@ -583,11 +630,11 @@ where // If this event is for an ACK response that was sent for // the last received credit grant, skip it. if let Some(info) = self.peer_info.get_mut(&peer) { - if info.send_budget_request_id == Some(request_id) { + if info.send_budget.received.remove(&request_id) { log::trace! { "{:08}: successfully sent ACK for credit grant {:?}.", self.id, - info.send_budget_id, + info.send_budget.grant, } continue }