Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc/multistream-select: Differentiate interpretation of EOF #1823

Merged
merged 9 commits into from
Nov 9, 2020
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ libsecp256k1 = { version = "0.3.1", optional = true }
log = "0.4"
multiaddr = { package = "parity-multiaddr", version = "0.9.2", path = "../misc/multiaddr" }
multihash = "0.11.0"
multistream-select = { version = "0.8.4", path = "../misc/multistream-select" }
multistream-select = { version = "0.8.5", path = "../misc/multistream-select" }
parking_lot = "0.11.0"
pin-project = "1.0.0"
prost = "0.6.1"
Expand Down
6 changes: 6 additions & 0 deletions misc/multistream-select/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.8.5 [unreleased]

- Do not interpret EOF error after sending a `NotAvailable` message as an IO
error, but instead as a negotiation error. See
https://github.com/libp2p/rust-libp2p/pull/1823.

# 0.8.4 [2020-10-20]

- Temporarily disable the internal selection of "parallel" protocol
Expand Down
2 changes: 1 addition & 1 deletion misc/multistream-select/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "multistream-select"
description = "Multistream-select negotiation protocol for libp2p"
version = "0.8.4"
version = "0.8.5"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
48 changes: 34 additions & 14 deletions misc/multistream-select/src/listener_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,21 @@ where
{
RecvHeader { io: MessageIO<R> },
SendHeader { io: MessageIO<R>, version: Version },
RecvMessage { io: MessageIO<R> },
RecvMessage {
io: MessageIO<R>,
/// Whether the last sent message was a [`Message::NotAvailable`].
prev_sent_na: bool,
},
SendMessage {
io: MessageIO<R>,
message: Message,
protocol: Option<N>
},
Flush {
io: MessageIO<R>,
protocol: Option<N>
protocol: Option<N>,
/// Whether the last sent message was a [`Message::NotAvailable`].
prev_sent_na: bool
},
Done
}
Expand Down Expand Up @@ -144,20 +150,32 @@ where
}

*this.state = match version {
Version::V1 => State::Flush { io, protocol: None },
Version::V1Lazy => State::RecvMessage { io },
Version::V1 => State::Flush { io, protocol: None, prev_sent_na: false },
Version::V1Lazy => State::RecvMessage { io, prev_sent_na: false },
}
}

State::RecvMessage { mut io } => {
State::RecvMessage { mut io, prev_sent_na } => {
let msg = match Pin::new(&mut io).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => msg,
Poll::Ready(None) =>
return Poll::Ready(Err(NegotiationError::from(
ProtocolError::IoError(
io::ErrorKind::UnexpectedEof.into())))),
Poll::Ready(None) => {
// When a listener rejects a protocol with [`Message::NotAvailable`] and
// the dialer does not have alternative protocols to propose then the
// dialer will stop the negotiation and drop the corresponding stream.
// As a listener interpret an EOF after sending a
// [`Message::NotAvailable`] as a failed negotiation. Interpret an EOF
// as an io error in all other cases.
let err = if prev_sent_na {
NegotiationError::Failed
} else {
NegotiationError::from(ProtocolError::IoError(
io::ErrorKind::UnexpectedEof.into(),
))
};
return Poll::Ready(Err(err));
}
Poll::Pending => {
*this.state = State::RecvMessage { io };
*this.state = State::RecvMessage { io, prev_sent_na };
return Poll::Pending;
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(From::from(err))),
Expand Down Expand Up @@ -203,17 +221,19 @@ where
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
}

let is_na_msg = matches!(message, Message::NotAvailable);

if let Err(err) = Pin::new(&mut io).start_send(message) {
return Poll::Ready(Err(From::from(err)));
}

*this.state = State::Flush { io, protocol };
*this.state = State::Flush { io, protocol, prev_sent_na: is_na_msg };
}

State::Flush { mut io, protocol } => {
State::Flush { mut io, protocol, prev_sent_na } => {
match Pin::new(&mut io).poll_flush(cx) {
Poll::Pending => {
*this.state = State::Flush { io, protocol };
*this.state = State::Flush { io, protocol, prev_sent_na };
return Poll::Pending
},
Poll::Ready(Ok(())) => {
Expand All @@ -226,7 +246,7 @@ where
let io = Negotiated::completed(io.into_inner());
return Poll::Ready(Ok((protocol, io)))
}
None => *this.state = State::RecvMessage { io }
None => *this.state = State::RecvMessage { io, prev_sent_na }
}
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
Expand Down
5 changes: 2 additions & 3 deletions misc/multistream-select/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ fn no_protocol_found() {
let protos = vec![b"/proto1", b"/proto2"];
let io = match listener_select_proto(connec, protos).await {
Ok((_, io)) => io,
// We don't explicitly check for `Failed` because the client might close the connection when it
// realizes that we have no protocol in common.
Comment on lines -88 to -89
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

Err(_) => return,
Err(NegotiationError::Failed) => return,
Err(NegotiationError::ProtocolError(e)) => panic!("Unexpected protocol error {}", e),
};
match io.complete().await {
Err(NegotiationError::Failed) => {},
Expand Down