Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[request-response] Refine success & error reporting for inbound requests. #1867

Merged
merged 10 commits into from
Dec 7, 2020
26 changes: 20 additions & 6 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,29 @@ pub enum RequestResponseHandlerEvent<TCodec>
where
TCodec: RequestResponseCodec
{
/// An inbound request.
/// A request has been received.
Request {
request_id: RequestId,
request: TCodec::Request,
sender: oneshot::Sender<TCodec::Response>
},
/// An inbound response.
/// A response has been received.
Response {
request_id: RequestId,
response: TCodec::Response
},
/// An outbound upgrade (i.e. request) timed out.
/// A response to an inbound request has been sent.
ResponseSent(RequestId),
/// A response to an inbound request was omitted as a result
/// of dropping the response `sender` of an inbound `Request`.
ResponseOmission(RequestId),
/// An outbound request timed out while sending the request
/// or waiting for the response.
OutboundTimeout(RequestId),
/// An outbound request failed to negotiate a mutually supported protocol.
OutboundUnsupportedProtocols(RequestId),
/// An inbound request timed out.
/// An inbound request timed out while waiting for the request
/// or sending the response.
InboundTimeout(RequestId),
/// An inbound request failed to negotiate a mutually supported protocol.
InboundUnsupportedProtocols(RequestId),
Expand Down Expand Up @@ -187,9 +194,16 @@ where

fn inject_fully_negotiated_inbound(
&mut self,
(): (),
_: RequestId
sent: bool,
request_id: RequestId
) {
if sent {
self.pending_events.push_back(
RequestResponseHandlerEvent::ResponseSent(request_id))
} else {
self.pending_events.push_back(
RequestResponseHandlerEvent::ResponseOmission(request_id))
}
}

fn inject_fully_negotiated_outbound(
Expand Down
6 changes: 4 additions & 2 deletions protocols/request-response/src/handler/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<TCodec> InboundUpgrade<NegotiatedSubstream> for ResponseProtocol<TCodec>
where
TCodec: RequestResponseCodec + Send + 'static,
{
type Output = ();
type Output = bool;
type Error = io::Error;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

Expand All @@ -105,10 +105,12 @@ where
if let Ok(response) = self.response_receiver.await {
let write = self.codec.write_response(&protocol, &mut io, response);
write.await?;
} else {
return Ok(false)
}
}
io.close().await?;
Ok(())
Ok(true)
}.boxed()
}
}
Expand Down
80 changes: 53 additions & 27 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,6 @@
//! For that purpose, [`RequestResponseCodec::Protocol`] is typically
//! instantiated with a sum type.
//!
//! ## One-Way Protocols
//!
//! The implementation supports one-way protocols that do not
//! have responses. In these cases the [`RequestResponseCodec::Response`] can
//! be defined as `()` and [`RequestResponseCodec::read_response`] as well as
//! [`RequestResponseCodec::write_response`] given the obvious implementations.
//! Note that `RequestResponseMessage::Response` will still be emitted,
//! immediately after the request has been sent, since `RequestResponseCodec::read_response`
//! will not actually read anything from the given I/O stream.
//! [`RequestResponse::send_response`] need not be called for one-way protocols,
//! i.e. the [`ResponseChannel`] may just be dropped.
//!
//! ## Limited Protocol Support
//!
//! It is possible to only support inbound or outbound requests for
Expand Down Expand Up @@ -115,9 +103,11 @@ pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TRespons
request_id: RequestId,
/// The request message.
request: TRequest,
/// The sender of the request who is awaiting a response.
/// The channel waiting for the response.
///
/// See [`RequestResponse::send_response`].
/// If this channel is dropped instead of being used to send a response
/// via [`RequestResponse::send_response`], a [`RequestResponseEvent::InboundFailure`]
/// with [`InboundFailure::ResponseOmission`] is emitted.
channel: ResponseChannel<TChannelResponse>,
},
/// A response message.
Expand Down Expand Up @@ -151,6 +141,14 @@ pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse>
error: OutboundFailure,
},
/// An inbound request failed.
///
/// > **Note**: The case whereby a connection on which a response is sent
/// > closes after [`RequestResponse::send_response`] already succeeded
/// > but before the response could be sent on the connection is reflected
/// > by there being no [`RequestResponseEvent::ResponseSent`] event.
/// > Code interested in ensuring a response has been successfully
/// > handed to the transport layer, e.g. before continuing with the next
/// > step in a multi-step protocol, should listen to these events.
InboundFailure {
/// The peer from whom the request was received.
peer: PeerId,
Expand All @@ -159,6 +157,16 @@ pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse>
/// The error that occurred.
error: InboundFailure,
},
/// A response to an inbound request has been sent.
///
/// When this event is received, the response has been flushed on
/// the underlying transport connection.
ResponseSent {
/// The peer to whom the response was sent.
peer: PeerId,
/// The ID of the inbound request whose response was sent.
request_id: RequestId,
},
}

