Skip to content

Commit

Permalink
stream: add drop and take
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamingr committed Jan 23, 2022
1 parent ca48949 commit b59200d
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 0 deletions.
35 changes: 35 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,45 @@ async function* flatMap(fn, options) {
}
}

async function* drop(number, options) {
validateInteger(number, 'number', 0);
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- <= 0) {
yield await val;
}
}
}


async function* take(number, options) {
validateInteger(number, 'number', 0);
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- > 0) {
yield val;
} else {
return;
}
}
}

module.exports.streamReturningOperators = {
drop,
filter,
flatMap,
map,
take,
};

module.exports.promiseReturningOperators = {
Expand Down
94 changes: 94 additions & 0 deletions test/parallel/test-stream-drop-take.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const { deepStrictEqual, rejects } = require('assert');

const { from } = Readable;
{
// Synchronous streams
(async () => {
deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await from([]).drop(2).toArray(), []);
deepStrictEqual(await from([]).take(1).toArray(), []);
deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Asynchronous streams
(async () => {
const fromAsync = (...args) => from(...args).map(async (x) => x);
deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Infinite streams
// Asynchronous streams
(async () => {
const naturals = () => from(async function*() {
let i = 1;
while (true) {
yield i++;
}
}());
deepStrictEqual(await naturals().take(1).toArray(), [1]);
deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
rejects(
Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
rejects(
Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
ac.abort();
}

{
// Support for AbortSignal, already aborted
const signal = AbortSignal.abort();
rejects(
Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
}

{
// Error cases
// TODO(benjamingr) these do not align with the spec and need to all
// unfortunately coerce to `0`. Negative values should be checked to throw
// a RangeError, issue opened upstream at
// https://github.com/tc39/proposal-iterator-helpers/issues/169
const invalidArgs = [
'5',
undefined,
null,
{},
[],
from([1, 2, 3]),
Promise.resolve(5),
];

for (const example of invalidArgs) {
rejects(
from([]).take(example).toArray(),
/ERR_INVALID_ARG_TYPE/
).then(common.mustCall());
}
}

0 comments on commit b59200d

Please sign in to comment.