diff --git a/src/lib/client.ts b/src/lib/client.ts index b9fa98480..de0420137 100644 --- a/src/lib/client.ts +++ b/src/lib/client.ts @@ -385,7 +385,10 @@ export type OnMessageCallback = ( export type OnPacketCallback = (packet: Packet) => void export type OnCloseCallback = () => void export type OnErrorCallback = (error: Error | ErrorWithReasonCode) => void -export type PacketCallback = (error?: Error, packet?: Packet) => any +export type PacketCallback = ( + error?: Error | ErrorWithReasonCode, + packet?: Packet, +) => any export type CloseCallback = (error?: Error) => void export interface MqttClientEventCallbacks { diff --git a/src/lib/handlers/ack.ts b/src/lib/handlers/ack.ts index 066d30c6e..83f902966 100644 --- a/src/lib/handlers/ack.ts +++ b/src/lib/handlers/ack.ts @@ -1,6 +1,6 @@ // Other Socket Errors: EADDRINUSE, ECONNRESET, ENOTFOUND, ETIMEDOUT. -import { PacketHandler } from '../shared' +import { PacketHandler, ErrorWithReasonCode } from '../shared' export const ReasonCodes = { 0: '', @@ -82,8 +82,10 @@ const handleAck: PacketHandler = (client, packet) => { const pubackRC = packet.reasonCode // Callback - we're done if (pubackRC && pubackRC > 0 && pubackRC !== 16) { - err = new Error(`Publish error: ${ReasonCodes[pubackRC]}`) - err.code = pubackRC + err = new ErrorWithReasonCode( + `Publish error: ${ReasonCodes[pubackRC]}`, + pubackRC, + ) client['_removeOutgoingAndStoreMessage'](messageId, () => { cb(err, packet) }) @@ -102,8 +104,10 @@ const handleAck: PacketHandler = (client, packet) => { const pubrecRC = packet.reasonCode if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { - err = new Error(`Publish error: ${ReasonCodes[pubrecRC]}`) - err.code = pubrecRC + err = new ErrorWithReasonCode( + `Publish error: ${ReasonCodes[pubrecRC]}`, + pubrecRC, + ) client['_removeOutgoingAndStoreMessage'](messageId, () => { cb(err, packet) }) diff --git a/test/abstract_client.ts b/test/abstract_client.ts index 57e049923..d4441318a 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -8,6 +8,7 @@ import levelStore from 'mqtt-level-store' import Store from '../src/lib/store' import serverBuilder from './server_helpers_for_client_tests' import handlePubrel from '../src/lib/handlers/pubrel' +import TeardownHelper from './helpers/TeardownHelper' import handle from '../src/lib/handlers/index' import handlePublish from '../src/lib/handlers/publish' import mqtt, { @@ -20,7 +21,7 @@ import mqtt, { import { IPublishPacket, IPubrelPacket, ISubackPacket, QoS } from 'mqtt-packet' import { DoneCallback, ErrorWithReasonCode } from 'src/lib/shared' import { fail } from 'assert' -import { describe, it, beforeEach, afterEach } from 'node:test' +import { describe, it, beforeEach, afterEach, after } from 'node:test' /** * These tests try to be consistent with names for servers (brokers) and clients, @@ -58,6 +59,20 @@ export default function abstractTest(server, config, ports) { return mqtt.connect(opts) } + const teardownHelper = new TeardownHelper() + + async function beforeEachExec() { + await teardownHelper.runAll() + teardownHelper.reset({ removeOnce: true }) + } + + async function afterExec() { + await teardownHelper.runAll() + teardownHelper.reset() + } + + after(afterExec) + describe('closing', () => { it('should emit close if stream closes', function _test(t, done) { const client = connect() @@ -601,6 +616,9 @@ export default function abstractTest(server, config, ports) { }) describe('offline messages', () => { + beforeEach(beforeEachExec) + after(afterExec) + it('should queue message until connected', function _test(t, done) { const client = connect() @@ -645,7 +663,7 @@ export default function abstractTest(server, config, ports) { }) it('should not interrupt messages', function _test(t, done) { - let client = null + let client: mqtt.MqttClient | null = null let publishCount = 0 const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) @@ -683,13 +701,13 @@ export default function abstractTest(server, config, ports) { packet.payload.toString(), 'payload4', ) - client.end((err1) => { - server2.close((err2) => done(err1 || err2)) - }) + client.end(false, done) } }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -701,6 +719,7 @@ export default function abstractTest(server, config, ports) { outgoingStore, queueQoSZero: true, }) + teardownHelper.addClient(client) client.on('packetreceive', (packet) => { if (packet.cmd === 'connack') { setImmediate(() => { @@ -715,13 +734,20 @@ export default function abstractTest(server, config, ports) { }) it('should not overtake the messages stored in the level-db-store', function _test(t, done) { + teardownHelper.add({ executeOnce: true }, async () => { + await new Promise((resolve) => { + fs.rm(storePath, { recursive: true }, () => { + resolve() + }) + }) + }) + const storePath = fs.mkdtempSync('test-store_') const store = levelStore(storePath) - let client = null + let client: mqtt.MqttClient | null = null const incomingStore = store.incoming const outgoingStore = store.outgoing let publishCount = 0 - const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', () => { const connack = @@ -751,16 +777,14 @@ export default function abstractTest(server, config, ports) { packet.payload.toString(), 'payload3', ) - - server2.close((err) => { - fs.rmSync(storePath, { recursive: true }) - done(err) - }) + client.end(false, done) break } }) }) + teardownHelper.addServer(server2) + const clientOptions = { port: ports.PORTAND72, host: 'localhost', @@ -775,11 +799,12 @@ export default function abstractTest(server, config, ports) { server2.listen(ports.PORTAND72, () => { client = connect(clientOptions) + teardownHelper.addClient(client) + client.once('close', () => { client.once('connect', () => { - client.publish('test', 'payload2', { qos: 1 }) - client.publish('test', 'payload3', { qos: 1 }, () => { - client.end(false) + client.publish('test', 'payload2', { qos: 1 }, () => { + client.publish('test', 'payload3', { qos: 1 }) }) }) // reconecting @@ -883,6 +908,9 @@ export default function abstractTest(server, config, ports) { }) describe('publishing', () => { + beforeEach(beforeEachExec) + after(afterExec) + it('should publish a message (offline)', function _test(t, done) { const client = connect() const payload = 'test' @@ -1107,8 +1135,8 @@ export default function abstractTest(server, config, ports) { it('should fire a callback (qos 1) on error', function _test(t, done) { // 145 = Packet Identifier in use const pubackReasonCode = 145 - const pubOpts = { qos: 1 } - let client = null + const pubOpts: IClientPublishOptions = { qos: 1 } + let client: mqtt.MqttClient | null = null const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', () => { @@ -1130,6 +1158,8 @@ export default function abstractTest(server, config, ports) { }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND72, () => { client = connect({ port: ports.PORTAND72, @@ -1139,6 +1169,8 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 0, }) + teardownHelper.addClient(client) + client.once('connect', () => { client.publish( 'a', @@ -1147,15 +1179,18 @@ export default function abstractTest(server, config, ports) { (err, packet?: mqtt.Packet) => { assert.exists(packet) if (version === 5) { - assert.strictEqual(err.code, pubackReasonCode) + if (err instanceof ErrorWithReasonCode) { + assert.strictEqual( + err.code, + pubackReasonCode, + ) + } else { + assert.instanceOf(err, ErrorWithReasonCode) + } } else { assert.ifError(err) } - setImmediate(() => { - client.end(() => { - server2.close(done) - }) - }) + done() }, ) }) @@ -1177,9 +1212,8 @@ export default function abstractTest(server, config, ports) { it('should fire a callback (qos 2) on error', function _test(t, done) { // 145 = Packet Identifier in use const pubrecReasonCode = 145 - const pubOpts = { qos: 2 } - let client = null - + const pubOpts: IClientPublishOptions = { qos: 2 } + let client: mqtt.MqttClient | null = null const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', () => { const connack = @@ -1204,6 +1238,8 @@ export default function abstractTest(server, config, ports) { }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND103, () => { client = connect({ port: ports.PORTAND103, @@ -1213,6 +1249,8 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 0, }) + teardownHelper.addClient(client) + client.once('connect', () => { client.publish( 'a', @@ -1221,15 +1259,18 @@ export default function abstractTest(server, config, ports) { (err, packet?: mqtt.Packet) => { assert.exists(packet) if (version === 5) { - assert.strictEqual(err.code, pubrecReasonCode) + if (err instanceof ErrorWithReasonCode) { + assert.strictEqual( + err.code, + pubrecReasonCode, + ) + } else { + assert.instanceOf(err, ErrorWithReasonCode) + } } else { assert.ifError(err) } - setImmediate(() => { - client.end(true, () => { - server2.close(done) - }) - }) + done() }, ) }) @@ -1725,15 +1766,15 @@ export default function abstractTest(server, config, ports) { packet.payload.toString(), 'payload3', ) - client.end((err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() break } } }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -1745,6 +1786,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + teardownHelper.addClient(client) + client.on('connect', () => { if (!reconnect) { client.publish('topic', 'payload1', { qos: 1 }) @@ -3009,6 +3052,9 @@ export default function abstractTest(server, config, ports) { }) describe('auto reconnect', () => { + beforeEach(beforeEachExec) + after(afterExec) + it('should mark the client disconnecting if #end called', function _test(t, done) { const client = connect() @@ -3534,6 +3580,7 @@ export default function abstractTest(server, config, ports) { it('should not resubscribe when reconnecting if suback is error', function _test(t, done) { let tryReconnect = true let reconnectEvent = false + let client: mqtt.MqttClient | null = null const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', (packet) => { const connack = @@ -3551,13 +3598,17 @@ export default function abstractTest(server, config, ports) { }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND49, () => { - const client = connect({ + client = connect({ port: ports.PORTAND49, host: 'localhost', reconnectPeriod: 100, }) + teardownHelper.addClient(client) + client.on('reconnect', () => { reconnectEvent = true }) @@ -3580,9 +3631,7 @@ export default function abstractTest(server, config, ports) { Object.keys(client['_resubscribeTopics']).length, 0, ) - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() } }) }) @@ -3590,7 +3639,7 @@ export default function abstractTest(server, config, ports) { it('should preserved incomingStore after disconnecting if clean is false', function _test(t, done) { let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3624,12 +3673,12 @@ export default function abstractTest(server, config, ports) { }) }) serverClient.on('pubcomp', (packet) => { - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -3641,6 +3690,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + teardownHelper.addClient(client) + client.on('connect', () => { if (!reconnect) { client.subscribe('test', { qos: 2 }, () => {}) @@ -3656,7 +3707,7 @@ export default function abstractTest(server, config, ports) { it('should clear outgoing if close from server', function _test(t, done) { let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const server2 = serverBuilder(config.protocol, (serverClient) => { serverClient.on('connect', (packet) => { const connack = @@ -3675,6 +3726,8 @@ export default function abstractTest(server, config, ports) { }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -3685,6 +3738,8 @@ export default function abstractTest(server, config, ports) { reconnectPeriod: 0, }) + teardownHelper.addClient(client) + client.on('connect', () => { client.subscribe('test', { qos: 2 }, (e) => { if (!e) { @@ -3695,7 +3750,7 @@ export default function abstractTest(server, config, ports) { client.on('close', () => { if (reconnect) { - server2.close((err) => done(err)) + done() } else { assert.strictEqual( Object.keys(client.outgoing).length, @@ -3710,7 +3765,7 @@ export default function abstractTest(server, config, ports) { it('should resend in-flight QoS 1 publish messages from the client if clean is false', function _test(t, done) { let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3721,9 +3776,7 @@ export default function abstractTest(server, config, ports) { }) serverClient.on('publish', (packet) => { if (reconnect) { - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() } else { client.end(true, () => { client.reconnect({ @@ -3736,6 +3789,8 @@ export default function abstractTest(server, config, ports) { }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -3747,6 +3802,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + teardownHelper.addClient(client) + client.on('connect', () => { if (!reconnect) { client.publish('topic', 'payload', { qos: 1 }) @@ -3758,7 +3815,7 @@ export default function abstractTest(server, config, ports) { it('should resend in-flight QoS 2 publish messages from the client if clean is false', function _test(t, done) { let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3769,9 +3826,7 @@ export default function abstractTest(server, config, ports) { }) serverClient.on('publish', (packet) => { if (reconnect) { - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() } else { client.end(true, () => { client.reconnect({ @@ -3784,6 +3839,8 @@ export default function abstractTest(server, config, ports) { }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -3795,6 +3852,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + teardownHelper.addClient(client) + client.on('connect', () => { if (!reconnect) { client.publish('topic', 'payload', { qos: 2 }) @@ -3806,7 +3865,7 @@ export default function abstractTest(server, config, ports) { it('should resend in-flight QoS 2 pubrel messages from the client if clean is false', function _test(t, done) { let reconnect = false - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3835,6 +3894,8 @@ export default function abstractTest(server, config, ports) { }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -3846,6 +3907,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + teardownHelper.addClient(client) + client.on('connect', () => { if (!reconnect) { client.publish( @@ -3855,9 +3918,7 @@ export default function abstractTest(server, config, ports) { (err) => { assert(reconnect) assert.ifError(err) - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() }, ) } @@ -3870,7 +3931,7 @@ export default function abstractTest(server, config, ports) { let publishCount = 0 let reconnect = false let disconnectOnce = true - let client: mqtt.MqttClient + let client: mqtt.MqttClient | null = null const incomingStore = new mqtt.Store({ clean: false }) const outgoingStore = new mqtt.Store({ clean: false }) const server2 = serverBuilder(config.protocol, (serverClient) => { @@ -3904,9 +3965,7 @@ export default function abstractTest(server, config, ports) { packet.payload.toString(), 'payload3', ) - client.end(true, (err1) => { - server2.close((err2) => done(err1 || err2)) - }) + done() break } } else if (disconnectOnce) { @@ -3922,6 +3981,8 @@ export default function abstractTest(server, config, ports) { }) }) + teardownHelper.addServer(server2) + server2.listen(ports.PORTAND50, () => { client = connect({ port: ports.PORTAND50, @@ -3933,6 +3994,8 @@ export default function abstractTest(server, config, ports) { outgoingStore, }) + teardownHelper.addClient(client) + client['nextId'] = 65535 client.on('connect', () => { diff --git a/test/helpers/TeardownHelper.ts b/test/helpers/TeardownHelper.ts new file mode 100644 index 000000000..774582009 --- /dev/null +++ b/test/helpers/TeardownHelper.ts @@ -0,0 +1,269 @@ +import type { MqttClient } from 'src' +import { randomUUID } from 'node:crypto' +import serverBuilder from '../server_helpers_for_client_tests' + +type ServerBuilderInstance = ReturnType + +type AddOptions = { + /** + * @description + * If `true`, the method will be executed only one time and then removed from the store. + * + * @default true + */ + executeOnce?: boolean +} + +type ResetOptions = { + /** + * @description + * If `true`, only the methods that have the option `executeOnce` set to `true` will be removed. + * + * @default false + */ + removeOnce?: boolean +} + +/** + * @description + * Class to help clean the environment or close opened connections after tests finish. + * Also, you can add custom methods to be executed after the tests finish, like + * deleting temporary files or closing connections. + * + * @example + * ``` + * import { describe, it } from 'node:test' + * import mqtt from './src' + * import serverBuilder from './test/server_helpers_for_client_tests' + * import TeardownHelper from './test/helpers/TeardownHelper' + * + * + * describe('Test', () => { + * const teardownHelper = new TeardownHelper() + * + * it('should clean the client and server', (t, done) => { + * t.after(async () => { + * await teardownHelper.runAll() + * }) + * + * const server = serverBuilder('8883') + * const client = mqtt.connect('mqtt://localhost') + * + * teardownHelper.addServer(server) + * teardownHelper.addClient(client) + * }) + * }) + * ``` + * + * @example + * ``` + * import { describe, it, after } from 'node:test' + * import mqtt from './src' + * import serverBuilder from './test/server_helpers_for_client_tests' + * import TeardownHelper from './test/helpers/TeardownHelper' + * + * + * describe('Test', () => { + * + * const teardownHelper = new TeardownHelper() + * let server = serverBuilder('8883') + * + * teardownHelper.add({}, async () => { + * if (server?.listening) { + * await new Promise((resolve, reject) => { + * server.close((err) => { + * if (err) reject(err) + * else resolve() + * }) + * }) + * } + * }) + * + * after(async () => { + * await teardownHelper.runAll() + * }) + * + * it('should clean the client and server', (t, done) => { + * server = serverBuilder('8883') + * const client = mqtt.connect('mqtt://localhost') + * + * teardownHelper.addClient(client) + * done() + * }) + * + * }) + * ``` + */ +class TeardownHelper { + #methods: Map< + string, + { + options: AddOptions + method: Promise | ((...args: any[]) => Promise) + args: any[] + } + > + + constructor() { + this.#methods = new Map() + } + + /** + * @description + * Add a client to close. + */ + addClient(client: MqttClient) { + this.add({}, this.closeClient, client) + } + + /** + * @description + * Add a server to close. + */ + addServer(server: ServerBuilderInstance) { + this.add({}, this.closeServer, server) + } + + /** + * @param options Options to be passed to the method. + * @param method It can be a promise or a function that returns a promise. + * @param args Arguments to be passed to the method. + * + * @description + * Add a method to be executed + */ + add Promise>( + options: AddOptions | undefined, + method: Promise | T, + ...args: Parameters + ): string { + const id = randomUUID() + + this.#methods.set(id, { + method, + args, + options: { executeOnce: true, ...options }, + }) + + return id + } + + /** + * @description + * Remove all methods stored. + */ + reset(options?: ResetOptions) { + if (options?.removeOnce) { + for (const [id, { options: methodOptions }] of this.#methods) { + if (methodOptions.executeOnce) { + this.#methods.delete(id) + } + } + } else { + this.#methods.clear() + } + } + + /** + * @description + * Close the `client` connection. + * + * @default + * Use the `client` set in the class. + */ + async closeClient(client: MqttClient) { + if (client) { + await new Promise((resolve, reject) => { + client.end(true, (err) => { + if (err) reject(err) + else resolve() + }) + }) + } + } + + /** + * @description + * Close the `server` connection. + * + * @default + * Use the `server` set in the class. + */ + async closeServer(server: ServerBuilderInstance) { + if (server?.listening) { + await new Promise((resolve, reject) => { + server.close((err) => { + if (err) reject(err) + else resolve() + }) + }) + } + } + + /** + * @param id Method id to be executed. + * + * @description + * Execute a method stored by its id. + * If the method has the option `executeOnce` set to `true`, it will be removed after execution. + */ + async run(id: string) { + const method = this.#methods.get(id) + + if (!method) { + return + } + + if (method.options.executeOnce) { + this.#methods.delete(id) + } + + if (method.method instanceof Promise) { + await method.method + } else { + await method.method(...method.args) + } + } + + /** + * @description + * Execute all methods stored. + * If a method has the option `executeOnce` set to `true`, it will be removed after execution. + */ + async runAll() { + if (this.#methods.size === 0) { + return + } + + const methods: Array> = [] + + for (const [id, { method, options, args }] of this.#methods) { + if (method instanceof Promise) { + methods.push(method) + } else { + const promise = new Promise((resolve, reject) => { + method(...args) + .then(resolve) + .catch(reject) + }) + + methods.push(promise) + } + + if (options.executeOnce) { + this.#methods.delete(id) + } + } + + const results = await Promise.allSettled(methods) + + for (const result of results) { + if (result.status === 'rejected') { + if (result.reason instanceof Error) throw result.reason + else throw new Error(result.reason) + } + } + } +} + +export default TeardownHelper