diff --git a/index.js b/index.js index 05c8165..85d8599 100644 --- a/index.js +++ b/index.js @@ -79,16 +79,37 @@ export default async function pMap( } else { errors.push(error); resolvingCount--; - next(); + + // In that case we can't really continue regardless of stopOnError state + // since an iterable is likely to continue throwing after it throws once. + // If we continue calling next() indefinitely we will likely end up + // in an infinite loop of failed iteration. + try { + next(); + } catch (error) { + isRejected = true; + reject(error); + } } } })(); }; for (let index = 0; index < concurrency; index++) { - next(); + // Catch errors from the iterable.next() call + // In that case we can't really continue regardless of stopOnError state + // since an iterable is likely to continue throwing after it throws once. + // If we continue calling next() indefinitely we will likely end up + // in an infinite loop of failed iteration. + try { + next(); + } catch (error) { + isRejected = true; + reject(error); + break; + } - if (isIterableDone) { + if (isIterableDone || isRejected) { break; } } diff --git a/test.js b/test.js index d38ca39..fdcb7c5 100644 --- a/test.js +++ b/test.js @@ -40,6 +40,39 @@ const mapper = async ([value, ms]) => { return value; }; +class ThrowingIterator { + constructor(max, throwOnIndex) { + this._max = max; + this._throwOnIndex = throwOnIndex; + this.index = 0; + this[Symbol.iterator] = this[Symbol.iterator].bind(this); + } + + [Symbol.iterator]() { + let index = 0; + const max = this._max; + const throwOnIndex = this._throwOnIndex; + return { + next: (() => { + try { + if (index === throwOnIndex) { + throw new Error(`throwing on index ${index}`); + } + + const item = {value: index, done: index === max}; + return item; + } finally { + index++; + this.index = index; + } + // eslint is wrong - bind is needed else the next() call cannot update + // this.index, which we need to track how many times the iterator was called + // eslint-disable-next-line no-extra-bind + }).bind(this) + }; + } +} + test('main', async t => { const end = timeSpan(); t.deepEqual(await pMap(sharedInput, mapper), [10, 20, 30]); @@ -128,8 +161,67 @@ test('do not run mapping after stop-on-error happened', async t => { await delay(100); throw new Error('Oops!'); } - }) + }, + {concurrency: 1}) ); await delay(500); - t.deepEqual(mappedValues, [1, 3]); + t.deepEqual(mappedValues, [1]); +}); + +test('catches exception from source iterator - 1st item', async t => { + const input = new ThrowingIterator(100, 0); + const mappedValues = []; + const error = await t.throwsAsync(pMap( + input, + async value => { + mappedValues.push(value); + await delay(100); + return value; + }, + {concurrency: 1, stopOnError: true} + )); + t.is(error.message, 'throwing on index 0'); + t.is(input.index, 1); + await delay(300); + t.deepEqual(mappedValues, []); +}); + +// The 2nd iterable item throwing is distinct from the 1st when concurrency is 1 because +// it means that the source next() is invoked from next() and not from +// the constructor +test('catches exception from source iterator - 2nd item', async t => { + const input = new ThrowingIterator(100, 1); + const mappedValues = []; + await t.throwsAsync(pMap( + input, + async value => { + mappedValues.push(value); + await delay(100); + return value; + }, + {concurrency: 1, stopOnError: true} + )); + await delay(300); + t.is(input.index, 2); + t.deepEqual(mappedValues, [0]); +}); + +// The 2nd iterable item throwing after a 1st item mapper exception, with stopOnError false, +// is distinct from other cases because our next() is called from a catch block +test('catches exception from source iterator - 2nd item after 1st item mapper throw', async t => { + const input = new ThrowingIterator(100, 1); + const mappedValues = []; + const error = await t.throwsAsync(pMap( + input, + async value => { + mappedValues.push(value); + await delay(100); + throw new Error('mapper threw error'); + }, + {concurrency: 1, stopOnError: false} + )); + await delay(300); + t.is(error.message, 'throwing on index 1'); + t.is(input.index, 2); + t.deepEqual(mappedValues, [0]); });