/// Possible failures occurring in the context of sending
Expand Down Expand Up @@ -186,14 +194,17 @@ pub enum OutboundFailure {
#[derive(Debug)]
pub enum InboundFailure {
/// The inbound request timed out, either while reading the
/// incoming request or before a response is sent, i.e. if
/// incoming request or before a response is sent, e.g. if
/// [`RequestResponse::send_response`] is not called in a
/// timely manner.
Timeout,
/// The local peer supports none of the requested protocols.
/// The local peer supports none of the protocols requested
/// by the remote.
UnsupportedProtocols,
/// The connection closed before a response was delivered.
ConnectionClosed,
/// The local peer failed to respond to an inbound request
/// due to the [`ResponseChannel`] being dropped instead of
/// being passed to [`RequestResponse::send_response`].
ResponseOmission,
}

/// A channel for sending a response to an inbound request.
Expand Down Expand Up @@ -379,17 +390,18 @@ where

/// Initiates sending a response to an inbound request.
///
/// If the `ResponseChannel` is already closed due to a timeout,
/// the response is discarded and eventually [`RequestResponseEvent::InboundFailure`]
/// is emitted by `RequestResponse::poll`.
/// If the `ResponseChannel` is already closed due to a timeout or
/// the connection being closed, the response is returned as an `Err`
/// for further handling. When the response has been successfully sent
/// on the corresponding connection, [`RequestResponseEvent::ResponseSent`]
/// is emitted.
romanb marked this conversation as resolved.
Show resolved Hide resolved
///
/// The provided `ResponseChannel` is obtained from a
/// The provided `ResponseChannel` is obtained from an inbound
/// [`RequestResponseMessage::Request`].
pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response) {
// Fails only if the inbound upgrade timed out waiting for the response,
// in which case the handler emits `RequestResponseHandlerEvent::InboundTimeout`
// which in turn results in `RequestResponseEvent::InboundFailure`.
let _ = ch.sender.send(rs);
pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response)
-> Result<(), TCodec::Response>
{
ch.sender.send(rs)
}

