Skip to content

Commit

Permalink
Implement WebSocket shutdown support (#713)
Browse files Browse the repository at this point in the history
  • Loading branch information
ikkerens authored and arqunis committed Sep 9, 2019
1 parent e762ea9 commit 711882b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 7 deletions.
5 changes: 5 additions & 0 deletions src/client/bridge/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ pub enum ShardManagerMessage {
/// component that receives this to also shutdown with no further action
/// taken.
ShutdownInitiated,
/// Indicator that a [`ShardRunner`] has finished the shutdown of a shard, allowing it to
/// move toward the next one.
///
/// [`ShardRunner`]: struct.ShardRunner.html
ShutdownFinished(ShardId)
}

/// A message to be sent to the [`ShardQueuer`].
Expand Down
25 changes: 22 additions & 3 deletions src/client/bridge/gateway/shard_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use parking_lot::RwLock;
use std::{
collections::{HashMap, VecDeque},
sync::{
mpsc::{self, Sender},
Arc
mpsc::{self, channel, Sender, Receiver},
Arc,
},
thread
thread,
time::Duration
};
use super::super::super::{EventHandler, RawEventHandler};
use super::{
Expand Down Expand Up @@ -127,6 +128,7 @@ pub struct ShardManager {
/// The total shards in use, 1-indexed.
shard_total: u64,
shard_queuer: Sender<ShardQueuerMessage>,
shard_shutdown: Receiver<ShardId>,
}

impl ShardManager {
Expand Down Expand Up @@ -164,18 +166,21 @@ impl ShardManager {
shard_queuer.run();
});

let (shutdown_send, shutdown_recv) = channel();
let manager = Arc::new(Mutex::new(Self {
monitor_tx: thread_tx,
shard_index: opt.shard_index,
shard_init: opt.shard_init,
shard_queuer: shard_queue_tx,
shard_total: opt.shard_total,
shard_shutdown: shutdown_recv,
runners,
}));

(Arc::clone(&manager), ShardManagerMonitor {
rx: thread_rx,
manager,
shutdown: shutdown_send,
})
}

Expand Down Expand Up @@ -298,6 +303,20 @@ impl ShardManager {
why,
);
}
match self.shard_shutdown.recv_timeout(Duration::from_secs(5)) {
Ok(shutdown_shard_id) =>
if shutdown_shard_id != shard_id {
warn!(
"Failed to cleanly shutdown shard {}: Shutdown channel sent incorrect ID",
shard_id,
);
},
Err(why) => warn!(
"Failed to cleanly shutdown shard {}: {:?}",
shard_id,
why,
)
}
}

self.runners.lock().remove(&shard_id).is_some()
Expand Down
25 changes: 21 additions & 4 deletions src/client/bridge/gateway/shard_manager_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use parking_lot::Mutex;
use std::sync::{
mpsc::Receiver,
Arc
use std::{
sync::{
mpsc::{
Receiver,
Sender
},
Arc,
}
};
use super::{ShardManager, ShardManagerMessage};
use log::debug;
use super::super::gateway::ShardId;
use log::{debug, warn};

/// The shard manager monitor does what it says on the tin -- it monitors the
/// shard manager and performs actions on it as received.
Expand All @@ -20,6 +26,8 @@ pub struct ShardManagerMonitor {
pub manager: Arc<Mutex<ShardManager>>,
/// The mpsc Receiver channel to receive shard manager messages over.
pub rx: Receiver<ShardManagerMessage>,
/// The mpsc Sender channel to inform the manager that a shard has just properly shut down
pub shutdown: Sender<ShardId>,
}

impl ShardManagerMonitor {
Expand Down Expand Up @@ -61,6 +69,15 @@ impl ShardManagerMonitor {
break;
},
ShardManagerMessage::ShutdownInitiated => break,
ShardManagerMessage::ShutdownFinished(shard_id) => {
if let Err(why) = self.shutdown.send(shard_id) {
warn!(
"[ShardMonitor] Could not forward Shutdown signal to ShardManager for shard {}: {:#?}",
shard_id,
why
);
}
}
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions src/client/bridge/gateway/shard_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,36 @@ impl<H: EventHandler + Send + Sync + 'static,
return true;
}

// Send a Close Frame to Discord, which allows a bot to "log off"
let _ = self.shard.client.close(Some(CloseFrame {
code: 1000.into(),
reason: Cow::from(""),
}));

// In return, we wait for either a Close Frame response, or an error, after which this WS is deemed
// disconnected from Discord.
loop {
match self.shard.client.read_message() {
Ok(tungstenite::Message::Close(_)) => break,
Err(_) => {
warn!(
"[ShardRunner {:?}] Received an error awaiting close frame",
self.shard.shard_info(),
);
break;
}
_ => continue,
}
}

// Inform the manager that shutdown for this shard has finished.
if let Err(why) = self.manager_tx.send(ShardManagerMessage::ShutdownFinished(id)) {
warn!(
"[ShardRunner {:?}] Could not send ShutdownFinished: {:#?}",
self.shard.shard_info(),
why,
);
}
false
}

Expand Down Expand Up @@ -259,6 +284,11 @@ impl<H: EventHandler + Send + Sync + 'static,
ShardClientMessage::Manager(ShardManagerMessage::ShutdownInitiated) => {
// nb: not sent here

true
},
ShardClientMessage::Manager(ShardManagerMessage::ShutdownFinished(_)) => {
// nb: not sent here

true
},
ShardClientMessage::Runner(ShardRunnerMessage::ChunkGuilds { guild_ids, limit, query }) => {
Expand Down

0 comments on commit 711882b

Please sign in to comment.