From 9a3860186c12452c8e4b60d700d4e1599cc7aefa Mon Sep 17 00:00:00 2001 From: Claudio Pennati Date: Wed, 4 Dec 2024 20:02:52 +0100 Subject: [PATCH] feat(PoolCluster): `restoreNodeTimeout` implementation (#3218) --- lib/pool_cluster.js | 180 +++++++++++++----- promise.js | 4 +- .../test-promise-wrapper.test.mjs | 36 ++++ .../mysql/createPoolCluster/remove.test.ts | 6 + .../test-connection-retry.test.cjs | 70 +++++++ .../pool-cluster/test-remove-by-name.test.cjs | 76 ++++++++ .../test-remove-by-pattern.test.cjs | 68 +++++++ .../pool-cluster/test-restore-events.test.cjs | 98 ++++++++++ ...restore.test.cjs => test-restore.test.cjs} | 49 +++-- typings/mysql/lib/PoolCluster.d.ts | 4 + 10 files changed, 520 insertions(+), 71 deletions(-) create mode 100644 test/tsc-build/mysql/createPoolCluster/remove.test.ts create mode 100644 test/unit/pool-cluster/test-connection-retry.test.cjs create mode 100644 test/unit/pool-cluster/test-remove-by-name.test.cjs create mode 100644 test/unit/pool-cluster/test-remove-by-pattern.test.cjs create mode 100644 test/unit/pool-cluster/test-restore-events.test.cjs rename test/unit/pool-cluster/{test-connection-restore.test.cjs => test-restore.test.cjs} (61%) diff --git a/lib/pool_cluster.js b/lib/pool_cluster.js index 92f53de668..9edda501a2 100644 --- a/lib/pool_cluster.js +++ b/lib/pool_cluster.js @@ -13,17 +13,30 @@ const EventEmitter = require('events').EventEmitter; const makeSelector = { RR() { let index = 0; - return clusterIds => clusterIds[index++ % clusterIds.length]; + return (clusterIds) => clusterIds[index++ % clusterIds.length]; }, RANDOM() { - return clusterIds => + return (clusterIds) => clusterIds[Math.floor(Math.random() * clusterIds.length)]; }, ORDER() { - return clusterIds => clusterIds[0]; + return (clusterIds) => clusterIds[0]; } }; +const getMonotonicMilliseconds = function () { + let ms; + + if (typeof process.hrtime === 'function') { + ms = process.hrtime(); + ms = ms[0] * 1e3 + ms[1] * 1e-6; + } else { + ms = process.uptime() * 1000; + } + + return Math.floor(ms); +}; + class PoolNamespace { constructor(cluster, pattern, selector) { this._cluster = cluster; @@ -34,15 +47,28 @@ class PoolNamespace { getConnection(cb) { const clusterNode = this._getClusterNode(); if (clusterNode === null) { - return cb(new Error('Pool does Not exists.')); + let err = new Error('Pool does Not exist.'); + err.code = 'POOL_NOEXIST'; + + if (this._cluster._findNodeIds(this._pattern, true).length !== 0) { + err = new Error('Pool does Not have online node.'); + err.code = 'POOL_NONEONLINE'; + } + + return cb(err); } return this._cluster._getConnection(clusterNode, (err, connection) => { if (err) { + if ( + this._cluster._canRetry && + this._cluster._findNodeIds(this._pattern).length !== 0 + ) { + this._cluster.emit('warn', err); + return this.getConnection(cb); + } + return cb(err); } - if (connection === 'retry') { - return this.getConnection(cb); - } return cb(null, connection); }); } @@ -79,9 +105,9 @@ class PoolNamespace { /** * pool cluster execute - * @param {*} sql - * @param {*} values - * @param {*} cb + * @param {*} sql + * @param {*} values + * @param {*} cb */ execute(sql, values, cb) { if (typeof values === 'function') { @@ -123,6 +149,7 @@ class PoolCluster extends EventEmitter { this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry; this._removeNodeErrorCount = config.removeNodeErrorCount || 5; + this._restoreNodeTimeout = config.restoreNodeTimeout || 0; this._defaultSelector = config.defaultSelector || 'RR'; this._closed = false; this._lastId = 0; @@ -155,13 +182,26 @@ class PoolCluster extends EventEmitter { this._nodes[id] = { id: id, errorCount: 0, - pool: new Pool({ config: new PoolConfig(config) }) + pool: new Pool({ config: new PoolConfig(config) }), + _offlineUntil: 0 }; this._serviceableNodeIds.push(id); this._clearFindCaches(); } } + remove(pattern) { + const foundNodeIds = this._findNodeIds(pattern, true); + + for (let i = 0; i < foundNodeIds.length; i++) { + const node = this._getNode(foundNodeIds[i]); + + if (node) { + this._removeNode(node); + } + } + } + getConnection(pattern, selector, cb) { let namespace; if (typeof pattern === 'function') { @@ -181,7 +221,7 @@ class PoolCluster extends EventEmitter { const cb = callback !== undefined ? callback - : err => { + : (err) => { if (err) { throw err; } @@ -190,11 +230,12 @@ class PoolCluster extends EventEmitter { process.nextTick(cb); return; } + this._closed = true; let calledBack = false; let waitingClose = 0; - const onEnd = err => { + const onEnd = (err) => { if (!calledBack && (err || --waitingClose <= 0)) { calledBack = true; return cb(err); @@ -205,31 +246,51 @@ class PoolCluster extends EventEmitter { waitingClose++; this._nodes[id].pool.end(onEnd); } + if (waitingClose === 0) { process.nextTick(onEnd); } } - _findNodeIds(pattern) { - if (typeof this._findCaches[pattern] !== 'undefined') { - return this._findCaches[pattern]; - } - let foundNodeIds; - if (pattern === '*') { - // all - foundNodeIds = this._serviceableNodeIds; - } else if (this._serviceableNodeIds.indexOf(pattern) !== -1) { - // one - foundNodeIds = [pattern]; - } else { - // wild matching - const keyword = pattern.substring(pattern.length - 1, 0); - foundNodeIds = this._serviceableNodeIds.filter(id => - id.startsWith(keyword) - ); + _findNodeIds(pattern, includeOffline) { + let currentTime = 0; + let foundNodeIds = this._findCaches[pattern]; + + if (typeof this._findCaches[pattern] === 'undefined') { + if (pattern === '*') { + // all + foundNodeIds = this._serviceableNodeIds; + } else if (this._serviceableNodeIds.indexOf(pattern) !== -1) { + // one + foundNodeIds = [pattern]; + } else { + // wild matching + const keyword = pattern.substring(pattern.length - 1, 0); + foundNodeIds = this._serviceableNodeIds.filter((id) => + id.startsWith(keyword) + ); + } } + this._findCaches[pattern] = foundNodeIds; - return foundNodeIds; + + if (includeOffline) { + return foundNodeIds; + } + + return foundNodeIds.filter((nodeId) => { + const node = this._getNode(nodeId); + + if (!node._offlineUntil) { + return true; + } + + if (!currentTime) { + currentTime = getMonotonicMilliseconds(); + } + + return node._offlineUntil <= currentTime; + }); } _getNode(id) { @@ -237,21 +298,39 @@ class PoolCluster extends EventEmitter { } _increaseErrorCount(node) { - if (++node.errorCount >= this._removeNodeErrorCount) { - const index = this._serviceableNodeIds.indexOf(node.id); - if (index !== -1) { - this._serviceableNodeIds.splice(index, 1); - delete this._nodes[node.id]; - this._clearFindCaches(); - node.pool.end(); - this.emit('remove', node.id); - } + const errorCount = ++node.errorCount; + + if (this._removeNodeErrorCount > errorCount) { + return; } + + if (this._restoreNodeTimeout > 0) { + node._offlineUntil = + getMonotonicMilliseconds() + this._restoreNodeTimeout; + this.emit('offline', node.id); + return; + } + + this._removeNode(node); + this.emit('remove', node.id); } _decreaseErrorCount(node) { - if (node.errorCount > 0) { - --node.errorCount; + let errorCount = node.errorCount; + + if (errorCount > this._removeNodeErrorCount) { + errorCount = this._removeNodeErrorCount; + } + + if (errorCount < 1) { + errorCount = 1; + } + + node.errorCount = errorCount - 1; + + if (node._offlineUntil) { + node._offlineUntil = 0; + this.emit('online', node.id); } } @@ -259,13 +338,6 @@ class PoolCluster extends EventEmitter { node.pool.getConnection((err, connection) => { if (err) { this._increaseErrorCount(node); - if (this._canRetry) { - // REVIEW: this seems wrong? - this.emit('warn', err); - // eslint-disable-next-line no-console - console.warn(`[Error] PoolCluster : ${err}`); - return cb(null, 'retry'); - } return cb(err); } this._decreaseErrorCount(node); @@ -275,6 +347,16 @@ class PoolCluster extends EventEmitter { }); } + _removeNode(node) { + const index = this._serviceableNodeIds.indexOf(node.id); + if (index !== -1) { + this._serviceableNodeIds.splice(index, 1); + delete this._nodes[node.id]; + this._clearFindCaches(); + node.pool.end(); + } + } + _clearFindCaches() { this._findCaches = {}; } diff --git a/promise.js b/promise.js index a0216f5660..4010437fe9 100644 --- a/promise.js +++ b/promise.js @@ -60,7 +60,7 @@ class PromisePoolCluster extends EventEmitter { super(); this.poolCluster = poolCluster; this.Promise = thePromise || Promise; - inheritEvents(poolCluster, this, ['warn', 'remove']); + inheritEvents(poolCluster, this, ['warn', 'remove' , 'online', 'offline']); } getConnection(pattern, selector) { @@ -156,7 +156,7 @@ class PromisePoolCluster extends EventEmitter { })(func); } } -})(['add']); +})(['add', 'remove']); function createPromisePoolCluster(opts) { const corePoolCluster = createPoolCluster(opts); diff --git a/test/esm/integration/pool-cluster/test-promise-wrapper.test.mjs b/test/esm/integration/pool-cluster/test-promise-wrapper.test.mjs index f19888369d..00a56edde6 100644 --- a/test/esm/integration/pool-cluster/test-promise-wrapper.test.mjs +++ b/test/esm/integration/pool-cluster/test-promise-wrapper.test.mjs @@ -43,4 +43,40 @@ const { createPoolCluster } = require('../../../../promise.js'); poolCluster.poolCluster.emit('remove'); }); + + await test(async () => { + const poolCluster = createPoolCluster(); + + poolCluster.once('offline', async function () { + await new Promise((resolve) => { + assert.equal( + // eslint-disable-next-line no-invalid-this + this, + poolCluster, + 'should propagate offline event to promise wrapper', + ); + resolve(true); + }); + }); + + poolCluster.poolCluster.emit('offline'); + }); + + await test(async () => { + const poolCluster = createPoolCluster(); + + poolCluster.once('online', async function () { + await new Promise((resolve) => { + assert.equal( + // eslint-disable-next-line no-invalid-this + this, + poolCluster, + 'should propagate online event to promise wrapper', + ); + resolve(true); + }); + }); + + poolCluster.poolCluster.emit('online'); + }); })(); diff --git a/test/tsc-build/mysql/createPoolCluster/remove.test.ts b/test/tsc-build/mysql/createPoolCluster/remove.test.ts new file mode 100644 index 0000000000..a561e4b5be --- /dev/null +++ b/test/tsc-build/mysql/createPoolCluster/remove.test.ts @@ -0,0 +1,6 @@ +import { mysql } from '../../index.test.js'; + +const poolCluster = mysql.createPoolCluster(); + +// Overload: poolCluster.add(group, connectionUri); +poolCluster.remove('cluster1'); diff --git a/test/unit/pool-cluster/test-connection-retry.test.cjs b/test/unit/pool-cluster/test-connection-retry.test.cjs new file mode 100644 index 0000000000..c3def66ad6 --- /dev/null +++ b/test/unit/pool-cluster/test-connection-retry.test.cjs @@ -0,0 +1,70 @@ +'use strict'; + +const { assert } = require('poku'); +const portfinder = require('portfinder'); +const common = require('../../common.test.cjs'); +const mysql = require('../../../index.js'); +const { exit } = require('node:process'); +const process = require('node:process'); + +// The process is not terminated in Deno +if (typeof Deno !== 'undefined') process.exit(0); + +// TODO: config poolCluster to work with MYSQL_CONNECTION_URL run +if (`${process.env.MYSQL_CONNECTION_URL}`.includes('pscale_pw_')) { + console.log('skipping test for planetscale'); + process.exit(0); +} + +if (process.platform === 'win32') { + console.log('This test is known to fail on windows. FIXME: investi=gate why'); + exit(0); +} + +const cluster = common.createPoolCluster({ + canRetry: true, + removeNodeErrorCount: 5, +}); + +let connCount = 0; + +const server = mysql.createServer(); + +console.log('test pool cluster retry'); + +portfinder.getPort((err, port) => { + cluster.add('MASTER', { port }); + + server.listen(port + 0, (err) => { + assert.ifError(err); + + cluster.getConnection('MASTER', (err, connection) => { + assert.ifError(err); + assert.equal(connCount, 2); + assert.equal(connection._clusterId, 'MASTER'); + + connection.release(); + + cluster.end((err) => { + assert.ifError(err); + server.close(); + }); + }); + }); + + server.on('connection', (conn) => { + connCount += 1; + + if (connCount < 2) { + conn.close(); + } else { + conn.serverHandshake({ + serverVersion: 'node.js rocks', + }); + conn.on('error', () => { + // server side of the connection + // ignore disconnects + }); + } + }); +}); diff --git a/test/unit/pool-cluster/test-remove-by-name.test.cjs b/test/unit/pool-cluster/test-remove-by-name.test.cjs new file mode 100644 index 0000000000..d1b084ca15 --- /dev/null +++ b/test/unit/pool-cluster/test-remove-by-name.test.cjs @@ -0,0 +1,76 @@ +'use strict'; + +const { assert } = require('poku'); +const portfinder = require('portfinder'); +const common = require('../../common.test.cjs'); +const mysql = require('../../../index.js'); +const process = require('node:process'); + +// TODO: config poolCluster to work with MYSQL_CONNECTION_URL run +if (`${process.env.MYSQL_CONNECTION_URL}`.includes('pscale_pw_')) { + console.log('skipping test for planetscale'); + process.exit(0); +} + +if (process.platform === 'win32') { + console.log('This test is known to fail on windows. FIXME: investi=gate why'); + process.exit(0); +} + +// The process is not terminated in Deno +if (typeof Deno !== 'undefined') process.exit(0); + +const cluster = common.createPoolCluster(); +const server = mysql.createServer(); + +console.log('test pool cluster remove by name'); + +portfinder.getPort((err, port) => { + cluster.add('SLAVE1', { port }); + cluster.add('SLAVE2', { port }); + + server.listen(port + 0, (err) => { + assert.ifError(err); + + const pool = cluster.of('SLAVE*', 'ORDER'); + + pool.getConnection((err, conn) => { + assert.ifError(err); + assert.strictEqual(conn._clusterId, 'SLAVE1'); + + conn.release(); + cluster.remove('SLAVE1'); + + pool.getConnection((err, conn) => { + assert.ifError(err); + assert.strictEqual(conn._clusterId, 'SLAVE2'); + + conn.release(); + cluster.remove('SLAVE2'); + + pool.getConnection((err) => { + assert.ok(err); + assert.equal(err.code, 'POOL_NOEXIST'); + + cluster.remove('SLAVE1'); + cluster.remove('SLAVE2'); + + cluster.end((err) => { + assert.ifError(err); + server.close(); + }); + }); + }); + }); + }); + + server.on('connection', (conn) => { + conn.serverHandshake({ + serverVersion: 'node.js rocks', + }); + conn.on('error', () => { + // server side of the connection + // ignore disconnects + }); + }); +}); diff --git a/test/unit/pool-cluster/test-remove-by-pattern.test.cjs b/test/unit/pool-cluster/test-remove-by-pattern.test.cjs new file mode 100644 index 0000000000..7de9b70180 --- /dev/null +++ b/test/unit/pool-cluster/test-remove-by-pattern.test.cjs @@ -0,0 +1,68 @@ +'use strict'; + +const { assert } = require('poku'); +const portfinder = require('portfinder'); +const common = require('../../common.test.cjs'); +const mysql = require('../../../index.js'); +const process = require('node:process'); + +// TODO: config poolCluster to work with MYSQL_CONNECTION_URL run +if (`${process.env.MYSQL_CONNECTION_URL}`.includes('pscale_pw_')) { + console.log('skipping test for planetscale'); + process.exit(0); +} + +if (process.platform === 'win32') { + console.log('This test is known to fail on windows. FIXME: investi=gate why'); + process.exit(0); +} + +// The process is not terminated in Deno +if (typeof Deno !== 'undefined') process.exit(0); + +const cluster = common.createPoolCluster(); +const server = mysql.createServer(); + +console.log('test pool cluster remove by pattern'); + +portfinder.getPort((err, port) => { + cluster.add('SLAVE1', { port }); + cluster.add('SLAVE2', { port }); + + server.listen(port + 0, (err) => { + assert.ifError(err); + + const pool = cluster.of('SLAVE*', 'ORDER'); + + pool.getConnection((err, conn) => { + assert.ifError(err); + assert.strictEqual(conn._clusterId, 'SLAVE1'); + + conn.release(); + cluster.remove('SLAVE*'); + + pool.getConnection((err) => { + assert.ok(err); + assert.equal(err.code, 'POOL_NOEXIST'); + + cluster.remove('SLAVE*'); + cluster.remove('SLAVE2'); + + cluster.end((err) => { + assert.ifError(err); + server.close(); + }); + }); + }); + }); + + server.on('connection', (conn) => { + conn.serverHandshake({ + serverVersion: 'node.js rocks', + }); + conn.on('error', () => { + // server side of the connection + // ignore disconnects + }); + }); +}); diff --git a/test/unit/pool-cluster/test-restore-events.test.cjs b/test/unit/pool-cluster/test-restore-events.test.cjs new file mode 100644 index 0000000000..558943d017 --- /dev/null +++ b/test/unit/pool-cluster/test-restore-events.test.cjs @@ -0,0 +1,98 @@ +'use strict'; + +const { assert } = require('poku'); +const portfinder = require('portfinder'); +const common = require('../../common.test.cjs'); +const mysql = require('../../../index.js'); +const process = require('node:process'); + +// TODO: config poolCluster to work with MYSQL_CONNECTION_URL run +if (`${process.env.MYSQL_CONNECTION_URL}`.includes('pscale_pw_')) { + console.log('skipping test for planetscale'); + process.exit(0); +} + +if (process.platform === 'win32') { + console.log('This test is known to fail on windows. FIXME: investi=gate why'); + process.exit(0); +} + +// The process is not terminated in Deno +if (typeof Deno !== 'undefined') process.exit(0); + +const cluster = common.createPoolCluster({ + canRetry: true, + removeNodeErrorCount: 2, + restoreNodeTimeout: 100, +}); + +let connCount = 0; +let offline = true; +let offlineEvents = 0; +let onlineEvents = 0; + +const server = mysql.createServer(); + +console.log('test pool cluster restore events'); + +portfinder.getPort((err, port) => { + cluster.add('MASTER', { port }); + + server.listen(port + 0, (err) => { + assert.ifError(err); + + cluster.on('offline', (id) => { + assert.equal(++offlineEvents, 1); + assert.equal(id, 'MASTER'); + assert.equal(connCount, 2); + + cluster.getConnection('MASTER', (err) => { + assert.ok(err); + assert.equal(err.code, 'POOL_NONEONLINE'); + + offline = false; + }); + + setTimeout(() => { + cluster.getConnection('MASTER', (err, conn) => { + assert.ifError(err); + conn.release(); + }); + }, 200); + }); + + cluster.on('online', (id) => { + assert.equal(++onlineEvents, 1); + assert.equal(id, 'MASTER'); + assert.equal(connCount, 3); + + cluster.end((err) => { + assert.ifError(err); + server.close(); + }); + }); + + cluster.getConnection('MASTER', (err) => { + assert.ok(err); + assert.equal(err.code, 'PROTOCOL_CONNECTION_LOST'); + assert.equal(err.fatal, true); + assert.equal(connCount, 2); + }); + }); + + server.on('connection', (conn) => { + connCount += 1; + + if (offline) { + conn.close(); + } else { + conn.serverHandshake({ + serverVersion: 'node.js rocks', + }); + conn.on('error', () => { + // server side of the connection + // ignore disconnects + }); + } + }); +}); diff --git a/test/unit/pool-cluster/test-connection-restore.test.cjs b/test/unit/pool-cluster/test-restore.test.cjs similarity index 61% rename from test/unit/pool-cluster/test-connection-restore.test.cjs rename to test/unit/pool-cluster/test-restore.test.cjs index 4981bdec72..34e1773df9 100644 --- a/test/unit/pool-cluster/test-connection-restore.test.cjs +++ b/test/unit/pool-cluster/test-restore.test.cjs @@ -22,11 +22,12 @@ if (typeof Deno !== 'undefined') process.exit(0); const cluster = common.createPoolCluster({ canRetry: true, - removeNodeErrorCount: 1, + removeNodeErrorCount: 2, restoreNodeTimeout: 100, }); let connCount = 0; +let offline = true; const server = mysql.createServer(); @@ -38,40 +39,48 @@ portfinder.getPort((err, port) => { server.listen(port + 0, (err) => { assert.ifError(err); - const pool = cluster.of('*', 'ORDER'); - let removedNodeId; + cluster.getConnection('MASTER', (err) => { + assert.ok(err); + assert.equal(err.code, 'PROTOCOL_CONNECTION_LOST'); + assert.equal(err.fatal, true); + assert.equal(connCount, 2); - cluster.on('remove', (nodeId) => { - removedNodeId = nodeId; - }); + cluster.getConnection('MASTER', (err) => { + assert.ok(err); + assert.equal(err.code, 'POOL_NONEONLINE'); - pool.getConnection((err) => { - assert.ok(err); - console.log(connCount, cluster._serviceableNodeIds, removedNodeId); - }); + cluster._nodes.MASTER.errorCount = 3; - setTimeout(() => { - pool.getConnection(() => { - // TODO: restoreNodeTimeout is not supported now - console.log(connCount, cluster._serviceableNodeIds, removedNodeId); + offline = false; + }); - cluster.end((err) => { + setTimeout(() => { + cluster.getConnection('MASTER', (err, conn) => { assert.ifError(err); - server._server.close(); + conn.release(); + + cluster.end((err) => { + assert.ifError(err); + server.close(); + }); }); - }); - }, 200); + }, 200); + }); }); server.on('connection', (conn) => { connCount += 1; - console.log(connCount); - if (connCount < 2) { + + if (offline) { conn.close(); } else { conn.serverHandshake({ serverVersion: 'node.js rocks', }); + conn.on('error', () => { + // server side of the connection + // ignore disconnects + }); } }); }); diff --git a/typings/mysql/lib/PoolCluster.d.ts b/typings/mysql/lib/PoolCluster.d.ts index 2be0be860c..503b666f1a 100644 --- a/typings/mysql/lib/PoolCluster.d.ts +++ b/typings/mysql/lib/PoolCluster.d.ts @@ -52,6 +52,8 @@ declare class PoolCluster extends EventEmitter { add(group: string, connectionUri: string): void; add(group: string, config: PoolOptions): void; + remove(pattern: string): void; + end(): void; getConnection( @@ -79,6 +81,8 @@ declare class PoolCluster extends EventEmitter { of(pattern: string, selector?: string): PoolNamespace; on(event: string, listener: (...args: any[]) => void): this; + on(event: 'online', listener: (nodeId: number) => void): this; + on(event: 'offline', listener: (nodeId: number) => void): this; on(event: 'remove', listener: (nodeId: number) => void): this; on(event: 'warn', listener: (err: Error) => void): this; }