From 00931bd58a1be25055c1dfe7fd48fa81b7fa671e Mon Sep 17 00:00:00 2001 From: Ilyas Shabi Date: Sun, 25 Feb 2024 00:45:48 +0100 Subject: [PATCH] stream: support typed arrays --- lib/internal/streams/readable.js | 15 ++-- lib/internal/streams/writable.js | 8 +- lib/stream.js | 2 +- test/parallel/test-net-write-arguments.js | 2 +- test/parallel/test-stream-typedarray.js | 105 ++++++++++++++++++++++ 5 files changed, 121 insertions(+), 11 deletions(-) create mode 100644 test/parallel/test-stream-typedarray.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 3800399c82ad62..01306d7712473e 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -36,6 +36,7 @@ const { SymbolAsyncIterator, SymbolSpecies, TypedArrayPrototypeSet, + Uint8Array, } = primordials; module.exports = Readable; @@ -420,11 +421,12 @@ function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) { chunk = Buffer.from(chunk, encoding); } } - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); + } else if (Stream._isArrayBufferView(chunk)) { + const array = new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); + chunk = Stream._uint8ArrayToBuffer(array); } else if (chunk !== undefined && !(chunk instanceof Buffer)) { errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk)); + 'chunk', ['string', 'Buffer', 'TypedArray', 'BufferView'], chunk)); return false; } @@ -473,12 +475,13 @@ function readableAddChunkPushByteMode(stream, state, chunk, encoding) { } } else if (chunk instanceof Buffer) { encoding = ''; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); + } else if (Stream._isArrayBufferView(chunk)) { + const array = new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); + chunk = Stream._uint8ArrayToBuffer(array); encoding = ''; } else if (chunk !== undefined) { errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk)); + 'chunk', ['string', 'Buffer', 'TypedArray', 'BufferView'], chunk)); return false; } diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 0dbf56d7a69ca9..11386c60d0698e 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -35,6 +35,7 @@ const { StringPrototypeToLowerCase, Symbol, SymbolHasInstance, + Uint8Array, } = primordials; module.exports = Writable; @@ -467,12 +468,13 @@ function _write(stream, chunk, encoding, cb) { } } else if (chunk instanceof Buffer) { encoding = 'buffer'; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); + } else if (Stream._isArrayBufferView(chunk)) { + const array = new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); + chunk = Stream._uint8ArrayToBuffer(array); encoding = 'buffer'; } else { throw new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + 'chunk', ['string', 'Buffer', 'TypedArray', 'BufferView'], chunk); } } diff --git a/lib/stream.js b/lib/stream.js index cdbc1fe0380694..0e36feb50d156a 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -137,7 +137,7 @@ ObjectDefineProperty(eos, customPromisify, { // Backwards-compat with node 0.4.x Stream.Stream = Stream; -Stream._isUint8Array = require('internal/util/types').isUint8Array; +Stream._isArrayBufferView = require('internal/util/types').isArrayBufferView; Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { return new internalBuffer.FastBuffer(chunk.buffer, chunk.byteOffset, diff --git a/test/parallel/test-net-write-arguments.js b/test/parallel/test-net-write-arguments.js index 289c3c0f36bcf3..436a181261fbb9 100644 --- a/test/parallel/test-net-write-arguments.js +++ b/test/parallel/test-net-write-arguments.js @@ -34,6 +34,6 @@ assert.throws(() => { code: 'ERR_INVALID_ARG_TYPE', name: 'TypeError', message: 'The "chunk" argument must be of type string or an instance of ' + - `Buffer or Uint8Array.${common.invalidArgTypeHelper(value)}` + `Buffer, TypedArray, or BufferView.${common.invalidArgTypeHelper(value)}` }); }); diff --git a/test/parallel/test-stream-typedarray.js b/test/parallel/test-stream-typedarray.js new file mode 100644 index 00000000000000..a374989276cf64 --- /dev/null +++ b/test/parallel/test-stream-typedarray.js @@ -0,0 +1,105 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { Readable, Writable } = require('stream'); + +const buffer = Buffer.from('ABCD'); +const views = common.getArrayBufferViews(buffer); + +{ + // Simple Writable test. + let n = 0; + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]); + n++; + cb(); + }, views.length), + }); + + views.forEach((msg) => writable.write(msg)); + writable.end(); +} + +{ + // Writable test with object mode True. + let n = 0; + const writable = new Writable({ + objectMode: true, + write: common.mustCall((chunk, encoding, cb) => { + assert(!(chunk instanceof Buffer)); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]); + n++; + cb(); + }, views.length), + }); + + views.forEach((msg) => writable.write(msg)); + writable.end(); +} + + +{ + // Writable test, multiple writes carried out via writev. + let n = 0; + let callback; + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]); + n++; + callback = cb; + }), + + writev: common.mustCall((chunks, cb) => { + assert.strictEqual(chunks.length, views.length); + let res = ''; + for (const chunk of chunks) { + assert.strictEqual(chunk.encoding, 'buffer'); + res += chunk.chunk; + } + assert.strictEqual(res, 'ABCD'.repeat(9)); + }), + + }); + views.forEach((msg) => writable.write(msg)); + writable.end(views[0]); + callback(); +} + + +{ + // Simple Readable test. + const readable = new Readable({ + read() {} + }); + + readable.push(views[1]); + readable.push(views[2]); + readable.unshift(views[0]); + + const buf = readable.read(); + assert(buf instanceof Buffer); + assert.deepStrictEqual([...buf], [...views[0], ...views[1], ...views[2]]); +} + +{ + // Readable test, setEncoding. + const readable = new Readable({ + read() {} + }); + + readable.setEncoding('utf8'); + + readable.push(views[1]); + readable.push(views[2]); + readable.unshift(views[0]); + + const out = readable.read(); + assert.strictEqual(out, 'ABCD'.repeat(3)); +}