From bcc7c4d349e24b8621937b17ae31104d5e2e4027 Mon Sep 17 00:00:00 2001 From: Roman Borschel Date: Wed, 24 Jul 2019 11:32:59 +0200 Subject: [PATCH] Fix missed task notifications. (#1210) Addresses https://github.com/libp2p/rust-libp2p/issues/1206 by always registering the current task before calling poll_*_notify functions. This is in the same spirit as the corresponding fix for yamux in https://github.com/paritytech/yamux/pull/54. Also adds missing registration of the current task in close() and flush_all(), which have been observed to cause stalls when trying to do a graceful connection shutdown / close. --- muxers/mplex/src/lib.rs | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 550a96099ac..a1fe1636d6d 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -283,13 +283,11 @@ where C: AsyncRead + AsyncWrite, } } + inner.notifier_read.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); let elem = match inner.inner.poll_stream_notify(&inner.notifier_read, 0) { Ok(Async::Ready(Some(item))) => item, Ok(Async::Ready(None)) => return Err(IoErrorKind::BrokenPipe.into()), - Ok(Async::NotReady) => { - inner.notifier_read.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); - return Ok(Async::NotReady); - }, + Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { let err2 = IoError::new(err.kind(), err.to_string()); inner.error = Err(err); @@ -333,14 +331,10 @@ where C: AsyncRead + AsyncWrite if inner.is_shutdown { return Err(IoError::new(IoErrorKind::Other, "connection is shut down")) } + inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); match inner.inner.start_send_notify(elem, &inner.notifier_write, 0) { - Ok(AsyncSink::Ready) => { - Ok(Async::Ready(())) - }, - Ok(AsyncSink::NotReady(_)) => { - inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); - Ok(Async::NotReady) - }, + Ok(AsyncSink::Ready) => Ok(Async::Ready(())), + Ok(AsyncSink::NotReady(_)) => Ok(Async::NotReady), Err(err) => Err(err) } } @@ -410,6 +404,7 @@ where C: AsyncRead + AsyncWrite return Err(IoError::new(IoErrorKind::Other, "connection is shut down")) } let inner = &mut *inner; // Avoids borrow errors + inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); inner.inner.poll_flush_notify(&inner.notifier_write, 0) }, OutboundSubstreamState::Done => { @@ -420,7 +415,6 @@ where C: AsyncRead + AsyncWrite match polling { Ok(Async::Ready(())) => (), Ok(Async::NotReady) => { - inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); return Ok(Async::NotReady) }, Err(err) => { @@ -545,13 +539,8 @@ where C: AsyncRead + AsyncWrite } let inner = &mut *inner; // Avoids borrow errors - match inner.inner.poll_flush_notify(&inner.notifier_write, 0)? { - Async::Ready(()) => Ok(Async::Ready(())), - Async::NotReady => { - inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); - Ok(Async::NotReady) - } - } + inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); + inner.inner.poll_flush_notify(&inner.notifier_write, 0) } fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { @@ -585,6 +574,7 @@ where C: AsyncRead + AsyncWrite #[inline] fn close(&self) -> Poll<(), IoError> { let inner = &mut *self.inner.lock(); + inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); try_ready!(inner.inner.close_notify(&inner.notifier_write, 0)); inner.is_shutdown = true; Ok(Async::Ready(())) @@ -596,6 +586,7 @@ where C: AsyncRead + AsyncWrite if inner.is_shutdown { return Ok(Async::Ready(())) } + inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); inner.inner.poll_flush_notify(&inner.notifier_write, 0) } }