Skip to content

Commit

Permalink
Fix(publish): call callback when messageId available (#1393)
Browse files Browse the repository at this point in the history
Process is enqueued if messageId allocation is failure.
  • Loading branch information
redboltz authored Jan 5, 2022
1 parent ee75c32 commit 58fb8df
Showing 1 changed file with 3 additions and 9 deletions.
12 changes: 3 additions & 9 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -660,16 +660,14 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {
return true
}

if (this._storeProcessing || this._storeProcessingQueue.length > 0) {
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) {
this._storeProcessingQueue.push(
{
invoke: publishProc,
cbStorePut: opts.cbStorePut,
callback: callback
}
)
} else {
publishProc()
}
return this
}
Expand Down Expand Up @@ -842,15 +840,13 @@ MqttClient.prototype.subscribe = function () {
return true
}

if (this._storeProcessing || this._storeProcessingQueue.length > 0) {
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) {
this._storeProcessingQueue.push(
{
invoke: subscribeProc,
callback: callback
}
)
} else {
subscribeProc()
}

return this
Expand Down Expand Up @@ -935,15 +931,13 @@ MqttClient.prototype.unsubscribe = function () {
return true
}

if (this._storeProcessing || this._storeProcessingQueue.length > 0) {
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) {
this._storeProcessingQueue.push(
{
invoke: unsubscribeProc,
callback: callback
}
)
} else {
unsubscribeProc()
}

return this
Expand Down

0 comments on commit 58fb8df

Please sign in to comment.