diff --git a/.travis.yml b/.travis.yml index dcbd926..facf12e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,4 +2,7 @@ sudo: false language: node_js script: npm test node_js: -- '0.12' \ No newline at end of file +- '4' +deploy: + api_key: + secure: opyL9JEWNrV9WSs5DU49r+X2ffhuVMRSv7qbOHQvlGCIGjeWPWXD9CwBZiqWYOUM55FSVxTe5zslGMATrCd1QJW9CDOmOTvujq5mvfLuEPnJzzSR0SNdXZ6FstUQ06G7yhqKzJUvVWwRY1VOQ+XW0edDJ0+OYF87UYW+WhIFdDvSEREY+Nolihe7zPdSzudd43GMcJ/Fyd3iKx1v2ZrNX2X5WA38evbujws7bBRBH1+J3ispuR0T2ysiCME98QDvDbITFc7ku5E3J2lVLeF7n4AwvEouOLCt31hQVjeEzzlHBrn1Z9L5I+PgO9dCxjTBiYEwNBn2DPlKpMM7EGmLoY6IRUba/c7y0Fpa44ChdepVIW9nip+HdWO6XRABMpO5QY6i+G0SnO5JHAjjQV8YywKB7zJSuA4jTHrW9+/QDBApcoy2M2EE/L851ZMlc+D8neDT2Y2ZK5kmxSjC4aDKwiL6fzKnCVhtjfl8YRTX+zemDA+bZ00qv/A6NzfWs6b/qtZn+dXlQZ0SvKxVlJUr0a0lBMLb9EPAajyY+lEJrN7GuNjSpxCxAbJmIPlukHoxIn+RAbnBs5BNCqTsqh75Nf91tiERWIPgkQ/eo0GArnTtw6PqDvWXAwFg0Mp82IgoWs2nUo5FKOhTT0Z5fiDGib3e5fD09BCgMqs/NL0g928= diff --git a/README.md b/README.md index 3d4beaf..4765050 100644 --- a/README.md +++ b/README.md @@ -2,25 +2,40 @@ ## Like Promise.all, only less so -This module could more descriptively be named Promise.allButNotAllAtOnce. It takes an array of functions, each of which return a promise, and returns a promise which resolves once all those promises have resolved, or otherwise rejects... very similar to `Promise.all`. The difference is that a maximum of `n` promises are created at any one time. This is useful for rate-limiting asynchronous calls (e.g. `fetch`, `mongoose`...); +This module could more descriptively be named Promise.allButNotAllAtOnce. It takes an array of functions, each of which return a promise, and returns a promise which resolves once all those promises have resolved, or otherwise rejects... very similar to `Promise.all`. The difference is that a maximum of `n` promises are created at any one time. This is useful for rate-limiting asynchronous calls (e.g. `fetch`, `mongoose`...) + +### *** New feature *** +Now supports throttling of potentially infinite queues of Promises (see notes on the `Queue` constructor below) ## About the name -In Devon people will often promise to do things 'directly' `[drekt-lee] `, meaning they'll do it when they're good and ready, possibly never. Example usage: +In the West Country people will often promise to do things 'directly' `[drekt-lee]`, meaning they'll do it when they're good and ready, possibly never. Example usage: > I'll wash the dishes directly, my lover ## Usage ``` -var Directly = require('directly'); -var urls = []; // a big array of urls -var fetchers = urls.map(function (url) { +const directly = require('directly'); +const urls = []; // a big array of urls +const fetchers = urls.map(function (url) { return function () { return fetch(url); } }); -var throttledRequests = new Directly(10, fetchers) + +directly(10, fetchers) + .then(function (results) { + // handle exactly as if it was a Promise.all() + }); + +``` + +Can also be called as a constructor (in which case the `.run()` method should be used) + +``` +const Directly = require('Directly'); +const throttledRequests = new Directly(10, fetchers) throttledRequests .run() @@ -28,16 +43,26 @@ throttledRequests // handle exactly as if it was a Promise.all() }) + // can be used to stop the directly instance prematurely +throttledRequests.terminate() ``` -Can also be called as a function (which calls the `.run()` method internally) +To handle an infinite queue of promises use the `Queue` class to wrap your arrya of functions ``` -var directly = require('directly'); +fetchers = directly.Queue(fetchers); directly(10, fetchers) - .then(function (results) { - // handle exactly as if it was a Promise.all() + .catch(function (errorObject) { + // You can handle any errors in here + // The error object has 3 properties + // error: The error thrown + // nextError: A promise which will reject the next time an error is encountered + // terminate: A function to call which will terminate the directly instance }); + +// use push to add to the execution queue. Will work even if the queue has fallen idle +fetchers.push(func1, func2, func3) ``` + *Based on an idea originally developed [at FT](https://github.com/Financial-Times/next-user-preferences-api-v2/blob/master/lib/promise-throttle.js)* diff --git a/directly.js b/directly.js index b58bac9..c19d784 100644 --- a/directly.js +++ b/directly.js @@ -1,64 +1,118 @@ 'use strict'; -var Directly = function (concurrence, funcs) { +function getRemover (arr, target) { + return () => { + arr.splice(arr.indexOf(target), 1); + }; +} - if (!Promise) { - throw 'Directly requires es6 Promises'; +class Directly { + constructor (concurrence, funcs) { + if (!(this instanceof Directly)) { + return new Directly(concurrence, funcs).run(); + } + this.results = []; + this.concurrence = concurrence; + this.funcs = funcs; + this.terminates = Array.isArray(this.funcs); + this.cancelled = false; + if (!Array.isArray(this.funcs)) { + this.funcs.attachDirectlyInstance(this); + } + this.competitors = []; } - if (!(this instanceof Directly)) { - return new Directly(concurrence, funcs).run(); - } - this.results = []; - this.concurrence = concurrence; - this.funcs = funcs; - this.competitors = []; -}; + run () { + if (this.terminates) { + if (this.funcs.length <= this.concurrence) { + return Promise.all(this.funcs.map(func => func())); + } -Directly.prototype.run = function () { + while (this.concurrence - this.competitors.length) { + this.executeOne(); + } + this.startRace(); - if (this.funcs.length <= this.concurrence) { - return Promise.all(this.funcs.map(function (func) { - return func(); - })); + + } else if (!this.running) { + // never take the Promise.all shortcut as even if the initial list is short, it + // could easily grow to exceed the concurrence limit. + while (this.funcs.length && this.concurrence - this.competitors.length) { + this.executeOne(); + } + this.startRace(); + } + this.running === true; + if (!this.resolve) { + return this.instancePromise(); + } } - while (this.concurrence--) { - this.executeOne(); + instancePromise () { + return new Promise((resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + }); } - this.startRace(); - return new Promise(function (resolve, reject) { - this.resolve = resolve; - this.reject = reject; - }.bind(this)); -}; + executeOne () { + const promise = this.funcs.shift()(); -Directly.prototype.executeOne = function () { - var promise = this.funcs.shift()(); - var competitors = this.competitors; + this.results.push(promise); + this.competitors.push(promise); + const remove = getRemover(this.competitors, promise) + promise.then(remove, remove); + } - this.results.push(promise); - competitors.push(promise); + startRace () { + const race = this.race = Promise.race(this.competitors); - promise.then(function () { - competitors.splice(competitors.indexOf(promise), 1); - }); -}; + race + .then(() => { + this.rejoinRace(race); + }, err => { + + if (this.terminates) { + this.reject(err); + } else { + // give the ability to handle future errors; + const reject = this.reject; + const nextPromise = this.instancePromise(); + reject({ + error: err, + nextError: nextPromise, + terminate: this.terminate.bind(this) + }); + this.rejoinRace(race); + } + }); + } -Directly.prototype.startRace = function () { - Promise.race(this.competitors) - .then(function (index) { + rejoinRace (race) { + if (this.race === race) { if (!this.funcs.length) { - return this.resolve(Promise.all(this.results)); + if (this.terminates) { + return this.resolve(Promise.all(this.results)); + } else { + this.running = false; + } + } else if (!this.cancelled) { + this.executeOne(); + this.startRace(); } + } + } - this.executeOne(); - this.startRace(); + terminate () { + this.resolve(); + this.cancelled = true; + } + +} - }.bind(this), function (err) { - this.reject(err); - }.bind(this)); +module.exports = function SmartConstructor (concurrence, funcs) { + const directly = new Directly(concurrence, funcs) + return (this instanceof SmartConstructor) ? directly : directly.run(); }; -module.exports = Directly; +module.exports.Queue = require('./lib/queue'); diff --git a/lib/queue.js b/lib/queue.js new file mode 100644 index 0000000..9458b97 --- /dev/null +++ b/lib/queue.js @@ -0,0 +1,27 @@ +'use strict'; + +class Queue { + constructor (items) { + this.items = items || []; + } + + attachDirectlyInstance (directly) { + this.directly = directly; + } + + push (func) { + this.items.push.apply(this.items, [].slice.call(arguments)); + this.directly.run(); + } + + shift () { + return this.items.shift(); + } + + get length () { + return this.items.length; + } + +} + +module.exports = Queue; diff --git a/test/directly.test.js b/test/directly.test.js index ed77cda..a3f2e01 100644 --- a/test/directly.test.js +++ b/test/directly.test.js @@ -1,7 +1,5 @@ 'use strict'; -require('es6-promise').polyfill(); - var expect = require('chai').expect; var Directly = require('../directly'); @@ -222,3 +220,136 @@ describe('functional calling', function () { }); }) + +describe('infinite queueing', function () { + + it('should call all on startup if throttle limit not reached', function (done) { + const result = []; + const funcs = new Directly.Queue([1, 2].map(i => { + return () => { + result.push(i); + return Promise.resolve(i); + } + })) + + new Directly(3, funcs).run() + setTimeout(() => { + expect(result).to.eql([1, 2]); + done(); + }, 50) + }); + + it('should only call up to the throttle limit on startup', function (done) { + const result = []; + const funcs = new Directly.Queue([1, 2, 3].map(i => { + return () => { + result.push(i); + return new Promise(() => null); + } + })) + + new Directly(2, funcs).run() + setTimeout(() => { + expect(result).to.eql([1, 2]); + done(); + }, 50) + }); + + it('should seamlessly call any promise function added after startup', function (done) { + const promises = setupPromises(3); + const funcs = new Directly.Queue(promises.functions); + + new Directly(3, funcs).run(); + let spyCalled = false; + const spy = () => { + spyCalled = true; + return new Promise(() => null) + }; + funcs.push(spy); + + setTimeout(() => { + expect(spyCalled).to.be.false; + promises.promises.forEach(promise => promise.resolve()); + + setTimeout(() => { + expect(spyCalled).to.be.true; + done(); + }, 50) + }, 50); + + }); + + it('should restart if functions are pushed when it is idling', function (done) { + const funcs = new Directly.Queue([1, 2].map(i => { + return () => { + return Promise.resolve(i); + } + })) + + new Directly(3, funcs).run() + let spyCalled = false; + const spy = () => { + spyCalled = true; + return new Promise(() => null) + }; + funcs.push(spy); + + setTimeout(() => { + expect(spyCalled).to.be.true; + done(); + }, 50) + }); + + it('should still apply concurrence limits to functions added after startup', function (done) { + const funcs = new Directly.Queue([1, 2].map(i => { + return () => { + return Promise.resolve(i); + } + })) + + new Directly(3, funcs).run() + let spyCalled = false; + const spy = () => { + spyCalled = true; + return new Promise(() => null) + }; + funcs.push(() => new Promise(() => null)); + funcs.push(() => new Promise(() => null)); + funcs.push(() => new Promise(() => null)); + funcs.push(spy); + + setTimeout(() => { + expect(spyCalled).to.be.false; + done(); + }, 50) + }); + + it('should provide a promisey interface to handle errors', function (done) { + const funcs = new Directly.Queue([() => { + return Promise.reject('test error1'); + }]); + + var d = new Directly(3, funcs); + + var p = d.run() + .catch(err1 => { + expect(err1.error).to.equal('test error1'); + + funcs.push(() => Promise.reject('test error2')); + + err1.nextError.catch(err2 => { + expect(err2.error).to.equal('test error2'); + err2.terminate(); + let spyCalled = false; + const spy = () => { + spyCalled = true; + return new Promise(() => null) + }; + setTimeout(() => { + expect(spyCalled).to.be.false; + done(); + }, 50) + }); + }) + }); +})