diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 9dac354a602..ca98fce8ba7 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -1,6 +1,6 @@ 'use strict' -const Readable = require('../node/readable') +const Readable = require('./readable') const { InvalidArgumentError, RequestAbortedError @@ -9,27 +9,6 @@ const util = require('../core/util') const { AsyncResource } = require('async_hooks') const { addSignal, removeSignal } = require('./abort-signal') -const kAbort = Symbol('abort') - -class RequestResponse extends Readable { - constructor (resume, abort) { - super({ autoDestroy: true, read: resume }) - this[kAbort] = abort - } - - _destroy (err, callback) { - if (!err && !this._readableState.endEmitted) { - err = new RequestAbortedError() - } - - if (err) { - this[kAbort]() - } - - callback(err) - } -} - class RequestHandler extends AsyncResource { constructor (opts, callback) { if (!opts || typeof opts !== 'object') { @@ -92,7 +71,7 @@ class RequestHandler extends AsyncResource { return } - const body = new RequestResponse(resume, abort) + const body = new Readable(resume, abort) this.callback = null this.res = body diff --git a/lib/node/readable.js b/lib/api/readable.js similarity index 73% rename from lib/node/readable.js rename to lib/api/readable.js index ef4ba3debad..2c187b1e0d0 100644 --- a/lib/node/readable.js +++ b/lib/api/readable.js @@ -6,14 +6,14 @@ const assert = require('assert') const { Readable } = require('stream') -const { InvalidArgumentError, RequestAbortedError } = require('../core/errors') +const { RequestAbortedError } = require('../core/errors') -let StringDecoder let Blob const kConsume = Symbol('kConsume') const kReading = Symbol('kReading') const kBody = Symbol('kBody') +const kAbort = Symbol('abort') const kTextType = 1 const kBlobType = 2 @@ -21,11 +21,12 @@ const kArrayBufferType = 3 const kJSONType = 4 module.exports = class BodyReadable extends Readable { - constructor (opts) { - super(opts) + constructor (resume, abort) { + super({ autoDestroy: true, read: resume }) this._readableState.dataEmitted = false + this[kAbort] = abort this[kConsume] = null this[kBody] = null @@ -41,6 +42,15 @@ module.exports = class BodyReadable extends Readable { // Node < 16 return this } + + if (!err && !this._readableState.endEmitted) { + err = new RequestAbortedError() + } + + if (err) { + this[kAbort]() + } + return super.destroy(err) } @@ -81,20 +91,13 @@ module.exports = class BodyReadable extends Readable { return this.off(ev, ...args) } - push (chunk, encoding) { + push (chunk) { if (this[kConsume] && chunk !== null && !this[kReading]) { - // Fast path. - return consumePush( - this[kConsume], - chunk, - encoding || this._readableState.defaultEncoding - ) + consumePush(this[kConsume], chunk) + return true + } else { + return super.push(chunk) } - - const pushed = super.push(chunk, encoding) - const consumed = consumePush(this[kConsume], chunk, encoding) - - return pushed && consumed } // https://fetch.spec.whatwg.org/#dom-body-text @@ -138,7 +141,7 @@ module.exports = class BodyReadable extends Readable { // https://streams.spec.whatwg.org/#readablestream-locked function isLocked (self) { - // consume is implicit lock + // Consume is an implicit lock. return (self[kBody] && self[kBody].locked === true) || self[kConsume] } @@ -173,11 +176,7 @@ async function consume (stream, type) { resolve, reject, length: 0, - decoder: undefined, - body: undefined, - reading: false, - pushed: false, - ended: false + body: undefined } stream @@ -251,61 +250,17 @@ function consumeEnd (consume) { } function consumePush (consume, chunk, encoding) { - if (!consume) { - return true - } - - if (consume.ended) { - return false - } - - if (chunk === null) { - consume.ended = true - consume.stream.read() - return false - } - - consume.pushed = true - if (consume.type === kTextType || consume.type === kJSONType) { consumePushString(consume, chunk, encoding) } else { consumePushBuffer(consume, chunk, encoding) } - - if (!consume.stream[kReading] && !consume.reading) { - consume.reading = true - process.nextTick(consumeReadMore, consume) - } - - return true } function consumePushString (consume, chunk, encoding) { if (!consume.body) { consume.body = '' } - - if (typeof chunk === 'string') { - if (consume.decoder) { - chunk = consume.decoder.write(Buffer.from(chunk, encoding)) - } else if (encoding !== 'utf8') { - chunk = Buffer.from(chunk, encoding).toString() - } - } else if (ArrayBuffer.isView(chunk)) { - if (!consume.decoder) { - if (!StringDecoder) { - StringDecoder = require('string_decoder').StringDecoder - } - consume.decoder = new StringDecoder('utf8') - } - chunk = consume.decoder.write(chunk) - } else { - // TODO: This does not support objectMode. - // TODO: Use Node error? - throw new InvalidArgumentError('chunk') - } - consume.length += chunk.length consume.body += chunk } @@ -314,33 +269,10 @@ function consumePushBuffer (consume, chunk, encoding) { if (!consume.body) { consume.body = [] } - - if (typeof chunk === 'string') { - chunk = Buffer.from(chunk, encoding) - } else if (!ArrayBuffer.isView(chunk)) { - // TODO: This does not support objectMode. - // TODO: Use Node error? - throw new InvalidArgumentError('chunk') - } - consume.length += chunk.byteLength consume.body.push(chunk) } -function consumeReadMore (consume) { - consume.pushed = true - - while (consume.pushed && !consume.stream[kReading]) { - consume.pushed = false - // TODO: This does not support thenable return. - // TODO: This does not support throwing in _read(). - consume.stream._read() - } - - consume.pushed = false - consume.reading = false -} - function consumeFinish (consume, err) { if (consume.body === null) { return diff --git a/test/body-readable.js b/test/body-readable.js deleted file mode 100644 index cf162e614d9..00000000000 --- a/test/body-readable.js +++ /dev/null @@ -1,394 +0,0 @@ -'use strict' -const Readable = require('../lib/node/readable') -const { test } = require('tap') - -test('body readable', t => { - // Ported from https://github.com/nodejs/node/pull/39520. - - let counter = 0 - const assert = Object.assign(t.ok.bind(t), { - strictEqual: t.equal, - deepStrictEqual: t.strictSame - }) - const common = { - mustCall (fn, count = 1) { - if (Number.isFinite(fn)) { - count = fn - fn = null - } - counter++ - return (...args) => { - count -= 1 - if (count < 0) { - t.fail() - } else if (count === 0) { - counter -= 1 - queueMicrotask(() => { - if (counter === 0) { - t.end() - counter = null - } - }) - } - return fn && fn(...args) - } - }, - mustNotCall () { - return () => ( - t.fail() - ) - }, - mustCallAtLeast (...args) { - return common.mustCall(...args) - } - } - - { - const r = new Readable({ - read () { - this.push('asd') - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.on('end', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - r.text().then(common.mustCall((val) => { - assert.strictEqual(r.bodyUsed, true) - assert.strictEqual(val, 'asd') - })) - process.nextTick(() => { - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const n = ['a', 's', 'd', null] - const r = new Readable({ - read () { - this.push(n.shift()) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.on('end', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - r.text().then(common.mustCall((val) => { - assert.strictEqual(r.bodyUsed, true) - assert.strictEqual(val, 'asd') - })) - process.nextTick(() => { - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const r = new Readable({ - read () { - this.push('asd') - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.on('error', common.mustCall(() => { - assert.strictEqual(r.bodyUsed, true) - })) - // r.on('close', common.mustCall()) Node < 14 - const _err = new Error() - r.text().catch(common.mustCall((err) => assert.strictEqual(err, _err))) - r.destroy(_err) - process.nextTick(() => { - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const obj = { asd: '123' } - const r = new Readable({ - read () { - this.push(JSON.stringify(obj)) - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.on('end', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - r.json().then(common.mustCall((val) => { - assert.strictEqual(r.bodyUsed, true) - assert.deepStrictEqual(val, obj) - })) - process.nextTick(() => { - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const r = new Readable({ - read () { - this.push('asd') - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.on('error', common.mustCall(() => { - assert.strictEqual(r.bodyUsed, true) - })) - // r.on('close', common.mustCall()) Node < 14 - r.json() - .catch(common.mustCall((err) => { - assert.strictEqual(err.message, 'Unexpected token a in JSON at position 0') - assert.strictEqual(r.bodyUsed, true) - })) - process.nextTick(() => { - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const r = new Readable({ - read () { - this.push('asd') - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.on('error', common.mustCall(() => { - assert.strictEqual(r.bodyUsed, true) - })) - // r.on('close', common.mustCall()) Node < 14 - r.json().catch(common.mustCall((err) => { - assert.strictEqual(r.bodyUsed, true) - assert(err) - })) - process.nextTick(() => { - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const buf = Uint8Array.from('asd') - const r = new Readable({ - read () { - this.push(buf) - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.on('end', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - r.arrayBuffer() - .then(common.mustCall((val) => assert.deepStrictEqual(val, buf))) - process.nextTick(() => { - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const r = new Readable({ - read () { - this.push('asd') - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.pause() - assert.strictEqual(r.bodyUsed, false) - r.on('data', common.mustCall(() => { - assert.strictEqual(r.bodyUsed, true) - })) - r.on('end', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - r.text().then(common.mustCall((val) => assert.strictEqual(val, 'asd'))) - process.nextTick(() => { - assert.strictEqual(r.bodyUsed, true) - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const n = ['a', 's', 'd', null] - const r = new Readable({ - read () { - this.push(n.shift()) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.pause() - assert.strictEqual(r.bodyUsed, false) - r.on('data', common.mustCallAtLeast(3)) - r.on('end', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - r.text().then(common.mustCall((val) => assert.strictEqual(val, 'asd'))) - process.nextTick(() => { - assert.strictEqual(r.bodyUsed, true) - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const r = new Readable({ - read () { - this.push('asd') - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.pause() - assert.strictEqual(r.bodyUsed, false) - r.on('data', common.mustNotCall()) - r.on('error', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - const _err = new Error() - r.text().catch(common.mustCall((err) => assert.strictEqual(err, _err))) - r.destroy(_err) - process.nextTick(() => { - assert.strictEqual(r.bodyUsed, true) - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const obj = { asd: '123' } - const r = new Readable({ - read () { - this.push(JSON.stringify(obj)) - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.pause() - assert.strictEqual(r.bodyUsed, false) - r.on('data', common.mustCall(() => { - assert.strictEqual(r.bodyUsed, true) - })) - r.on('end', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - r.json().then(common.mustCall((val) => assert.deepStrictEqual(val, obj))) - process.nextTick(() => { - assert.strictEqual(r.bodyUsed, true) - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const r = new Readable({ - read () { - this.push('asd') - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.pause() - assert.strictEqual(r.bodyUsed, false) - r.on('data', common.mustCall(() => { - assert.strictEqual(r.bodyUsed, true) - })) - r.on('error', common.mustCall(() => { - assert.strictEqual(r.bodyUsed, true) - })) - // r.on('close', common.mustCall()) Node < 14 - r.json() - .catch(common.mustCall((err) => assert.strictEqual( - err.message, 'Unexpected token a in JSON at position 0'))) - process.nextTick(() => { - assert.strictEqual(r.bodyUsed, true) - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const buf = Uint8Array.from('asd') - const r = new Readable({ - read () { - this.push(buf) - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - r.pause() - assert.strictEqual(r.bodyUsed, false) - r.on('data', common.mustCall()) - r.on('end', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - r.arrayBuffer() - .then(common.mustCall((val) => assert.deepStrictEqual(val, buf))) - process.nextTick(() => { - assert.strictEqual(r.bodyUsed, true) - assert.strictEqual(r.isPaused(), false) - }) - } - - { - const buf = Uint8Array.from('asd') - const r = new Readable({ - read () { - this.push(buf) - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - assert.strictEqual(r.bodyUsed, false) - r.on('data', common.mustCall(() => { - assert.strictEqual(r.bodyUsed, true) - r.json() - .catch(common.mustCall((err) => assert( - err instanceof TypeError))) - r.arrayBuffer() - .catch(common.mustCall((err) => assert( - err instanceof TypeError))) - r.blob() - .catch(common.mustCall((err) => assert( - err instanceof TypeError))) - r.text() - .catch(common.mustCall((err) => assert( - err instanceof TypeError))) - })) - r.on('error', common.mustNotCall()) - r.on('end', common.mustCall()) - // r.on('close', common.mustCall()) Node < 14 - } - - { - const r = new Readable({ - read () { - this.push(null) - } - }) - assert.strictEqual(r.bodyUsed, false) - assert.strictEqual(r.bodyUsed, false) - r.on('end', common.mustCall(() => { - assert.strictEqual(r.bodyUsed, true) - r.json() - .catch(common.mustCall((err) => assert( - err instanceof TypeError))) - r.arrayBuffer() - .catch(common.mustCall((err) => assert( - err instanceof TypeError))) - r.blob() - .catch(common.mustCall((err) => assert( - err instanceof TypeError))) - r.text() - .catch(common.mustCall((err) => assert( - err instanceof TypeError))) - })) - r.on('error', common.mustNotCall()) - r.on('data', common.mustNotCall()) - // r.on('close', common.mustCall()) Node < 14 - } - - for (const key of ['text', 'json', 'arrayBuffer', 'blob']) { - const r = new Readable({ - read () { - } - }) - assert.strictEqual(r.bodyUsed, false) - assert.strictEqual(r.bodyUsed, false) - r[key]() - .catch(common.mustCall((err) => assert.strictEqual( - err.name, 'RequestAbortedError'))) - r.destroy() - r.on('error', common.mustNotCall()) - r.on('end', common.mustNotCall()) - r.on('data', common.mustNotCall()) - // r.on('close', common.mustCall()) Node < 14 - } -})