Skip to content

Commit

Permalink
Merge pull request #3 from wheresrhys/infinite
Browse files Browse the repository at this point in the history
Infinite
  • Loading branch information
wheresrhys committed Sep 25, 2015
2 parents 6225480 + 66a4147 commit 6ad283a
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 56 deletions.
5 changes: 4 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ sudo: false
language: node_js
script: npm test
node_js:
- '0.12'
- '4'
deploy:
api_key:
secure: opyL9JEWNrV9WSs5DU49r+X2ffhuVMRSv7qbOHQvlGCIGjeWPWXD9CwBZiqWYOUM55FSVxTe5zslGMATrCd1QJW9CDOmOTvujq5mvfLuEPnJzzSR0SNdXZ6FstUQ06G7yhqKzJUvVWwRY1VOQ+XW0edDJ0+OYF87UYW+WhIFdDvSEREY+Nolihe7zPdSzudd43GMcJ/Fyd3iKx1v2ZrNX2X5WA38evbujws7bBRBH1+J3ispuR0T2ysiCME98QDvDbITFc7ku5E3J2lVLeF7n4AwvEouOLCt31hQVjeEzzlHBrn1Z9L5I+PgO9dCxjTBiYEwNBn2DPlKpMM7EGmLoY6IRUba/c7y0Fpa44ChdepVIW9nip+HdWO6XRABMpO5QY6i+G0SnO5JHAjjQV8YywKB7zJSuA4jTHrW9+/QDBApcoy2M2EE/L851ZMlc+D8neDT2Y2ZK5kmxSjC4aDKwiL6fzKnCVhtjfl8YRTX+zemDA+bZ00qv/A6NzfWs6b/qtZn+dXlQZ0SvKxVlJUr0a0lBMLb9EPAajyY+lEJrN7GuNjSpxCxAbJmIPlukHoxIn+RAbnBs5BNCqTsqh75Nf91tiERWIPgkQ/eo0GArnTtw6PqDvWXAwFg0Mp82IgoWs2nUo5FKOhTT0Z5fiDGib3e5fD09BCgMqs/NL0g928=
45 changes: 35 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,67 @@

## Like Promise.all, only less so

This module could more descriptively be named Promise.allButNotAllAtOnce. It takes an array of functions, each of which return a promise, and returns a promise which resolves once all those promises have resolved, or otherwise rejects... very similar to `Promise.all`. The difference is that a maximum of `n` promises are created at any one time. This is useful for rate-limiting asynchronous calls (e.g. `fetch`, `mongoose`...);
This module could more descriptively be named Promise.allButNotAllAtOnce. It takes an array of functions, each of which return a promise, and returns a promise which resolves once all those promises have resolved, or otherwise rejects... very similar to `Promise.all`. The difference is that a maximum of `n` promises are created at any one time. This is useful for rate-limiting asynchronous calls (e.g. `fetch`, `mongoose`...)

### *** New feature ***
Now supports throttling of potentially infinite queues of Promises (see notes on the `Queue` constructor below)

## About the name
In Devon people will often promise to do things 'directly' `[drekt-lee] `, meaning they'll do it when they're good and ready, possibly never. Example usage:
In the West Country people will often promise to do things 'directly' `[drekt-lee]`, meaning they'll do it when they're good and ready, possibly never. Example usage:

> I'll wash the dishes directly, my lover
## Usage

```
var Directly = require('directly');
var urls = []; // a big array of urls
var fetchers = urls.map(function (url) {
const directly = require('directly');
const urls = []; // a big array of urls
const fetchers = urls.map(function (url) {
return function () {
return fetch(url);
}
});
var throttledRequests = new Directly(10, fetchers)
directly(10, fetchers)
.then(function (results) {
// handle exactly as if it was a Promise.all()
});
```

Can also be called as a constructor (in which case the `.run()` method should be used)

```
const Directly = require('Directly');
const throttledRequests = new Directly(10, fetchers)
throttledRequests
.run()
.then(function (results) {
// handle exactly as if it was a Promise.all()
})
// can be used to stop the directly instance prematurely
throttledRequests.terminate()
```

Can also be called as a function (which calls the `.run()` method internally)
To handle an infinite queue of promises use the `Queue` class to wrap your arrya of functions

```
var directly = require('directly');
fetchers = directly.Queue(fetchers);
directly(10, fetchers)
.then(function (results) {
// handle exactly as if it was a Promise.all()
.catch(function (errorObject) {
// You can handle any errors in here
// The error object has 3 properties
// error: The error thrown
// nextError: A promise which will reject the next time an error is encountered
// terminate: A function to call which will terminate the directly instance
});
// use push to add to the execution queue. Will work even if the queue has fallen idle
fetchers.push(func1, func2, func3)
```