/// Adds a known address for a peer that can be used for
Expand Down Expand Up @@ -577,6 +589,20 @@ where
RequestResponseEvent::Message { peer, message }
));
}
RequestResponseHandlerEvent::ResponseSent(request_id) => {
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::ResponseSent { peer, request_id }));
}
RequestResponseHandlerEvent::ResponseOmission(request_id) => {
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
peer,
request_id,
error: InboundFailure::ResponseOmission
}));
}
RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) {
self.pending_events.push_back(
Expand Down
64 changes: 56 additions & 8 deletions protocols/request-response/src/throttled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ struct PeerInfo {
/// Remaining number of inbound requests that can be received.
recv_budget: u16,
/// The ID of the credit message that granted the current `send_budget`.
send_budget_id: Option<u64>
send_budget_id: Option<u64>,
/// The inbound request ID of the credit message that granted the current `send_budget`.
send_budget_request_id: Option<RequestId>,
}

impl PeerInfo {
Expand All @@ -149,7 +151,8 @@ impl PeerInfo {
limit,
send_budget: 1,
recv_budget: 1,
send_budget_id: None
send_budget_id: None,
send_budget_request_id: None,
}
}
}
Expand Down Expand Up @@ -254,7 +257,9 @@ where
/// Answer an inbound request with a response.
///
/// See [`RequestResponse::send_response`] for details.
pub fn send_response(&mut self, ch: ResponseChannel<Message<C::Response>>, res: C::Response) {
pub fn send_response(&mut self, ch: ResponseChannel<Message<C::Response>>, res: C::Response)
-> Result<(), C::Response>
{
log::trace!("{:08x}: sending response {} to peer {}", self.id, ch.request_id(), &ch.peer);
if let Some(info) = self.peer_info.get_mut(&ch.peer) {
if info.recv_budget == 0 { // need to send more credit to the remote peer
Expand All @@ -263,7 +268,10 @@ where
self.send_credit(&ch.peer, crd)
}
}
self.behaviour.send_response(ch, Message::response(res))
match self.behaviour.send_response(ch, Message::response(res)) {
Ok(()) => Ok(()),
Err(m) => Err(m.into_parts().1.expect("Missing response data.")),
}
}

/// Add a known peer address.
Expand Down Expand Up @@ -470,9 +478,15 @@ where
self.events.push_back(Event::ResumeSending(peer.clone()))
}
info.send_budget += credit;
info.send_budget_id = Some(id)
info.send_budget_id = Some(id);
info.send_budget_request_id = Some(request_id);
mxinden marked this conversation as resolved.
Show resolved Hide resolved
}
if let Err(_) = self.behaviour.send_response(channel, Message::ack(id)) {
log::debug! {
"{:08x}: Failed to send ack for credit grant {}.",
self.id, id
};
}
self.behaviour.send_response(channel, Message::ack(id))
}
continue
}
Expand Down Expand Up @@ -524,16 +538,20 @@ where
request_id,
error
}) => {
// If the outbound failure was for a credit message, don't report it on
// the public API and retry the sending.
if let Some(credit) = self.credit_messages.get_mut(&peer) {
if credit.request == request_id {
log::debug! { "{:08x}: failed to send {} as credit {} to {}; retrying...",
log::debug! {
"{:08x}: failed to send {} as credit {} to {}; retrying...",
self.id,
credit.amount,
credit.id,
peer
};
let msg = Message::credit(credit.amount, credit.id);
credit.request = self.behaviour.send_request(&peer, msg)
credit.request = self.behaviour.send_request(&peer, msg);
continue
}
}
let event = RequestResponseEvent::OutboundFailure { peer, request_id, error };
Expand All @@ -544,9 +562,39 @@ where
request_id,
error
}) => {
// If the inbound failure occurred in the context of responding to a
// credit grant, don't report it on the public API.
if let Some(info) = self.peer_info.get_mut(&peer) {
if info.send_budget_request_id == Some(request_id) {
log::debug! {
"{:08}: failed to respond to credit grant from {}: {:?}",
self.id, peer, error
};
continue
}
}
let event = RequestResponseEvent::InboundFailure { peer, request_id, error };
NetworkBehaviourAction::GenerateEvent(Event::Event(event))
}
| NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::ResponseSent {
peer,
request_id
}) => {
// If this event is for an ACK response that was sent for
// the last received credit grant, skip it.
if let Some(info) = self.peer_info.get_mut(&peer) {
if info.send_budget_request_id == Some(request_id) {
log::trace! {
"{:08}: successfully sent ACK for credit grant {:?}.",
self.id,
info.send_budget_id,
}
continue
}
}
NetworkBehaviourAction::GenerateEvent(Event::Event(
RequestResponseEvent::ResponseSent { peer, request_id }))
}
| NetworkBehaviourAction::DialAddress { address } =>
NetworkBehaviourAction::DialAddress { address },
| NetworkBehaviourAction::DialPeer { peer_id, condition } =>
Expand Down
14 changes: 12 additions & 2 deletions protocols/request-response/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ fn ping_protocol() {
} => {
assert_eq!(&request, &expected_ping);
assert_eq!(&peer, &peer2_id);
swarm1.send_response(channel, pong.clone());
swarm1.send_response(channel, pong.clone()).unwrap();
},
RequestResponseEvent::ResponseSent {
peer, ..
} => {
assert_eq!(&peer, &peer2_id);
}
e => panic!("Peer1: Unexpected event: {:?}", e)
}
}
Expand Down Expand Up @@ -159,8 +164,13 @@ fn ping_protocol_throttled() {
}) => {
assert_eq!(&request, &expected_ping);
assert_eq!(&peer, &peer2_id);
swarm1.send_response(channel, pong.clone());
swarm1.send_response(channel, pong.clone()).unwrap();
},
throttled::Event::Event(RequestResponseEvent::ResponseSent {
peer, ..
}) => {
assert_eq!(&peer, &peer2_id);
}
e => panic!("Peer1: Unexpected event: {:?}", e)
}
if i % 31 == 0 {
Expand Down