From a61e374dc1ca157c3a323ad093e28572932c571e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 15:05:26 +0200 Subject: [PATCH 01/11] multistream/dialer: Await header line only once Signed-off-by: Alexandru Vasile --- src/multistream_select/dialer_select.rs | 108 +++++++++++++++++++----- 1 file changed, 86 insertions(+), 22 deletions(-) diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index 2a8a025e..9ae1edc5 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -86,10 +86,24 @@ pub struct DialerSelectFuture { } enum State { - SendHeader { io: MessageIO }, - SendProtocol { io: MessageIO, protocol: N }, - FlushProtocol { io: MessageIO, protocol: N }, - AwaitProtocol { io: MessageIO, protocol: N }, + SendHeader { + io: MessageIO, + }, + SendProtocol { + io: MessageIO, + protocol: N, + header_received: bool, + }, + FlushProtocol { + io: MessageIO, + protocol: N, + header_received: bool, + }, + AwaitProtocol { + io: MessageIO, + protocol: N, + header_received: bool, + }, Done, } @@ -127,14 +141,26 @@ where // The dialer always sends the header and the first protocol // proposal in one go for efficiency. - *this.state = State::SendProtocol { io, protocol }; + *this.state = State::SendProtocol { + io, + protocol, + header_received: false, + }; } - State::SendProtocol { mut io, protocol } => { + State::SendProtocol { + mut io, + protocol, + header_received, + } => { match Pin::new(&mut io).poll_ready(cx)? { Poll::Ready(()) => {} Poll::Pending => { - *this.state = State::SendProtocol { io, protocol }; + *this.state = State::SendProtocol { + io, + protocol, + header_received, + }; return Poll::Pending; } } @@ -146,10 +172,19 @@ where tracing::debug!(target: LOG_TARGET, "Dialer: Proposed protocol: {}", p); if this.protocols.peek().is_some() { - *this.state = State::FlushProtocol { io, protocol } + *this.state = State::FlushProtocol { + io, + protocol, + header_received, + } } else { match this.version { - Version::V1 => *this.state = State::FlushProtocol { io, protocol }, + Version::V1 => + *this.state = State::FlushProtocol { + io, + protocol, + header_received, + }, // This is the only effect that `V1Lazy` has compared to `V1`: // Optimistically settling on the only protocol that // the dialer supports for this negotiation. Notably, @@ -168,21 +203,40 @@ where } } - State::FlushProtocol { mut io, protocol } => { - match Pin::new(&mut io).poll_flush(cx)? { - Poll::Ready(()) => *this.state = State::AwaitProtocol { io, protocol }, - Poll::Pending => { - *this.state = State::FlushProtocol { io, protocol }; - return Poll::Pending; - } + State::FlushProtocol { + mut io, + protocol, + header_received, + } => match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => + *this.state = State::AwaitProtocol { + io, + protocol, + header_received, + }, + Poll::Pending => { + *this.state = State::FlushProtocol { + io, + protocol, + header_received, + }; + return Poll::Pending; } - } + }, - State::AwaitProtocol { mut io, protocol } => { + State::AwaitProtocol { + mut io, + protocol, + header_received, + } => { let msg = match Pin::new(&mut io).poll_next(cx)? { Poll::Ready(Some(msg)) => msg, Poll::Pending => { - *this.state = State::AwaitProtocol { io, protocol }; + *this.state = State::AwaitProtocol { + io, + protocol, + header_received, + }; return Poll::Pending; } // Treat EOF error as [`NegotiationError::Failed`], not as @@ -192,8 +246,14 @@ where }; match msg { - Message::Header(v) if v == HeaderLine::from(*this.version) => { - *this.state = State::AwaitProtocol { io, protocol }; + Message::Header(v) + if v == HeaderLine::from(*this.version) && !header_received => + { + *this.state = State::AwaitProtocol { + io, + protocol, + header_received: true, + }; } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { tracing::debug!( @@ -211,7 +271,11 @@ where String::from_utf8_lossy(protocol.as_ref()) ); let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; - *this.state = State::SendProtocol { io, protocol } + *this.state = State::SendProtocol { + io, + protocol, + header_received, + } } _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), } From a4f50d4c176eac902c40eed6cff815134f5906a5 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 15:25:15 +0200 Subject: [PATCH 02/11] multistream/tests: Check protocol negotiation Signed-off-by: Alexandru Vasile --- Cargo.lock | 22 +++++++++++ Cargo.toml | 1 + src/multistream_select/dialer_select.rs | 49 +++++++++++++++++++++++++ 3 files changed, 72 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 8112055e..d9f83a1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1784,6 +1784,18 @@ dependencies = [ "slab", ] +[[package]] +name = "futures_ringbuf" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6628abb6eb1fc74beaeb20cd0670c43d158b0150f7689b38c3eaf663f99bdec7" +dependencies = [ + "futures", + "log", + "ringbuf", + "rustc_version", +] + [[package]] name = "fxhash" version = "0.2.1" @@ -2880,6 +2892,7 @@ dependencies = [ "futures", "futures-rustls", "futures-timer", + "futures_ringbuf", "hex-literal", "indexmap 2.1.0", "libc", @@ -4370,6 +4383,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ringbuf" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79abed428d1fd2a128201cec72c5f6938e2da607c6f3745f769fabea399d950a" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "rtcp" version = "0.7.2" diff --git a/Cargo.toml b/Cargo.toml index fc0b875f..68fc0eb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ sc-network = "0.28.0" sc-utils = "8.0.0" serde_json = "1.0.108" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } +futures_ringbuf = "0.4.0" [features] custom_sc_network = [] diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index 9ae1edc5..e779dba4 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -410,3 +410,52 @@ impl DialerState { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::multistream_select::listener_select_proto; + + #[tokio::test] + async fn select_proto_basic() { + async fn run(version: Version) { + let (client_connection, server_connection) = futures_ringbuf::Endpoint::pair(100, 100); + + let server = tokio::spawn(async move { + let protos = vec!["/proto1", "/proto2"]; + let (proto, mut io) = + listener_select_proto(server_connection, protos).await.unwrap(); + assert_eq!(proto, "/proto2"); + + let mut out = vec![0; 32]; + let n = io.read(&mut out).await.unwrap(); + out.truncate(n); + assert_eq!(out, b"ping"); + + io.write_all(b"pong").await.unwrap(); + io.flush().await.unwrap(); + }); + + let client = tokio::spawn(async move { + let protos = vec!["/proto3", "/proto2"]; + let (proto, mut io) = + dialer_select_proto(client_connection, protos, version).await.unwrap(); + assert_eq!(proto, "/proto2"); + + io.write_all(b"ping").await.unwrap(); + io.flush().await.unwrap(); + + let mut out = vec![0; 32]; + let n = io.read(&mut out).await.unwrap(); + out.truncate(n); + assert_eq!(out, b"pong"); + }); + + server.await; + client.await; + } + + run(Version::V1).await; + run(Version::V1Lazy).await; + } +} From 32b7284aa27897eb813e80b72d2a2b4e5a3761f4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 15:35:59 +0200 Subject: [PATCH 03/11] multistream/tests: Check negotiation fails Signed-off-by: Alexandru Vasile --- src/multistream_select/dialer_select.rs | 65 +++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index e779dba4..4e34ebe0 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -415,6 +415,8 @@ impl DialerState { mod tests { use super::*; use crate::multistream_select::listener_select_proto; + use std::time::Duration; + use tokio::net::{TcpListener, TcpStream}; #[tokio::test] async fn select_proto_basic() { @@ -458,4 +460,67 @@ mod tests { run(Version::V1).await; run(Version::V1Lazy).await; } + + /// Tests the expected behaviour of failed negotiations. + #[tokio::test] + async fn negotiation_failed() { + async fn run( + version: Version, + dial_protos: Vec<&'static str>, + dial_payload: Vec, + listen_protos: Vec<&'static str>, + ) { + let (client_connection, server_connection) = futures_ringbuf::Endpoint::pair(100, 100); + + let server = tokio::spawn(async move { + let io = match tokio::time::timeout( + Duration::from_secs(2), + listener_select_proto(server_connection, listen_protos), + ) + .await + .unwrap() + { + Ok((_, io)) => io, + Err(NegotiationError::Failed) => return, + Err(NegotiationError::ProtocolError(e)) => { + panic!("Unexpected protocol error {e}") + } + }; + match io.complete().await { + Err(NegotiationError::Failed) => {} + _ => panic!(), + } + }); + + let client = tokio::spawn(async move { + let mut io = match tokio::time::timeout( + Duration::from_secs(2), + dialer_select_proto(client_connection, dial_protos, version), + ) + .await + .unwrap() + { + Err(NegotiationError::Failed) => return, + Ok((_, io)) => io, + Err(_) => panic!(), + }; + + // The dialer may write a payload that is even sent before it + // got confirmation of the last proposed protocol, when `V1Lazy` + // is used. + io.write_all(&dial_payload).await.unwrap(); + match io.complete().await { + Err(NegotiationError::Failed) => {} + _ => panic!(), + } + }); + + server.await; + client.await; + } + + // Incompatible protocols. + run(Version::V1, vec!["/proto1"], vec![1], vec!["/proto2"]).await; + run(Version::V1Lazy, vec!["/proto1"], vec![1], vec!["/proto2"]).await; + } } From 2d5a381f3f48a34388eb2d2611b853685480ce32 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 16:02:50 +0200 Subject: [PATCH 04/11] multistream/tests: Check V1 lazy negotiation awaits completion Signed-off-by: Alexandru Vasile --- src/multistream_select/dialer_select.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index 4e34ebe0..d008e9ec 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -523,4 +523,28 @@ mod tests { run(Version::V1, vec!["/proto1"], vec![1], vec!["/proto2"]).await; run(Version::V1Lazy, vec!["/proto1"], vec![1], vec!["/proto2"]).await; } + + #[tokio::test] + async fn v1_lazy_do_not_wait_for_negotiation_on_poll_close() { + let (client_connection, _server_connection) = + futures_ringbuf::Endpoint::pair(1024 * 1024, 1); + + let client = tokio::spawn(async move { + // Single protocol to allow for lazy (or optimistic) protocol negotiation. + let protos = vec!["/proto1"]; + let (proto, mut io) = + dialer_select_proto(client_connection, protos, Version::V1Lazy).await.unwrap(); + assert_eq!(proto, "/proto1"); + + // In Libp2p the lazy negotation of protocols can be closed at any time, + // even if the negotiation is not yet done. + + // However, for the Litep2p the negotation must conclude before closing the + // lazy negotation of protocol. We'll wait for the close until the + // server has produced a message, in this test that means forever. + io.close().await.unwrap(); + }); + + assert!(tokio::time::timeout(Duration::from_secs(10), client).await.is_err()); + } } From a1083f172cfdace23b2c3650ce96507035fe005f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 16:23:08 +0200 Subject: [PATCH 05/11] multistream/tests: Low level protocol negotiation Signed-off-by: Alexandru Vasile --- src/multistream_select/dialer_select.rs | 83 +++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index d008e9ec..b3f45e7e 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -547,4 +547,87 @@ mod tests { assert!(tokio::time::timeout(Duration::from_secs(10), client).await.is_err()); } + + #[tokio::test] + async fn low_level_negotiate() { + async fn run(version: Version) { + let (client_connection, mut server_connection) = + futures_ringbuf::Endpoint::pair(100, 100); + + let server = tokio::spawn(async move { + let protos = vec!["/proto2"]; + + let multistream = b"/multistream/1.0.0\n"; + let len = multistream.len(); + let proto = b"/proto2\n"; + let proto_len = proto.len(); + + // Check that our implementation writes optimally + // the multistream ++ protocol in a single message. + let mut expected_message = Vec::new(); + expected_message.push(len as u8); + expected_message.extend_from_slice(multistream); + expected_message.push(proto_len as u8); + expected_message.extend_from_slice(proto); + + if version == Version::V1Lazy { + expected_message.extend_from_slice(b"ping"); + } + + let mut out = vec![0; 64]; + let n = server_connection.read(&mut out).await.unwrap(); + out.truncate(n); + assert_eq!(out, expected_message); + + // We must send the back the multistream packet. + let mut send_message = Vec::new(); + send_message.push(len as u8); + send_message.extend_from_slice(multistream); + + server_connection.write(&mut send_message).await.unwrap(); + + let mut send_message = Vec::new(); + send_message.push(proto_len as u8); + send_message.extend_from_slice(proto); + server_connection.write(&mut send_message).await.unwrap(); + + // Handle handshake. + match version { + Version::V1 => { + let mut out = vec![0; 64]; + let n = server_connection.read(&mut out).await.unwrap(); + out.truncate(n); + assert_eq!(out, b"ping"); + + server_connection.write(b"pong").await.unwrap(); + } + Version::V1Lazy => { + // Ping (handshake) payload expected in the initial message. + server_connection.write(b"pong").await.unwrap(); + } + } + }); + + let client = tokio::spawn(async move { + let protos = vec!["/proto2"]; + let (proto, mut io) = + dialer_select_proto(client_connection, protos, version).await.unwrap(); + assert_eq!(proto, "/proto2"); + + io.write_all(b"ping").await.unwrap(); + io.flush().await.unwrap(); + + let mut out = vec![0; 32]; + let n = io.read(&mut out).await.unwrap(); + out.truncate(n); + assert_eq!(out, b"pong"); + }); + + server.await; + client.await; + } + + run(Version::V1).await; + run(Version::V1Lazy).await; + } } From 4436a69506500a4b27569b586547e490c337e5a7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 17:19:07 +0200 Subject: [PATCH 06/11] multistream/negotiated: Ensure lazy stream checks for duplicate headers Signed-off-by: Alexandru Vasile --- src/multistream_select/negotiated.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/multistream_select/negotiated.rs b/src/multistream_select/negotiated.rs index 450bdae3..8eee8459 100644 --- a/src/multistream_select/negotiated.rs +++ b/src/multistream_select/negotiated.rs @@ -176,6 +176,10 @@ impl Negotiated { header: None, }; continue; + } else { + // If we received a header message but it doesn't match the expected + // one, or we have already received the message return an error. + return Poll::Ready(Err(ProtocolError::InvalidMessage.into())); } } From aed31d8cd874e00638f694060446b953c5e82cd6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 17:21:01 +0200 Subject: [PATCH 07/11] multistream/tests: Fail on multiple headers Signed-off-by: Alexandru Vasile --- src/multistream_select/dialer_select.rs | 134 +++++++++++++++++++++++- 1 file changed, 130 insertions(+), 4 deletions(-) diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index b3f45e7e..c8615664 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -584,12 +584,12 @@ mod tests { send_message.push(len as u8); send_message.extend_from_slice(multistream); - server_connection.write(&mut send_message).await.unwrap(); + server_connection.write_all(&mut send_message).await.unwrap(); let mut send_message = Vec::new(); send_message.push(proto_len as u8); send_message.extend_from_slice(proto); - server_connection.write(&mut send_message).await.unwrap(); + server_connection.write_all(&mut send_message).await.unwrap(); // Handle handshake. match version { @@ -599,11 +599,11 @@ mod tests { out.truncate(n); assert_eq!(out, b"ping"); - server_connection.write(b"pong").await.unwrap(); + server_connection.write_all(b"pong").await.unwrap(); } Version::V1Lazy => { // Ping (handshake) payload expected in the initial message. - server_connection.write(b"pong").await.unwrap(); + server_connection.write_all(b"pong").await.unwrap(); } } }); @@ -630,4 +630,130 @@ mod tests { run(Version::V1).await; run(Version::V1Lazy).await; } + + #[tokio::test] + async fn v1_low_level_negotiate_multiple_headers() { + let (client_connection, mut server_connection) = futures_ringbuf::Endpoint::pair(100, 100); + + let server: tokio::task::JoinHandle> = tokio::spawn(async move { + let protos = vec!["/proto2"]; + + let multistream = b"/multistream/1.0.0\n"; + let len = multistream.len(); + let proto = b"/proto2\n"; + let proto_len = proto.len(); + + // Check that our implementation writes optimally + // the multistream ++ protocol in a single message. + let mut expected_message = Vec::new(); + expected_message.push(len as u8); + expected_message.extend_from_slice(multistream); + expected_message.push(proto_len as u8); + expected_message.extend_from_slice(proto); + + let mut out = vec![0; 64]; + let n = server_connection.read(&mut out).await.unwrap(); + out.truncate(n); + assert_eq!(out, expected_message); + + // We must send the back the multistream packet. + let mut send_message = Vec::new(); + send_message.push(len as u8); + send_message.extend_from_slice(multistream); + + server_connection.write_all(&mut send_message).await.unwrap(); + + // We must send the back the multistream packet again. + let mut send_message = Vec::new(); + send_message.push(len as u8); + send_message.extend_from_slice(multistream); + + server_connection.write_all(&mut send_message).await.unwrap(); + + Ok(()) + }); + + let client: tokio::task::JoinHandle> = tokio::spawn(async move { + let protos = vec!["/proto2"]; + + // Negotiation fails because the protocol receives the `/multistream/1.0.0` header + // multiple times. + let result = + dialer_select_proto(client_connection, protos, Version::V1).await.unwrap_err(); + match result { + NegotiationError::ProtocolError(ProtocolError::InvalidMessage) => {} + _ => panic!("unexpected error: {:?}", result), + }; + + Ok(()) + }); + + server.await.unwrap(); + client.await.unwrap(); + } + + #[tokio::test] + async fn v1_lazy_low_level_negotiate_multiple_headers() { + let (client_connection, mut server_connection) = futures_ringbuf::Endpoint::pair(100, 100); + + let server: tokio::task::JoinHandle> = tokio::spawn(async move { + let protos = vec!["/proto2"]; + + let multistream = b"/multistream/1.0.0\n"; + let len = multistream.len(); + let proto = b"/proto2\n"; + let proto_len = proto.len(); + + // Check that our implementation writes optimally + // the multistream ++ protocol in a single message. + let mut expected_message = Vec::new(); + expected_message.push(len as u8); + expected_message.extend_from_slice(multistream); + expected_message.push(proto_len as u8); + expected_message.extend_from_slice(proto); + + let mut out = vec![0; 64]; + let n = server_connection.read(&mut out).await.unwrap(); + out.truncate(n); + assert_eq!(out, expected_message); + + // We must send the back the multistream packet. + let mut send_message = Vec::new(); + send_message.push(len as u8); + send_message.extend_from_slice(multistream); + + server_connection.write_all(&mut send_message).await.unwrap(); + + // We must send the back the multistream packet again. + let mut send_message = Vec::new(); + send_message.push(len as u8); + send_message.extend_from_slice(multistream); + + server_connection.write_all(&mut send_message).await.unwrap(); + + Ok(()) + }); + + let client: tokio::task::JoinHandle> = tokio::spawn(async move { + let protos = vec!["/proto2"]; + + // Negotiation fails because the protocol receives the `/multistream/1.0.0` header + // multiple times. + let (proto, to_negociate) = + dialer_select_proto(client_connection, protos, Version::V1Lazy).await.unwrap(); + assert_eq!(proto, "/proto2"); + + let result = to_negociate.complete().await.unwrap_err(); + + match result { + NegotiationError::ProtocolError(ProtocolError::InvalidMessage) => {} + _ => panic!("unexpected error: {:?}", result), + }; + + Ok(()) + }); + + server.await.unwrap(); + client.await.unwrap(); + } } From c422f5824aaf3311f6c6c5fd8e41fe85b041d4ae Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 17:24:59 +0200 Subject: [PATCH 08/11] mutlistream/tests: Ensure panics are propagated for spawned tasks Signed-off-by: Alexandru Vasile --- src/multistream_select/dialer_select.rs | 28 ++++++++++++++++--------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index c8615664..ebd402ff 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -423,7 +423,7 @@ mod tests { async fn run(version: Version) { let (client_connection, server_connection) = futures_ringbuf::Endpoint::pair(100, 100); - let server = tokio::spawn(async move { + let server: tokio::task::JoinHandle> = tokio::spawn(async move { let protos = vec!["/proto1", "/proto2"]; let (proto, mut io) = listener_select_proto(server_connection, protos).await.unwrap(); @@ -436,9 +436,11 @@ mod tests { io.write_all(b"pong").await.unwrap(); io.flush().await.unwrap(); + + Ok(()) }); - let client = tokio::spawn(async move { + let client: tokio::task::JoinHandle> = tokio::spawn(async move { let protos = vec!["/proto3", "/proto2"]; let (proto, mut io) = dialer_select_proto(client_connection, protos, version).await.unwrap(); @@ -451,10 +453,12 @@ mod tests { let n = io.read(&mut out).await.unwrap(); out.truncate(n); assert_eq!(out, b"pong"); + + Ok(()) }); - server.await; - client.await; + server.await.unwrap(); + client.await.unwrap(); } run(Version::V1).await; @@ -472,7 +476,7 @@ mod tests { ) { let (client_connection, server_connection) = futures_ringbuf::Endpoint::pair(100, 100); - let server = tokio::spawn(async move { + let server: tokio::task::JoinHandle> = tokio::spawn(async move { let io = match tokio::time::timeout( Duration::from_secs(2), listener_select_proto(server_connection, listen_protos), @@ -481,7 +485,7 @@ mod tests { .unwrap() { Ok((_, io)) => io, - Err(NegotiationError::Failed) => return, + Err(NegotiationError::Failed) => return Ok(()), Err(NegotiationError::ProtocolError(e)) => { panic!("Unexpected protocol error {e}") } @@ -490,9 +494,11 @@ mod tests { Err(NegotiationError::Failed) => {} _ => panic!(), } + + Ok(()) }); - let client = tokio::spawn(async move { + let client: tokio::task::JoinHandle> = tokio::spawn(async move { let mut io = match tokio::time::timeout( Duration::from_secs(2), dialer_select_proto(client_connection, dial_protos, version), @@ -500,7 +506,7 @@ mod tests { .await .unwrap() { - Err(NegotiationError::Failed) => return, + Err(NegotiationError::Failed) => return Ok(()), Ok((_, io)) => io, Err(_) => panic!(), }; @@ -513,10 +519,12 @@ mod tests { Err(NegotiationError::Failed) => {} _ => panic!(), } + + Ok(()) }); - server.await; - client.await; + server.await.unwrap(); + client.await.unwrap(); } // Incompatible protocols. From 92bd321ba7c46d159689807685a47fcd77800f1b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 17:37:05 +0200 Subject: [PATCH 09/11] multistream/negotiated: Propagate `poll_close` on unreceived msg Signed-off-by: Alexandru Vasile --- src/multistream_select/negotiated.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/multistream_select/negotiated.rs b/src/multistream_select/negotiated.rs index 8eee8459..01f7bec3 100644 --- a/src/multistream_select/negotiated.rs +++ b/src/multistream_select/negotiated.rs @@ -325,7 +325,6 @@ where fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Ensure all data has been flushed and expected negotiation messages // have been received. - ready!(self.as_mut().poll(cx).map_err(Into::::into)?); ready!(self.as_mut().poll_flush(cx).map_err(Into::::into)?); // Continue with the shutdown of the underlying I/O stream. From ca0ac181c126889e71cd72f04781cd6adaf4cfba Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 17:37:30 +0200 Subject: [PATCH 10/11] multistream/tests: Adjust testing to reflect close before proto msg Signed-off-by: Alexandru Vasile --- src/multistream_select/dialer_select.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/multistream_select/dialer_select.rs b/src/multistream_select/dialer_select.rs index ebd402ff..6b345cb6 100644 --- a/src/multistream_select/dialer_select.rs +++ b/src/multistream_select/dialer_select.rs @@ -544,16 +544,12 @@ mod tests { dialer_select_proto(client_connection, protos, Version::V1Lazy).await.unwrap(); assert_eq!(proto, "/proto1"); - // In Libp2p the lazy negotation of protocols can be closed at any time, + // The lazy negotation of protocols can be closed at any time, // even if the negotiation is not yet done. - - // However, for the Litep2p the negotation must conclude before closing the - // lazy negotation of protocol. We'll wait for the close until the - // server has produced a message, in this test that means forever. io.close().await.unwrap(); }); - assert!(tokio::time::timeout(Duration::from_secs(10), client).await.is_err()); + tokio::time::timeout(Duration::from_secs(10), client).await.unwrap(); } #[tokio::test] From 7428c797442def8ab7f387d00d9c6fb75257b66f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 18 Mar 2024 17:39:54 +0200 Subject: [PATCH 11/11] multistream/negotiated: Adjust comment wrt received message Signed-off-by: Alexandru Vasile --- src/multistream_select/negotiated.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/multistream_select/negotiated.rs b/src/multistream_select/negotiated.rs index 01f7bec3..1aae2285 100644 --- a/src/multistream_select/negotiated.rs +++ b/src/multistream_select/negotiated.rs @@ -323,7 +323,7 @@ where } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Ensure all data has been flushed and expected negotiation messages + // Ensure all data has been flushed and potentially negotiation messages // have been received. ready!(self.as_mut().poll_flush(cx).map_err(Into::::into)?);