From 3fb394f33c4c4dda748eb4911f605f60e8fcba08 Mon Sep 17 00:00:00 2001 From: Ilyas Shabi Date: Sun, 25 Feb 2024 00:45:48 +0100 Subject: [PATCH] stream: support typed arrays --- doc/api/stream.md | 65 ++++++++------ lib/internal/streams/readable.js | 15 ++-- lib/internal/streams/writable.js | 8 +- lib/stream.js | 4 +- test/parallel/test-net-write-arguments.js | 2 +- test/parallel/test-stream-typedarray.js | 105 ++++++++++++++++++++++ 6 files changed, 162 insertions(+), 37 deletions(-) create mode 100644 test/parallel/test-stream-typedarray.js diff --git a/doc/api/stream.md b/doc/api/stream.md index d5b0044d65c2fd..10ce9d9eb5aa0e 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -283,10 +283,10 @@ The `finished` API also provides a [callback version][stream-finished]. ### Object mode All streams created by Node.js APIs operate exclusively on strings and `Buffer` -(or `Uint8Array`) objects. It is possible, however, for stream implementations -to work with other types of JavaScript values (with the exception of `null`, -which serves a special purpose within streams). Such streams are considered to -operate in "object mode". +(or {TypedArray} and {DataView}) objects. It is possible, however, for stream +implementations to work with other types of JavaScript values (with the +exception of `null`, which serves a special purpose within streams). +Such streams are considered to operate in "object mode". Stream instances are switched into object mode using the `objectMode` option when the stream is created. Attempting to switch an existing stream into @@ -712,6 +712,9 @@ console.log(myStream.destroyed); // true -* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams - not operating in object mode, `chunk` must be a string, `Buffer` or - `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value - other than `null`. +* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For + streams not operating in object mode, `chunk` must be a {string}, {Buffer}, + {TypedArray} or {DataView}. For object mode streams, `chunk` may be any + JavaScript value other than `null`. * `encoding` {string} The encoding if `chunk` is a string * `callback` {Function} Callback for when the stream is finished. * Returns: {this} @@ -926,6 +929,9 @@ Getter for the property `objectMode` of a given `Writable` stream. -* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams - not operating in object mode, `chunk` must be a string, `Buffer` or - `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value - other than `null`. +* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For + streams not operating in object mode, `chunk` must be a {string}, {Buffer}, + {TypedArray} or {DataView}. For object mode streams, `chunk` may be any + JavaScript value other than `null`. * `encoding` {string|null} The encoding, if `chunk` is a string. **Default:** `'utf8'` * `callback` {Function} Callback for when this chunk of data is flushed. * Returns: {boolean} `false` if the stream wishes for the calling code to @@ -1763,15 +1769,18 @@ setTimeout(() => { -* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to unshift onto the - read queue. For streams not operating in object mode, `chunk` must be a - string, `Buffer`, `Uint8Array`, or `null`. For object mode streams, `chunk` - may be any JavaScript value. +* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to unshift + onto the read queue. For streams not operating in object mode, `chunk` must + be a {string}, {Buffer}, {TypedArray}, {DataView} or `null`. + For object mode streams, `chunk` may be any JavaScript value. * `encoding` {string} Encoding of string chunks. Must be a valid `Buffer` encoding, such as `'utf8'` or `'ascii'`. @@ -3512,8 +3521,8 @@ changes: **Default:** `'utf8'`. * `objectMode` {boolean} Whether or not the [`stream.write(anyObj)`][stream-write] is a valid operation. When set, - it becomes possible to write JavaScript values other than string, - `Buffer` or `Uint8Array` if supported by the stream implementation. + it becomes possible to write JavaScript values other than string, {Buffer}, + {TypedArray} or {DataView} if supported by the stream implementation. **Default:** `false`. * `emitClose` {boolean} Whether or not the stream should emit `'close'` after it has been destroyed. **Default:** `true`. @@ -4062,22 +4071,25 @@ It can be overridden by child classes but it **must not** be called directly. -* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the - read queue. For streams not operating in object mode, `chunk` must be a - string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be - any JavaScript value. +* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to push + into the read queue. For streams not operating in object mode, `chunk` must + be a {string}, {Buffer}, {TypedArray} or {DataView}. For object mode streams, + `chunk` may be any JavaScript value. * `encoding` {string} Encoding of string chunks. Must be a valid `Buffer` encoding, such as `'utf8'` or `'ascii'`. * Returns: {boolean} `true` if additional chunks of data may continue to be pushed; `false` otherwise. -When `chunk` is a `Buffer`, `Uint8Array`, or `string`, the `chunk` of data will -be added to the internal queue for users of the stream to consume. +When `chunk` is a {Buffer}, {TypedArray}, {DataView} or {string}, the `chunk` +of data will be added to the internal queue for users of the stream to consume. Passing `chunk` as `null` signals the end of the stream (EOF), after which no more data can be written. @@ -4752,8 +4764,9 @@ situations within Node.js where this is done, particularly in the Use of `readable.push('')` is not recommended. -Pushing a zero-byte string, `Buffer`, or `Uint8Array` to a stream that is not in -object mode has an interesting side effect. Because it _is_ a call to +Pushing a zero-byte {string}, {Buffer}, {TypedArray} or {DataView} to a stream +that is not in object mode has an interesting side effect. +Because it _is_ a call to [`readable.push()`][stream-push], the call will end the reading process. However, because the argument is an empty string, no data is added to the readable buffer so there is nothing for a user to consume. diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 3800399c82ad62..6b41822a381b42 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -36,6 +36,7 @@ const { SymbolAsyncIterator, SymbolSpecies, TypedArrayPrototypeSet, + Uint8Array, } = primordials; module.exports = Readable; @@ -420,11 +421,12 @@ function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) { chunk = Buffer.from(chunk, encoding); } } - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); + } else if (Stream._isArrayBufferView(chunk)) { + const array = new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); + chunk = Stream._uint8ArrayToBuffer(array); } else if (chunk !== undefined && !(chunk instanceof Buffer)) { errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk)); + 'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk)); return false; } @@ -473,12 +475,13 @@ function readableAddChunkPushByteMode(stream, state, chunk, encoding) { } } else if (chunk instanceof Buffer) { encoding = ''; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); + } else if (Stream._isArrayBufferView(chunk)) { + const array = new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); + chunk = Stream._uint8ArrayToBuffer(array); encoding = ''; } else if (chunk !== undefined) { errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk)); + 'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk)); return false; } diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 0dbf56d7a69ca9..d19e8db0814557 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -35,6 +35,7 @@ const { StringPrototypeToLowerCase, Symbol, SymbolHasInstance, + Uint8Array, } = primordials; module.exports = Writable; @@ -467,12 +468,13 @@ function _write(stream, chunk, encoding, cb) { } } else if (chunk instanceof Buffer) { encoding = 'buffer'; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); + } else if (Stream._isArrayBufferView(chunk)) { + const array = new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); + chunk = Stream._uint8ArrayToBuffer(array); encoding = 'buffer'; } else { throw new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + 'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk); } } diff --git a/lib/stream.js b/lib/stream.js index cdbc1fe0380694..a69354138a2248 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -50,6 +50,7 @@ const internalBuffer = require('internal/buffer'); const promises = require('stream/promises'); const utils = require('internal/streams/utils'); +const { isArrayBufferView, isUint8Array } = require('internal/util/types'); const Stream = module.exports = require('internal/streams/legacy').Stream; @@ -137,7 +138,8 @@ ObjectDefineProperty(eos, customPromisify, { // Backwards-compat with node 0.4.x Stream.Stream = Stream; -Stream._isUint8Array = require('internal/util/types').isUint8Array; +Stream._isArrayBufferView = isArrayBufferView; +Stream._isUint8Array = isUint8Array; Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { return new internalBuffer.FastBuffer(chunk.buffer, chunk.byteOffset, diff --git a/test/parallel/test-net-write-arguments.js b/test/parallel/test-net-write-arguments.js index 289c3c0f36bcf3..2e2aa55432053b 100644 --- a/test/parallel/test-net-write-arguments.js +++ b/test/parallel/test-net-write-arguments.js @@ -34,6 +34,6 @@ assert.throws(() => { code: 'ERR_INVALID_ARG_TYPE', name: 'TypeError', message: 'The "chunk" argument must be of type string or an instance of ' + - `Buffer or Uint8Array.${common.invalidArgTypeHelper(value)}` + `Buffer, TypedArray, or DataView.${common.invalidArgTypeHelper(value)}` }); }); diff --git a/test/parallel/test-stream-typedarray.js b/test/parallel/test-stream-typedarray.js new file mode 100644 index 00000000000000..a374989276cf64 --- /dev/null +++ b/test/parallel/test-stream-typedarray.js @@ -0,0 +1,105 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { Readable, Writable } = require('stream'); + +const buffer = Buffer.from('ABCD'); +const views = common.getArrayBufferViews(buffer); + +{ + // Simple Writable test. + let n = 0; + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]); + n++; + cb(); + }, views.length), + }); + + views.forEach((msg) => writable.write(msg)); + writable.end(); +} + +{ + // Writable test with object mode True. + let n = 0; + const writable = new Writable({ + objectMode: true, + write: common.mustCall((chunk, encoding, cb) => { + assert(!(chunk instanceof Buffer)); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]); + n++; + cb(); + }, views.length), + }); + + views.forEach((msg) => writable.write(msg)); + writable.end(); +} + + +{ + // Writable test, multiple writes carried out via writev. + let n = 0; + let callback; + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]); + n++; + callback = cb; + }), + + writev: common.mustCall((chunks, cb) => { + assert.strictEqual(chunks.length, views.length); + let res = ''; + for (const chunk of chunks) { + assert.strictEqual(chunk.encoding, 'buffer'); + res += chunk.chunk; + } + assert.strictEqual(res, 'ABCD'.repeat(9)); + }), + + }); + views.forEach((msg) => writable.write(msg)); + writable.end(views[0]); + callback(); +} + + +{ + // Simple Readable test. + const readable = new Readable({ + read() {} + }); + + readable.push(views[1]); + readable.push(views[2]); + readable.unshift(views[0]); + + const buf = readable.read(); + assert(buf instanceof Buffer); + assert.deepStrictEqual([...buf], [...views[0], ...views[1], ...views[2]]); +} + +{ + // Readable test, setEncoding. + const readable = new Readable({ + read() {} + }); + + readable.setEncoding('utf8'); + + readable.push(views[1]); + readable.push(views[2]); + readable.unshift(views[0]); + + const out = readable.read(); + assert.strictEqual(out, 'ABCD'.repeat(3)); +}