Skip to content

Commit

Permalink
Fix handling rebalances.
Browse files Browse the repository at this point in the history
The gateway was issuing a new Identify whenever Discord
performed a rebalance.
  • Loading branch information
Lakelezz authored and arqunis committed Apr 18, 2020
1 parent 2a5d9cd commit 003c8e1
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/client/bridge/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub enum ShardManagerMessage {
/// without bringing it back up.
///
/// [`ShardManagerMonitor`]: struct.ShardManagerMonitor.html
Shutdown(ShardId),
Shutdown(ShardId, u16),
/// Indicator that a [`ShardManagerMonitor`] should fully shutdown all shards
/// and end its monitoring process for the [`ShardManager`].
///
Expand Down
8 changes: 4 additions & 4 deletions src/client/bridge/gateway/shard_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl ShardManager {
/// [`initialize`]: #method.initialize
pub fn restart(&mut self, shard_id: ShardId) {
info!("Restarting shard {}", shard_id);
self.shutdown(shard_id);
self.shutdown(shard_id, 4000);

let shard_total = self.shard_total;

Expand All @@ -286,11 +286,11 @@ impl ShardManager {
/// by the shard runner - no longer exists, then the shard runner will not
/// know it should shut down. This _should never happen_. It may already be
/// stopped.
pub fn shutdown(&mut self, shard_id: ShardId) -> bool {
pub fn shutdown(&mut self, shard_id: ShardId, code: u16) -> bool {
info!("Shutting down shard {}", shard_id);

if let Some(runner) = self.runners.lock().get(&shard_id) {
let shutdown = ShardManagerMessage::Shutdown(shard_id);
let shutdown = ShardManagerMessage::Shutdown(shard_id, code);
let client_msg = ShardClientMessage::Manager(shutdown);
let msg = InterMessage::Client(Box::new(client_msg));

Expand Down Expand Up @@ -341,7 +341,7 @@ impl ShardManager {
info!("Shutting down all shards");

for shard_id in keys {
self.shutdown(shard_id);
self.shutdown(shard_id, 1000);
}

let _ = self.shard_queuer.send(ShardQueuerMessage::Shutdown);
Expand Down
6 changes: 4 additions & 2 deletions src/client/bridge/gateway/shard_manager_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl ShardManagerMonitor {
match value {
ShardManagerMessage::Restart(shard_id) => {
self.manager.lock().restart(shard_id);
let _ = self.shutdown.send(shard_id);
},
ShardManagerMessage::ShardUpdate { id, latency, stage } => {
let manager = self.manager.lock();
Expand All @@ -60,8 +61,9 @@ impl ShardManagerMonitor {
runner.stage = stage;
}
}
ShardManagerMessage::Shutdown(shard_id) => {
self.manager.lock().shutdown(shard_id);
ShardManagerMessage::Shutdown(shard_id, code) => {
self.manager.lock().shutdown(shard_id, code);
let _ = self.shutdown.send(shard_id);
},
ShardManagerMessage::ShutdownAll => {
self.manager.lock().shutdown_all();
Expand Down
3 changes: 2 additions & 1 deletion src/client/bridge/gateway/shard_queuer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::{
use threadpool::ThreadPool;
use typemap::ShareMap;
use crate::gateway::ConnectionStage;
use log::{info, warn};
use log::{debug, info, warn};

#[cfg(feature = "voice")]
use crate::client::bridge::voice::ClientVoiceManager;
Expand Down Expand Up @@ -209,6 +209,7 @@ impl ShardQueuer {

thread::spawn(move || {
let _ = runner.run();
debug!("[ShardRunner {:?}] Stopping", runner.shard.shard_info());
});

self.runners.lock().insert(ShardId(shard_id), runner_info);
Expand Down
19 changes: 10 additions & 9 deletions src/client/bridge/gateway/shard_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct ShardRunner {
runner_rx: Receiver<InterMessage>,
// channel to send messages to the shard runner from the shard manager
runner_tx: Sender<InterMessage>,
shard: Shard,
pub(crate) shard: Shard,
threadpool: ThreadPool,
#[cfg(feature = "voice")]
voice_manager: Arc<Mutex<ClientVoiceManager>>,
Expand Down Expand Up @@ -138,8 +138,9 @@ impl ShardRunner {
}

match action {
Some(ShardAction::Reconnect(ReconnectType::Reidentify)) => {
return self.request_restart()
Some(ShardAction::Reconnect(ReconnectType::Resume)) => {
let _ = self.request_restart();
continue;
},
Some(other) => {
let _ = self.action(&other);
Expand Down Expand Up @@ -193,7 +194,7 @@ impl ShardRunner {
// Returns whether the WebSocket client is still active.
//
// If true, the WebSocket client was _not_ shutdown. If false, it was.
fn checked_shutdown(&mut self, id: ShardId) -> bool {
fn checked_shutdown(&mut self, id: ShardId, close_code: u16) -> bool {
// First verify the ID so we know for certain this runner is
// to shutdown.
if id.0 != self.shard.shard_info()[0] {
Expand All @@ -204,7 +205,7 @@ impl ShardRunner {

// Send a Close Frame to Discord, which allows a bot to "log off"
let _ = self.shard.client.close(Some(CloseFrame {
code: 1000.into(),
code: close_code.into(),
reason: Cow::from(""),
}));

Expand Down Expand Up @@ -260,10 +261,10 @@ impl ShardRunner {
fn handle_rx_value(&mut self, value: InterMessage) -> bool {
match value {
InterMessage::Client(value) => match *value {
ShardClientMessage::Manager(ShardManagerMessage::Restart(id)) |
ShardClientMessage::Manager(ShardManagerMessage::Shutdown(id)) => {
self.checked_shutdown(id)
},
ShardClientMessage::Manager(ShardManagerMessage::Restart(id)) =>
self.checked_shutdown(id, 4000),
ShardClientMessage::Manager(ShardManagerMessage::Shutdown(id, code)) =>
self.checked_shutdown(id, code),
ShardClientMessage::Manager(ShardManagerMessage::ShutdownAll) => {
// This variant should never be received.
warn!(
Expand Down

0 comments on commit 003c8e1

Please sign in to comment.