*Based on an idea originally developed [at FT](https://github.com/Financial-Times/next-user-preferences-api-v2/blob/master/lib/promise-throttle.js)*
140 changes: 97 additions & 43 deletions directly.js
Original file line number Diff line number Diff line change
@@ -1,64 +1,118 @@
'use strict';

var Directly = function (concurrence, funcs) {
function getRemover (arr, target) {
return () => {
arr.splice(arr.indexOf(target), 1);
};
}

if (!Promise) {
throw 'Directly requires es6 Promises';
class Directly {
constructor (concurrence, funcs) {
if (!(this instanceof Directly)) {
return new Directly(concurrence, funcs).run();
}
this.results = [];
this.concurrence = concurrence;
this.funcs = funcs;
this.terminates = Array.isArray(this.funcs);
this.cancelled = false;
if (!Array.isArray(this.funcs)) {
this.funcs.attachDirectlyInstance(this);
}
this.competitors = [];
}

if (!(this instanceof Directly)) {
return new Directly(concurrence, funcs).run();
}
this.results = [];
this.concurrence = concurrence;
this.funcs = funcs;
this.competitors = [];
};
run () {
if (this.terminates) {
if (this.funcs.length <= this.concurrence) {
return Promise.all(this.funcs.map(func => func()));
}

Directly.prototype.run = function () {
while (this.concurrence - this.competitors.length) {
this.executeOne();
}
this.startRace();

if (this.funcs.length <= this.concurrence) {
return Promise.all(this.funcs.map(function (func) {
return func();
}));

} else if (!this.running) {
// never take the Promise.all shortcut as even if the initial list is short, it
// could easily grow to exceed the concurrence limit.
while (this.funcs.length && this.concurrence - this.competitors.length) {
this.executeOne();
}
this.startRace();
}
this.running === true;
if (!this.resolve) {
return this.instancePromise();
}
}

while (this.concurrence--) {
this.executeOne();
instancePromise () {
return new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
this.startRace();

return new Promise(function (resolve, reject) {
this.resolve = resolve;
this.reject = reject;
}.bind(this));
};
executeOne () {
const promise = this.funcs.shift()();

Directly.prototype.executeOne = function () {
var promise = this.funcs.shift()();
var competitors = this.competitors;
this.results.push(promise);
this.competitors.push(promise);
const remove = getRemover(this.competitors, promise)
promise.then(remove, remove);
}

this.results.push(promise);
competitors.push(promise);
startRace () {
const race = this.race = Promise.race(this.competitors);

promise.then(function () {
competitors.splice(competitors.indexOf(promise), 1);
});
};
race
.then(() => {
this.rejoinRace(race);
}, err => {

if (this.terminates) {
this.reject(err);
} else {
// give the ability to handle future errors;
const reject = this.reject;
const nextPromise = this.instancePromise();
reject({
error: err,
nextError: nextPromise,
terminate: this.terminate.bind(this)
});
this.rejoinRace(race);
}
});
}

Directly.prototype.startRace = function () {
Promise.race(this.competitors)
.then(function (index) {
rejoinRace (race) {
if (this.race === race) {
if (!this.funcs.length) {
return this.resolve(Promise.all(this.results));
if (this.terminates) {
return this.resolve(Promise.all(this.results));
} else {
this.running = false;
}
} else if (!this.cancelled) {
this.executeOne();
this.startRace();
}
}
}

this.executeOne();
this.startRace();
terminate () {
this.resolve();
this.cancelled = true;
}

}

}.bind(this), function (err) {
this.reject(err);
}.bind(this));
module.exports = function SmartConstructor (concurrence, funcs) {
const directly = new Directly(concurrence, funcs)
return (this instanceof SmartConstructor) ? directly : directly.run();
};

module.exports = Directly;
module.exports.Queue = require('./lib/queue');
27 changes: 27 additions & 0 deletions lib/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

class Queue {
constructor (items) {
this.items = items || [];
}

attachDirectlyInstance (directly) {
this.directly = directly;
}

push (func) {
this.items.push.apply(this.items, [].slice.call(arguments));
this.directly.run();
}

shift () {
return this.items.shift();
}

get length () {
return this.items.length;
}

}

module.exports = Queue;
Loading

0 comments on commit 6ad283a

Please sign in to comment.