Skip to content

Commit

Permalink
Add support for rate limiting
Browse files Browse the repository at this point in the history
Fix OptimalBits#362
Fix OptimalBits#329

This commit adds support for rate limiting of a queue. The rate limit
applies to each instance of the queue. If you have 3 threads processing
the queue, and you want to limit your jobs to 1 per second, it will be
1 job per second amongst the 3 threads.

queue#process first argument has been changed to an options object, and
concurrency now resides within this object, as well as the new rate
limiting options. (If a number is specified as the first argument to
queue#process, it will be treated as the old version of the API which I
have deprecated in favor of passing in the options object).
  • Loading branch information
Michael Leaney committed Feb 15, 2017
1 parent 1d4d3e9 commit 36e9cb0
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 13 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ __Arguments__


<a name="process"/>
#### Queue##process([concurrency,] function(job[, done]))
#### Queue##process([options,] function(job[, done]))

Defines a processing function for the jobs placed into a given Queue.

Expand Down Expand Up @@ -405,7 +405,13 @@ queue.process(function(job) { // No done callback here :)
});
```

You can specify a concurrency. Bull will then call you handler in parallel respecting this max number.
##### Valid options

| Key | Default | Description |
|---------------------|-----------|--------------------------|
| options.concurrency | 1 | You can specify a concurrency. Bull will then call your handler in parallel respecting this max number. |
| options.rateLimit | undefined | An object with the signature `{ max, duration }`. Applies rate limiting so that `max` jobs are processed in `duration` ms.


__Arguments__

Expand Down
81 changes: 70 additions & 11 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var _ = require('lodash');
var Promise = require('bluebird');
var semver = require('semver');
var debuglog = require('debuglog')('bull');
var Limiter = require('redis-rolling-rate-limiter');


/**
Expand Down Expand Up @@ -114,6 +115,7 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){
// Create queue client (used to add jobs, pause queues, etc);
//
this.client = createClient();
this.rateLimitClient = createClient();

getRedisVersion(this.client).then(function(version){
if(semver.lt(version, MINIMUM_REDIS_VERSION)){
Expand Down Expand Up @@ -337,18 +339,29 @@ Queue.prototype.close = function( doNotWaitJobs ){
@method process
*/
Queue.prototype.process = function(concurrency, handler){
Queue.prototype.process = function(options, handler){
var _this = this;
if(typeof concurrency === 'function'){
handler = concurrency;
concurrency = 1;
if(typeof options === 'function'){
handler = options;
options = {};
}

_.defaults(options, {
concurrency: 1,
rateLimit: undefined
});

// support deprecated passing of concurrency as first argument
if(typeof options === 'number') {
console.warn('[DEPRECATED] you should pass an object with { concurrency: Number } to queue#process');
options = { concurrency: options };
}

this.setHandler(handler);

var runQueueWhenReady = function(){
_this.bclient.once('ready', function(){
_this.run(concurrency).catch(function(err){
_this.run(options).catch(function(err){
console.error(err);
});
});
Expand All @@ -359,7 +372,7 @@ Queue.prototype.process = function(concurrency, handler){
this.bclient.on('error', runQueueWhenReady);
this.bclient.on('end', runQueueWhenReady);

return this.run(concurrency).catch(function(err){
return this.run(options).catch(function(err){
console.error(err);
throw err;
});
Expand Down Expand Up @@ -518,14 +531,40 @@ function pauseResumeGlobal(queue, pause){
return queue.client.eval(script, keys.length, keys[0], keys[1], keys[2], keys[3], pause ? 'paused' : 'resumed');
}

Queue.prototype.run = function(concurrency){
Queue.prototype.run = function(options){
var promises = [];
var _this = this;
var concurrency = options.concurrency;
var rateLimiter;

if (!options.rateLimit) {
rateLimiter = undefined;
} else {
var limiter = Limiter({
interval: options.rateLimit.duration,
maxInInterval: options.rateLimit.max,
redis: this.rateLimitClient,
namespace: this.keyPrefix + ':ratelimit:'
});

rateLimiter = function() {
return new Promise(function(resolve, reject) {
limiter(_this.name, function(err, timeLeft, actionsLeft) {
if (err) {
return reject(err);
}
resolve(timeLeft);
});
});
};
}

return this.moveUnlockedJobsToWait().then(function(){

while(concurrency--){
promises.push(new Promise(_this.processJobs));
promises.push(new Promise(function(resolve, reject) {
_this.processJobs(resolve, reject, { rateLimiter: rateLimiter});
}));
}

_this.startMoveUnlockedJobsToWait();
Expand Down Expand Up @@ -603,14 +642,34 @@ Queue.prototype.startMoveUnlockedJobsToWait = function() {
}
};

Queue.prototype.processJobs = function(resolve, reject){
var _this = this;
var processJobs = this.processJobs.bind(this, resolve, reject);

Queue.prototype.processJobs = function(resolve, reject, options){
var _this = this;
var processJobs = this.processJobs.bind(this, resolve, reject, options);
if(!this.closing){
var idleTime = 0;
var obeyRateLimit = function() {};

if (options.rateLimiter) {
obeyRateLimit = function() {
return options.rateLimiter()
.then(function(timeLeft) {
if (!timeLeft) { return; }
idleTime += timeLeft;
return Promise.delay(timeLeft)
.then(obeyRateLimit);
});
};
}

process.nextTick(function(){
(_this.paused || Promise.resolve())
.then(obeyRateLimit)
.then(_this.getNextJob)
.then(function(job) {
job.idleTime = idleTime;
return job;
})
.then(_this.processJob)
.then(processJobs, function(err){
console.error('Error processing job:', err);
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"disturbed": "^1.0.6",
"ioredis": "^2.5.0",
"lodash": "^4.17.4",
"redis-rolling-rate-limiter": "^0.3.0",
"redlock": "^2.1.0",
"semver": "^5.3.0",
"uuid": "^3.0.1"
Expand Down
40 changes: 40 additions & 0 deletions test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -1937,4 +1937,44 @@ describe('Queue', function () {
});
});
});

describe('Rate limiting', function() {
var queue;

beforeEach(function () {
var client = redis.createClient();
queue = utils.buildQueue();
return client.flushdb();
});

afterEach(function () {
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT));
return queue.close();
});

it('should obey the rate limit', function(done) {
var startTime = new Date().getTime();
var nbProcessed = 0;

queue.process({ rateLimit: { max: 1, duration: 1000 }}, function() {
return Promise.resolve();
});

queue.add();
queue.add();
queue.add();
queue.add();

queue.on('completed', _.after(4, function() {
try {
expect(new Date().getTime() - startTime).to.be.above(3000);
done();
} catch (e) {
done(e);
}
}));

queue.on('failed', done);
});
});
});

0 comments on commit 36e9cb0

Please sign in to comment.