From 79ce013af695f96ff57106b213982647e0783d3f Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Wed, 13 Oct 2021 13:35:47 +0800 Subject: [PATCH] fix(name-processors): wait for all processors when closing fixes #1618 --- lib/queue.js | 14 ++++++++------ test/test_queue.js | 28 ++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 8ec438109..a34b5c5d6 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -616,12 +616,12 @@ Queue.prototype.process = function(name, concurrency, handler) { this.setHandler(name, handler); return this._initProcess().then(() => { - return this.start(concurrency); + return this.start(concurrency, name); }); }; -Queue.prototype.start = function(concurrency) { - return this.run(concurrency).catch(err => { +Queue.prototype.start = function(concurrency, name) { + return this.run(concurrency, name).catch(err => { this.emit('error', err, 'error running queue'); throw err; }); @@ -860,7 +860,7 @@ Queue.prototype.isPaused = async function(isLocal) { } }; -Queue.prototype.run = function(concurrency) { +Queue.prototype.run = function(concurrency, handlerName) { const promises = []; return this.isReady() @@ -874,7 +874,7 @@ Queue.prototype.run = function(concurrency) { while (concurrency--) { promises.push( new Promise(resolve => { - this.processJobs(concurrency, resolve); + this.processJobs(`${handlerName}:${concurrency}`, resolve); }) ); } @@ -1279,7 +1279,9 @@ Queue.prototype.whenCurrentJobsFinished = function() { return this.bclient.connect(); }); - return Promise.all(this.processing).then(() => forcedReconnection); + return Promise.all(Object.values(this.processing)).then( + () => forcedReconnection + ); }; // diff --git a/test/test_queue.js b/test/test_queue.js index 77513ac18..1e49a6af7 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -1287,6 +1287,34 @@ describe('Queue', () => { }); }); + it('should wait for all jobs when closing queue with named processors', async () => { + let processedA = false; + + const startProcessing = new Promise(resolve => { + queue.process('jobA', async () => { + resolve(); + return new Promise(resolve => { + setTimeout(() => { + processedA = true; + resolve(); + }, 500); + }); + }); + }); + + queue.process('jobB', async () => {}); + + queue.add('jobA', {}); + + await startProcessing; + + expect(processedA).to.be.eq(false); + + await queue.close(); + + expect(processedA).to.be.eq(true); + }); + it('processes several stalled jobs when starting several queues', function(done) { this.timeout(50000);