From 0ef22f8dea47616b9e120a0d0c851eafbd74bf79 Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Fri, 19 Aug 2016 10:51:46 +0930 Subject: [PATCH 01/10] Allow custom job ids to be specified in the job options --- lib/job.js | 2 +- lib/scripts.js | 9 +++++---- test/test_job.js | 7 +++++++ 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/lib/job.js b/lib/job.js index 7e72358e6..6dac9afbe 100644 --- a/lib/job.js +++ b/lib/job.js @@ -40,7 +40,7 @@ var Job = function(queue, data, opts){ function addJob(queue, job){ var jobData = job.toData(); var toKey = _.bind(queue.toKey, queue); - return scripts.addJob(queue.client, toKey, job.opts.lifo, jobData); + return scripts.addJob(queue.client, toKey, job.opts.lifo, job.opts.jobId, jobData); } Job.create = function(queue, data, opts){ diff --git a/lib/scripts.js b/lib/scripts.js index 245cba75e..1756b6dae 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -65,7 +65,7 @@ var scripts = { return result === 1; }); }, - addJob: function(client, toKey, lifo, job){ + addJob: function(client, toKey, lifo, customJobId, job){ var jobArgs = _.flatten(_.toPairs(job)); var keys = _.map(['wait', 'paused', 'meta-paused', 'jobs', 'id', 'delayed'], function(name){ @@ -74,11 +74,11 @@ var scripts = { var baseKey = toKey(''); var argvs = _.map(jobArgs, function(arg, index){ - return ', ARGV['+(index+2)+']'; + return ', ARGV['+(index+3)+']'; }) var script = [ - 'local jobId = redis.call("INCR", KEYS[5])', + 'local jobId = ARGV[2] == "" and redis.call("INCR", KEYS[5]) or ARGV[2]', 'redis.call("HMSET", ARGV[1] .. jobId' + argvs.join('') + ')', ]; @@ -87,7 +87,7 @@ var scripts = { var delayTimestamp = job.timestamp + job.delay; if(job.delay && delayTimestamp > Date.now()){ script.push.apply(script, [ - ' local timestamp = tonumber(ARGV[' + (argvs.length + 2) + ']) * 0x1000 + bit.band(jobId, 0xfff)', + ' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(jobId, 0xfff)', ' redis.call("ZADD", KEYS[6], timestamp, jobId)', ' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))', ' return jobId', @@ -121,6 +121,7 @@ var scripts = { args.push.apply(args, keys); args.push(baseKey); + args.push(customJobId || ''); args.push.apply(args, jobArgs); args.push(delayTimestamp); diff --git a/test/test_job.js b/test/test_job.js index a75698b2b..4156a3a4d 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -56,6 +56,13 @@ describe('Job', function(){ expect(storedJob.opts.testOpt).to.be('enabled'); }); }); + + it('should use the custom jobId if one is provided', function() { + var customJobId = 'customjob'; + return Job.create(queue, data, { jobId: customJobId }).then(function(createdJob){ + expect(createdJob.jobId).to.be.equal(customJobId); + }); + }); }); describe('.remove', function () { From 7c5940c2f52534a3172a8e67db184131b51a5bcf Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Fri, 19 Aug 2016 11:40:55 +0930 Subject: [PATCH 02/10] Treat jobIds as strings --- lib/job.js | 2 +- lib/scripts.js | 2 +- test/test_job.js | 15 +++++++++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/lib/job.js b/lib/job.js index 6dac9afbe..dd897ad4c 100644 --- a/lib/job.js +++ b/lib/job.js @@ -65,7 +65,7 @@ Job.fromId = function(queue, jobId){ } return queue.client.hgetallAsync(queue.toKey(jobId)).then(function(jobData){ if(jobData){ - return Job.fromData(queue, +jobId, jobData); + return Job.fromData(queue, jobId, jobData); }else{ return jobData; } diff --git a/lib/scripts.js b/lib/scripts.js index 1756b6dae..ab2d39324 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -106,7 +106,7 @@ var scripts = { ' redis.call("' + push + '", KEYS[2], jobId)', 'end', 'redis.call("PUBLISH", KEYS[4], jobId)', - 'return jobId' + 'return jobId .. ""' ]); scriptName = 'addJob'+push; diff --git a/test/test_job.js b/test/test_job.js index 4156a3a4d..2cea81e59 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -63,6 +63,21 @@ describe('Job', function(){ expect(createdJob.jobId).to.be.equal(customJobId); }); }); + + it('should process jobs with custom jobIds', function(done) { + var customJobId = 'customjob'; + queue.process(function () { + return Promise.resolve(); + }); + + queue.add({ foo: 'bar' }, { jobId: customJobId }); + + queue.on('completed', function(job) { + if (job.opts.jobId == customJobId) { + done(); + } + }); + }); }); describe('.remove', function () { From 2e9940baf7e514e6b5a578d98131ab88bb9b5f2a Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Fri, 19 Aug 2016 12:25:08 +0930 Subject: [PATCH 03/10] Delayed jobs should still use a numeric id to offset the timestamp --- lib/scripts.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/scripts.js b/lib/scripts.js index ab2d39324..f6a084584 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -78,7 +78,8 @@ var scripts = { }) var script = [ - 'local jobId = ARGV[2] == "" and redis.call("INCR", KEYS[5]) or ARGV[2]', + 'local numericJobId = redis.call("INCR", KEYS[5])', + 'local jobId = ARGV[2] == "" and numericJobId or ARGV[2]', 'redis.call("HMSET", ARGV[1] .. jobId' + argvs.join('') + ')', ]; @@ -87,7 +88,7 @@ var scripts = { var delayTimestamp = job.timestamp + job.delay; if(job.delay && delayTimestamp > Date.now()){ script.push.apply(script, [ - ' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(jobId, 0xfff)', + ' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(numericJobId, 0xfff)', ' redis.call("ZADD", KEYS[6], timestamp, jobId)', ' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))', ' return jobId', From 02f40096e68c7f72f937d76f8ca61e58dc371388 Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Fri, 19 Aug 2016 12:43:13 +0930 Subject: [PATCH 04/10] Don't overwrite if the key already exists --- lib/scripts.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/scripts.js b/lib/scripts.js index f6a084584..bc6788bce 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -80,7 +80,9 @@ var scripts = { var script = [ 'local numericJobId = redis.call("INCR", KEYS[5])', 'local jobId = ARGV[2] == "" and numericJobId or ARGV[2]', - 'redis.call("HMSET", ARGV[1] .. jobId' + argvs.join('') + ')', + 'local fullJobId = ARGV[1] .. jobId', + 'if redis.call("EXISTS", fullJobId) == 1 then return jobId end', + 'redis.call("HMSET", fullJobId' + argvs.join('') + ')', ]; var scriptName; From e744956d1321621fe6426790e6954d787128f729 Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Fri, 19 Aug 2016 14:09:59 +0930 Subject: [PATCH 05/10] Updating readme --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 87a7fb169..e930fa928 100644 --- a/README.md +++ b/README.md @@ -414,6 +414,10 @@ __Arguments__ of the queue instead of the left (default false) opts.timeout {Number} The number of milliseconds after which the job should be fail with a timeout error [optional] + opts.jobId {Number|String} Override the job ID - by default, the job ID is a + unique integer, but you can use this setting to change it to something else, + for example some kind of unique key that will prevent two jobs of the same + value from existing at the same time returns {Promise} A promise that resolves when the job has been succesfully added to the queue (or rejects if some error occured). On success, the promise resolves to the new Job. From 1b765e55161a7ed8fa871e8ada428ff5ce12aef5 Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Mon, 22 Aug 2016 09:40:28 +0930 Subject: [PATCH 06/10] Use an 'if/else' for readability in the 'addJob' script --- lib/scripts.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/scripts.js b/lib/scripts.js index bc6788bce..c327043bb 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -79,7 +79,8 @@ var scripts = { var script = [ 'local numericJobId = redis.call("INCR", KEYS[5])', - 'local jobId = ARGV[2] == "" and numericJobId or ARGV[2]', + 'local jobId', + 'if ARGV[2] == "" then jobId = numericJobId else jobId = ARGV[2] end', 'local fullJobId = ARGV[1] .. jobId', 'if redis.call("EXISTS", fullJobId) == 1 then return jobId end', 'redis.call("HMSET", fullJobId' + argvs.join('') + ')', From 77f6b775cfc1b55f7f2f5bfb7e626babcbba58d2 Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Mon, 22 Aug 2016 09:40:47 +0930 Subject: [PATCH 07/10] Rename 'numericJobId' to 'jobCounter' --- lib/scripts.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/scripts.js b/lib/scripts.js index c327043bb..77b4a42f5 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -78,9 +78,9 @@ var scripts = { }) var script = [ - 'local numericJobId = redis.call("INCR", KEYS[5])', + 'local jobCounter = redis.call("INCR", KEYS[5])', 'local jobId', - 'if ARGV[2] == "" then jobId = numericJobId else jobId = ARGV[2] end', + 'if ARGV[2] == "" then jobId = jobCounter else jobId = ARGV[2] end', 'local fullJobId = ARGV[1] .. jobId', 'if redis.call("EXISTS", fullJobId) == 1 then return jobId end', 'redis.call("HMSET", fullJobId' + argvs.join('') + ')', @@ -91,7 +91,7 @@ var scripts = { var delayTimestamp = job.timestamp + job.delay; if(job.delay && delayTimestamp > Date.now()){ script.push.apply(script, [ - ' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(numericJobId, 0xfff)', + ' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)', ' redis.call("ZADD", KEYS[6], timestamp, jobId)', ' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))', ' return jobId', From 6c8e29dca36f1c63ae57180900a857b9ab5bbaa2 Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Mon, 22 Aug 2016 09:42:35 +0930 Subject: [PATCH 08/10] Renaming 'fullJobId' to 'jobIdKey' in the 'addJob' script --- lib/scripts.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/scripts.js b/lib/scripts.js index 77b4a42f5..9823a5002 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -81,9 +81,9 @@ var scripts = { 'local jobCounter = redis.call("INCR", KEYS[5])', 'local jobId', 'if ARGV[2] == "" then jobId = jobCounter else jobId = ARGV[2] end', - 'local fullJobId = ARGV[1] .. jobId', - 'if redis.call("EXISTS", fullJobId) == 1 then return jobId end', - 'redis.call("HMSET", fullJobId' + argvs.join('') + ')', + 'local jobIdKey = ARGV[1] .. jobId', + 'if redis.call("EXISTS", jobIdKey) == 1 then return jobId end', + 'redis.call("HMSET", jobIdKey' + argvs.join('') + ')', ]; var scriptName; From ceb827f3335b378f70a292158472cb05c74697ce Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Mon, 22 Aug 2016 09:50:09 +0930 Subject: [PATCH 09/10] Changing the order of params to the 'addJob' script --- lib/job.js | 2 +- lib/scripts.js | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/job.js b/lib/job.js index dd897ad4c..66bca7404 100644 --- a/lib/job.js +++ b/lib/job.js @@ -40,7 +40,7 @@ var Job = function(queue, data, opts){ function addJob(queue, job){ var jobData = job.toData(); var toKey = _.bind(queue.toKey, queue); - return scripts.addJob(queue.client, toKey, job.opts.lifo, job.opts.jobId, jobData); + return scripts.addJob(queue.client, toKey, jobData, { lifo: job.opts.lifo, customJobId: job.opts.jobId }); } Job.create = function(queue, data, opts){ diff --git a/lib/scripts.js b/lib/scripts.js index 9823a5002..c452c309b 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -65,7 +65,10 @@ var scripts = { return result === 1; }); }, - addJob: function(client, toKey, lifo, customJobId, job){ + addJob: function(client, toKey, job, opts){ + opts = opts || {}; + opts.lifo = !!(opts.lifo); + var jobArgs = _.flatten(_.toPairs(job)); var keys = _.map(['wait', 'paused', 'meta-paused', 'jobs', 'id', 'delayed'], function(name){ @@ -99,7 +102,7 @@ var scripts = { scriptName = 'addJob:delayed'; }else{ - var push = (lifo ? 'R' : 'L') + 'PUSH'; + var push = (opts.lifo ? 'R' : 'L') + 'PUSH'; // // Whe check for the meta-paused key to decide if we are paused or not // (since an empty list and !EXISTS are not really the same) @@ -125,7 +128,7 @@ var scripts = { args.push.apply(args, keys); args.push(baseKey); - args.push(customJobId || ''); + args.push(opts.customJobId || ''); args.push.apply(args, jobArgs); args.push(delayTimestamp); From 73695155d8be9d68dbaa644db8c104925c7fc7d3 Mon Sep 17 00:00:00 2001 From: Cameron Price-Austin Date: Mon, 22 Aug 2016 09:52:36 +0930 Subject: [PATCH 10/10] Making the readme a bit clearer on the 'jobId' option --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index e930fa928..4939c955c 100644 --- a/README.md +++ b/README.md @@ -415,9 +415,9 @@ __Arguments__ opts.timeout {Number} The number of milliseconds after which the job should be fail with a timeout error [optional] opts.jobId {Number|String} Override the job ID - by default, the job ID is a - unique integer, but you can use this setting to change it to something else, - for example some kind of unique key that will prevent two jobs of the same - value from existing at the same time + unique integer, but you can use this setting to override it. If you use this option, + it is up to you to ensure the jobId is unique. If you attempt to add a job with an + id that already exists, it will not be added. returns {Promise} A promise that resolves when the job has been succesfully added to the queue (or rejects if some error occured). On success, the promise resolves to the new Job.