diff --git a/.github/workflows/mqttjs-test.yml b/.github/workflows/mqttjs-test.yml index 9511d07c8..eb1c47d6c 100644 --- a/.github/workflows/mqttjs-test.yml +++ b/.github/workflows/mqttjs-test.yml @@ -44,6 +44,8 @@ jobs: DEBUG: "mqttjs" - name: Test Browser + if: matrix.node-version == '20.x' + # only run on latest node version, no reason to run on all timeout-minutes: 2 run: | npm run browser-build diff --git a/bin/pub.js b/bin/pub.js index 2ea11f747..6309bca3e 100755 --- a/bin/pub.js +++ b/bin/pub.js @@ -3,11 +3,10 @@ 'use strict' const mqtt = require('../') -const pump = require('pump') +const { pipeline, Writable } = require('readable-stream') const path = require('path') const fs = require('fs') const concat = require('concat-stream') -const Writable = require('readable-stream').Writable const helpMe = require('help-me')({ dir: path.join(__dirname, '..', 'doc') }) @@ -40,7 +39,7 @@ function multisend (args) { } client.on('connect', function () { - pump(process.stdin, split2(), sender, function (err) { + pipeline(process.stdin, split2(), sender, function (err) { client.end() if (err) { throw err diff --git a/lib/client.js b/lib/client.js index 13306a0c6..b39ab63db 100644 --- a/lib/client.js +++ b/lib/client.js @@ -10,11 +10,9 @@ const TopicAliasSend = require('./topic-alias-send') const mqttPacket = require('mqtt-packet') const DefaultMessageIdProvider = require('./default-message-id-provider') const Writable = require('readable-stream').Writable -const inherits = require('inherits') const reInterval = require('reinterval') const clone = require('rfdc/default') const validations = require('./validations') -const xtend = require('xtend') const debug = require('debug')('mqttjs:client') const nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) } const setImmediate = global.setImmediate || function (...args) { @@ -247,1672 +245,1670 @@ function nop (error) { * @param {Object} [options] - connection options * (see Connection#connect) */ -function MqttClient (streamBuilder, options) { - let k - const that = this +class MqttClient extends EventEmitter { + constructor (streamBuilder, options) { + super() - if (!(this instanceof MqttClient)) { - return new MqttClient(streamBuilder, options) - } + let k + const that = this - this.options = options || {} + this.options = options || {} - // Defaults - for (k in defaultConnectOptions) { - if (typeof this.options[k] === 'undefined') { - this.options[k] = defaultConnectOptions[k] - } else { - this.options[k] = options[k] + // Defaults + for (k in defaultConnectOptions) { + if (typeof this.options[k] === 'undefined') { + this.options[k] = defaultConnectOptions[k] + } else { + this.options[k] = options[k] + } } - } - - debug('MqttClient :: options.protocol', options.protocol) - debug('MqttClient :: options.protocolVersion', options.protocolVersion) - debug('MqttClient :: options.username', options.username) - debug('MqttClient :: options.keepalive', options.keepalive) - debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod) - debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized) - debug('MqttClient :: options.properties.topicAliasMaximum', options.properties ? options.properties.topicAliasMaximum : undefined) - this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId() + debug('MqttClient :: options.protocol', options.protocol) + debug('MqttClient :: options.protocolVersion', options.protocolVersion) + debug('MqttClient :: options.username', options.username) + debug('MqttClient :: options.keepalive', options.keepalive) + debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod) + debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized) + debug('MqttClient :: options.properties.topicAliasMaximum', options.properties ? options.properties.topicAliasMaximum : undefined) - debug('MqttClient :: clientId', this.options.clientId) + this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId() - this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) } + debug('MqttClient :: clientId', this.options.clientId) - // Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance - if (!this.options.writeCache) { - mqttPacket.writeToStream.cacheNumbers = false - } + this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) } - this.streamBuilder = streamBuilder - - this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider - - // Inflight message storages - this.outgoingStore = options.outgoingStore || new Store() - this.incomingStore = options.incomingStore || new Store() - - // Should QoS zero messages be queued when the connection is broken? - this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero - - // map of subscribed topics to support reconnection - this._resubscribeTopics = {} - - // map of a subscribe messageId and a topic - this.messageIdToTopic = {} - - // Ping timer, setup in _setupPingTimer - this.pingTimer = null - // Is the client connected? - this.connected = false - // Are we disconnecting? - this.disconnecting = false - // Packet queue - this.queue = [] - // connack timer - this.connackTimer = null - // Reconnect timer - this.reconnectTimer = null - // Is processing store? - this._storeProcessing = false - // Packet Ids are put into the store during store processing - this._packetIdsDuringStoreProcessing = {} - // Store processing queue - this._storeProcessingQueue = [] - - // Inflight callbacks - this.outgoing = {} - - // True if connection is first time. - this._firstConnection = true - - if (options.properties && options.properties.topicAliasMaximum > 0) { - if (options.properties.topicAliasMaximum > 0xffff) { - debug('MqttClient :: options.properties.topicAliasMaximum is out of range') - } else { - this.topicAliasRecv = new TopicAliasRecv(options.properties.topicAliasMaximum) + // Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance + if (!this.options.writeCache) { + mqttPacket.writeToStream.cacheNumbers = false } - } - // Send queued packets - this.on('connect', function () { - const queue = that.queue + this.streamBuilder = streamBuilder - function deliver () { - const entry = queue.shift() - debug('deliver :: entry %o', entry) - let packet = null + this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider - if (!entry) { - that._resubscribe() - return - } + // Inflight message storages + this.outgoingStore = options.outgoingStore || new Store() + this.incomingStore = options.incomingStore || new Store() - packet = entry.packet - debug('deliver :: call _sendPacket for %o', packet) - let send = true - if (packet.messageId && packet.messageId !== 0) { - if (!that.messageIdProvider.register(packet.messageId)) { - send = false - } - } - if (send) { - that._sendPacket( - packet, - function (err) { - if (entry.cb) { - entry.cb(err) - } - deliver() - } - ) + // Should QoS zero messages be queued when the connection is broken? + this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero + + // map of subscribed topics to support reconnection + this._resubscribeTopics = {} + + // map of a subscribe messageId and a topic + this.messageIdToTopic = {} + + // Ping timer, setup in _setupPingTimer + this.pingTimer = null + // Is the client connected? + this.connected = false + // Are we disconnecting? + this.disconnecting = false + // Packet queue + this.queue = [] + // connack timer + this.connackTimer = null + // Reconnect timer + this.reconnectTimer = null + // Is processing store? + this._storeProcessing = false + // Packet Ids are put into the store during store processing + this._packetIdsDuringStoreProcessing = {} + // Store processing queue + this._storeProcessingQueue = [] + + // Inflight callbacks + this.outgoing = {} + + // True if connection is first time. + this._firstConnection = true + + if (options.properties && options.properties.topicAliasMaximum > 0) { + if (options.properties.topicAliasMaximum > 0xffff) { + debug('MqttClient :: options.properties.topicAliasMaximum is out of range') } else { - debug('messageId: %d has already used. The message is skipped and removed.', packet.messageId) - deliver() + this.topicAliasRecv = new TopicAliasRecv(options.properties.topicAliasMaximum) } } - debug('connect :: sending queued packets') - deliver() - }) - - this.on('close', function () { - debug('close :: connected set to `false`') - that.connected = false + // Send queued packets + this.on('connect', function () { + const queue = that.queue - debug('close :: clearing connackTimer') - clearTimeout(that.connackTimer) + function deliver () { + const entry = queue.shift() + debug('deliver :: entry %o', entry) + let packet = null - debug('close :: clearing ping timer') - if (that.pingTimer !== null) { - that.pingTimer.clear() - that.pingTimer = null - } + if (!entry) { + that._resubscribe() + return + } - if (that.topicAliasRecv) { - that.topicAliasRecv.clear() - } + packet = entry.packet + debug('deliver :: call _sendPacket for %o', packet) + let send = true + if (packet.messageId && packet.messageId !== 0) { + if (!that.messageIdProvider.register(packet.messageId)) { + send = false + } + } + if (send) { + that._sendPacket( + packet, + function (err) { + if (entry.cb) { + entry.cb(err) + } + deliver() + } + ) + } else { + debug('messageId: %d has already used. The message is skipped and removed.', packet.messageId) + deliver() + } + } - debug('close :: calling _setupReconnect') - that._setupReconnect() - }) - EventEmitter.call(this) + debug('connect :: sending queued packets') + deliver() + }) - debug('MqttClient :: setting up stream') - this._setupStream() -} -inherits(MqttClient, EventEmitter) + this.on('close', function () { + debug('close :: connected set to `false`') + that.connected = false -/** - * setup the event handlers in the inner stream. - * - * @api private - */ -MqttClient.prototype._setupStream = function () { - const that = this - const writable = new Writable() - const parser = mqttPacket.parser(this.options) - let completeParse = null - const packets = [] - - debug('_setupStream :: calling method to clear reconnect') - this._clearReconnect() - - debug('_setupStream :: using streamBuilder provided to client to create stream') - this.stream = this.streamBuilder(this) - - parser.on('packet', function (packet) { - debug('parser :: on packet push to packets array.') - packets.push(packet) - }) + debug('close :: clearing connackTimer') + clearTimeout(that.connackTimer) - function nextTickWork () { - if (packets.length) { - nextTick(work) - } else { - const done = completeParse - completeParse = null - done() - } - } + debug('close :: clearing ping timer') + if (that.pingTimer !== null) { + that.pingTimer.clear() + that.pingTimer = null + } - function work () { - debug('work :: getting next packet in queue') - const packet = packets.shift() + if (that.topicAliasRecv) { + that.topicAliasRecv.clear() + } - if (packet) { - debug('work :: packet pulled from queue') - that._handlePacket(packet, nextTickWork) - } else { - debug('work :: no packets in queue') - const done = completeParse - completeParse = null - debug('work :: done flag is %s', !!(done)) - if (done) done() - } - } + debug('close :: calling _setupReconnect') + that._setupReconnect() + }) + EventEmitter.call(this) - writable._write = function (buf, enc, done) { - completeParse = done - debug('writable stream :: parsing buffer') - parser.parse(buf) - work() + debug('MqttClient :: setting up stream') + this._setupStream() } - function streamErrorHandler (error) { - debug('streamErrorHandler :: error', error.message) - if (socketErrors.includes(error.code)) { - // handle error - debug('streamErrorHandler :: emitting error') - that.emit('error', error) - } else { - nop(error) - } - } + /** + * setup the event handlers in the inner stream. + * + * @api private + */ + _setupStream () { + const that = this + const writable = new Writable() + const parser = mqttPacket.parser(this.options) + let completeParse = null + const packets = [] - debug('_setupStream :: pipe stream to writable stream') - this.stream.pipe(writable) + debug('_setupStream :: calling method to clear reconnect') + this._clearReconnect() - // Suppress connection errors - this.stream.on('error', streamErrorHandler) + debug('_setupStream :: using streamBuilder provided to client to create stream') + this.stream = this.streamBuilder(this) - // Echo stream close - this.stream.on('close', function () { - debug('(%s)stream :: on close', that.options.clientId) - flushVolatile(that.outgoing) - debug('stream: emit close to MqttClient') - that.emit('close') - }) + parser.on('packet', function (packet) { + debug('parser :: on packet push to packets array.') + packets.push(packet) + }) - // Send a connect packet - debug('_setupStream: sending packet `connect`') - const connectPacket = Object.create(this.options) - connectPacket.cmd = 'connect' - if (this.topicAliasRecv) { - if (!connectPacket.properties) { - connectPacket.properties = {} - } - if (this.topicAliasRecv) { - connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max + function nextTickWork () { + if (packets.length) { + nextTick(work) + } else { + const done = completeParse + completeParse = null + done() + } } - } - // avoid message queue - sendPacket(this, connectPacket) - // Echo connection errors - parser.on('error', this.emit.bind(this, 'error')) + function work () { + debug('work :: getting next packet in queue') + const packet = packets.shift() - // auth - if (this.options.properties) { - if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) { - that.end(() => - this.emit('error', new Error('Packet has no Authentication Method') - )) - return this + if (packet) { + debug('work :: packet pulled from queue') + that._handlePacket(packet, nextTickWork) + } else { + debug('work :: no packets in queue') + const done = completeParse + completeParse = null + debug('work :: done flag is %s', !!(done)) + if (done) done() + } } - if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') { - const authPacket = xtend({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket) - sendPacket(this, authPacket) + + writable._write = function (buf, enc, done) { + completeParse = done + debug('writable stream :: parsing buffer') + parser.parse(buf) + work() } - } - // many drain listeners are needed for qos 1 callbacks if the connection is intermittent - this.stream.setMaxListeners(1000) + function streamErrorHandler (error) { + debug('streamErrorHandler :: error', error.message) + if (socketErrors.includes(error.code)) { + // handle error + debug('streamErrorHandler :: emitting error') + that.emit('error', error) + } else { + nop(error) + } + } - clearTimeout(this.connackTimer) - this.connackTimer = setTimeout(function () { - debug('!!connectTimeout hit!! Calling _cleanUp with force `true`') - that._cleanUp(true) - }, this.options.connectTimeout) -} + debug('_setupStream :: pipe stream to writable stream') + this.stream.pipe(writable) -MqttClient.prototype._handlePacket = function (packet, done) { - const options = this.options + // Suppress connection errors + this.stream.on('error', streamErrorHandler) - if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) { - this.emit('error', new Error('exceeding packets size ' + packet.cmd)) - this.end({ reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' } }) - return this - } - debug('_handlePacket :: emitting packetreceive') - this.emit('packetreceive', packet) - - switch (packet.cmd) { - case 'publish': - this._handlePublish(packet, done) - break - case 'puback': - case 'pubrec': - case 'pubcomp': - case 'suback': - case 'unsuback': - this._handleAck(packet) - done() - break - case 'pubrel': - this._handlePubrel(packet, done) - break - case 'connack': - this._handleConnack(packet) - done() - break - case 'auth': - this._handleAuth(packet) - done() - break - case 'pingresp': - this._handlePingresp(packet) - done() - break - case 'disconnect': - this._handleDisconnect(packet) - done() - break - default: - // do nothing - // maybe we should do an error handling - // or just log it - break - } -} + // Echo stream close + this.stream.on('close', function () { + debug('(%s)stream :: on close', that.options.clientId) + flushVolatile(that.outgoing) + debug('stream: emit close to MqttClient') + that.emit('close') + }) -MqttClient.prototype._checkDisconnecting = function (callback) { - if (this.disconnecting) { - if (callback && callback !== nop) { - callback(new Error('client disconnecting')) - } else { - this.emit('error', new Error('client disconnecting')) + // Send a connect packet + debug('_setupStream: sending packet `connect`') + const connectPacket = Object.create(this.options) + connectPacket.cmd = 'connect' + if (this.topicAliasRecv) { + if (!connectPacket.properties) { + connectPacket.properties = {} + } + if (this.topicAliasRecv) { + connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max + } } - } - return this.disconnecting -} + // avoid message queue + sendPacket(this, connectPacket) -/** - * publish - publish to - * - * @param {String} topic - topic to publish to - * @param {String, Buffer} message - message to publish - * @param {Object} [opts] - publish options, includes: - * {Number} qos - qos level to publish on - * {Boolean} retain - whether or not to retain the message - * {Boolean} dup - whether or not mark a message as duplicate - * {Function} cbStorePut - function(){} called when message is put into `outgoingStore` - * @param {Function} [callback] - function(err){} - * called when publish succeeds or fails - * @returns {MqttClient} this - for chaining - * @api public - * - * @example client.publish('topic', 'message'); - * @example - * client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); - * @example client.publish('topic', 'message', console.log); - */ -MqttClient.prototype.publish = function (topic, message, opts, callback) { - debug('publish :: message `%s` to topic `%s`', message, topic) - const options = this.options - - // .publish(topic, payload, cb); - if (typeof opts === 'function') { - callback = opts - opts = null - } + // Echo connection errors + parser.on('error', this.emit.bind(this, 'error')) + + // auth + if (this.options.properties) { + if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) { + that.end(() => this.emit('error', new Error('Packet has no Authentication Method') + )) + return this + } + if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') { + const authPacket = { cmd: 'auth', reasonCode: 0, ...this.options.authPacket } + sendPacket(this, authPacket) + } + } - // default opts - const defaultOpts = { qos: 0, retain: false, dup: false } - opts = xtend(defaultOpts, opts) + // many drain listeners are needed for qos 1 callbacks if the connection is intermittent + this.stream.setMaxListeners(1000) - if (this._checkDisconnecting(callback)) { - return this + clearTimeout(this.connackTimer) + this.connackTimer = setTimeout(function () { + debug('!!connectTimeout hit!! Calling _cleanUp with force `true`') + that._cleanUp(true) + }, this.options.connectTimeout) } - const that = this - const publishProc = function () { - let messageId = 0 - if (opts.qos === 1 || opts.qos === 2) { - messageId = that._nextId() - if (messageId === null) { - debug('No messageId left') - return false - } - } - const packet = { - cmd: 'publish', - topic, - payload: message, - qos: opts.qos, - retain: opts.retain, - messageId, - dup: opts.dup - } + _handlePacket (packet, done) { + const options = this.options - if (options.protocolVersion === 5) { - packet.properties = opts.properties + if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) { + this.emit('error', new Error('exceeding packets size ' + packet.cmd)) + this.end({ reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' } }) + return this } + debug('_handlePacket :: emitting packetreceive') + this.emit('packetreceive', packet) - debug('publish :: qos', opts.qos) - switch (opts.qos) { - case 1: - case 2: - // Add to callbacks - that.outgoing[packet.messageId] = { - volatile: false, - cb: callback || nop - } - debug('MqttClient:publish: packet cmd: %s', packet.cmd) - that._sendPacket(packet, undefined, opts.cbStorePut) + switch (packet.cmd) { + case 'publish': + this._handlePublish(packet, done) + break + case 'puback': + case 'pubrec': + case 'pubcomp': + case 'suback': + case 'unsuback': + this._handleAck(packet) + done() + break + case 'pubrel': + this._handlePubrel(packet, done) + break + case 'connack': + this._handleConnack(packet) + done() + break + case 'auth': + this._handleAuth(packet) + done() + break + case 'pingresp': + this._handlePingresp(packet) + done() + break + case 'disconnect': + this._handleDisconnect(packet) + done() break default: - debug('MqttClient:publish: packet cmd: %s', packet.cmd) - that._sendPacket(packet, callback, opts.cbStorePut) + // do nothing + // maybe we should do an error handling + // or just log it break } - return true } - if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) { - this._storeProcessingQueue.push( - { - invoke: publishProc, - cbStorePut: opts.cbStorePut, - callback + _checkDisconnecting (callback) { + if (this.disconnecting) { + if (callback && callback !== nop) { + callback(new Error('client disconnecting')) + } else { + this.emit('error', new Error('client disconnecting')) } - ) - } - return this -} + } + return this.disconnecting + } + + /** + * publish - publish to + * + * @param {String} topic - topic to publish to + * @param {String, Buffer} message - message to publish + * @param {Object} [opts] - publish options, includes: + * {Number} qos - qos level to publish on + * {Boolean} retain - whether or not to retain the message + * {Boolean} dup - whether or not mark a message as duplicate + * {Function} cbStorePut - function(){} called when message is put into `outgoingStore` + * @param {Function} [callback] - function(err){} + * called when publish succeeds or fails + * @returns {MqttClient} this - for chaining + * @api public + * + * @example client.publish('topic', 'message'); + * @example + * client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); + * @example client.publish('topic', 'message', console.log); + */ + publish (topic, message, opts, callback) { + debug('publish :: message `%s` to topic `%s`', message, topic) + const options = this.options + + // .publish(topic, payload, cb); + if (typeof opts === 'function') { + callback = opts + opts = null + } -/** - * subscribe - subscribe to - * - * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} - * @param {Object} [opts] - optional subscription options, includes: - * {Number} qos - subscribe qos level - * @param {Function} [callback] - function(err, granted){} where: - * {Error} err - subscription error (none at the moment!) - * {Array} granted - array of {topic: 't', qos: 0} - * @returns {MqttClient} this - for chaining - * @api public - * @example client.subscribe('topic'); - * @example client.subscribe('topic', {qos: 1}); - * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); - * @example client.subscribe('topic', console.log); - */ -MqttClient.prototype.subscribe = function () { - const that = this - const args = new Array(arguments.length) - for (let i = 0; i < arguments.length; i++) { - args[i] = arguments[i] - } - const subs = [] - let obj = args.shift() - const resubscribe = obj.resubscribe - let callback = args.pop() || nop - let opts = args.pop() - const version = this.options.protocolVersion + // default opts + const defaultOpts = { qos: 0, retain: false, dup: false } + opts = { ...defaultOpts, ...opts } + + if (this._checkDisconnecting(callback)) { + return this + } - delete obj.resubscribe + const that = this + const publishProc = function () { + let messageId = 0 + if (opts.qos === 1 || opts.qos === 2) { + messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + } + const packet = { + cmd: 'publish', + topic, + payload: message, + qos: opts.qos, + retain: opts.retain, + messageId, + dup: opts.dup + } - if (typeof obj === 'string') { - obj = [obj] - } + if (options.protocolVersion === 5) { + packet.properties = opts.properties + } - if (typeof callback !== 'function') { - opts = callback - callback = nop - } + debug('publish :: qos', opts.qos) + switch (opts.qos) { + case 1: + case 2: + // Add to callbacks + that.outgoing[packet.messageId] = { + volatile: false, + cb: callback || nop + } + debug('MqttClient:publish: packet cmd: %s', packet.cmd) + that._sendPacket(packet, undefined, opts.cbStorePut) + break + default: + debug('MqttClient:publish: packet cmd: %s', packet.cmd) + that._sendPacket(packet, callback, opts.cbStorePut) + break + } + return true + } - const invalidTopic = validations.validateTopics(obj) - if (invalidTopic !== null) { - setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) + if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) { + this._storeProcessingQueue.push( + { + invoke: publishProc, + cbStorePut: opts.cbStorePut, + callback + } + ) + } return this } - if (this._checkDisconnecting(callback)) { - debug('subscribe: discconecting true') - return this - } + /** + * subscribe - subscribe to + * + * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} + * @param {Object} [opts] - optional subscription options, includes: + * {Number} qos - subscribe qos level + * @param {Function} [callback] - function(err, granted){} where: + * {Error} err - subscription error (none at the moment!) + * {Array} granted - array of {topic: 't', qos: 0} + * @returns {MqttClient} this - for chaining + * @api public + * @example client.subscribe('topic'); + * @example client.subscribe('topic', {qos: 1}); + * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); + * @example client.subscribe('topic', console.log); + */ + subscribe () { + const that = this + const args = new Array(arguments.length) + for (let i = 0; i < arguments.length; i++) { + args[i] = arguments[i] + } + const subs = [] + let obj = args.shift() + const resubscribe = obj.resubscribe + let callback = args.pop() || nop + let opts = args.pop() + const version = this.options.protocolVersion - const defaultOpts = { - qos: 0 - } - if (version === 5) { - defaultOpts.nl = false - defaultOpts.rap = false - defaultOpts.rh = 0 - } - opts = xtend(defaultOpts, opts) - - if (Array.isArray(obj)) { - obj.forEach(function (topic) { - debug('subscribe: array topic %s', topic) - if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) || - that._resubscribeTopics[topic].qos < opts.qos || - resubscribe) { - const currentOpts = { - topic, - qos: opts.qos - } - if (version === 5) { - currentOpts.nl = opts.nl - currentOpts.rap = opts.rap - currentOpts.rh = opts.rh - currentOpts.properties = opts.properties - } - debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos) - subs.push(currentOpts) - } - }) - } else { - Object - .keys(obj) - .forEach(function (k) { - debug('subscribe: object topic %s', k) - if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) || - that._resubscribeTopics[k].qos < obj[k].qos || - resubscribe) { - const currentOpts = { - topic: k, - qos: obj[k].qos - } - if (version === 5) { - currentOpts.nl = obj[k].nl - currentOpts.rap = obj[k].rap - currentOpts.rh = obj[k].rh - currentOpts.properties = opts.properties - } - debug('subscribe: pushing `%s` to subs list', currentOpts) - subs.push(currentOpts) - } - }) - } + delete obj.resubscribe - if (!subs.length) { - callback(null, []) - return this - } + if (typeof obj === 'string') { + obj = [obj] + } + + if (typeof callback !== 'function') { + opts = callback + callback = nop + } - const subscribeProc = function () { - const messageId = that._nextId() - if (messageId === null) { - debug('No messageId left') - return false + const invalidTopic = validations.validateTopics(obj) + if (invalidTopic !== null) { + setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) + return this } - const packet = { - cmd: 'subscribe', - subscriptions: subs, - qos: 1, - retain: false, - dup: false, - messageId + if (this._checkDisconnecting(callback)) { + debug('subscribe: discconecting true') + return this } - if (opts.properties) { - packet.properties = opts.properties + const defaultOpts = { + qos: 0 } + if (version === 5) { + defaultOpts.nl = false + defaultOpts.rap = false + defaultOpts.rh = 0 + } + opts = { ...defaultOpts, ...opts } - // subscriptions to resubscribe to in case of disconnect - if (that.options.resubscribe) { - debug('subscribe :: resubscribe true') - const topics = [] - subs.forEach(function (sub) { - if (that.options.reconnectPeriod > 0) { - const topic = { qos: sub.qos } + if (Array.isArray(obj)) { + obj.forEach(function (topic) { + debug('subscribe: array topic %s', topic) + if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) || + that._resubscribeTopics[topic].qos < opts.qos || + resubscribe) { + const currentOpts = { + topic, + qos: opts.qos + } if (version === 5) { - topic.nl = sub.nl || false - topic.rap = sub.rap || false - topic.rh = sub.rh || 0 - topic.properties = sub.properties + currentOpts.nl = opts.nl + currentOpts.rap = opts.rap + currentOpts.rh = opts.rh + currentOpts.properties = opts.properties } - that._resubscribeTopics[sub.topic] = topic - topics.push(sub.topic) + debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos) + subs.push(currentOpts) } }) - that.messageIdToTopic[packet.messageId] = topics + } else { + Object + .keys(obj) + .forEach(function (k) { + debug('subscribe: object topic %s', k) + if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) || + that._resubscribeTopics[k].qos < obj[k].qos || + resubscribe) { + const currentOpts = { + topic: k, + qos: obj[k].qos + } + if (version === 5) { + currentOpts.nl = obj[k].nl + currentOpts.rap = obj[k].rap + currentOpts.rh = obj[k].rh + currentOpts.properties = opts.properties + } + debug('subscribe: pushing `%s` to subs list', currentOpts) + subs.push(currentOpts) + } + }) } - that.outgoing[packet.messageId] = { - volatile: true, - cb: function (err, packet) { - if (!err) { - const granted = packet.granted - for (let i = 0; i < granted.length; i += 1) { - subs[i].qos = granted[i] - } - } + if (!subs.length) { + callback(null, []) + return this + } - callback(err, subs) + const subscribeProc = function () { + const messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false } - } - debug('subscribe :: call _sendPacket') - that._sendPacket(packet) - return true - } - if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) { - this._storeProcessingQueue.push( - { - invoke: subscribeProc, - callback + const packet = { + cmd: 'subscribe', + subscriptions: subs, + qos: 1, + retain: false, + dup: false, + messageId } - ) - } - return this -} + if (opts.properties) { + packet.properties = opts.properties + } -/** - * unsubscribe - unsubscribe from topic(s) - * - * @param {String, Array} topic - topics to unsubscribe from - * @param {Object} [opts] - optional subscription options, includes: - * {Object} properties - properties of unsubscribe packet - * @param {Function} [callback] - callback fired on unsuback - * @returns {MqttClient} this - for chaining - * @api public - * @example client.unsubscribe('topic'); - * @example client.unsubscribe('topic', console.log); - */ -MqttClient.prototype.unsubscribe = function () { - const that = this - const args = new Array(arguments.length) - for (let i = 0; i < arguments.length; i++) { - args[i] = arguments[i] - } - let topic = args.shift() - let callback = args.pop() || nop - let opts = args.pop() - if (typeof topic === 'string') { - topic = [topic] - } + // subscriptions to resubscribe to in case of disconnect + if (that.options.resubscribe) { + debug('subscribe :: resubscribe true') + const topics = [] + subs.forEach(function (sub) { + if (that.options.reconnectPeriod > 0) { + const topic = { qos: sub.qos } + if (version === 5) { + topic.nl = sub.nl || false + topic.rap = sub.rap || false + topic.rh = sub.rh || 0 + topic.properties = sub.properties + } + that._resubscribeTopics[sub.topic] = topic + topics.push(sub.topic) + } + }) + that.messageIdToTopic[packet.messageId] = topics + } - if (typeof callback !== 'function') { - opts = callback - callback = nop - } + that.outgoing[packet.messageId] = { + volatile: true, + cb: function (err, packet) { + if (!err) { + const granted = packet.granted + for (let i = 0; i < granted.length; i += 1) { + subs[i].qos = granted[i] + } + } - const invalidTopic = validations.validateTopics(topic) - if (invalidTopic !== null) { - setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) - return this - } + callback(err, subs) + } + } + debug('subscribe :: call _sendPacket') + that._sendPacket(packet) + return true + } + + if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) { + this._storeProcessingQueue.push( + { + invoke: subscribeProc, + callback + } + ) + } - if (that._checkDisconnecting(callback)) { return this } - const unsubscribeProc = function () { - const messageId = that._nextId() - if (messageId === null) { - debug('No messageId left') - return false - } - const packet = { - cmd: 'unsubscribe', - qos: 1, - messageId + /** + * unsubscribe - unsubscribe from topic(s) + * + * @param {String, Array} topic - topics to unsubscribe from + * @param {Object} [opts] - optional subscription options, includes: + * {Object} properties - properties of unsubscribe packet + * @param {Function} [callback] - callback fired on unsuback + * @returns {MqttClient} this - for chaining + * @api public + * @example client.unsubscribe('topic'); + * @example client.unsubscribe('topic', console.log); + */ + unsubscribe () { + const that = this + const args = new Array(arguments.length) + for (let i = 0; i < arguments.length; i++) { + args[i] = arguments[i] } - + let topic = args.shift() + let callback = args.pop() || nop + let opts = args.pop() if (typeof topic === 'string') { - packet.unsubscriptions = [topic] - } else if (Array.isArray(topic)) { - packet.unsubscriptions = topic + topic = [topic] } - if (that.options.resubscribe) { - packet.unsubscriptions.forEach(function (topic) { - delete that._resubscribeTopics[topic] - }) + if (typeof callback !== 'function') { + opts = callback + callback = nop } - if (typeof opts === 'object' && opts.properties) { - packet.properties = opts.properties + const invalidTopic = validations.validateTopics(topic) + if (invalidTopic !== null) { + setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) + return this } - that.outgoing[packet.messageId] = { - volatile: true, - cb: callback + if (that._checkDisconnecting(callback)) { + return this } - debug('unsubscribe: call _sendPacket') - that._sendPacket(packet) + const unsubscribeProc = function () { + const messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + const packet = { + cmd: 'unsubscribe', + qos: 1, + messageId + } - return true - } + if (typeof topic === 'string') { + packet.unsubscriptions = [topic] + } else if (Array.isArray(topic)) { + packet.unsubscriptions = topic + } - if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) { - this._storeProcessingQueue.push( - { - invoke: unsubscribeProc, - callback + if (that.options.resubscribe) { + packet.unsubscriptions.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) } - ) - } - return this -} + if (typeof opts === 'object' && opts.properties) { + packet.properties = opts.properties + } -/** - * end - close connection - * - * @returns {MqttClient} this - for chaining - * @param {Boolean} force - do not wait for all in-flight messages to be acked - * @param {Object} opts - added to the disconnect packet - * @param {Function} cb - called when the client has been closed - * - * @api public - */ -MqttClient.prototype.end = function (force, opts, cb) { - const that = this + that.outgoing[packet.messageId] = { + volatile: true, + cb: callback + } - debug('end :: (%s)', this.options.clientId) + debug('unsubscribe: call _sendPacket') + that._sendPacket(packet) - if (force == null || typeof force !== 'boolean') { - cb = opts || nop - opts = force - force = false - if (typeof opts !== 'object') { - cb = opts - opts = null - if (typeof cb !== 'function') { - cb = nop - } + return true } - } - - if (typeof opts !== 'object') { - cb = opts - opts = null - } - debug('end :: cb? %s', !!cb) - cb = cb || nop - - function closeStores () { - debug('end :: closeStores: closing incoming and outgoing stores') - that.disconnected = true - that.incomingStore.close(function (e1) { - that.outgoingStore.close(function (e2) { - debug('end :: closeStores: emitting end') - that.emit('end') - if (cb) { - const err = e1 || e2 - debug('end :: closeStores: invoking callback with args') - cb(err) + if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) { + this._storeProcessingQueue.push( + { + invoke: unsubscribeProc, + callback } - }) - }) - if (that._deferredReconnect) { - that._deferredReconnect() + ) } - } - function finish () { - // defer closesStores of an I/O cycle, - // just to make sure things are - // ok for websockets - debug('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force) - that._cleanUp(force, () => { - debug('end :: finish :: calling process.nextTick on closeStores') - // const boundProcess = nextTick.bind(null, closeStores) - nextTick(closeStores.bind(that)) - }, opts) - } - - if (this.disconnecting) { - cb() return this } - this._clearReconnect() - - this.disconnecting = true + /** + * end - close connection + * + * @returns {MqttClient} this - for chaining + * @param {Boolean} force - do not wait for all in-flight messages to be acked + * @param {Object} opts - added to the disconnect packet + * @param {Function} cb - called when the client has been closed + * + * @api public + */ + end (force, opts, cb) { + const that = this + + debug('end :: (%s)', this.options.clientId) + + if (force == null || typeof force !== 'boolean') { + cb = opts || nop + opts = force + force = false + if (typeof opts !== 'object') { + cb = opts + opts = null + if (typeof cb !== 'function') { + cb = nop + } + } + } - if (!force && Object.keys(this.outgoing).length > 0) { - // wait 10ms, just to be sure we received all of it - debug('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId) - this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) - } else { - debug('end :: (%s) :: immediately calling finish', that.options.clientId) - finish() - } + if (typeof opts !== 'object') { + cb = opts + opts = null + } - return this -} + debug('end :: cb? %s', !!cb) + cb = cb || nop + + function closeStores () { + debug('end :: closeStores: closing incoming and outgoing stores') + that.disconnected = true + that.incomingStore.close(function (e1) { + that.outgoingStore.close(function (e2) { + debug('end :: closeStores: emitting end') + that.emit('end') + if (cb) { + const err = e1 || e2 + debug('end :: closeStores: invoking callback with args') + cb(err) + } + }) + }) + if (that._deferredReconnect) { + that._deferredReconnect() + } + } -/** - * removeOutgoingMessage - remove a message in outgoing store - * the outgoing callback will be called withe Error('Message removed') if the message is removed - * - * @param {Number} messageId - messageId to remove message - * @returns {MqttClient} this - for chaining - * @api public - * - * @example client.removeOutgoingMessage(client.getLastAllocated()); - */ -MqttClient.prototype.removeOutgoingMessage = function (messageId) { - const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null - delete this.outgoing[messageId] - this.outgoingStore.del({ messageId }, function () { - cb(new Error('Message removed')) - }) - return this -} + function finish () { + // defer closesStores of an I/O cycle, + // just to make sure things are + // ok for websockets + debug('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force) + that._cleanUp(force, () => { + debug('end :: finish :: calling process.nextTick on closeStores') + // const boundProcess = nextTick.bind(null, closeStores) + nextTick(closeStores.bind(that)) + }, opts) + } -/** - * reconnect - connect again using the same options as connect() - * - * @param {Object} [opts] - optional reconnect options, includes: - * {Store} incomingStore - a store for the incoming packets - * {Store} outgoingStore - a store for the outgoing packets - * if opts is not given, current stores are used - * @returns {MqttClient} this - for chaining - * - * @api public - */ -MqttClient.prototype.reconnect = function (opts) { - debug('client reconnect') - const that = this - const f = function () { - if (opts) { - that.options.incomingStore = opts.incomingStore - that.options.outgoingStore = opts.outgoingStore + if (this.disconnecting) { + cb() + return this + } + + this._clearReconnect() + + this.disconnecting = true + + if (!force && Object.keys(this.outgoing).length > 0) { + // wait 10ms, just to be sure we received all of it + debug('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId) + this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) } else { - that.options.incomingStore = null - that.options.outgoingStore = null - } - that.incomingStore = that.options.incomingStore || new Store() - that.outgoingStore = that.options.outgoingStore || new Store() - that.disconnecting = false - that.disconnected = false - that._deferredReconnect = null - that._reconnect() - } + debug('end :: (%s) :: immediately calling finish', that.options.clientId) + finish() + } - if (this.disconnecting && !this.disconnected) { - this._deferredReconnect = f - } else { - f() + return this } - return this -} -/** - * _reconnect - implement reconnection - * @api privateish - */ -MqttClient.prototype._reconnect = function () { - debug('_reconnect: emitting reconnect to client') - this.emit('reconnect') - if (this.connected) { - this.end(() => { this._setupStream() }) - debug('client already connected. disconnecting first.') - } else { - debug('_reconnect: calling _setupStream') - this._setupStream() + /** + * removeOutgoingMessage - remove a message in outgoing store + * the outgoing callback will be called withe Error('Message removed') if the message is removed + * + * @param {Number} messageId - messageId to remove message + * @returns {MqttClient} this - for chaining + * @api public + * + * @example client.removeOutgoingMessage(client.getLastAllocated()); + */ + removeOutgoingMessage (messageId) { + const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null + delete this.outgoing[messageId] + this.outgoingStore.del({ messageId }, function () { + cb(new Error('Message removed')) + }) + return this } -} -/** - * _setupReconnect - setup reconnect timer - */ -MqttClient.prototype._setupReconnect = function () { - const that = this - - if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) { - if (!this.reconnecting) { - debug('_setupReconnect :: emit `offline` state') - this.emit('offline') - debug('_setupReconnect :: set `reconnecting` to `true`') - this.reconnecting = true - } - debug('_setupReconnect :: setting reconnectTimer for %d ms', that.options.reconnectPeriod) - that.reconnectTimer = setInterval(function () { - debug('reconnectTimer :: reconnect triggered!') + /** + * reconnect - connect again using the same options as connect() + * + * @param {Object} [opts] - optional reconnect options, includes: + * {Store} incomingStore - a store for the incoming packets + * {Store} outgoingStore - a store for the outgoing packets + * if opts is not given, current stores are used + * @returns {MqttClient} this - for chaining + * + * @api public + */ + reconnect (opts) { + debug('client reconnect') + const that = this + const f = function () { + if (opts) { + that.options.incomingStore = opts.incomingStore + that.options.outgoingStore = opts.outgoingStore + } else { + that.options.incomingStore = null + that.options.outgoingStore = null + } + that.incomingStore = that.options.incomingStore || new Store() + that.outgoingStore = that.options.outgoingStore || new Store() + that.disconnecting = false + that.disconnected = false + that._deferredReconnect = null that._reconnect() - }, that.options.reconnectPeriod) - } else { - debug('_setupReconnect :: doing nothing...') - } -} + } -/** - * _clearReconnect - clear the reconnect timer - */ -MqttClient.prototype._clearReconnect = function () { - debug('_clearReconnect : clearing reconnect timer') - if (this.reconnectTimer) { - clearInterval(this.reconnectTimer) - this.reconnectTimer = null + if (this.disconnecting && !this.disconnected) { + this._deferredReconnect = f + } else { + f() + } + return this } -} -/** - * _cleanUp - clean up on connection end - * @api private - */ -MqttClient.prototype._cleanUp = function (forced, done) { - const opts = arguments[2] - if (done) { - debug('_cleanUp :: done callback provided for on stream close') - this.stream.on('close', done) + /** + * _reconnect - implement reconnection + * @api privateish + */ + _reconnect () { + debug('_reconnect: emitting reconnect to client') + this.emit('reconnect') + if (this.connected) { + this.end(() => { this._setupStream() }) + debug('client already connected. disconnecting first.') + } else { + debug('_reconnect: calling _setupStream') + this._setupStream() + } } - debug('_cleanUp :: forced? %s', forced) - if (forced) { - if ((this.options.reconnectPeriod === 0) && this.options.clean) { - flush(this.outgoing) - } - debug('_cleanUp :: (%s) :: destroying stream', this.options.clientId) - this.stream.destroy() - } else { - const packet = xtend({ cmd: 'disconnect' }, opts) - debug('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId) - this._sendPacket( - packet, - setImmediate.bind( - null, - this.stream.end.bind(this.stream) - ) - ) - } + /** + * _setupReconnect - setup reconnect timer + */ + _setupReconnect () { + const that = this - if (!this.disconnecting) { - debug('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.') - this._clearReconnect() - this._setupReconnect() + if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) { + if (!this.reconnecting) { + debug('_setupReconnect :: emit `offline` state') + this.emit('offline') + debug('_setupReconnect :: set `reconnecting` to `true`') + this.reconnecting = true + } + debug('_setupReconnect :: setting reconnectTimer for %d ms', that.options.reconnectPeriod) + that.reconnectTimer = setInterval(function () { + debug('reconnectTimer :: reconnect triggered!') + that._reconnect() + }, that.options.reconnectPeriod) + } else { + debug('_setupReconnect :: doing nothing...') + } } - if (this.pingTimer !== null) { - debug('_cleanUp :: clearing pingTimer') - this.pingTimer.clear() - this.pingTimer = null + /** + * _clearReconnect - clear the reconnect timer + */ + _clearReconnect () { + debug('_clearReconnect : clearing reconnect timer') + if (this.reconnectTimer) { + clearInterval(this.reconnectTimer) + this.reconnectTimer = null + } } - if (done && !this.connected) { - debug('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId) - this.stream.removeListener('close', done) - done() - } -} + /** + * _cleanUp - clean up on connection end + * @api private + */ + _cleanUp (forced, done) { + const opts = arguments[2] + if (done) { + debug('_cleanUp :: done callback provided for on stream close') + this.stream.on('close', done) + } -/** - * _sendPacket - send or queue a packet - * @param {Object} packet - packet options - * @param {Function} cb - callback when the packet is sent - * @param {Function} cbStorePut - called when message is put into outgoingStore - * @param {Boolean} noStore - send without put to the store - * @api private - */ -MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut, noStore) { - debug('_sendPacket :: (%s) :: start', this.options.clientId) - cbStorePut = cbStorePut || nop - cb = cb || nop - - const err = applyTopicAlias(this, packet) - if (err) { - cb(err) - return - } + debug('_cleanUp :: forced? %s', forced) + if (forced) { + if ((this.options.reconnectPeriod === 0) && this.options.clean) { + flush(this.outgoing) + } + debug('_cleanUp :: (%s) :: destroying stream', this.options.clientId) + this.stream.destroy() + } else { + const packet = { cmd: 'disconnect', ...opts } + debug('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId) + this._sendPacket( + packet, + setImmediate.bind( + null, + this.stream.end.bind(this.stream) + ) + ) + } - if (!this.connected) { - // allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth) - if (packet.cmd === 'auth') { - this._shiftPingInterval() - sendPacket(this, packet, cb) - return + if (!this.disconnecting) { + debug('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.') + this._clearReconnect() + this._setupReconnect() } - debug('_sendPacket :: client not connected. Storing packet offline.') - this._storePacket(packet, cb, cbStorePut) - return - } + if (this.pingTimer !== null) { + debug('_cleanUp :: clearing pingTimer') + this.pingTimer.clear() + this.pingTimer = null + } - // When sending a packet, reschedule the ping timer - this._shiftPingInterval() - - // If "noStore" is true, the message is sent without being recorded in the store. - // Messages that have not received puback or pubcomp remain in the store after disconnection - // and are resent from the store upon reconnection. - // For resend upon reconnection, "noStore" is set to true. This is because the message is already stored in the store. - // This is to avoid interrupting other processes while recording to the store. - if (noStore) { - sendPacket(this, packet, cb) - return + if (done && !this.connected) { + debug('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId) + this.stream.removeListener('close', done) + done() + } } - switch (packet.cmd) { - case 'publish': - break - case 'pubrel': - storeAndSend(this, packet, cb, cbStorePut) + /** + * _sendPacket - send or queue a packet + * @param {Object} packet - packet options + * @param {Function} cb - callback when the packet is sent + * @param {Function} cbStorePut - called when message is put into outgoingStore + * @param {Boolean} noStore - send without put to the store + * @api private + */ + _sendPacket (packet, cb, cbStorePut, noStore) { + debug('_sendPacket :: (%s) :: start', this.options.clientId) + cbStorePut = cbStorePut || nop + cb = cb || nop + + const err = applyTopicAlias(this, packet) + if (err) { + cb(err) return - default: - sendPacket(this, packet, cb) + } + + if (!this.connected) { + // allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth) + if (packet.cmd === 'auth') { + this._shiftPingInterval() + sendPacket(this, packet, cb) + return + } + + debug('_sendPacket :: client not connected. Storing packet offline.') + this._storePacket(packet, cb, cbStorePut) return - } + } + + // When sending a packet, reschedule the ping timer + this._shiftPingInterval() - switch (packet.qos) { - case 2: - case 1: - storeAndSend(this, packet, cb, cbStorePut) - break - /** - * no need of case here since it will be caught by default - * and jshint comply that before default it must be a break - * anyway it will result in -1 evaluation - */ - case 0: - /* falls through */ - default: + // If "noStore" is true, the message is sent without being recorded in the store. + // Messages that have not received puback or pubcomp remain in the store after disconnection + // and are resent from the store upon reconnection. + // For resend upon reconnection, "noStore" is set to true. This is because the message is already stored in the store. + // This is to avoid interrupting other processes while recording to the store. + if (noStore) { sendPacket(this, packet, cb) - break - } - debug('_sendPacket :: (%s) :: end', this.options.clientId) -} + return + } -/** - * _storePacket - queue a packet - * @param {Object} packet - packet options - * @param {Function} cb - callback when the packet is sent - * @param {Function} cbStorePut - called when message is put into outgoingStore - * @api private - */ -MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { - debug('_storePacket :: packet: %o', packet) - debug('_storePacket :: cb? %s', !!cb) - cbStorePut = cbStorePut || nop + switch (packet.cmd) { + case 'publish': + break + case 'pubrel': + storeAndSend(this, packet, cb, cbStorePut) + return + default: + sendPacket(this, packet, cb) + return + } - let storePacket = packet - if (storePacket.cmd === 'publish') { - // The original packet is for sending. - // The cloned storePacket is for storing to resend on reconnect. - // Topic Alias must not be used after disconnected. - storePacket = clone(packet) - const err = removeTopicAliasAndRecoverTopicName(this, storePacket) - if (err) { - return cb && cb(err) + switch (packet.qos) { + case 2: + case 1: + storeAndSend(this, packet, cb, cbStorePut) + break + /** + * no need of case here since it will be caught by default + * and jshint comply that before default it must be a break + * anyway it will result in -1 evaluation + */ + case 0: + /* falls through */ + default: + sendPacket(this, packet, cb) + break } - } - // check that the packet is not a qos of 0, or that the command is not a publish - if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') { - this.queue.push({ packet: storePacket, cb }) - } else if (storePacket.qos > 0) { - cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null - this.outgoingStore.put(storePacket, function (err) { + debug('_sendPacket :: (%s) :: end', this.options.clientId) + } + + /** + * _storePacket - queue a packet + * @param {Object} packet - packet options + * @param {Function} cb - callback when the packet is sent + * @param {Function} cbStorePut - called when message is put into outgoingStore + * @api private + */ + _storePacket (packet, cb, cbStorePut) { + debug('_storePacket :: packet: %o', packet) + debug('_storePacket :: cb? %s', !!cb) + cbStorePut = cbStorePut || nop + + let storePacket = packet + if (storePacket.cmd === 'publish') { + // The original packet is for sending. + // The cloned storePacket is for storing to resend on reconnect. + // Topic Alias must not be used after disconnected. + storePacket = clone(packet) + const err = removeTopicAliasAndRecoverTopicName(this, storePacket) if (err) { return cb && cb(err) } - cbStorePut() - }) - } else if (cb) { - cb(new Error('No connection to broker')) + } + // check that the packet is not a qos of 0, or that the command is not a publish + if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') { + this.queue.push({ packet: storePacket, cb }) + } else if (storePacket.qos > 0) { + cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null + this.outgoingStore.put(storePacket, function (err) { + if (err) { + return cb && cb(err) + } + cbStorePut() + }) + } else if (cb) { + cb(new Error('No connection to broker')) + } } -} -/** - * _setupPingTimer - setup the ping timer - * - * @api private - */ -MqttClient.prototype._setupPingTimer = function () { - debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive) - const that = this - - if (!this.pingTimer && this.options.keepalive) { - this.pingResp = true - this.pingTimer = reInterval(function () { - that._checkPing() - }, this.options.keepalive * 1000) + /** + * _setupPingTimer - setup the ping timer + * + * @api private + */ + _setupPingTimer () { + debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive) + const that = this + + if (!this.pingTimer && this.options.keepalive) { + this.pingResp = true + this.pingTimer = reInterval(function () { + that._checkPing() + }, this.options.keepalive * 1000) + } } -} -/** - * _shiftPingInterval - reschedule the ping interval - * - * @api private - */ -MqttClient.prototype._shiftPingInterval = function () { - if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) { - this.pingTimer.reschedule(this.options.keepalive * 1000) + /** + * _shiftPingInterval - reschedule the ping interval + * + * @api private + */ + _shiftPingInterval () { + if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) { + this.pingTimer.reschedule(this.options.keepalive * 1000) + } } -} -/** - * _checkPing - check if a pingresp has come back, and ping the server again - * - * @api private - */ -MqttClient.prototype._checkPing = function () { - debug('_checkPing :: checking ping...') - if (this.pingResp) { - debug('_checkPing :: ping response received. Clearing flag and sending `pingreq`') - this.pingResp = false - this._sendPacket({ cmd: 'pingreq' }) - } else { - // do a forced cleanup since socket will be in bad shape - debug('_checkPing :: calling _cleanUp with force true') - this._cleanUp(true) + + /** + * _checkPing - check if a pingresp has come back, and ping the server again + * + * @api private + */ + _checkPing () { + debug('_checkPing :: checking ping...') + if (this.pingResp) { + debug('_checkPing :: ping response received. Clearing flag and sending `pingreq`') + this.pingResp = false + this._sendPacket({ cmd: 'pingreq' }) + } else { + // do a forced cleanup since socket will be in bad shape + debug('_checkPing :: calling _cleanUp with force true') + this._cleanUp(true) + } } -} -/** - * _handlePingresp - handle a pingresp - * - * @api private - */ -MqttClient.prototype._handlePingresp = function () { - this.pingResp = true -} + /** + * _handlePingresp - handle a pingresp + * + * @api private + */ + _handlePingresp () { + this.pingResp = true + } -/** - * _handleConnack - * - * @param {Object} packet - * @api private - */ -MqttClient.prototype._handleConnack = function (packet) { - debug('_handleConnack') - const options = this.options - const version = options.protocolVersion - const rc = version === 5 ? packet.reasonCode : packet.returnCode + /** + * _handleConnack + * + * @param {Object} packet + * @api private + */ + _handleConnack (packet) { + debug('_handleConnack') + const options = this.options + const version = options.protocolVersion + const rc = version === 5 ? packet.reasonCode : packet.returnCode - clearTimeout(this.connackTimer) - delete this.topicAliasSend + clearTimeout(this.connackTimer) + delete this.topicAliasSend - if (packet.properties) { - if (packet.properties.topicAliasMaximum) { - if (packet.properties.topicAliasMaximum > 0xffff) { - this.emit('error', new Error('topicAliasMaximum from broker is out of range')) - return + if (packet.properties) { + if (packet.properties.topicAliasMaximum) { + if (packet.properties.topicAliasMaximum > 0xffff) { + this.emit('error', new Error('topicAliasMaximum from broker is out of range')) + return + } + if (packet.properties.topicAliasMaximum > 0) { + this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum) + } } - if (packet.properties.topicAliasMaximum > 0) { - this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum) + if (packet.properties.serverKeepAlive && options.keepalive) { + options.keepalive = packet.properties.serverKeepAlive + this._shiftPingInterval() + } + if (packet.properties.maximumPacketSize) { + if (!options.properties) { options.properties = {} } + options.properties.maximumPacketSize = packet.properties.maximumPacketSize } } - if (packet.properties.serverKeepAlive && options.keepalive) { - options.keepalive = packet.properties.serverKeepAlive - this._shiftPingInterval() - } - if (packet.properties.maximumPacketSize) { - if (!options.properties) { options.properties = {} } - options.properties.maximumPacketSize = packet.properties.maximumPacketSize - } - } - - if (rc === 0) { - this.reconnecting = false - this._onConnect(packet) - } else if (rc > 0) { - const err = new Error('Connection refused: ' + errors[rc]) - err.code = rc - this.emit('error', err) - } -} - -MqttClient.prototype._handleAuth = function (packet) { - const options = this.options - const version = options.protocolVersion - const rc = version === 5 ? packet.reasonCode : packet.returnCode - if (version !== 5) { - const err = new Error('Protocol error: Auth packets are only supported in MQTT 5. Your version:' + version) - err.code = rc - this.emit('error', err) - return + if (rc === 0) { + this.reconnecting = false + this._onConnect(packet) + } else if (rc > 0) { + const err = new Error('Connection refused: ' + errors[rc]) + err.code = rc + this.emit('error', err) + } } - const that = this - this.handleAuth(packet, function (err, packet) { - if (err) { - that.emit('error', err) - return - } + _handleAuth (packet) { + const options = this.options + const version = options.protocolVersion + const rc = version === 5 ? packet.reasonCode : packet.returnCode - if (rc === 24) { - that.reconnecting = false - that._sendPacket(packet) - } else { - const error = new Error('Connection refused: ' + errors[rc]) + if (version !== 5) { + const err = new Error('Protocol error: Auth packets are only supported in MQTT 5. Your version:' + version) err.code = rc - that.emit('error', error) + this.emit('error', err) + return } - }) -} -/** - * @param packet the packet received by the broker - * @return the auth packet to be returned to the broker - * @api public - */ -MqttClient.prototype.handleAuth = function (packet, callback) { - callback() -} + const that = this + this.handleAuth(packet, function (err, packet) { + if (err) { + that.emit('error', err) + return + } -/** - * _handlePublish - * - * @param {Object} packet - * @api private - */ -/* -those late 2 case should be rewrite to comply with coding style: - -case 1: -case 0: - // do not wait sending a puback - // no callback passed - if (1 === qos) { - this._sendPacket({ - cmd: 'puback', - messageId: messageId - }); + if (rc === 24) { + that.reconnecting = false + that._sendPacket(packet) + } else { + const error = new Error('Connection refused: ' + errors[rc]) + err.code = rc + that.emit('error', error) + } + }) } - // emit the message event for both qos 1 and 0 - this.emit('message', topic, message, packet); - this.handleMessage(packet, done); - break; -default: - // do nothing but every switch mus have a default - // log or throw an error about unknown qos - break; - -for now i just suppressed the warnings -*/ -MqttClient.prototype._handlePublish = function (packet, done) { - debug('_handlePublish: packet %o', packet) - done = typeof done !== 'undefined' ? done : nop - let topic = packet.topic.toString() - const message = packet.payload - const qos = packet.qos - const messageId = packet.messageId - const that = this - const options = this.options - const validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] - if (this.options.protocolVersion === 5) { - let alias - if (packet.properties) { - alias = packet.properties.topicAlias - } - if (typeof alias !== 'undefined') { - if (topic.length === 0) { - if (alias > 0 && alias <= 0xffff) { - const gotTopic = this.topicAliasRecv.getTopicByAlias(alias) - if (gotTopic) { - topic = gotTopic - debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias) + + /** + * @param packet the packet received by the broker + * @return the auth packet to be returned to the broker + * @api public + */ + handleAuth (packet, callback) { + callback() + } + + /** + * _handlePublish + * + * @param {Object} packet + * @api private + */ + /* + those late 2 case should be rewrite to comply with coding style: + + case 1: + case 0: + // do not wait sending a puback + // no callback passed + if (1 === qos) { + this._sendPacket({ + cmd: 'puback', + messageId: messageId + }); + } + // emit the message event for both qos 1 and 0 + this.emit('message', topic, message, packet); + this.handleMessage(packet, done); + break; + default: + // do nothing but every switch mus have a default + // log or throw an error about unknown qos + break; + + for now i just suppressed the warnings + */ + _handlePublish (packet, done) { + debug('_handlePublish: packet %o', packet) + done = typeof done !== 'undefined' ? done : nop + let topic = packet.topic.toString() + const message = packet.payload + const qos = packet.qos + const messageId = packet.messageId + const that = this + const options = this.options + const validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] + if (this.options.protocolVersion === 5) { + let alias + if (packet.properties) { + alias = packet.properties.topicAlias + } + if (typeof alias !== 'undefined') { + if (topic.length === 0) { + if (alias > 0 && alias <= 0xffff) { + const gotTopic = this.topicAliasRecv.getTopicByAlias(alias) + if (gotTopic) { + topic = gotTopic + debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias) + } else { + debug('_handlePublish :: unregistered topic alias. alias: %d', alias) + this.emit('error', new Error('Received unregistered Topic Alias')) + return + } } else { - debug('_handlePublish :: unregistered topic alias. alias: %d', alias) - this.emit('error', new Error('Received unregistered Topic Alias')) + debug('_handlePublish :: topic alias out of range. alias: %d', alias) + this.emit('error', new Error('Received Topic Alias is out of range')) return } } else { - debug('_handlePublish :: topic alias out of range. alias: %d', alias) - this.emit('error', new Error('Received Topic Alias is out of range')) - return - } - } else { - if (this.topicAliasRecv.put(topic, alias)) { - debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias) - } else { - debug('_handlePublish :: topic alias out of range. alias: %d', alias) - this.emit('error', new Error('Received Topic Alias is out of range')) - return + if (this.topicAliasRecv.put(topic, alias)) { + debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias) + } else { + debug('_handlePublish :: topic alias out of range. alias: %d', alias) + this.emit('error', new Error('Received Topic Alias is out of range')) + return + } } } } - } - debug('_handlePublish: qos %d', qos) - switch (qos) { - case 2: { - options.customHandleAcks(topic, message, packet, function (error, code) { - if (!(error instanceof Error)) { - code = error - error = null - } - if (error) { return that.emit('error', error) } - if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) } - if (code) { - that._sendPacket({ cmd: 'pubrec', messageId, reasonCode: code }, done) - } else { - that.incomingStore.put(packet, function () { - that._sendPacket({ cmd: 'pubrec', messageId }, done) - }) - } - }) - break - } - case 1: { - // emit the message event - options.customHandleAcks(topic, message, packet, function (error, code) { - if (!(error instanceof Error)) { - code = error - error = null - } - if (error) { return that.emit('error', error) } - if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) } - if (!code) { that.emit('message', topic, message, packet) } - that.handleMessage(packet, function (err) { - if (err) { - return done && done(err) + debug('_handlePublish: qos %d', qos) + switch (qos) { + case 2: { + options.customHandleAcks(topic, message, packet, function (error, code) { + if (!(error instanceof Error)) { + code = error + error = null + } + if (error) { return that.emit('error', error) } + if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) } + if (code) { + that._sendPacket({ cmd: 'pubrec', messageId, reasonCode: code }, done) + } else { + that.incomingStore.put(packet, function () { + that._sendPacket({ cmd: 'pubrec', messageId }, done) + }) } - that._sendPacket({ cmd: 'puback', messageId, reasonCode: code }, done) }) - }) - break - } - case 0: - // emit the message event - this.emit('message', topic, message, packet) - this.handleMessage(packet, done) - break - default: - // do nothing - debug('_handlePublish: unknown QoS. Doing nothing.') - // log or throw an error about unknown qos - break + break + } + case 1: { + // emit the message event + options.customHandleAcks(topic, message, packet, function (error, code) { + if (!(error instanceof Error)) { + code = error + error = null + } + if (error) { return that.emit('error', error) } + if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) } + if (!code) { that.emit('message', topic, message, packet) } + that.handleMessage(packet, function (err) { + if (err) { + return done && done(err) + } + that._sendPacket({ cmd: 'puback', messageId, reasonCode: code }, done) + }) + }) + break + } + case 0: + // emit the message event + this.emit('message', topic, message, packet) + this.handleMessage(packet, done) + break + default: + // do nothing + debug('_handlePublish: unknown QoS. Doing nothing.') + // log or throw an error about unknown qos + break + } } -} - -/** - * Handle messages with backpressure support, one at a time. - * Override at will. - * - * @param Packet packet the packet - * @param Function callback call when finished - * @api public - */ -MqttClient.prototype.handleMessage = function (packet, callback) { - callback() -} -/** - * _handleAck - * - * @param {Object} packet - * @api private - */ - -MqttClient.prototype._handleAck = function (packet) { - /* eslint no-fallthrough: "off" */ - const messageId = packet.messageId - const type = packet.cmd - let response = null - const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null - const that = this - let err - - // Checking `!cb` happens to work, but it's not technically "correct". - // - // Why? This code assumes that "no callback" is the same as that "we're not - // waiting for responses" (puback, pubrec, pubcomp, suback, or unsuback). - // - // It would be better to check `if (!this.outgoing[messageId])` here, but - // there's no reason to change it and risk (another) regression. - // - // The only reason this code works is becaues code in MqttClient.publish, - // MqttClinet.subscribe, and MqttClient.unsubscribe ensures that we will - // have a callback even if the user doesn't pass one in.) - if (!cb) { - debug('_handleAck :: Server sent an ack in error. Ignoring.') - // Server sent an ack in error, ignore it. - return - } + /** + * Handle messages with backpressure support, one at a time. + * Override at will. + * + * @param Packet packet the packet + * @param Function callback call when finished + * @api public + */ + handleMessage (packet, callback) { + callback() + } + + /** + * _handleAck + * + * @param {Object} packet + * @api private + */ + _handleAck (packet) { + /* eslint no-fallthrough: "off" */ + const messageId = packet.messageId + const type = packet.cmd + let response = null + const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null + const that = this + let err + + // Checking `!cb` happens to work, but it's not technically "correct". + // + // Why? This code assumes that "no callback" is the same as that "we're not + // waiting for responses" (puback, pubrec, pubcomp, suback, or unsuback). + // + // It would be better to check `if (!this.outgoing[messageId])` here, but + // there's no reason to change it and risk (another) regression. + // + // The only reason this code works is becaues code in MqttClient.publish, + // MqttClinet.subscribe, and MqttClient.unsubscribe ensures that we will + // have a callback even if the user doesn't pass one in.) + if (!cb) { + debug('_handleAck :: Server sent an ack in error. Ignoring.') + // Server sent an ack in error, ignore it. + return + } - // Process - debug('_handleAck :: packet type', type) - switch (type) { - case 'pubcomp': - // same thing as puback for QoS 2 - case 'puback': { - const pubackRC = packet.reasonCode - // Callback - we're done - if (pubackRC && pubackRC > 0 && pubackRC !== 16) { - err = new Error('Publish error: ' + errors[pubackRC]) - err.code = pubackRC - cb(err, packet) - } - delete this.outgoing[messageId] - this.outgoingStore.del(packet, cb) - this.messageIdProvider.deallocate(messageId) - this._invokeStoreProcessingQueue() - break - } - case 'pubrec': { - response = { - cmd: 'pubrel', - qos: 2, - messageId + // Process + debug('_handleAck :: packet type', type) + switch (type) { + case 'pubcomp': + // same thing as puback for QoS 2 + case 'puback': { + const pubackRC = packet.reasonCode + // Callback - we're done + if (pubackRC && pubackRC > 0 && pubackRC !== 16) { + err = new Error('Publish error: ' + errors[pubackRC]) + err.code = pubackRC + cb(err, packet) + } + delete this.outgoing[messageId] + this.outgoingStore.del(packet, cb) + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + break } - const pubrecRC = packet.reasonCode + case 'pubrec': { + response = { + cmd: 'pubrel', + qos: 2, + messageId + } + const pubrecRC = packet.reasonCode - if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { - err = new Error('Publish error: ' + errors[pubrecRC]) - err.code = pubrecRC - cb(err, packet) - } else { - this._sendPacket(response) + if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { + err = new Error('Publish error: ' + errors[pubrecRC]) + err.code = pubrecRC + cb(err, packet) + } else { + this._sendPacket(response) + } + break } - break - } - case 'suback': { - delete this.outgoing[messageId] - this.messageIdProvider.deallocate(messageId) - for (let grantedI = 0; grantedI < packet.granted.length; grantedI++) { - if ((packet.granted[grantedI] & 0x80) !== 0) { - // suback with Failure status - const topics = this.messageIdToTopic[messageId] - if (topics) { - topics.forEach(function (topic) { - delete that._resubscribeTopics[topic] - }) + case 'suback': { + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + for (let grantedI = 0; grantedI < packet.granted.length; grantedI++) { + if ((packet.granted[grantedI] & 0x80) !== 0) { + // suback with Failure status + const topics = this.messageIdToTopic[messageId] + if (topics) { + topics.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } } } + delete this.messageIdToTopic[messageId] + this._invokeStoreProcessingQueue() + cb(null, packet) + break } - delete this.messageIdToTopic[messageId] - this._invokeStoreProcessingQueue() - cb(null, packet) - break - } - case 'unsuback': { - delete this.outgoing[messageId] - this.messageIdProvider.deallocate(messageId) - this._invokeStoreProcessingQueue() - cb(null) - break - } - default: - that.emit('error', new Error('unrecognized packet type')) - } + case 'unsuback': { + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + cb(null) + break + } + default: + that.emit('error', new Error('unrecognized packet type')) + } - if (this.disconnecting && - Object.keys(this.outgoing).length === 0) { - this.emit('outgoingEmpty') + if (this.disconnecting && + Object.keys(this.outgoing).length === 0) { + this.emit('outgoingEmpty') + } } -} -/** - * _handlePubrel - * - * @param {Object} packet - * @api private - */ -MqttClient.prototype._handlePubrel = function (packet, callback) { - debug('handling pubrel packet') - callback = typeof callback !== 'undefined' ? callback : nop - const messageId = packet.messageId - const that = this - - const comp = { cmd: 'pubcomp', messageId } - - that.incomingStore.get(packet, function (err, pub) { - if (!err) { - that.emit('message', pub.topic, pub.payload, pub) - that.handleMessage(pub, function (err) { - if (err) { - return callback(err) - } - that.incomingStore.del(pub, nop) + /** + * _handlePubrel + * + * @param {Object} packet + * @api private + */ + _handlePubrel (packet, callback) { + debug('handling pubrel packet') + callback = typeof callback !== 'undefined' ? callback : nop + const messageId = packet.messageId + const that = this + + const comp = { cmd: 'pubcomp', messageId } + + that.incomingStore.get(packet, function (err, pub) { + if (!err) { + that.emit('message', pub.topic, pub.payload, pub) + that.handleMessage(pub, function (err) { + if (err) { + return callback(err) + } + that.incomingStore.del(pub, nop) + that._sendPacket(comp, callback) + }) + } else { that._sendPacket(comp, callback) - }) - } else { - that._sendPacket(comp, callback) - } - }) -} - -/** - * _handleDisconnect - * - * @param {Object} packet - * @api private - */ -MqttClient.prototype._handleDisconnect = function (packet) { - this.emit('disconnect', packet) -} - -/** - * _nextId - * @return unsigned int - */ -MqttClient.prototype._nextId = function () { - return this.messageIdProvider.allocate() -} - -/** - * getLastMessageId - * @return unsigned int - */ -MqttClient.prototype.getLastMessageId = function () { - return this.messageIdProvider.getLastAllocated() -} + } + }) + } -/** - * _resubscribe - * @api private - */ -MqttClient.prototype._resubscribe = function () { - debug('_resubscribe') - const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) - if (!this._firstConnection && - (this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) && - _resubscribeTopicsKeys.length > 0) { - if (this.options.resubscribe) { - if (this.options.protocolVersion === 5) { - debug('_resubscribe: protocolVersion 5') - for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) { - const resubscribeTopic = {} - resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]] - resubscribeTopic.resubscribe = true - this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties }) + /** + * _handleDisconnect + * + * @param {Object} packet + * @api private + */ + _handleDisconnect (packet) { + this.emit('disconnect', packet) + } + + /** + * _nextId + * @return unsigned int + */ + _nextId () { + return this.messageIdProvider.allocate() + } + + /** + * getLastMessageId + * @return unsigned int + */ + getLastMessageId () { + return this.messageIdProvider.getLastAllocated() + } + + /** + * _resubscribe + * @api private + */ + _resubscribe () { + debug('_resubscribe') + const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) + if (!this._firstConnection && + (this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) && + _resubscribeTopicsKeys.length > 0) { + if (this.options.resubscribe) { + if (this.options.protocolVersion === 5) { + debug('_resubscribe: protocolVersion 5') + for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) { + const resubscribeTopic = {} + resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]] + resubscribeTopic.resubscribe = true + this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties }) + } + } else { + this._resubscribeTopics.resubscribe = true + this.subscribe(this._resubscribeTopics) } } else { - this._resubscribeTopics.resubscribe = true - this.subscribe(this._resubscribeTopics) + this._resubscribeTopics = {} } - } else { - this._resubscribeTopics = {} } - } - - this._firstConnection = false -} -/** - * _onConnect - * - * @api private - */ -MqttClient.prototype._onConnect = function (packet) { - if (this.disconnected) { - this.emit('connect', packet) - return + this._firstConnection = false } - const that = this + /** + * _onConnect + * + * @api private + */ + _onConnect (packet) { + if (this.disconnected) { + this.emit('connect', packet) + return + } - this.connackPacket = packet - this.messageIdProvider.clear() - this._setupPingTimer() + const that = this - this.connected = true + this.connackPacket = packet + this.messageIdProvider.clear() + this._setupPingTimer() - function startStreamProcess () { - let outStore = that.outgoingStore.createStream() + this.connected = true - function clearStoreProcessing () { - that._storeProcessing = false - that._packetIdsDuringStoreProcessing = {} - } + function startStreamProcess () { + let outStore = that.outgoingStore.createStream() - that.once('close', remove) - outStore.on('error', function (err) { - clearStoreProcessing() - that._flushStoreProcessingQueue() - that.removeListener('close', remove) - that.emit('error', err) - }) + function clearStoreProcessing () { + that._storeProcessing = false + that._packetIdsDuringStoreProcessing = {} + } - function remove () { - outStore.destroy() - outStore = null - that._flushStoreProcessingQueue() - clearStoreProcessing() - } + that.once('close', remove) + outStore.on('error', function (err) { + clearStoreProcessing() + that._flushStoreProcessingQueue() + that.removeListener('close', remove) + that.emit('error', err) + }) - function storeDeliver () { - // edge case, we wrapped this twice - if (!outStore) { - return + function remove () { + outStore.destroy() + outStore = null + that._flushStoreProcessingQueue() + clearStoreProcessing() } - const packet = outStore.read(1) + function storeDeliver () { + // edge case, we wrapped this twice + if (!outStore) { + return + } - let cb + const packet = outStore.read(1) - if (!packet) { - // read when data is available in the future - outStore.once('readable', storeDeliver) - return - } + let cb - that._storeProcessing = true + if (!packet) { + // read when data is available in the future + outStore.once('readable', storeDeliver) + return + } - // Skip already processed store packets - if (that._packetIdsDuringStoreProcessing[packet.messageId]) { - storeDeliver() - return - } + that._storeProcessing = true - // Avoid unnecessary stream read operations when disconnected - if (!that.disconnecting && !that.reconnectTimer) { - cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null - that.outgoing[packet.messageId] = { - volatile: false, - cb: function (err, status) { - // Ensure that the original callback passed in to publish gets invoked - if (cb) { - cb(err, status) + // Skip already processed store packets + if (that._packetIdsDuringStoreProcessing[packet.messageId]) { + storeDeliver() + return + } + + // Avoid unnecessary stream read operations when disconnected + if (!that.disconnecting && !that.reconnectTimer) { + cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null + that.outgoing[packet.messageId] = { + volatile: false, + cb: function (err, status) { + // Ensure that the original callback passed in to publish gets invoked + if (cb) { + cb(err, status) + } + + storeDeliver() } + } + that._packetIdsDuringStoreProcessing[packet.messageId] = true + if (that.messageIdProvider.register(packet.messageId)) { + that._sendPacket(packet, undefined, undefined, true) + } else { + debug('messageId: %d has already used.', packet.messageId) + } + } else if (outStore.destroy) { + outStore.destroy() + } + } - storeDeliver() + outStore.on('end', function () { + let allProcessed = true + for (const id in that._packetIdsDuringStoreProcessing) { + if (!that._packetIdsDuringStoreProcessing[id]) { + allProcessed = false + break } } - that._packetIdsDuringStoreProcessing[packet.messageId] = true - if (that.messageIdProvider.register(packet.messageId)) { - that._sendPacket(packet, undefined, undefined, true) + if (allProcessed) { + clearStoreProcessing() + that.removeListener('close', remove) + that._invokeAllStoreProcessingQueue() + that.emit('connect', packet) } else { - debug('messageId: %d has already used.', packet.messageId) + startStreamProcess() } - } else if (outStore.destroy) { - outStore.destroy() - } + }) + storeDeliver() } - - outStore.on('end', function () { - let allProcessed = true - for (const id in that._packetIdsDuringStoreProcessing) { - if (!that._packetIdsDuringStoreProcessing[id]) { - allProcessed = false - break - } - } - if (allProcessed) { - clearStoreProcessing() - that.removeListener('close', remove) - that._invokeAllStoreProcessingQueue() - that.emit('connect', packet) - } else { - startStreamProcess() - } - }) - storeDeliver() + // start flowing + startStreamProcess() } - // start flowing - startStreamProcess() -} -MqttClient.prototype._invokeStoreProcessingQueue = function () { - if (this._storeProcessingQueue.length > 0) { - const f = this._storeProcessingQueue[0] - if (f && f.invoke()) { - this._storeProcessingQueue.shift() - return true + _invokeStoreProcessingQueue () { + if (this._storeProcessingQueue.length > 0) { + const f = this._storeProcessingQueue[0] + if (f && f.invoke()) { + this._storeProcessingQueue.shift() + return true + } } + return false } - return false -} -MqttClient.prototype._invokeAllStoreProcessingQueue = function () { - while (this._invokeStoreProcessingQueue()) { /* empty */ } -} + _invokeAllStoreProcessingQueue () { + while (this._invokeStoreProcessingQueue()) { /* empty */ } + } -MqttClient.prototype._flushStoreProcessingQueue = function () { - for (const f of this._storeProcessingQueue) { - if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) - if (f.callback) f.callback(new Error('Connection closed')) + _flushStoreProcessingQueue () { + for (const f of this._storeProcessingQueue) { + if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) + if (f.callback) f.callback(new Error('Connection closed')) + } + this._storeProcessingQueue.splice(0) } - this._storeProcessingQueue.splice(0) } module.exports = MqttClient diff --git a/lib/connect/index.js b/lib/connect/index.js index 707ba0d2d..cff123633 100644 --- a/lib/connect/index.js +++ b/lib/connect/index.js @@ -6,7 +6,6 @@ const DefaultMessageIdProvider = require('../default-message-id-provider') const UniqueMessageIdProvider = require('../unique-message-id-provider') const { IS_BROWSER } = require('../is-browser') const url = require('url') -const xtend = require('xtend') const debug = require('debug')('mqttjs') const protocols = {} @@ -68,7 +67,7 @@ function connect (brokerUrl, opts) { parsed.port = Number(parsed.port) } - opts = xtend(parsed, opts) + opts = { ...parsed, ...opts } if (opts.protocol === null) { throw new Error('Missing protocol') diff --git a/lib/default-message-id-provider.js b/lib/default-message-id-provider.js index 3655983a0..d7a1da87f 100644 --- a/lib/default-message-id-provider.js +++ b/lib/default-message-id-provider.js @@ -4,66 +4,64 @@ * DefaultMessageAllocator constructor * @constructor */ -function DefaultMessageIdProvider () { - if (!(this instanceof DefaultMessageIdProvider)) { - return new DefaultMessageIdProvider() +class DefaultMessageIdProvider { + constructor () { + /** + * MessageIDs starting with 1 + * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 + */ + this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) } /** - * MessageIDs starting with 1 - * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 + * allocate + * + * Get the next messageId. + * @return unsigned int */ - this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) -} - -/** - * allocate - * - * Get the next messageId. - * @return unsigned int - */ -DefaultMessageIdProvider.prototype.allocate = function () { - // id becomes current state of this.nextId and increments afterwards - const id = this.nextId++ - // Ensure 16 bit unsigned int (max 65535, nextId got one higher) - if (this.nextId === 65536) { - this.nextId = 1 + allocate () { + // id becomes current state of this.nextId and increments afterwards + const id = this.nextId++ + // Ensure 16 bit unsigned int (max 65535, nextId got one higher) + if (this.nextId === 65536) { + this.nextId = 1 + } + return id } - return id -} -/** - * getLastAllocated - * Get the last allocated messageId. - * @return unsigned int - */ -DefaultMessageIdProvider.prototype.getLastAllocated = function () { - return (this.nextId === 1) ? 65535 : (this.nextId - 1) -} + /** + * getLastAllocated + * Get the last allocated messageId. + * @return unsigned int + */ + getLastAllocated () { + return (this.nextId === 1) ? 65535 : (this.nextId - 1) + } -/** - * register - * Register messageId. If success return true, otherwise return false. - * @param { unsigned int } - messageId to register, - * @return boolean - */ -DefaultMessageIdProvider.prototype.register = function (messageId) { - return true -} + /** + * register + * Register messageId. If success return true, otherwise return false. + * @param { unsigned int } - messageId to register, + * @return boolean + */ + register (messageId) { + return true + } -/** - * deallocate - * Deallocate messageId. - * @param { unsigned int } - messageId to deallocate, - */ -DefaultMessageIdProvider.prototype.deallocate = function (messageId) { -} + /** + * deallocate + * Deallocate messageId. + * @param { unsigned int } - messageId to deallocate, + */ + deallocate (messageId) { + } -/** - * clear - * Deallocate all messageIds. - */ -DefaultMessageIdProvider.prototype.clear = function () { + /** + * clear + * Deallocate all messageIds. + */ + clear () { + } } module.exports = DefaultMessageIdProvider diff --git a/lib/store.js b/lib/store.js index 4f6b1a237..1412967eb 100644 --- a/lib/store.js +++ b/lib/store.js @@ -3,8 +3,6 @@ /** * Module dependencies */ -const xtend = require('xtend') - const Readable = require('readable-stream').Readable const streamsOpts = { objectMode: true } const defaultStoreOptions = { @@ -17,111 +15,109 @@ const defaultStoreOptions = { * * @param {Object} [options] - store options */ -function Store (options) { - if (!(this instanceof Store)) { - return new Store(options) - } - - this.options = options || {} - - // Defaults - this.options = xtend(defaultStoreOptions, options) +class Store { + constructor (options) { + this.options = options || {} - this._inflights = new Map() -} + // Defaults + this.options = { ...defaultStoreOptions, ...options } -/** - * Adds a packet to the store, a packet is - * anything that has a messageId property. - * - */ -Store.prototype.put = function (packet, cb) { - this._inflights.set(packet.messageId, packet) - - if (cb) { - cb() + this._inflights = new Map() } - return this -} + /** + * Adds a packet to the store, a packet is + * anything that has a messageId property. + * + */ + put (packet, cb) { + this._inflights.set(packet.messageId, packet) -/** - * Creates a stream with all the packets in the store - * - */ -Store.prototype.createStream = function () { - const stream = new Readable(streamsOpts) - const values = [] - let destroyed = false - let i = 0 - - this._inflights.forEach(function (value, key) { - values.push(value) - }) - - stream._read = function () { - if (!destroyed && i < values.length) { - this.push(values[i++]) - } else { - this.push(null) + if (cb) { + cb() } + + return this } - stream.destroy = function () { - if (destroyed) { - return + /** + * Creates a stream with all the packets in the store + * + */ + createStream () { + const stream = new Readable(streamsOpts) + const values = [] + let destroyed = false + let i = 0 + + this._inflights.forEach(function (value, key) { + values.push(value) + }) + + stream._read = function () { + if (!destroyed && i < values.length) { + this.push(values[i++]) + } else { + this.push(null) + } } - const self = this + stream.destroy = function () { + if (destroyed) { + return + } - destroyed = true + const self = this - setTimeout(function () { - self.emit('close') - }, 0) - } + destroyed = true - return stream -} + setTimeout(function () { + self.emit('close') + }, 0) + } -/** - * deletes a packet from the store. - */ -Store.prototype.del = function (packet, cb) { - packet = this._inflights.get(packet.messageId) - if (packet) { - this._inflights.delete(packet.messageId) - cb(null, packet) - } else if (cb) { - cb(new Error('missing packet')) + return stream } - return this -} + /** + * deletes a packet from the store. + */ + del (packet, cb) { + packet = this._inflights.get(packet.messageId) + if (packet) { + this._inflights.delete(packet.messageId) + cb(null, packet) + } else if (cb) { + cb(new Error('missing packet')) + } -/** - * get a packet from the store. - */ -Store.prototype.get = function (packet, cb) { - packet = this._inflights.get(packet.messageId) - if (packet) { - cb(null, packet) - } else if (cb) { - cb(new Error('missing packet')) + return this } - return this -} + /** + * get a packet from the store. + */ + get (packet, cb) { + packet = this._inflights.get(packet.messageId) + if (packet) { + cb(null, packet) + } else if (cb) { + cb(new Error('missing packet')) + } -/** - * Close the store - */ -Store.prototype.close = function (cb) { - if (this.options.clean) { - this._inflights = null + return this } - if (cb) { - cb() + + /** + * Close the store + */ + close (cb) { + if (this.options.clean) { + this._inflights = null + } + if (cb) { + cb() + } } } diff --git a/lib/topic-alias-recv.js b/lib/topic-alias-recv.js index 553341100..e6412d68d 100644 --- a/lib/topic-alias-recv.js +++ b/lib/topic-alias-recv.js @@ -5,43 +5,42 @@ * This holds alias to topic map * @param {Number} [max] - topic alias maximum entries */ -function TopicAliasRecv (max) { - if (!(this instanceof TopicAliasRecv)) { - return new TopicAliasRecv(max) +class TopicAliasRecv { + constructor (max) { + this.aliasToTopic = {} + this.max = max } - this.aliasToTopic = {} - this.max = max -} -/** - * Insert or update topic - alias entry. - * @param {String} [topic] - topic - * @param {Number} [alias] - topic alias - * @returns {Boolean} - if success return true otherwise false - */ -TopicAliasRecv.prototype.put = function (topic, alias) { - if (alias === 0 || alias > this.max) { - return false + /** + * Insert or update topic - alias entry. + * @param {String} [topic] - topic + * @param {Number} [alias] - topic alias + * @returns {Boolean} - if success return true otherwise false + */ + put (topic, alias) { + if (alias === 0 || alias > this.max) { + return false + } + this.aliasToTopic[alias] = topic + this.length = Object.keys(this.aliasToTopic).length + return true } - this.aliasToTopic[alias] = topic - this.length = Object.keys(this.aliasToTopic).length - return true -} -/** - * Get topic by alias - * @param {String} [topic] - topic - * @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined - */ -TopicAliasRecv.prototype.getTopicByAlias = function (alias) { - return this.aliasToTopic[alias] -} + /** + * Get topic by alias + * @param {String} [topic] - topic + * @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined + */ + getTopicByAlias (alias) { + return this.aliasToTopic[alias] + } -/** - * Clear all entries - */ -TopicAliasRecv.prototype.clear = function () { - this.aliasToTopic = {} + /** + * Clear all entries + */ + clear () { + this.aliasToTopic = {} + } } module.exports = TopicAliasRecv diff --git a/lib/topic-alias-send.js b/lib/topic-alias-send.js index 0c7292dc0..f6efcc822 100644 --- a/lib/topic-alias-send.js +++ b/lib/topic-alias-send.js @@ -11,82 +11,80 @@ const NumberAllocator = require('number-allocator').NumberAllocator * This holds both topic to alias and alias to topic map * @param {Number} [max] - topic alias maximum entries */ -function TopicAliasSend (max) { - if (!(this instanceof TopicAliasSend)) { - return new TopicAliasSend(max) +class TopicAliasSend { + constructor (max) { + if (max > 0) { + this.aliasToTopic = new LRUCache({ max }) + this.topicToAlias = {} + this.numberAllocator = new NumberAllocator(1, max) + this.max = max + this.length = 0 + } } - if (max > 0) { - this.aliasToTopic = new LRUCache({ max }) - this.topicToAlias = {} - this.numberAllocator = new NumberAllocator(1, max) - this.max = max - this.length = 0 + /** + * Insert or update topic - alias entry. + * @param {String} [topic] - topic + * @param {Number} [alias] - topic alias + * @returns {Boolean} - if success return true otherwise false + */ + put (topic, alias) { + if (alias === 0 || alias > this.max) { + return false + } + const entry = this.aliasToTopic.get(alias) + if (entry) { + delete this.topicToAlias[entry] + } + this.aliasToTopic.set(alias, topic) + this.topicToAlias[topic] = alias + this.numberAllocator.use(alias) + this.length = this.aliasToTopic.size + return true } -} -/** - * Insert or update topic - alias entry. - * @param {String} [topic] - topic - * @param {Number} [alias] - topic alias - * @returns {Boolean} - if success return true otherwise false - */ -TopicAliasSend.prototype.put = function (topic, alias) { - if (alias === 0 || alias > this.max) { - return false - } - const entry = this.aliasToTopic.get(alias) - if (entry) { - delete this.topicToAlias[entry] + /** + * Get topic by alias + * @param {Number} [alias] - topic alias + * @returns {String} - if mapped topic exists return topic, otherwise return undefined + */ + getTopicByAlias (alias) { + return this.aliasToTopic.get(alias) } - this.aliasToTopic.set(alias, topic) - this.topicToAlias[topic] = alias - this.numberAllocator.use(alias) - this.length = this.aliasToTopic.size - return true -} - -/** - * Get topic by alias - * @param {Number} [alias] - topic alias - * @returns {String} - if mapped topic exists return topic, otherwise return undefined - */ -TopicAliasSend.prototype.getTopicByAlias = function (alias) { - return this.aliasToTopic.get(alias) -} -/** - * Get topic by alias - * @param {String} [topic] - topic - * @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined - */ -TopicAliasSend.prototype.getAliasByTopic = function (topic) { - const alias = this.topicToAlias[topic] - if (typeof alias !== 'undefined') { - this.aliasToTopic.get(alias) // LRU update + /** + * Get topic by alias + * @param {String} [topic] - topic + * @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined + */ + getAliasByTopic (topic) { + const alias = this.topicToAlias[topic] + if (typeof alias !== 'undefined') { + this.aliasToTopic.get(alias) // LRU update + } + return alias } - return alias -} -/** - * Clear all entries - */ -TopicAliasSend.prototype.clear = function () { - this.aliasToTopic.clear() - this.topicToAlias = {} - this.numberAllocator.clear() - this.length = 0 -} + /** + * Clear all entries + */ + clear () { + this.aliasToTopic.clear() + this.topicToAlias = {} + this.numberAllocator.clear() + this.length = 0 + } -/** - * Get Least Recently Used (LRU) topic alias - * @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias - */ -TopicAliasSend.prototype.getLruAlias = function () { - const alias = this.numberAllocator.firstVacant() - if (alias) return alias - // get last alias (key) from LRU cache - return [...this.aliasToTopic.keys()][this.aliasToTopic.size - 1] + /** + * Get Least Recently Used (LRU) topic alias + * @returns {Number} - if vacant alias exists then return it, otherwise then return LRU alias + */ + getLruAlias () { + const alias = this.numberAllocator.firstVacant() + if (alias) return alias + // get last alias (key) from LRU cache + return [...this.aliasToTopic.keys()][this.aliasToTopic.size - 1] + } } module.exports = TopicAliasSend diff --git a/lib/unique-message-id-provider.js b/lib/unique-message-id-provider.js index 5026253a9..882bdf7d7 100644 --- a/lib/unique-message-id-provider.js +++ b/lib/unique-message-id-provider.js @@ -6,60 +6,62 @@ const NumberAllocator = require('number-allocator').NumberAllocator * UniqueMessageAllocator constructor * @constructor */ -function UniqueMessageIdProvider () { - if (!(this instanceof UniqueMessageIdProvider)) { - return new UniqueMessageIdProvider() - } +class UniqueMessageIdProvider { + constructor () { + if (!(this instanceof UniqueMessageIdProvider)) { + return new UniqueMessageIdProvider() + } - this.numberAllocator = new NumberAllocator(1, 65535) -} + this.numberAllocator = new NumberAllocator(1, 65535) + } -/** - * allocate - * - * Get the next messageId. - * @return if messageId is fully allocated then return null, - * otherwise return the smallest usable unsigned int messageId. - */ -UniqueMessageIdProvider.prototype.allocate = function () { - this.lastId = this.numberAllocator.alloc() - return this.lastId -} + /** + * allocate + * + * Get the next messageId. + * @return if messageId is fully allocated then return null, + * otherwise return the smallest usable unsigned int messageId. + */ + allocate () { + this.lastId = this.numberAllocator.alloc() + return this.lastId + } -/** - * getLastAllocated - * Get the last allocated messageId. - * @return unsigned int - */ -UniqueMessageIdProvider.prototype.getLastAllocated = function () { - return this.lastId -} + /** + * getLastAllocated + * Get the last allocated messageId. + * @return unsigned int + */ + getLastAllocated () { + return this.lastId + } -/** - * register - * Register messageId. If success return true, otherwise return false. - * @param { unsigned int } - messageId to register, - * @return boolean - */ -UniqueMessageIdProvider.prototype.register = function (messageId) { - return this.numberAllocator.use(messageId) -} + /** + * register + * Register messageId. If success return true, otherwise return false. + * @param { unsigned int } - messageId to register, + * @return boolean + */ + register (messageId) { + return this.numberAllocator.use(messageId) + } -/** - * deallocate - * Deallocate messageId. - * @param { unsigned int } - messageId to deallocate, - */ -UniqueMessageIdProvider.prototype.deallocate = function (messageId) { - this.numberAllocator.free(messageId) -} + /** + * deallocate + * Deallocate messageId. + * @param { unsigned int } - messageId to deallocate, + */ + deallocate (messageId) { + this.numberAllocator.free(messageId) + } -/** - * clear - * Deallocate all messageIds. - */ -UniqueMessageIdProvider.prototype.clear = function () { - this.numberAllocator.clear() + /** + * clear + * Deallocate all messageIds. + */ + clear () { + this.numberAllocator.clear() + } } module.exports = UniqueMessageIdProvider diff --git a/package-lock.json b/package-lock.json index 3fc7a738b..4d37e7172 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,18 +14,15 @@ "debug": "^4.3.4", "duplexify": "^4.1.2", "help-me": "^4.2.0", - "inherits": "^2.0.4", "lru-cache": "^7.18.3", "minimist": "^1.2.8", "mqtt-packet": "^8.2.0", "number-allocator": "^1.0.14", - "pump": "^3.0.0", "readable-stream": "^4.4.2", "reinterval": "^1.1.0", "rfdc": "^1.3.0", "split2": "^4.2.0", - "ws": "^8.13.0", - "xtend": "^4.0.2" + "ws": "^8.13.0" }, "bin": { "mqtt": "bin/mqtt.js", @@ -37,7 +34,6 @@ "@types/node": "^20.4.0", "@types/tape": "^5.6.0", "@types/ws": "^8.5.5", - "aedes": "^0.49.0", "airtap": "^4.0.4", "airtap-playwright": "^1.0.1", "browserify": "^17.0.0", @@ -1368,74 +1364,6 @@ "integrity": "sha512-qQLMr+8o0WC4FZGQTcJiKBVC59JylcPSrTtk6usvmIDFUOCKegapy1VHQwRbFMOFyb/inzUVqHs+eMYKDM1YeQ==", "dev": true }, - "node_modules/aedes": { - "version": "0.49.0", - "resolved": "https://registry.npmjs.org/aedes/-/aedes-0.49.0.tgz", - "integrity": "sha512-RJK/ZggcCwXE9MW2mJ6+kLDa732DdkIbZcMP4sQkl8T8Gb9HSYKneuRub1kKgKQDvne8EQLOOo36PXLpbr3BOg==", - "dev": true, - "dependencies": { - "aedes-packet": "^3.0.0", - "aedes-persistence": "^9.1.2", - "end-of-stream": "^1.4.4", - "fastfall": "^1.5.1", - "fastparallel": "^2.4.1", - "fastseries": "^2.0.0", - "hyperid": "^3.1.1", - "mqemitter": "^5.0.0", - "mqtt-packet": "^8.1.2", - "retimer": "^3.0.0", - "reusify": "^1.0.4", - "uuid": "^9.0.0" - }, - "engines": { - "node": ">=14" - } - }, - "node_modules/aedes-packet": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/aedes-packet/-/aedes-packet-3.0.0.tgz", - "integrity": "sha512-swASey0BxGs4/npZGWoiVDmnEyPvVFIRY6l2LVKL4rbiW8IhcIGDLfnb20Qo8U20itXlitAKPQ3MVTEbOGG5ZA==", - "dev": true, - "dependencies": { - "mqtt-packet": "^7.0.0" - }, - "engines": { - "node": ">=14" - } - }, - "node_modules/aedes-packet/node_modules/mqtt-packet": { - "version": "7.1.2", - "resolved": "https://registry.npmjs.org/mqtt-packet/-/mqtt-packet-7.1.2.tgz", - "integrity": "sha512-FFZbcZ2omsf4c5TxEQfcX9hI+JzDpDKPT46OmeIBpVA7+t32ey25UNqlqNXTmeZOr5BLsSIERpQQLsFWJS94SQ==", - "dev": true, - "dependencies": { - "bl": "^4.0.2", - "debug": "^4.1.1", - "process-nextick-args": "^2.0.1" - } - }, - "node_modules/aedes-persistence": { - "version": "9.1.2", - "resolved": "https://registry.npmjs.org/aedes-persistence/-/aedes-persistence-9.1.2.tgz", - "integrity": "sha512-2Wlr5pwIK0eQOkiTwb8ZF6C20s8UPUlnsJ4kXYePZ3JlQl0NbBA176mzM8wY294BJ5wybpNc9P5XEQxqadRNcQ==", - "dev": true, - "dependencies": { - "aedes-packet": "^3.0.0", - "qlobber": "^7.0.0" - }, - "engines": { - "node": ">=14" - } - }, - "node_modules/aedes/node_modules/uuid": { - "version": "9.0.0", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.0.tgz", - "integrity": "sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==", - "dev": true, - "bin": { - "uuid": "dist/bin/uuid" - } - }, "node_modules/agent-base": { "version": "6.0.2", "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-6.0.2.tgz", @@ -4693,9 +4621,9 @@ } }, "node_modules/engine.io": { - "version": "6.1.2", - "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.1.2.tgz", - "integrity": "sha512-v/7eGHxPvO2AWsksyx2PUsQvBafuvqs0jJJQ0FdmJG1b9qIvgSbqDRGwNhfk2XHaTTbTXiC4quRE8Q9nRjsrQQ==", + "version": "6.5.1", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.1.tgz", + "integrity": "sha512-mGqhI+D7YxS9KJMppR6Iuo37Ed3abhU8NdfgSvJSDUafQutrN+sPTncJYTyM9+tkhSmWodKtVYGPPHyXJEwEQA==", "dev": true, "dependencies": { "@types/cookie": "^0.4.1", @@ -4706,8 +4634,8 @@ "cookie": "~0.4.1", "cors": "~2.8.5", "debug": "~4.3.1", - "engine.io-parser": "~5.0.0", - "ws": "~8.2.3" + "engine.io-parser": "~5.1.0", + "ws": "~8.11.0" }, "engines": { "node": ">=10.0.0" @@ -4751,13 +4679,34 @@ "node": ">=10.0.0" } }, + "node_modules/engine.io/node_modules/engine.io-parser": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.1.0.tgz", + "integrity": "sha512-enySgNiK5tyZFynt3z7iqBR+Bto9EVVVvDFuTT0ioHCGbzirZVGDGiQjZzEp8hWl6hd5FSVytJGuScX1C1C35w==", + "dev": true, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/engine.io/node_modules/ws": { - "version": "8.2.3", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.2.3.tgz", - "integrity": "sha512-wBuoj1BDpC6ZQ1B7DWQBYVLphPWkm8i9Y0/3YdHjHKHiohOJ1ws+3OccDWtH+PoC9DZD5WOTrJvNbWvjS6JWaA==", + "version": "8.11.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", + "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", "dev": true, "engines": { "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } } }, "node_modules/errno": { @@ -5966,28 +5915,6 @@ "punycode": "^1.3.2" } }, - "node_modules/fastfall": { - "version": "1.5.1", - "resolved": "https://registry.npmjs.org/fastfall/-/fastfall-1.5.1.tgz", - "integrity": "sha1-P+4DMxpJ0dObPN96XpzWb0dee5Q=", - "dev": true, - "dependencies": { - "reusify": "^1.0.0" - }, - "engines": { - "node": ">=0.10.0" - } - }, - "node_modules/fastparallel": { - "version": "2.4.1", - "resolved": "https://registry.npmjs.org/fastparallel/-/fastparallel-2.4.1.tgz", - "integrity": "sha512-qUmhxPgNHmvRjZKBFUNI0oZuuH9OlSIOXmJ98lhKPxMZZ7zS/Fi0wRHOihDSz0R1YiIOjxzOY4bq65YTcdBi2Q==", - "dev": true, - "dependencies": { - "reusify": "^1.0.4", - "xtend": "^4.0.2" - } - }, "node_modules/fastq": { "version": "1.15.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.15.0.tgz", @@ -5997,12 +5924,6 @@ "reusify": "^1.0.4" } }, - "node_modules/fastseries": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/fastseries/-/fastseries-2.0.0.tgz", - "integrity": "sha512-XBU9RXeoYc2/VnvMhplAxEmZLfIk7cvTBu+xwoBuTI8pL19E03cmca17QQycKIdxgwCeFA/a4u27gv1h3ya5LQ==", - "dev": true - }, "node_modules/fetch-blob": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/fetch-blob/-/fetch-blob-3.2.0.tgz", @@ -7224,16 +7145,6 @@ "integrity": "sha512-jCVkMl+EaM80rrMrAPl96SGG4NRac53UyI1o/yAzebDntEY6K6/Fj2HOjdPg8omTqIe5Y0wPBai2q5xXrIbarA==", "dev": true }, - "node_modules/hyperid": { - "version": "3.1.1", - "resolved": "https://registry.npmjs.org/hyperid/-/hyperid-3.1.1.tgz", - "integrity": "sha512-RveV33kIksycSf7HLkq1sHB5wW0OwuX8ot8MYnY++gaaPXGFfKpBncHrAWxdpuEeRlazUMGWefwP1w6o6GaumA==", - "dev": true, - "dependencies": { - "uuid": "^8.3.2", - "uuid-parse": "^1.1.0" - } - }, "node_modules/iconv-lite": { "version": "0.4.24", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.4.24.tgz", @@ -9420,15 +9331,6 @@ "validate-npm-package-license": "^3.0.1" } }, - "node_modules/meow/node_modules/read-pkg/node_modules/semver": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", - "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", - "dev": true, - "bin": { - "semver": "bin/semver" - } - }, "node_modules/meow/node_modules/read-pkg/node_modules/type-fest": { "version": "0.6.0", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.6.0.tgz", @@ -9438,6 +9340,15 @@ "node": ">=8" } }, + "node_modules/meow/node_modules/semver": { + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", + "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", + "dev": true, + "bin": { + "semver": "bin/semver" + } + }, "node_modules/meow/node_modules/type-fest": { "version": "0.18.1", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.18.1.tgz", @@ -9916,19 +9827,6 @@ "xtend": "~4.0.1" } }, - "node_modules/mqemitter": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/mqemitter/-/mqemitter-5.0.0.tgz", - "integrity": "sha512-rqNRQhGgl0W/NV+Zrx0rpAUTZcSlAtivCVUmXBUPcFYt+AeDEpoJgy5eKlFWJP6xnatONL59WIFdV0W6niOMhw==", - "dev": true, - "dependencies": { - "fastparallel": "^2.3.0", - "qlobber": "^7.0.0" - }, - "engines": { - "node": ">=10" - } - }, "node_modules/mqtt-connection": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/mqtt-connection/-/mqtt-connection-4.1.0.tgz", @@ -12024,15 +11922,6 @@ "looper": "^2.0.0" } }, - "node_modules/pump": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", - "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", - "dependencies": { - "end-of-stream": "^1.1.0", - "once": "^1.3.1" - } - }, "node_modules/punycode": { "version": "1.4.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-1.4.1.tgz", @@ -12054,15 +11943,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/qlobber": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/qlobber/-/qlobber-7.0.1.tgz", - "integrity": "sha512-FsFg9lMuMEFNKmTO9nV7tlyPhx8BmskPPjH2akWycuYVTtWaVwhW5yCHLJQ6Q+3mvw5cFX2vMfW2l9z2SiYAbg==", - "dev": true, - "engines": { - "node": ">= 14" - } - }, "node_modules/qs": { "version": "6.11.0", "resolved": "https://registry.npmjs.org/qs/-/qs-6.11.0.tgz", @@ -12365,9 +12245,9 @@ } }, "node_modules/read-pkg/node_modules/semver": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", - "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", + "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", "dev": true, "bin": { "semver": "bin/semver" @@ -12766,12 +12646,6 @@ "through": "~2.3.4" } }, - "node_modules/retimer": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/retimer/-/retimer-3.0.0.tgz", - "integrity": "sha512-WKE0j11Pa0ZJI5YIk0nflGI7SQsfl2ljihVy7ogh7DeQSeYAUi0ubZ/yEueGtDfUPk6GH5LRw1hBdLq4IwUBWA==", - "dev": true - }, "node_modules/retry": { "version": "0.13.1", "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", @@ -13087,9 +12961,9 @@ "dev": true }, "node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true, "bin": { "semver": "bin/semver.js" @@ -15138,12 +15012,6 @@ "uuid": "dist/bin/uuid" } }, - "node_modules/uuid-parse": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/uuid-parse/-/uuid-parse-1.1.0.tgz", - "integrity": "sha512-OdmXxA8rDsQ7YpNVbKSJkNzTw2I+S5WsbMDnCtIWSQaosNAcWtFuI/YK1TjzUI6nbkgiqEyh8gWngfcv8Asd9A==", - "dev": true - }, "node_modules/validate-npm-package-license": { "version": "3.0.4", "resolved": "https://registry.npmjs.org/validate-npm-package-license/-/validate-npm-package-license-3.0.4.tgz", @@ -15250,21 +15118,6 @@ "node": ">= 6" } }, - "node_modules/watchify/node_modules/safe-buffer": { - "version": "5.1.2", - "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", - "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", - "dev": true - }, - "node_modules/watchify/node_modules/string_decoder": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", - "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", - "dev": true, - "dependencies": { - "safe-buffer": "~5.1.0" - } - }, "node_modules/watchify/node_modules/through2": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/through2/-/through2-4.0.2.tgz", @@ -15712,6 +15565,7 @@ "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", + "dev": true, "engines": { "node": ">=0.4" } @@ -16840,66 +16694,6 @@ "integrity": "sha512-qQLMr+8o0WC4FZGQTcJiKBVC59JylcPSrTtk6usvmIDFUOCKegapy1VHQwRbFMOFyb/inzUVqHs+eMYKDM1YeQ==", "dev": true }, - "aedes": { - "version": "0.49.0", - "resolved": "https://registry.npmjs.org/aedes/-/aedes-0.49.0.tgz", - "integrity": "sha512-RJK/ZggcCwXE9MW2mJ6+kLDa732DdkIbZcMP4sQkl8T8Gb9HSYKneuRub1kKgKQDvne8EQLOOo36PXLpbr3BOg==", - "dev": true, - "requires": { - "aedes-packet": "^3.0.0", - "aedes-persistence": "^9.1.2", - "end-of-stream": "^1.4.4", - "fastfall": "^1.5.1", - "fastparallel": "^2.4.1", - "fastseries": "^2.0.0", - "hyperid": "^3.1.1", - "mqemitter": "^5.0.0", - "mqtt-packet": "^8.1.2", - "retimer": "^3.0.0", - "reusify": "^1.0.4", - "uuid": "^9.0.0" - }, - "dependencies": { - "uuid": { - "version": "9.0.0", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.0.tgz", - "integrity": "sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==", - "dev": true - } - } - }, - "aedes-packet": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/aedes-packet/-/aedes-packet-3.0.0.tgz", - "integrity": "sha512-swASey0BxGs4/npZGWoiVDmnEyPvVFIRY6l2LVKL4rbiW8IhcIGDLfnb20Qo8U20itXlitAKPQ3MVTEbOGG5ZA==", - "dev": true, - "requires": { - "mqtt-packet": "^7.0.0" - }, - "dependencies": { - "mqtt-packet": { - "version": "7.1.2", - "resolved": "https://registry.npmjs.org/mqtt-packet/-/mqtt-packet-7.1.2.tgz", - "integrity": "sha512-FFZbcZ2omsf4c5TxEQfcX9hI+JzDpDKPT46OmeIBpVA7+t32ey25UNqlqNXTmeZOr5BLsSIERpQQLsFWJS94SQ==", - "dev": true, - "requires": { - "bl": "^4.0.2", - "debug": "^4.1.1", - "process-nextick-args": "^2.0.1" - } - } - } - }, - "aedes-persistence": { - "version": "9.1.2", - "resolved": "https://registry.npmjs.org/aedes-persistence/-/aedes-persistence-9.1.2.tgz", - "integrity": "sha512-2Wlr5pwIK0eQOkiTwb8ZF6C20s8UPUlnsJ4kXYePZ3JlQl0NbBA176mzM8wY294BJ5wybpNc9P5XEQxqadRNcQ==", - "dev": true, - "requires": { - "aedes-packet": "^3.0.0", - "qlobber": "^7.0.0" - } - }, "agent-base": { "version": "6.0.2", "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-6.0.2.tgz", @@ -19564,9 +19358,9 @@ } }, "engine.io": { - "version": "6.1.2", - "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.1.2.tgz", - "integrity": "sha512-v/7eGHxPvO2AWsksyx2PUsQvBafuvqs0jJJQ0FdmJG1b9qIvgSbqDRGwNhfk2XHaTTbTXiC4quRE8Q9nRjsrQQ==", + "version": "6.5.1", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.1.tgz", + "integrity": "sha512-mGqhI+D7YxS9KJMppR6Iuo37Ed3abhU8NdfgSvJSDUafQutrN+sPTncJYTyM9+tkhSmWodKtVYGPPHyXJEwEQA==", "dev": true, "requires": { "@types/cookie": "^0.4.1", @@ -19577,15 +19371,22 @@ "cookie": "~0.4.1", "cors": "~2.8.5", "debug": "~4.3.1", - "engine.io-parser": "~5.0.0", - "ws": "~8.2.3" + "engine.io-parser": "~5.1.0", + "ws": "~8.11.0" }, "dependencies": { - "ws": { - "version": "8.2.3", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.2.3.tgz", - "integrity": "sha512-wBuoj1BDpC6ZQ1B7DWQBYVLphPWkm8i9Y0/3YdHjHKHiohOJ1ws+3OccDWtH+PoC9DZD5WOTrJvNbWvjS6JWaA==", + "engine.io-parser": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.1.0.tgz", + "integrity": "sha512-enySgNiK5tyZFynt3z7iqBR+Bto9EVVVvDFuTT0ioHCGbzirZVGDGiQjZzEp8hWl6hd5FSVytJGuScX1C1C35w==", "dev": true + }, + "ws": { + "version": "8.11.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", + "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", + "dev": true, + "requires": {} } } }, @@ -20515,25 +20316,6 @@ "punycode": "^1.3.2" } }, - "fastfall": { - "version": "1.5.1", - "resolved": "https://registry.npmjs.org/fastfall/-/fastfall-1.5.1.tgz", - "integrity": "sha1-P+4DMxpJ0dObPN96XpzWb0dee5Q=", - "dev": true, - "requires": { - "reusify": "^1.0.0" - } - }, - "fastparallel": { - "version": "2.4.1", - "resolved": "https://registry.npmjs.org/fastparallel/-/fastparallel-2.4.1.tgz", - "integrity": "sha512-qUmhxPgNHmvRjZKBFUNI0oZuuH9OlSIOXmJ98lhKPxMZZ7zS/Fi0wRHOihDSz0R1YiIOjxzOY4bq65YTcdBi2Q==", - "dev": true, - "requires": { - "reusify": "^1.0.4", - "xtend": "^4.0.2" - } - }, "fastq": { "version": "1.15.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.15.0.tgz", @@ -20543,12 +20325,6 @@ "reusify": "^1.0.4" } }, - "fastseries": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/fastseries/-/fastseries-2.0.0.tgz", - "integrity": "sha512-XBU9RXeoYc2/VnvMhplAxEmZLfIk7cvTBu+xwoBuTI8pL19E03cmca17QQycKIdxgwCeFA/a4u27gv1h3ya5LQ==", - "dev": true - }, "fetch-blob": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/fetch-blob/-/fetch-blob-3.2.0.tgz", @@ -21497,16 +21273,6 @@ "integrity": "sha512-jCVkMl+EaM80rrMrAPl96SGG4NRac53UyI1o/yAzebDntEY6K6/Fj2HOjdPg8omTqIe5Y0wPBai2q5xXrIbarA==", "dev": true }, - "hyperid": { - "version": "3.1.1", - "resolved": "https://registry.npmjs.org/hyperid/-/hyperid-3.1.1.tgz", - "integrity": "sha512-RveV33kIksycSf7HLkq1sHB5wW0OwuX8ot8MYnY++gaaPXGFfKpBncHrAWxdpuEeRlazUMGWefwP1w6o6GaumA==", - "dev": true, - "requires": { - "uuid": "^8.3.2", - "uuid-parse": "^1.1.0" - } - }, "iconv-lite": { "version": "0.4.24", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.4.24.tgz", @@ -23198,12 +22964,6 @@ "validate-npm-package-license": "^3.0.1" } }, - "semver": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", - "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", - "dev": true - }, "type-fest": { "version": "0.6.0", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.6.0.tgz", @@ -23231,6 +22991,12 @@ } } }, + "semver": { + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", + "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", + "dev": true + }, "type-fest": { "version": "0.18.1", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.18.1.tgz", @@ -23605,16 +23371,6 @@ } } }, - "mqemitter": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/mqemitter/-/mqemitter-5.0.0.tgz", - "integrity": "sha512-rqNRQhGgl0W/NV+Zrx0rpAUTZcSlAtivCVUmXBUPcFYt+AeDEpoJgy5eKlFWJP6xnatONL59WIFdV0W6niOMhw==", - "dev": true, - "requires": { - "fastparallel": "^2.3.0", - "qlobber": "^7.0.0" - } - }, "mqtt-connection": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/mqtt-connection/-/mqtt-connection-4.1.0.tgz", @@ -25275,15 +25031,6 @@ "looper": "^2.0.0" } }, - "pump": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", - "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", - "requires": { - "end-of-stream": "^1.1.0", - "once": "^1.3.1" - } - }, "punycode": { "version": "1.4.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-1.4.1.tgz", @@ -25299,12 +25046,6 @@ "escape-goat": "^4.0.0" } }, - "qlobber": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/qlobber/-/qlobber-7.0.1.tgz", - "integrity": "sha512-FsFg9lMuMEFNKmTO9nV7tlyPhx8BmskPPjH2akWycuYVTtWaVwhW5yCHLJQ6Q+3mvw5cFX2vMfW2l9z2SiYAbg==", - "dev": true - }, "qs": { "version": "6.11.0", "resolved": "https://registry.npmjs.org/qs/-/qs-6.11.0.tgz", @@ -25486,9 +25227,9 @@ } }, "semver": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", - "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", + "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", "dev": true } } @@ -25836,12 +25577,6 @@ "through": "~2.3.4" } }, - "retimer": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/retimer/-/retimer-3.0.0.tgz", - "integrity": "sha512-WKE0j11Pa0ZJI5YIk0nflGI7SQsfl2ljihVy7ogh7DeQSeYAUi0ubZ/yEueGtDfUPk6GH5LRw1hBdLq4IwUBWA==", - "dev": true - }, "retry": { "version": "0.13.1", "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", @@ -26059,9 +25794,9 @@ "dev": true }, "semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true }, "semver-diff": { @@ -27721,12 +27456,6 @@ "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", "dev": true }, - "uuid-parse": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/uuid-parse/-/uuid-parse-1.1.0.tgz", - "integrity": "sha512-OdmXxA8rDsQ7YpNVbKSJkNzTw2I+S5WsbMDnCtIWSQaosNAcWtFuI/YK1TjzUI6nbkgiqEyh8gWngfcv8Asd9A==", - "dev": true - }, "validate-npm-package-license": { "version": "3.0.4", "resolved": "https://registry.npmjs.org/validate-npm-package-license/-/validate-npm-package-license-3.0.4.tgz", @@ -27805,21 +27534,6 @@ "util-deprecate": "^1.0.1" } }, - "safe-buffer": { - "version": "5.1.2", - "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", - "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", - "dev": true - }, - "string_decoder": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", - "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", - "dev": true, - "requires": { - "safe-buffer": "~5.1.0" - } - }, "through2": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/through2/-/through2-4.0.2.tgz", @@ -28159,7 +27873,8 @@ "xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", - "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==" + "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", + "dev": true }, "y18n": { "version": "5.0.8", diff --git a/package.json b/package.json index ba71a5800..8947b686e 100644 --- a/package.json +++ b/package.json @@ -93,25 +93,21 @@ "debug": "^4.3.4", "duplexify": "^4.1.2", "help-me": "^4.2.0", - "inherits": "^2.0.4", "lru-cache": "^7.18.3", "minimist": "^1.2.8", "mqtt-packet": "^8.2.0", "number-allocator": "^1.0.14", - "pump": "^3.0.0", "readable-stream": "^4.4.2", "reinterval": "^1.1.0", "rfdc": "^1.3.0", "split2": "^4.2.0", - "ws": "^8.13.0", - "xtend": "^4.0.2" + "ws": "^8.13.0" }, "devDependencies": { "@release-it/conventional-changelog": "^6.0.0", "@types/node": "^20.4.0", "@types/tape": "^5.6.0", "@types/ws": "^8.5.5", - "aedes": "^0.49.0", "airtap": "^4.0.4", "airtap-playwright": "^1.0.1", "browserify": "^17.0.0", diff --git a/test/abstract_client.js b/test/abstract_client.js index d37a1bde7..985651192 100644 --- a/test/abstract_client.js +++ b/test/abstract_client.js @@ -6,7 +6,6 @@ const should = require('chai').should const sinon = require('sinon') const mqtt = require('../') -const xtend = require('xtend') const Store = require('./../lib/store') const assert = require('chai').assert const ports = require('./helpers/port_list') @@ -39,7 +38,7 @@ module.exports = function (server, config) { const version = config.protocolVersion || 4 function connect (opts) { - opts = xtend(config, opts) + opts = { ...config, ...opts } return mqtt.connect(opts) } @@ -439,7 +438,7 @@ module.exports = function (server, config) { // fake a port const client = connect({ reconnectPeriod: 20, port: 4557 }) - client.on('error', function () {}) + client.on('error', function () { }) client.on('offline', function () { client.end(true, done) @@ -1403,7 +1402,7 @@ module.exports = function (server, config) { const server2 = serverBuilder(config.protocol, function (serverClient) { // errors are not interesting for this test // but they might happen on some platforms - serverClient.on('error', function () {}) + serverClient.on('error', function () { }) serverClient.on('connect', function (packet) { const connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 } @@ -1727,7 +1726,7 @@ module.exports = function (server, config) { const client = connect({ keepalive: 1, reconnectPeriod: 100 }) // Fake no pingresp being send by stubbing the _handlePingresp function - client._handlePingresp = function () {} + client._handlePingresp = function () { } client.once('connect', function () { client.once('connect', function () { @@ -2634,7 +2633,7 @@ module.exports = function (server, config) { server.once('client', function (serverClient) { // ignore errors - serverClient.on('error', function () {}) + serverClient.on('error', function () { }) serverClient.on('publish', function () { setImmediate(function () { serverClient.stream.destroy() @@ -2996,7 +2995,7 @@ module.exports = function (server, config) { client.publish('topic', 'payload', { qos: 1 }) } }) - client.on('error', function () {}) + client.on('error', function () { }) }) }) @@ -3043,7 +3042,7 @@ module.exports = function (server, config) { client.publish('topic', 'payload', { qos: 2 }) } }) - client.on('error', function () {}) + client.on('error', function () { }) }) }) @@ -3099,7 +3098,7 @@ module.exports = function (server, config) { }) } }) - client.on('error', function () {}) + client.on('error', function () { }) }) }) @@ -3113,7 +3112,7 @@ module.exports = function (server, config) { const server2 = serverBuilder(config.protocol, function (serverClient) { // errors are not interesting for this test // but they might happen on some platforms - serverClient.on('error', function () {}) + serverClient.on('error', function () { }) serverClient.on('connect', function (packet) { const connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 } @@ -3171,7 +3170,7 @@ module.exports = function (server, config) { client.publish('topic', 'payload3', { qos: 1 }) } }) - client.on('error', function () {}) + client.on('error', function () { }) }) }) diff --git a/test/client.js b/test/client.js index deba79f80..2d78a81d4 100644 --- a/test/client.js +++ b/test/client.js @@ -32,9 +32,9 @@ describe('MqttClient', function () { abstractClientTests(server, config) describe('creating', function () { - it('should allow instantiation of MqttClient without the \'new\' operator', function (done) { + it('should allow instantiation of MqttClient', function (done) { try { - client = mqtt.MqttClient(function () { + client = new mqtt.MqttClient(function () { throw Error('break') }, {}) client.end() @@ -47,7 +47,7 @@ describe('MqttClient', function () { it('should disable number cache if specified in options', function (done) { try { assert.isTrue(mqttPacket.writeToStream.cacheNumbers) - client = mqtt.MqttClient(function () { + client = new mqtt.MqttClient(function () { throw Error('break') }, { writeCache: false }) client.end() diff --git a/test/websocket_client.js b/test/websocket_client.js index 84da2a9f4..0ff253fb4 100644 --- a/test/websocket_client.js +++ b/test/websocket_client.js @@ -7,7 +7,6 @@ const abstractClientTests = require('./abstract_client') const ports = require('./helpers/port_list') const MqttServerNoWait = require('./server').MqttServerNoWait const mqtt = require('../') -const xtend = require('xtend') const assert = require('assert') const port = 9999 const httpServer = http.createServer() @@ -20,8 +19,8 @@ function attachWebsocketServer (httpServer) { const connection = new MQTTConnection(stream) connection.protocol = ws.protocol httpServer.emit('client', connection) - stream.on('error', function () {}) - connection.on('error', function () {}) + stream.on('error', function () { }) + connection.on('error', function () { }) }) return httpServer @@ -90,8 +89,7 @@ describe('Websocket Client', function () { const baseConfig = { protocol: 'ws', port } function makeOptions (custom) { - // xtend returns a new object. Does not mutate arguments - return xtend(baseConfig, custom || {}) + return { ...baseConfig, ...(custom || {}) } } it('should use mqtt as the protocol by default', function (done) {