Skip to content

Commit

Permalink
graceful shutdown: distinguish between stopped and conn closed (#1220)
Browse files Browse the repository at this point in the history
* graceful shutdown: distinguish between stopped and conn closed

* grumbles: ws_stream fuse
  • Loading branch information
niklasad1 committed Oct 24, 2023
1 parent 398d8c1 commit 8152b2e
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,15 @@ pub(crate) async fn background_task<L: Logger>(sender: Sender, receiver: Receive
// Return the `Closed` error to avoid logging unnecessary warnings on clean shutdown.
Err(e) => Some((Err(e), receiver)),
}
});
})
.fuse();

tokio::pin!(ws_stream);

let result = loop {
let data = match try_recv(&mut ws_stream, stopped).await {
Receive::Shutdown => break Ok(Shutdown::Stopped),
Receive::ConnectionClosed => break Ok(Shutdown::ConnectionClosed),
Receive::Stopped => break Ok(Shutdown::Stopped),
Receive::Ok(data, stop) => {
stopped = stop;
data
Expand Down Expand Up @@ -409,7 +411,8 @@ async fn send_task(
}

enum Receive<S> {
Shutdown,
ConnectionClosed,
Stopped,
Err(SokettoError, S),
Ok(Vec<u8>, S),
}
Expand All @@ -423,7 +426,7 @@ where
loop {
match futures_util::future::select(ws_stream.next(), stopped).await {
// The connection is closed.
Either::Left((None, _)) => break Receive::Shutdown,
Either::Left((None, _)) => break Receive::ConnectionClosed,
// The message has been received, we are done
Either::Left((Some(Ok(Incoming::Data(d))), s)) => break Receive::Ok(d, s),
// Got a pong response, update our "last seen" timestamp.
Expand All @@ -433,7 +436,7 @@ where
// Received an error, terminate the connection.
Either::Left((Some(Err(e)), s)) => break Receive::Err(e, s),
// Server has been stopped.
Either::Right(_) => break Receive::Shutdown,
Either::Right(_) => break Receive::Stopped,
}
}
}
Expand Down

0 comments on commit 8152b2e

Please sign in to comment.