Skip to content

Commit

Permalink
Fix bug where Senders were not unparked upon calling Receiver::try_next
Browse files Browse the repository at this point in the history
Fixed by moving calls to unpark_one and dec_num_messages into next_message
so that calling them cannot be missed (as it was with try_next).

Closes #861
  • Loading branch information
nwoeanhinnogaehr authored and cramertj committed Mar 13, 2018
1 parent efb37f8 commit 97af31e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
19 changes: 10 additions & 9 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,9 @@ impl<T> Receiver<T> {
/// no longer empty.
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
Async::Ready(msg) => Ok(msg),
Async::Ready(msg) => {
Ok(msg)
},
Async::Pending => Err(TryRecvError { _inner: () }),
}
}
Expand All @@ -842,6 +844,13 @@ impl<T> Receiver<T> {
loop {
match unsafe { self.inner.message_queue.pop() } {
PopResult::Data(msg) => {
// If there are any parked task handles in the parked queue, pop
// one and unpark it.
self.unpark_one();

// Decrement number of messages
self.dec_num_messages();

return Async::Ready(msg);
}
PopResult::Empty => {
Expand Down Expand Up @@ -959,14 +968,6 @@ impl<T> Stream for Receiver<T> {
}
}
};

// If there are any parked task handles in the parked queue, pop
// one and unpark it.
self.unpark_one();

// Decrement number of messages
self.dec_num_messages();

// Return the message
return Ok(Async::Ready(msg));
}
Expand Down
14 changes: 14 additions & 0 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,17 @@ fn try_send_fail() {
let item = block_on(rx.into_future()).ok().unwrap().0;
assert_eq!(item, None);
}

#[test]
fn try_send_recv() {
let (mut tx, mut rx) = mpsc::channel(1);
tx.try_send("hello").unwrap();
tx.try_send("hello").unwrap();
tx.try_send("hello").unwrap_err(); // should be full
rx.try_next().unwrap();
rx.try_next().unwrap();
rx.try_next().unwrap_err(); // should be empty
tx.try_send("hello").unwrap();
rx.try_next().unwrap();
rx.try_next().unwrap_err(); // should be empty
}

0 comments on commit 97af31e

Please sign in to comment.