Skip to content

Commit

Permalink
fix(multistream-select): don't wait for negotiation in poll_close
Browse files Browse the repository at this point in the history
With `Version::V1Lazy` and negotiation of a single protocol, a stream initiator optimistically
sends application data right after proposing its protocol. More specifically an application can
write data via `AsyncWrite::poll_write` even though the remote has not yet confirmed the stream protocol.

This saves one round-trip.

``` mermaid
sequenceDiagram
A->>B: "/multistream/1.0.0"
A->>B: "/perf/1.0.0"
A->>B: <some-perf-protocol-data>
B->>A: "/multistream/1.0.0"
B->>A: "/perf/1.0.0"
B->>A: <some-perf-protocol-data>
```

When considering stream closing, i.e. `AsyncWrite::poll_close`, and using stream closing as an
operation in ones protocol, e.g. using stream closing to signal the end of a request, this becomes tricky.

The behavior without this commit was as following:

``` mermaid
sequenceDiagram
A->>B: "/multistream/1.0.0"
A->>B: "/perf/1.0.0"
A->>B: <some-perf-protocol-data>
Note left of A: Call `AsyncWrite::poll_close` which first waits for the<br/>optimistic multistream-select negotiation to finish, before closing the stream,<br/> i.e. setting the FIN bit.
B->>A: "/multistream/1.0.0"
B->>A: "/perf/1.0.0"
Note right of B: Waiting for A to close the stream (i.e. set the `FIN` bit)<br/>before sending the response.
A->>B: FIN
B->>A: <some-perf-protocol-data>
```

The above takes 2 round trips:

1. Send the optimistic multistream-select protocol proposals as well as the initiator protocol
payload and waits for the confirmation of the protocols.
2. Close the stream, i.e. sends the `FIN` bit and waits for the responder protocol payload.

This commit proposes that the stream initiator should not wait for the multistream-select protocol
confirmation when closing the stream, but close the stream within the first round-trip.

``` mermaid
sequenceDiagram
A->>B: "/multistream/1.0.0"
A->>B: "/perf/1.0.0"
A->>B: <some-perf-protocol-data>
A->>B: FIN
B->>A: "/multistream/1.0.0"
B->>A: "/perf/1.0.0"
B->>A: <some-perf-protocol-data>
```

This takes 1 round-trip.

The downside of this commit is, that the stream initiator will no longer notice a negotiation error
when closing the stream. They will only notice it when reading from the stream. E.g. say that B does
not support "/perf/1.0.0", A will only notice on `AsyncRead::poll_read`, not on
`AsyncWrite::poll_close`. This is problematic for protocols where A only sends data, but never
receives data, i.e. never calls `AsyncRead::poll_read`. Though one can argue that such protocol is
flawed in the first place. With a response-less protocol, as even if negotiation succceeds, A
doesn't know whether B received the protocol payload.

Pull-Request: #4019.
  • Loading branch information
mxinden authored Jun 6, 2023
1 parent a728cca commit 76cb76c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 5 deletions.
5 changes: 5 additions & 0 deletions misc/multistream-select/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
## 0.13.0 - unreleased

- Don't wait for negotiation on `<Negotiated as AsyncWrite>::poll_close`.
This can save one round-trip for protocols that use stream closing as an operation in ones protocol, e.g. using stream closing to signal the end of a request.
See [PR 4019] for details.

- Raise MSRV to 1.65.
See [PR 3715].

[PR 4019]: https://github.com/libp2p/rust-libp2p/pull/4019
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715

## 0.12.1
Expand Down
2 changes: 1 addition & 1 deletion misc/multistream-select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ smallvec = "1.6.1"
unsigned-varint = "0.7"

[dev-dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10"
futures_ringbuf = "0.4.0"
libp2p-core = { workspace = true }
Expand Down
12 changes: 8 additions & 4 deletions misc/multistream-select/src/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,7 @@ where
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
// Ensure all data has been flushed and expected negotiation messages
// have been received.
ready!(self.as_mut().poll(cx).map_err(Into::<io::Error>::into)?);
// Ensure all data has been flushed, including optimistic multistream-select messages.
ready!(self
.as_mut()
.poll_flush(cx)
Expand All @@ -316,7 +314,13 @@ where
// Continue with the shutdown of the underlying I/O stream.
match self.project().state.project() {
StateProj::Completed { io, .. } => io.poll_close(cx),
StateProj::Expecting { io, .. } => io.poll_close(cx),
StateProj::Expecting { io, .. } => {
let close_poll = io.poll_close(cx);
if let Poll::Ready(Ok(())) = close_poll {
log::debug!("Stream closed. Confirmation from remote for optimstic protocol negotiation still pending.")
}
close_poll
}
StateProj::Invalid => panic!("Negotiated: Invalid state"),
}
}
Expand Down
24 changes: 24 additions & 0 deletions misc/multistream-select/tests/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use futures::prelude::*;
use multistream_select::{dialer_select_proto, listener_select_proto, NegotiationError, Version};
use std::time::Duration;

#[test]
fn select_proto_basic() {
Expand Down Expand Up @@ -176,3 +177,26 @@ fn negotiation_failed() {
}
}
}

#[async_std::test]
async fn v1_lazy_do_not_wait_for_negotiation_on_poll_close() {
let (client_connection, _server_connection) = futures_ringbuf::Endpoint::pair(1024 * 1024, 1);

let client = async_std::task::spawn(async move {
// Single protocol to allow for lazy (or optimistic) protocol negotiation.
let protos = vec!["/proto1"];
let (proto, mut io) =
dialer_select_proto(client_connection, protos.into_iter(), Version::V1Lazy)
.await
.unwrap();
assert_eq!(proto, "/proto1");

// client can close the connection even though protocol negotiation is not yet done, i.e.
// `_server_connection` had been untouched.
io.close().await.unwrap();
});

async_std::future::timeout(Duration::from_secs(10), client)
.await
.unwrap();
}

0 comments on commit 76cb76c

Please sign in to comment.