Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add highWaterMark to the map and filter operator #49249

Merged
merged 10 commits into from
Aug 24, 2023
12 changes: 12 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,10 @@ showBoth();
added:
- v17.4.0
- v16.14.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/49249
description: added `highWaterMark` in options.
-->

> Stability: 1 - Experimental
Expand All @@ -2025,6 +2029,8 @@ added:
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `highWaterMark` {number} how many items to buffer while waiting for user
consumption of the mapped items. **Default:** `concurrency * 2 - 1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream mapped with the function `fn`.
Expand Down Expand Up @@ -2059,6 +2065,10 @@ for await (const result of dnsResults) {
added:
- v17.4.0
- v16.14.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/49249
description: added `highWaterMark` in options.
-->

> Stability: 1 - Experimental
Expand All @@ -2071,6 +2081,8 @@ added:
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `highWaterMark` {number} how many items to buffer while waiting for user
consumption of the filtered items. **Default:** `concurrency * 2 - 1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream filtered with the predicate `fn`.
Expand Down
54 changes: 41 additions & 13 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {
NumberIsNaN,
Promise,
PromiseReject,
PromiseResolve,
PromisePrototypeThen,
Symbol,
} = primordials;
Expand Down Expand Up @@ -83,6 +84,15 @@ function map(fn, options) {

validateInteger(concurrency, 'concurrency', 1);

let highWaterMark = concurrency - 1;
if (options?.highWaterMark != null) {
highWaterMark = MathFloor(options.highWaterMark);
}

highWaterMark += concurrency;

validateInteger(highWaterMark, 'options.highWaterMark', 1);

return async function* map() {
const signal = AbortSignal.any([options?.signal].filter(Boolean));
const stream = this;
Expand All @@ -92,9 +102,28 @@ function map(fn, options) {
let next;
let resume;
let done = false;
let cnt = 0;

function onDone() {
function onCatch() {
done = true;
afterItemProcessed();
}

function afterItemProcessed() {
cnt -= 1;
maybeResume();
}

function maybeResume() {
if (
resume &&
!done &&
cnt < concurrency &&
queue.length < highWaterMark
) {
resume();
resume = null;
}
}

async function pump() {
Expand All @@ -110,25 +139,27 @@ function map(fn, options) {

try {
val = fn(val, signalOpt);

if (val === kEmpty) {
continue;
}

val = PromiseResolve(val);
} catch (err) {
val = PromiseReject(err);
}

if (val === kEmpty) {
continue;
}
cnt += 1;

if (typeof val?.catch === 'function') {
val.catch(onDone);
}
PromisePrototypeThen(val, afterItemProcessed, onCatch);

queue.push(val);
if (next) {
next();
next = null;
}

if (!done && queue.length && queue.length >= concurrency) {
if (!done && queue.length >= highWaterMark) {
rluvaton marked this conversation as resolved.
Show resolved Hide resolved
await new Promise((resolve) => {
resume = resolve;
});
Expand All @@ -137,7 +168,7 @@ function map(fn, options) {
queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeThen(val, undefined, onDone);
PromisePrototypeThen(val, afterItemProcessed, onCatch);
queue.push(val);
} finally {
done = true;
Expand Down Expand Up @@ -168,10 +199,7 @@ function map(fn, options) {
}

queue.shift();
if (resume) {
resume();
resume = null;
}
maybeResume();
}

await new Promise((resolve) => {
Expand Down
170 changes: 169 additions & 1 deletion test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@ const assert = require('assert');
const { once } = require('events');
const { setTimeout } = require('timers/promises');

function createDependentPromises(n) {
const promiseAndResolveArray = [];

for (let i = 0; i < n; i++) {
let res;
const promise = new Promise((resolve) => {
if (i === 0) {
res = resolve;
return;
}
res = () => promiseAndResolveArray[i - 1][0].then(resolve);
});

promiseAndResolveArray.push([promise, res]);
}

return promiseAndResolveArray;
}

{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
Expand Down Expand Up @@ -143,7 +162,7 @@ const { setTimeout } = require('timers/promises');
const stream = range.map(common.mustCall(async (_, { signal }) => {
await once(signal, 'abort');
throw signal.reason;
}, 2), { signal: ac.signal, concurrency: 2 });
}, 2), { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
Expand Down Expand Up @@ -173,6 +192,155 @@ const { setTimeout } = require('timers/promises');
})().then(common.mustCall());
}


{
// highWaterMark with small concurrency
const finishOrder = [];

const promises = createDependentPromises(4);

const raw = Readable.from([2, 0, 1, 3]);
const stream = raw.map(async (item) => {
const [promise, resolve] = promises[item];
resolve();

await promise;
finishOrder.push(item);
return item;
}, { concurrency: 2 });

(async () => {
await stream.toArray();

assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]);
})().then(common.mustCall(), common.mustNotCall());
}

{
// highWaterMark with a lot of items and large concurrency
const finishOrder = [];

const promises = createDependentPromises(20);

const input = [11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19];
const raw = Readable.from(input);
// Should be
// 11, 1, 0, 3, 4, 2 | next: 0
// 11, 1, 3, 4, 2, 5 | next: 1
// 11, 3, 4, 2, 5, 7 | next: 2
// 11, 3, 4, 5, 7, 8 | next: 3
// 11, 4, 5, 7, 8, 9 | next: 4
// 11, 5, 7, 8, 9, 6 | next: 5
// 11, 7, 8, 9, 6, 10 | next: 6
// 11, 7, 8, 9, 10, 12 | next: 7
// 11, 8, 9, 10, 12, 13 | next: 8
// 11, 9, 10, 12, 13, 18 | next: 9
// 11, 10, 12, 13, 18, 15 | next: 10
// 11, 12, 13, 18, 15, 16 | next: 11
// 12, 13, 18, 15, 16, 17 | next: 12
// 13, 18, 15, 16, 17, 14 | next: 13
// 18, 15, 16, 17, 14, 19 | next: 14
// 18, 15, 16, 17, 19 | next: 15
// 18, 16, 17, 19 | next: 16
// 18, 17, 19 | next: 17
// 18, 19 | next: 18
// 19 | next: 19
//

const stream = raw.map(async (item) => {
const [promise, resolve] = promises[item];
resolve();

await promise;
finishOrder.push(item);
return item;
}, { concurrency: 6 });

(async () => {
const outputOrder = await stream.toArray();

assert.deepStrictEqual(outputOrder, input);
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
})().then(common.mustCall(), common.mustNotCall());
}

{
// Custom highWaterMark with a lot of items and large concurrency
const finishOrder = [];

const promises = createDependentPromises(20);

const input = [11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19];
const raw = Readable.from(input);
// Should be
// 11, 1, 0, 3, 4 | next: 0, buffer: []
// 11, 1, 3, 4, 2 | next: 1, buffer: [0]
// 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1]
// 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2]
// 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3]
// 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4]
// 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5]
// 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full
// 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6]
// 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6]
// 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6]
// 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6]
// 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it
// 13, 18, 15, 16, 17 | next: 13, buffer: []
// 18, 15, 16, 17, 14 | next: 14, buffer: []
// 18, 15, 16, 17, 19 | next: 15, buffer: [14]
// 18, 16, 17, 19 | next: 16, buffer: [14, 15]
// 18, 17, 19 | next: 17, buffer: [14, 15, 16]
// 18, 19 | next: 18, buffer: [14, 15, 16, 17]
// 19 | next: 19, buffer: [] -- all items flushed
//

const stream = raw.map(async (item) => {
const [promise, resolve] = promises[item];
resolve();

await promise;
finishOrder.push(item);
return item;
}, { concurrency: 5, highWaterMark: 7 });

(async () => {
const outputOrder = await stream.toArray();

assert.deepStrictEqual(outputOrder, input);
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
})().then(common.mustCall(), common.mustNotCall());
}

{
// Where there is a delay between the first and the next item it should not wait for filled queue
// before yielding to the user
const promises = createDependentPromises(3);

const raw = Readable.from([0, 1, 2]);

const stream = raw
.map(async (item) => {
if (item !== 0) {
await promises[item][0];
}

return item;
}, { concurrency: 2 })
.map((item) => {
// eslint-disable-next-line no-unused-vars
for (const [_, resolve] of promises) {
resolve();
}

return item;
});

(async () => {
await stream.toArray();
})().then(common.mustCall(), common.mustNotCall());
}

{
// Error cases
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
Expand Down