diff --git a/client/network/src/protocol/notifications/handler.rs b/client/network/src/protocol/notifications/handler.rs index 99677cc45e54e..2b350cd7fcfc8 100644 --- a/client/network/src/protocol/notifications/handler.rs +++ b/client/network/src/protocol/notifications/handler.rs @@ -159,6 +159,16 @@ enum State { Closed { /// True if an outgoing substream is still in the process of being opened. pending_opening: bool, + + /// Outbound substream that has been accepted by the remote. Being closed. + out_substream_closing: Option>, + + /// Substream opened by the remote. Being closed. + in_substream_closing: Option>, + + /// Substream re-opened by the remote. Not to be closed after `in_substream_closing` has + /// been closed. + in_substream_reopened: Option>, }, /// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been @@ -167,6 +177,9 @@ enum State { /// Substream opened by the remote and that hasn't been accepted/rejected yet. in_substream: NotificationsInSubstream, + /// Outbound substream that has been accepted by the remote. Being closed. + out_substream_closing: Option>, + /// See [`State::Closed::pending_opening`]. pending_opening: bool, }, @@ -177,8 +190,15 @@ enum State { /// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must /// be emitted when transitionning to respectively [`State::Open`] or [`State::Closed`]. Opening { - /// Substream opened by the remote. If `Some`, has been accepted. - in_substream: Option>, + /// Outbound substream that has been accepted by the remote. Being closed. An outbound + /// substream request has been emitted towards libp2p if and only if this field is `None`. + out_substream_closing: Option>, + + /// Substream re-opened by the remote. Has been accepted. + in_substream_reopened: Option>, + + /// Substream opened by the remote. Being closed. + in_substream_closing: Option>, }, /// Protocol is in the "Open" state. @@ -227,6 +247,9 @@ impl IntoProtocolsHandler for NotifsHandlerProto { handshake, state: State::Closed { pending_opening: false, + in_substream_closing: None, + in_substream_reopened: None, + out_substream_closing: None, }, max_notification_size: max_size, } @@ -487,7 +510,16 @@ impl ProtocolsHandler for NotifsHandler { ) { let mut protocol_info = &mut self.protocols[protocol_index]; match protocol_info.state { - State::Closed { pending_opening } => { + State::Closed { + ref mut pending_opening, + ref mut out_substream_closing, + ref mut in_substream_closing, + ref mut in_substream_reopened + } + if in_substream_closing.is_none() && in_substream_reopened.is_none() + => { + debug_assert!(!(out_substream_closing.is_some() && *pending_opening)); + self.events_queue.push_back(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::OpenDesiredByRemote { protocol_index, @@ -496,9 +528,31 @@ impl ProtocolsHandler for NotifsHandler { protocol_info.state = State::OpenDesiredByRemote { in_substream: new_substream, - pending_opening, + out_substream_closing: out_substream_closing.take(), + pending_opening: *pending_opening, }; }, + + State::Opening { ref mut in_substream_closing, ref mut in_substream_reopened, .. } => { + *in_substream_closing = None; + + // Create `handshake_message` on a separate line to be sure that the + // lock is released as soon as possible. + let handshake_message = protocol_info.handshake.read().clone(); + new_substream.send_handshake(handshake_message); + *in_substream_reopened = Some(new_substream); + }, + + State::Open { ref mut in_substream, .. } if in_substream.is_none() => { + // Create `handshake_message` on a separate line to be sure that the + // lock is released as soon as possible. + let handshake_message = protocol_info.handshake.read().clone(); + new_substream.send_handshake(handshake_message); + *in_substream = Some(new_substream); + }, + + State::Closed { .. } | + State::Open { .. } | State::OpenDesiredByRemote { .. } => { // If a substream already exists, silently drop the new one. // Note that we drop the substream, which will send an equivalent to a @@ -509,19 +563,6 @@ impl ProtocolsHandler for NotifsHandler { // to do. return; }, - State::Opening { ref mut in_substream, .. } | - State::Open { ref mut in_substream, .. } => { - if in_substream.is_some() { - // Same remark as above. - return; - } - - // Create `handshake_message` on a separate line to be sure that the - // lock is released as soon as possible. - let handshake_message = protocol_info.handshake.read().clone(); - new_substream.send_handshake(handshake_message); - *in_substream = Some(new_substream); - }, } } @@ -531,16 +572,24 @@ impl ProtocolsHandler for NotifsHandler { protocol_index: Self::OutboundOpenInfo ) { match self.protocols[protocol_index].state { - State::Closed { ref mut pending_opening } | - State::OpenDesiredByRemote { ref mut pending_opening, .. } => { + State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } | + State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => { + debug_assert!(out_substream_closing.is_none()); debug_assert!(*pending_opening); + *out_substream_closing = Some(substream); *pending_opening = false; } State::Open { .. } => { error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler"); debug_assert!(false); } - State::Opening { ref mut in_substream } => { + State::Opening { + ref mut in_substream_reopened, ref mut in_substream_closing, + ref mut out_substream_closing + } => { + debug_assert!(out_substream_closing.is_none()); + debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some())); + let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let notifications_sink = NotificationsSink { @@ -554,7 +603,7 @@ impl ProtocolsHandler for NotifsHandler { self.protocols[protocol_index].state = State::Open { notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(), out_substream: Some(substream), - in_substream: in_substream.take(), + in_substream: in_substream_reopened.take().or(in_substream_closing.take()), }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom( @@ -574,8 +623,13 @@ impl ProtocolsHandler for NotifsHandler { NotifsHandlerIn::Open { protocol_index } => { let protocol_info = &mut self.protocols[protocol_index]; match &mut protocol_info.state { - State::Closed { pending_opening } => { - if !*pending_opening { + State::Closed { + ref mut pending_opening, + ref mut in_substream_closing, + ref mut in_substream_reopened, + ref mut out_substream_closing + } => { + if !*pending_opening && out_substream_closing.is_none() { let proto = NotificationsOut::new( protocol_info.name.clone(), protocol_info.handshake.read().clone(), @@ -588,14 +642,31 @@ impl ProtocolsHandler for NotifsHandler { }); } + debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some())); protocol_info.state = State::Opening { - in_substream: None, + in_substream_closing: in_substream_closing.take(), + in_substream_reopened: in_substream_reopened.take(), + out_substream_closing: out_substream_closing.take(), }; }, - State::OpenDesiredByRemote { pending_opening, in_substream } => { + + State::OpenDesiredByRemote { .. } => { + // The state change is done in two steps because of borrowing issues. + let (pending_opening, out_substream_closing, mut in_substream) = match + mem::replace(&mut protocol_info.state, + State::Opening { + in_substream_closing: None, in_substream_reopened: None, + out_substream_closing: None, + }) + { + State::OpenDesiredByRemote { pending_opening, out_substream_closing, in_substream, .. } => + (pending_opening, out_substream_closing, in_substream), + _ => unreachable!() + }; + let handshake_message = protocol_info.handshake.read().clone(); - if !*pending_opening { + if !pending_opening && out_substream_closing.is_none() { let proto = NotificationsOut::new( protocol_info.name.clone(), handshake_message.clone(), @@ -610,17 +681,13 @@ impl ProtocolsHandler for NotifsHandler { in_substream.send_handshake(handshake_message); - // The state change is done in two steps because of borrowing issues. - let in_substream = match - mem::replace(&mut protocol_info.state, State::Opening { in_substream: None }) - { - State::OpenDesiredByRemote { in_substream, .. } => in_substream, - _ => unreachable!() - }; protocol_info.state = State::Opening { - in_substream: Some(in_substream), + out_substream_closing, + in_substream_closing: None, + in_substream_reopened: Some(in_substream), }; }, + State::Opening { .. } | State::Open { .. } => { // As documented, it is forbidden to send an `Open` while there is already @@ -632,15 +699,30 @@ impl ProtocolsHandler for NotifsHandler { }, NotifsHandlerIn::Close { protocol_index } => { - match self.protocols[protocol_index].state { - State::Open { .. } => { + match &mut self.protocols[protocol_index].state { + State::Open { in_substream, out_substream, .. } => { + if let Some(in_substream) = in_substream.as_mut() { + in_substream.set_close_desired(); + } self.protocols[protocol_index].state = State::Closed { + in_substream_closing: in_substream.take(), + in_substream_reopened: None, + out_substream_closing: out_substream.take(), pending_opening: false, }; }, - State::Opening { .. } => { + State::Opening { in_substream_closing, in_substream_reopened, out_substream_closing } => { + debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some())); + + let pending_opening = out_substream_closing.is_none(); + if let Some(in_substream_reopened) = in_substream_reopened.as_mut() { + in_substream_reopened.set_close_desired(); + } self.protocols[protocol_index].state = State::Closed { - pending_opening: true, + in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()), + in_substream_reopened: None, + out_substream_closing: out_substream_closing.take(), + pending_opening, }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom( @@ -649,8 +731,23 @@ impl ProtocolsHandler for NotifsHandler { } )); }, - State::OpenDesiredByRemote { pending_opening, .. } => { + State::OpenDesiredByRemote { .. } => { + let (mut in_substream, pending_opening, out_substream_closing) = match mem::replace( + &mut self.protocols[protocol_index].state, + State::Closed { pending_opening: false, in_substream_closing: None, + in_substream_reopened: None, out_substream_closing: None, + } + ) { + State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } => + (in_substream, pending_opening, out_substream_closing), + _ => unreachable!("Can only enter this branch after OpenDesiredByRemote; qed") + }; + + in_substream.set_close_desired(); self.protocols[protocol_index].state = State::Closed { + in_substream_closing: Some(in_substream), + in_substream_reopened: None, + out_substream_closing, pending_opening, }; } @@ -672,14 +769,30 @@ impl ProtocolsHandler for NotifsHandler { _: ProtocolsHandlerUpgrErr ) { match self.protocols[num].state { - State::Closed { ref mut pending_opening } | - State::OpenDesiredByRemote { ref mut pending_opening, .. } => { + State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } | + State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => { + debug_assert!(out_substream_closing.is_none()); debug_assert!(*pending_opening); *pending_opening = false; } - State::Opening { .. } => { + State::Opening { + ref mut out_substream_closing, + ref mut in_substream_closing, + ref mut in_substream_reopened, + .. + } => { + debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some())); + debug_assert!(out_substream_closing.is_none()); + + if let Some(in_substream_reopened) = in_substream_reopened.as_mut() { + in_substream_reopened.set_close_desired(); + } + self.protocols[num].state = State::Closed { + out_substream_closing: None, + in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()), + in_substream_reopened: None, pending_opening: false, }; @@ -788,15 +901,67 @@ impl ProtocolsHandler for NotifsHandler { } } + // Try close outbound substreams that are marked for closing. + for protocol_index in 0..self.protocols.len() { + match &mut self.protocols[protocol_index].state { + State::Closed { out_substream_closing: ref mut substream @ Some(_), .. } | + State::OpenDesiredByRemote { out_substream_closing: ref mut substream @ Some(_), .. } | + State::Opening { out_substream_closing: ref mut substream @ Some(_), .. } => { + match Sink::poll_close(Pin::new(substream.as_mut().unwrap()), cx) { + Poll::Pending => {}, + Poll::Ready(_) => { + *substream = None; + + if matches!(self.protocols[protocol_index].state, State::Opening { .. }) { + let protocol_info = &mut self.protocols[protocol_index]; + let proto = NotificationsOut::new( + protocol_info.name.clone(), + protocol_info.handshake.read().clone(), + protocol_info.max_notification_size + ); + + self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(proto, protocol_index) + .with_timeout(OPEN_TIMEOUT), + }); + } + } + } + } + _ => {} + } + + if let State::Closed { + pending_opening, + out_substream_closing: None, + in_substream_closing, + in_substream_reopened: ref mut in_substream_reopened @ Some(_), + .. + } = &mut self.protocols[protocol_index].state { + debug_assert!(!*pending_opening); + debug_assert!(in_substream_closing.is_none()); + + self.events_queue.push_back(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::OpenDesiredByRemote { + protocol_index, + } + )); + + self.protocols[protocol_index].state = State::OpenDesiredByRemote { + in_substream: in_substream_reopened.take() + .expect("The if let above ensures that this is Some ; qed"), + out_substream_closing: None, + pending_opening: false, + }; + } + } + // Poll inbound substreams. for protocol_index in 0..self.protocols.len() { // Inbound substreams being closed is always tolerated, except for the // `OpenDesiredByRemote` state which might need to be switched back to `Closed`. match &mut self.protocols[protocol_index].state { - State::Closed { .. } | - State::Open { in_substream: None, .. } | - State::Opening { in_substream: None } => {} - + State::Open { in_substream: None, .. } => {} State::Open { in_substream: in_substream @ Some(_), .. } => { match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) { Poll::Pending => {}, @@ -812,13 +977,16 @@ impl ProtocolsHandler for NotifsHandler { } } - State::OpenDesiredByRemote { in_substream, pending_opening } => { + State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } => { match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) { Poll::Pending => {}, Poll::Ready(Ok(void)) => match void {}, Poll::Ready(Err(_)) => { self.protocols[protocol_index].state = State::Closed { pending_opening: *pending_opening, + in_substream_closing: None, + in_substream_reopened: None, + out_substream_closing: out_substream_closing.take(), }; return Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::CloseDesired { protocol_index } @@ -827,13 +995,40 @@ impl ProtocolsHandler for NotifsHandler { } } - State::Opening { in_substream: in_substream @ Some(_), .. } => { + State::Opening { in_substream_closing: None, in_substream_reopened: None, .. } | + State::Closed { in_substream_closing: None, in_substream_reopened: None, .. } => {} + + State::Opening { + in_substream_closing: ref mut in_substream @ Some(_), + in_substream_reopened: None, + .. + } | + State::Opening { + in_substream_closing: None, + in_substream_reopened: ref mut in_substream @ Some(_), + .. + } | + State::Closed { + in_substream_closing: ref mut in_substream @ Some(_), + in_substream_reopened: None, + .. + } | + State::Closed { + in_substream_closing: None, + in_substream_reopened: ref mut in_substream @ Some(_), + .. + } => { match NotificationsInSubstream::poll_process(Pin::new(in_substream.as_mut().unwrap()), cx) { Poll::Pending => {}, Poll::Ready(Ok(void)) => match void {}, Poll::Ready(Err(_)) => *in_substream = None, } } + + State::Opening { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } | + State::Closed { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } => { + debug_assert!(false); + } } } diff --git a/client/network/src/protocol/notifications/upgrade/notifications.rs b/client/network/src/protocol/notifications/upgrade/notifications.rs index eba96441bcfde..f76472a0de2c5 100644 --- a/client/network/src/protocol/notifications/upgrade/notifications.rs +++ b/client/network/src/protocol/notifications/upgrade/notifications.rs @@ -88,10 +88,11 @@ enum NotificationsInSubstreamHandshake { PendingSend(Vec), /// Handshake message was pushed in the socket. Still need to flush. Flush, - /// Handshake message successfully sent and flushed. - Sent, - /// Remote has closed their writing side. We close our own writing side in return. - ClosingInResponseToRemote, + /// Ready to receive notifications. Handshake message successfully sent and flushed, or + /// sending side closed before handshake sent. + Normal { write_side_open: bool }, + /// Closing our writing side. + Closing { remote_write_open: bool }, /// Both our side and the remote have closed their writing side. BothSidesClosed, } @@ -169,8 +170,30 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, impl NotificationsInSubstream where TSubstream: AsyncRead + AsyncWrite + Unpin, { + /// Closes the writing side of the substream, indicating to the remote that we would like this + /// substream to be closed. + pub fn set_close_desired(&mut self) { + match self.handshake { + NotificationsInSubstreamHandshake::PendingSend(_) | + NotificationsInSubstreamHandshake::Flush | + NotificationsInSubstreamHandshake::NotSent | + NotificationsInSubstreamHandshake::Normal { write_side_open: true } => { + self.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open: true }; + } + NotificationsInSubstreamHandshake::Normal { write_side_open: false } | + NotificationsInSubstreamHandshake::Closing { .. } | + NotificationsInSubstreamHandshake::BothSidesClosed => {} + } + } + /// Sends the handshake in order to inform the remote that we accept the substream. + /// + /// Has no effect if `set_close_desired` has been called. pub fn send_handshake(&mut self, message: impl Into>) { + if matches!(self.handshake, NotificationsInSubstreamHandshake::Normal { write_side_open: false }) { + return; + } + if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) { error!(target: "sub-libp2p", "Tried to send handshake twice"); return; @@ -185,7 +208,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, let mut this = self.project(); loop { - match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) { + match mem::replace(this.handshake, NotificationsInSubstreamHandshake::NotSent) { NotificationsInSubstreamHandshake::PendingSend(msg) => match Sink::poll_ready(this.socket.as_mut(), cx) { Poll::Ready(_) => { @@ -203,16 +226,28 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, NotificationsInSubstreamHandshake::Flush => match Sink::poll_flush(this.socket.as_mut(), cx)? { Poll::Ready(()) => - *this.handshake = NotificationsInSubstreamHandshake::Sent, + *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: true }, Poll::Pending => { *this.handshake = NotificationsInSubstreamHandshake::Flush; return Poll::Pending } }, + NotificationsInSubstreamHandshake::Closing { remote_write_open } => + match Sink::poll_close(this.socket.as_mut(), cx)? { + Poll::Ready(()) => if remote_write_open { + *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: false } + } else { + *this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed; + }, + Poll::Pending => { + *this.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open }; + return Poll::Pending + } + }, + st @ NotificationsInSubstreamHandshake::NotSent | - st @ NotificationsInSubstreamHandshake::Sent | - st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote | + st @ NotificationsInSubstreamHandshake::Normal { .. } | st @ NotificationsInSubstreamHandshake::BothSidesClosed => { *this.handshake = st; return Poll::Pending; @@ -232,7 +267,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, // This `Stream` implementation first tries to send back the handshake if necessary. loop { - match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) { + match mem::replace(this.handshake, NotificationsInSubstreamHandshake::NotSent) { NotificationsInSubstreamHandshake::NotSent => { *this.handshake = NotificationsInSubstreamHandshake::NotSent; return Poll::Pending @@ -254,34 +289,39 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, NotificationsInSubstreamHandshake::Flush => match Sink::poll_flush(this.socket.as_mut(), cx)? { Poll::Ready(()) => - *this.handshake = NotificationsInSubstreamHandshake::Sent, + *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: true }, Poll::Pending => { *this.handshake = NotificationsInSubstreamHandshake::Flush; return Poll::Pending } }, - NotificationsInSubstreamHandshake::Sent => { + NotificationsInSubstreamHandshake::Normal { write_side_open } => { match Stream::poll_next(this.socket.as_mut(), cx) { - Poll::Ready(None) => *this.handshake = - NotificationsInSubstreamHandshake::ClosingInResponseToRemote, + Poll::Ready(None) if write_side_open => + *this.handshake = + NotificationsInSubstreamHandshake::Closing { remote_write_open: false }, + Poll::Ready(None) => + *this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed, Poll::Ready(Some(msg)) => { - *this.handshake = NotificationsInSubstreamHandshake::Sent; + *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open }; return Poll::Ready(Some(msg)) }, Poll::Pending => { - *this.handshake = NotificationsInSubstreamHandshake::Sent; + *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open }; return Poll::Pending }, } }, - NotificationsInSubstreamHandshake::ClosingInResponseToRemote => + NotificationsInSubstreamHandshake::Closing { remote_write_open } => match Sink::poll_close(this.socket.as_mut(), cx)? { + Poll::Ready(()) if remote_write_open => + *this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: false }, Poll::Ready(()) => *this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed, Poll::Pending => { - *this.handshake = NotificationsInSubstreamHandshake::ClosingInResponseToRemote; + *this.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open }; return Poll::Pending } },