Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: support typed arrays #51866

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions benchmark/streams/readable-uint8array.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict';
const common = require('../common.js');
const { Readable } = require('stream');

const bench = common.createBenchmark(main, {
n: [1e6],
kind: ['read', 'encoding'],
});
const ABC = new Uint8Array([0x41, 0x42, 0x43]);

function main({ n, kind }) {
switch (kind) {
case 'read': {
bench.start();
const readable = new Readable({
read() {},
});
for (let i = 0; i < n; ++i) readable.push(ABC);
bench.end(n);
break;
}

case 'encoding': {
bench.start();
const readable = new Readable({
read() {},
});
readable.setEncoding('utf8');
for (let i = 0; i < n; ++i) readable.push(ABC);
bench.end(n);
break;
}
default:
throw new Error('Invalid kind');
}
}
51 changes: 51 additions & 0 deletions benchmark/streams/writable-uint8array.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
'use strict';
const common = require('../common.js');
const { Writable } = require('stream');

const bench = common.createBenchmark(main, {
n: [50e6],
kind: ['write', 'object-mode', 'writev'],
});
const ABC = new Uint8Array([0x41, 0x42, 0x43]);

function main({ n, kind }) {
switch (kind) {
case 'write': {
bench.start();
const wr = new Writable({
write(chunk, encoding, cb) {
cb();
},
});
for (let i = 0; i < n; ++i) wr.write(ABC);
bench.end(n);
break;
}

case 'object-mode': {
bench.start();
const wr = new Writable({
objectMode: true,
write(chunk, encoding, cb) {
cb();
},
});
for (let i = 0; i < n; ++i) wr.write(ABC);
bench.end(n);
break;
}
case 'writev': {
bench.start();
const wr = new Writable({
writev(chunks, cb) {
cb();
},
});
for (let i = 0; i < n; ++i) wr.write(ABC);
bench.end(n);
break;
}
default:
throw new Error('Invalid kind');
}
}
75 changes: 48 additions & 27 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,19 @@ The `finished` API also provides a [callback version][stream-finished].

### Object mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
(or `Uint8Array`) objects. It is possible, however, for stream implementations
to work with other types of JavaScript values (with the exception of `null`,
which serves a special purpose within streams). Such streams are considered to
operate in "object mode".
All streams created by Node.js APIs operate exclusively on strings, {Buffer},
{TypedArray} and {DataView} objects:

* `Strings` and `Buffers` are the most common types used with streams.
* `TypedArray` and `DataView` lets you handle binary data with types like
`Int32Array` or `Uint8Array`. When you write a TypedArray or DataView to a
stream, Node.js processes
the raw bytes.

It is possible, however, for stream
implementations to work with other types of JavaScript values (with the
exception of `null`, which serves a special purpose within streams).
Such streams are considered to operate in "object mode".

Stream instances are switched into object mode using the `objectMode` option
when the stream is created. Attempting to switch an existing stream into
Expand Down Expand Up @@ -712,6 +720,9 @@ console.log(myStream.destroyed); // true
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/51866
description: The `chunk` argument can now be a `TypedArray` or `DataView` instance.
- version: v15.0.0
pr-url: https://github.com/nodejs/node/pull/34101
description: The `callback` is invoked before 'finish' or on error.
Expand All @@ -726,10 +737,10 @@ changes:
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For
streams not operating in object mode, `chunk` must be a {string}, {Buffer},
{TypedArray} or {DataView}. For object mode streams, `chunk` may be any
JavaScript value other than `null`.
* `encoding` {string} The encoding if `chunk` is a string
* `callback` {Function} Callback for when the stream is finished.
* Returns: {this}
Expand Down Expand Up @@ -926,6 +937,9 @@ Getter for the property `objectMode` of a given `Writable` stream.
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/51866
description: The `chunk` argument can now be a `TypedArray` or `DataView` instance.
- version: v8.0.0
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
Expand All @@ -935,10 +949,10 @@ changes:
considered invalid now, even in object mode.
-->

* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For
streams not operating in object mode, `chunk` must be a {string}, {Buffer},
{TypedArray} or {DataView}. For object mode streams, `chunk` may be any
JavaScript value other than `null`.
* `encoding` {string|null} The encoding, if `chunk` is a string. **Default:** `'utf8'`
* `callback` {Function} Callback for when this chunk of data is flushed.
* Returns: {boolean} `false` if the stream wishes for the calling code to
Expand Down Expand Up @@ -1763,15 +1777,18 @@ setTimeout(() => {
<!-- YAML
added: v0.9.11
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/51866
description: The `chunk` argument can now be a `TypedArray` or `DataView` instance.
- version: v8.0.0
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to unshift onto the
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer`, `Uint8Array`, or `null`. For object mode streams, `chunk`
may be any JavaScript value.
* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to unshift
onto the read queue. For streams not operating in object mode, `chunk` must
be a {string}, {Buffer}, {TypedArray}, {DataView} or `null`.
For object mode streams, `chunk` may be any JavaScript value.
* `encoding` {string} Encoding of string chunks. Must be a valid
`Buffer` encoding, such as `'utf8'` or `'ascii'`.

Expand Down Expand Up @@ -3515,8 +3532,8 @@ changes:
**Default:** `'utf8'`.
* `objectMode` {boolean} Whether or not the
[`stream.write(anyObj)`][stream-write] is a valid operation. When set,
it becomes possible to write JavaScript values other than string,
`Buffer` or `Uint8Array` if supported by the stream implementation.
it becomes possible to write JavaScript values other than string, {Buffer},
{TypedArray} or {DataView} if supported by the stream implementation.
**Default:** `false`.
* `emitClose` {boolean} Whether or not the stream should emit `'close'`
after it has been destroyed. **Default:** `true`.
Expand Down Expand Up @@ -4068,22 +4085,25 @@ It can be overridden by child classes but it **must not** be called directly.

<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/51866
description: The `chunk` argument can now be a `TypedArray` or `DataView` instance.
- version: v8.0.0
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value.
* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to push
into the read queue. For streams not operating in object mode, `chunk` must
be a {string}, {Buffer}, {TypedArray} or {DataView}. For object mode streams,
`chunk` may be any JavaScript value.
* `encoding` {string} Encoding of string chunks. Must be a valid
`Buffer` encoding, such as `'utf8'` or `'ascii'`.
* Returns: {boolean} `true` if additional chunks of data may continue to be
pushed; `false` otherwise.

When `chunk` is a `Buffer`, `Uint8Array`, or `string`, the `chunk` of data will
be added to the internal queue for users of the stream to consume.
When `chunk` is a {Buffer}, {TypedArray}, {DataView} or {string}, the `chunk`
of data will be added to the internal queue for users of the stream to consume.
Passing `chunk` as `null` signals the end of the stream (EOF), after which no
more data can be written.

Expand Down Expand Up @@ -4758,8 +4778,9 @@ situations within Node.js where this is done, particularly in the

Use of `readable.push('')` is not recommended.

Pushing a zero-byte string, `Buffer`, or `Uint8Array` to a stream that is not in
object mode has an interesting side effect. Because it _is_ a call to
Pushing a zero-byte {string}, {Buffer}, {TypedArray} or {DataView} to a stream
that is not in object mode has an interesting side effect.
Because it _is_ a call to
[`readable.push()`][stream-push], the call will end the reading process.
However, because the argument is an empty string, no data is added to the
readable buffer so there is nothing for a user to consume.
Expand Down
8 changes: 4 additions & 4 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,11 @@ function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) {
chunk = Buffer.from(chunk, encoding);
}
}
} else if (Stream._isUint8Array(chunk)) {
} else if (Stream._isArrayBufferView(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
} else if (chunk !== undefined && !(chunk instanceof Buffer)) {
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk));
return false;
}

Expand Down Expand Up @@ -473,12 +473,12 @@ function readableAddChunkPushByteMode(stream, state, chunk, encoding) {
}
} else if (chunk instanceof Buffer) {
encoding = '';
} else if (Stream._isUint8Array(chunk)) {
} else if (Stream._isArrayBufferView(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = '';
} else if (chunk !== undefined) {
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk));
return false;
}

Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,12 @@ function _write(stream, chunk, encoding, cb) {
}
} else if (chunk instanceof Buffer) {
encoding = 'buffer';
} else if (Stream._isUint8Array(chunk)) {
} else if (Stream._isArrayBufferView(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = 'buffer';
} else {
throw new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk);
}
}

Expand Down
4 changes: 3 additions & 1 deletion lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const internalBuffer = require('internal/buffer');

const promises = require('stream/promises');
const utils = require('internal/streams/utils');
const { isArrayBufferView, isUint8Array } = require('internal/util/types');

const Stream = module.exports = require('internal/streams/legacy').Stream;

Expand Down Expand Up @@ -137,7 +138,8 @@ ObjectDefineProperty(eos, customPromisify, {
// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

Stream._isUint8Array = require('internal/util/types').isUint8Array;
Stream._isArrayBufferView = isArrayBufferView;
Stream._isUint8Array = isUint8Array;
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return new internalBuffer.FastBuffer(chunk.buffer,
chunk.byteOffset,
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-net-write-arguments.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ assert.throws(() => {
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError',
message: 'The "chunk" argument must be of type string or an instance of ' +
`Buffer or Uint8Array.${common.invalidArgTypeHelper(value)}`
`Buffer, TypedArray, or DataView.${common.invalidArgTypeHelper(value)}`
});
});
Loading
Loading