From 35dbbc27e878c507069aac9a8f8b53067868b9c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e?= Date: Sat, 7 Dec 2024 12:28:34 +0000 Subject: [PATCH] Various tweaks to make the socket server shut down cleanly (#683) * Tweak pinging; terminate hung connections * Close all connections, including lost connections on shutdown * Tweak guest counting * Make private things private * Check lost connection timeouts during ping phase --- src/SocketServer.js | 43 +++++++++++----------- src/sockets/AuthedConnection.js | 47 ++++++++++++++++++------ src/sockets/GuestConnection.js | 37 ++++++++++++++----- src/sockets/LostConnection.js | 63 ++++++++++++++------------------- 4 files changed, 113 insertions(+), 77 deletions(-) diff --git a/src/SocketServer.js b/src/SocketServer.js index 980362d8..a87b2456 100644 --- a/src/SocketServer.js +++ b/src/SocketServer.js @@ -3,7 +3,6 @@ import lodash from 'lodash'; import sjson from 'secure-json-parse'; import { WebSocketServer } from 'ws'; import Ajv from 'ajv'; -import ms from 'ms'; import { stdSerializers } from 'pino'; import { socketVote } from './controllers/booth.js'; import { disconnectUser } from './controllers/users.js'; @@ -13,10 +12,13 @@ import AuthedConnection from './sockets/AuthedConnection.js'; import LostConnection from './sockets/LostConnection.js'; import { serializeUser } from './utils/serialize.js'; -const { debounce, isEmpty } = lodash; +const { isEmpty } = lodash; export const REDIS_ACTIVE_SESSIONS = 'users'; +const PING_INTERVAL = 10_000; +const GUEST_COUNT_INTERVAL = 2_000; + /** * @typedef {import('./schema.js').User} User */ @@ -102,10 +104,10 @@ class SocketServer { #pinger; - /** - * Update online guests count and broadcast an update if necessary. - */ - #recountGuests; + /** Update online guests count and broadcast an update if necessary. */ + #guestCountInterval; + + #guestCountDirty = true; /** * Handlers for commands that come in from clients. @@ -187,16 +189,17 @@ class SocketServer { this.#pinger = setInterval(() => { this.ping(); - }, ms('10 seconds')); + }, PING_INTERVAL); - this.#recountGuests = debounce(() => { - if (this.#closing) { + this.#guestCountInterval = setInterval(() => { + if (!this.#guestCountDirty) { return; } - this.#recountGuestsInternal().catch((error) => { + + this.#recountGuests().catch((error) => { this.#logger.error({ err: error }, 'counting guests failed'); }); - }, ms('2 seconds')); + }, GUEST_COUNT_INTERVAL); this.#clientActions = { sendChat: (user, message) => { @@ -629,7 +632,7 @@ class SocketServer { this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection'); this.#connections.push(connection); - this.#recountGuests(); + this.#guestCountDirty = true; } /** @@ -647,7 +650,7 @@ class SocketServer { this.#connections.splice(i, 1); connection.removed(); - this.#recountGuests(); + this.#guestCountDirty = true; } /** @@ -703,12 +706,12 @@ class SocketServer { clearInterval(this.#pinger); this.#closing = true; - for (const connection of this.#wss.clients) { + clearInterval(this.#guestCountInterval); + + for (const connection of this.#connections) { connection.close(); } - this.#recountGuests.cancel(); - const closeWsServer = promisify(this.#wss.close.bind(this.#wss)); await closeWsServer(); await this.#redisSubscription.quit(); @@ -717,7 +720,7 @@ class SocketServer { /** * Get the connection instance for a specific user. * - * @param {User|string} user The user. + * @param {User|import('./schema.js').UserID} user The user. * @returns {Connection|undefined} */ connection(user) { @@ -727,9 +730,7 @@ class SocketServer { ping() { this.#connections.forEach((connection) => { - if ('socket' in connection) { - connection.ping(); - } + connection.ping(); }); } @@ -779,7 +780,7 @@ class SocketServer { return parseInt(rawCount, 10); } - async #recountGuestsInternal() { + async #recountGuests() { const { redis } = this.#uw; const guests = this.#connections .filter((connection) => connection instanceof GuestConnection) diff --git a/src/sockets/AuthedConnection.js b/src/sockets/AuthedConnection.js index f4bebff7..a48e1ecd 100644 --- a/src/sockets/AuthedConnection.js +++ b/src/sockets/AuthedConnection.js @@ -3,9 +3,16 @@ import Ultron from 'ultron'; import WebSocket from 'ws'; import sjson from 'secure-json-parse'; +const PING_TIMEOUT = 5_000; +const DEAD_TIMEOUT = 30_000; + class AuthedConnection extends EventEmitter { + #events; + #logger; + #lastMessage = Date.now(); + /** * @param {import('../Uwave.js').default} uw * @param {import('ws').WebSocket} socket @@ -16,19 +23,23 @@ class AuthedConnection extends EventEmitter { super(); this.uw = uw; this.socket = socket; - this.events = new Ultron(this.socket); + this.#events = new Ultron(this.socket); this.user = user; this.sessionID = sessionID; this.#logger = uw.logger.child({ ns: 'uwave:sockets', connectionType: 'AuthedConnection', userId: this.user.id, sessionID, }); - this.events.on('close', () => { + this.#events.on('close', () => { this.emit('close', { banned: this.banned }); }); - this.events.on('message', this.onMessage.bind(this)); + this.#events.on('message', (raw) => { + this.#onMessage(raw); + }); + this.#events.on('pong', () => { + this.#onPong(); + }); - this.lastMessage = Date.now(); this.sendWaiting(); } @@ -66,28 +77,42 @@ class AuthedConnection extends EventEmitter { /** * @param {string|Buffer} raw - * @private */ - onMessage(raw) { + #onMessage(raw) { + this.#lastMessage = Date.now(); const { command, data } = sjson.safeParse(raw) ?? {}; if (command) { this.emit('command', command, data); } } + #onPong() { + this.#lastMessage = Date.now(); + } + /** * @param {string} command * @param {import('type-fest').JsonValue} data */ send(command, data) { this.socket.send(JSON.stringify({ command, data })); - this.lastMessage = Date.now(); + this.#lastMessage = Date.now(); + } + + #timeSinceLastMessage() { + return Date.now() - this.#lastMessage; } ping() { - if (Date.now() - this.lastMessage > 5000 && this.socket.readyState === WebSocket.OPEN) { - this.socket.send('-'); - this.lastMessage = Date.now(); + if (this.socket.readyState !== WebSocket.OPEN) { + return; + } + if (this.#timeSinceLastMessage() > DEAD_TIMEOUT) { + this.socket.terminate(); + return; + } + if (this.#timeSinceLastMessage() > PING_TIMEOUT) { + this.socket.ping(); } } @@ -104,7 +129,7 @@ class AuthedConnection extends EventEmitter { } removed() { - this.events.remove(); + this.#events.remove(); } toString() { diff --git a/src/sockets/GuestConnection.js b/src/sockets/GuestConnection.js index d5e10b3b..4f2f4a6e 100644 --- a/src/sockets/GuestConnection.js +++ b/src/sockets/GuestConnection.js @@ -1,9 +1,17 @@ import EventEmitter from 'node:events'; import Ultron from 'ultron'; +import WebSocket from 'ws'; + +const PING_TIMEOUT = 5_000; +const DEAD_TIMEOUT = 30_000; class GuestConnection extends EventEmitter { + #events; + #logger; + #lastMessage = Date.now(); + /** * @param {import('../Uwave.js').default} uw * @param {import('ws').WebSocket} socket @@ -16,13 +24,13 @@ class GuestConnection extends EventEmitter { this.options = options; this.#logger = uw.logger.child({ ns: 'uwave:sockets', connectionType: 'GuestConnection', userId: null }); - this.events = new Ultron(socket); + this.#events = new Ultron(socket); - this.events.on('close', () => { + this.#events.on('close', () => { this.emit('close'); }); - this.events.on('message', /** @param {string|Buffer} token */ (token) => { + this.#events.on('message', /** @param {string|Buffer} token */ (token) => { this.attemptAuth(token.toString()).then(() => { this.send('authenticated'); }).catch((error) => { @@ -30,7 +38,9 @@ class GuestConnection extends EventEmitter { }); }); - this.lastMessage = Date.now(); + this.#events.on('pong', () => { + this.#lastMessage = Date.now(); + }); } /** @@ -73,13 +83,22 @@ class GuestConnection extends EventEmitter { */ send(command, data) { this.socket.send(JSON.stringify({ command, data })); - this.lastMessage = Date.now(); + } + + #timeSinceLastMessage() { + return Date.now() - this.#lastMessage; } ping() { - if (Date.now() - this.lastMessage > 5000) { - this.socket.send('-'); - this.lastMessage = Date.now(); + if (this.socket.readyState !== WebSocket.OPEN) { + return; + } + if (this.#timeSinceLastMessage() > DEAD_TIMEOUT) { + this.socket.terminate(); + return; + } + if (this.#timeSinceLastMessage() > PING_TIMEOUT) { + this.socket.ping(); } } @@ -89,7 +108,7 @@ class GuestConnection extends EventEmitter { } removed() { - this.events.remove(); + this.#events.remove(); } toString() { diff --git a/src/sockets/LostConnection.js b/src/sockets/LostConnection.js index cf66c3bf..7549c3f4 100644 --- a/src/sockets/LostConnection.js +++ b/src/sockets/LostConnection.js @@ -3,6 +3,10 @@ import EventEmitter from 'node:events'; class LostConnection extends EventEmitter { #logger; + #expiresAt; + + #uw; + /** * @param {import('../Uwave.js').default} uw * @param {import('../schema.js').User} user @@ -10,59 +14,39 @@ class LostConnection extends EventEmitter { */ constructor(uw, user, sessionID, timeout = 30) { super(); - this.uw = uw; + this.#uw = uw; this.user = user; this.sessionID = sessionID; - this.timeout = timeout; + this.#expiresAt = Date.now() + timeout * 1_000; this.#logger = uw.logger.child({ ns: 'uwave:sockets', connectionType: 'LostConnection', userID: this.user.id, sessionID, }); - this.initQueued(); - this.setTimeout(timeout); + this.#initQueued(timeout); } - /** - * @private - */ - get key() { + get #key() { return `http-api:disconnected:${this.sessionID}`; } - /** - * @private - */ - get messagesKey() { + get #messagesKey() { return `http-api:disconnected:${this.sessionID}:messages`; } - /** - * @private - */ - initQueued() { + /** @param {number} seconds */ + #initQueued(seconds) { // We expire the keys after timeout*10, because a server restart near the // end of the timeout might mean that someone fails to reconnect. This way // we can ensure that everyone still gets the full `timeout` duration to // reconnect after a server restart, while also not filling up Redis with // messages to users who left and will never return. - this.uw.redis.multi() - .set(this.key, 'true', 'EX', this.timeout * 10) - .ltrim(this.messagesKey, 0, 0) - .expire(this.messagesKey, this.timeout * 10) + this.#uw.redis.multi() + .set(this.#key, 'true', 'EX', seconds * 10) + .ltrim(this.#messagesKey, 0, 0) + .expire(this.#messagesKey, seconds * 10) .exec(); } - /** - * @param {number} timeout - * @private - */ - setTimeout(timeout) { - this.removeTimer = setTimeout(() => { - this.close(); - this.uw.redis.del(this.key, this.messagesKey); - }, timeout * 1000); - } - /** * @param {string} command * @param {import('type-fest').JsonValue} data @@ -70,21 +54,28 @@ class LostConnection extends EventEmitter { send(command, data) { this.#logger.info({ command, data }, 'queue command'); - this.uw.redis.rpush( - this.messagesKey, + this.#uw.redis.rpush( + this.#messagesKey, JSON.stringify({ command, data }), ); } + ping() { + if (Date.now() > this.#expiresAt) { + this.close(); + this.#uw.redis.del(this.#key, this.#messagesKey).catch(() => { + // No big deal + }); + } + } + close() { this.#logger.info('close'); this.emit('close'); } removed() { - if (this.removeTimer) { - clearTimeout(this.removeTimer); - } + // Nothing to do } toString() {