diff --git a/src/client/bridge/gateway/mod.rs b/src/client/bridge/gateway/mod.rs index 896cd8efbf6..c26571fc403 100644 --- a/src/client/bridge/gateway/mod.rs +++ b/src/client/bridge/gateway/mod.rs @@ -123,6 +123,11 @@ pub enum ShardManagerMessage { /// component that receives this to also shutdown with no further action /// taken. ShutdownInitiated, + /// Indicator that a [`ShardRunner`] has finished the shutdown of a shard, allowing it to + /// move toward the next one. + /// + /// [`ShardRunner`]: struct.ShardRunner.html + ShutdownFinished(ShardId) } /// A message to be sent to the [`ShardQueuer`]. diff --git a/src/client/bridge/gateway/shard_manager.rs b/src/client/bridge/gateway/shard_manager.rs index 90bbf8283f8..cc207d35ade 100644 --- a/src/client/bridge/gateway/shard_manager.rs +++ b/src/client/bridge/gateway/shard_manager.rs @@ -6,10 +6,11 @@ use parking_lot::RwLock; use std::{ collections::{HashMap, VecDeque}, sync::{ - mpsc::{self, Sender}, - Arc + mpsc::{self, channel, Sender, Receiver}, + Arc, }, - thread + thread, + time::Duration }; use super::super::super::{EventHandler, RawEventHandler}; use super::{ @@ -127,6 +128,7 @@ pub struct ShardManager { /// The total shards in use, 1-indexed. shard_total: u64, shard_queuer: Sender, + shard_shutdown: Receiver, } impl ShardManager { @@ -164,18 +166,21 @@ impl ShardManager { shard_queuer.run(); }); + let (shutdown_send, shutdown_recv) = channel(); let manager = Arc::new(Mutex::new(Self { monitor_tx: thread_tx, shard_index: opt.shard_index, shard_init: opt.shard_init, shard_queuer: shard_queue_tx, shard_total: opt.shard_total, + shard_shutdown: shutdown_recv, runners, })); (Arc::clone(&manager), ShardManagerMonitor { rx: thread_rx, manager, + shutdown: shutdown_send, }) } @@ -298,6 +303,20 @@ impl ShardManager { why, ); } + match self.shard_shutdown.recv_timeout(Duration::from_secs(5)) { + Ok(shutdown_shard_id) => + if shutdown_shard_id != shard_id { + warn!( + "Failed to cleanly shutdown shard {}: Shutdown channel sent incorrect ID", + shard_id, + ); + }, + Err(why) => warn!( + "Failed to cleanly shutdown shard {}: {:?}", + shard_id, + why, + ) + } } self.runners.lock().remove(&shard_id).is_some() diff --git a/src/client/bridge/gateway/shard_manager_monitor.rs b/src/client/bridge/gateway/shard_manager_monitor.rs index 350cc1f6a24..bb50f448642 100644 --- a/src/client/bridge/gateway/shard_manager_monitor.rs +++ b/src/client/bridge/gateway/shard_manager_monitor.rs @@ -1,10 +1,16 @@ use parking_lot::Mutex; -use std::sync::{ - mpsc::Receiver, - Arc +use std::{ + sync::{ + mpsc::{ + Receiver, + Sender + }, + Arc, + } }; use super::{ShardManager, ShardManagerMessage}; -use log::debug; +use super::super::gateway::ShardId; +use log::{debug, warn}; /// The shard manager monitor does what it says on the tin -- it monitors the /// shard manager and performs actions on it as received. @@ -20,6 +26,8 @@ pub struct ShardManagerMonitor { pub manager: Arc>, /// The mpsc Receiver channel to receive shard manager messages over. pub rx: Receiver, + /// The mpsc Sender channel to inform the manager that a shard has just properly shut down + pub shutdown: Sender, } impl ShardManagerMonitor { @@ -61,6 +69,15 @@ impl ShardManagerMonitor { break; }, ShardManagerMessage::ShutdownInitiated => break, + ShardManagerMessage::ShutdownFinished(shard_id) => { + if let Err(why) = self.shutdown.send(shard_id) { + warn!( + "[ShardMonitor] Could not forward Shutdown signal to ShardManager for shard {}: {:#?}", + shard_id, + why + ); + } + } } } } diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index c1c83e8a749..5b8cba181b5 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -205,11 +205,36 @@ impl break, + Err(_) => { + warn!( + "[ShardRunner {:?}] Received an error awaiting close frame", + self.shard.shard_info(), + ); + break; + } + _ => continue, + } + } + + // Inform the manager that shutdown for this shard has finished. + if let Err(why) = self.manager_tx.send(ShardManagerMessage::ShutdownFinished(id)) { + warn!( + "[ShardRunner {:?}] Could not send ShutdownFinished: {:#?}", + self.shard.shard_info(), + why, + ); + } false } @@ -259,6 +284,11 @@ impl { // nb: not sent here + true + }, + ShardClientMessage::Manager(ShardManagerMessage::ShutdownFinished(_)) => { + // nb: not sent here + true }, ShardClientMessage::Runner(ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query }) => {