Skip to content

Commit

Permalink
fixup: tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 30, 2021
1 parent ba51fe5 commit 8d744a2
Show file tree
Hide file tree
Showing 5 changed files with 454 additions and 62 deletions.
2 changes: 1 addition & 1 deletion docs/api/Dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ The `RequestOptions.method` property should not be value `'CONNECT'`.

* **statusCode** `number`
* **headers** `http.IncomingHttpHeaders`
* **body** `stream.Readable`
* **body** `stream.Readable` which also implements [the body mixin from the Fetch Standard](https://fetch.spec.whatwg.org/#body-mixin).
* **trailers** `Record<string, string>` - This object starts out
as empty and will be mutated to contain trailers after `body` has emitted `'end'`.
* **opaque** `unknown`
Expand Down
2 changes: 1 addition & 1 deletion lib/api/api-request.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const Readable = require('./readable')
const Readable = require('../node/readable')
const {
InvalidArgumentError,
RequestAbortedError
Expand Down
123 changes: 64 additions & 59 deletions lib/api/readable.js → lib/node/readable.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
/* istanbul ignore file: TODO add more coverage */

// Ported from https://github.com/nodejs/undici/pull/907

'use strict'

const assert = require('assert')
const { Readable } = require('stream')
const { InvalidArgumentError } = require('../core/errors')
const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')

let StringDecoder
let Blob

const kConsume = Symbol('kConsume')
const kReading = Symbol('kReading')
const kBody = Symbol('kBody')

const kWebStreamType = 1
const kTextType = 2
const kBlobType = 3
const kArrayBufferType = 4
const kJSONType = 5

class AbortError extends Error {
constructor (message) {
super(message)
Error.captureStackTrace(this, AbortError)
this.name = 'AbortError'
this.message = 'aborted'
this.code = 'UND_ERR_ABORTED'
}
}
const kTextType = 1
const kBlobType = 2
const kArrayBufferType = 3
const kJSONType = 4

module.exports = class BodyReadable extends Readable {
constructor (opts) {
Expand All @@ -32,12 +27,24 @@ module.exports = class BodyReadable extends Readable {
this._readableState.dataEmitted = false

this[kConsume] = null
this[kBody] = null
this[kReading] = false // Is stream being consumed through Readable API?
}

destroy (err) {
if (this.destroyed) {
// Node < 16
return this
}
return super.destroy(err)
}

emit (ev, ...args) {
if (ev === 'data') {
this._readableState.dataEmitted = true
} else if (ev === 'error') {
// Node < 16
this._readableState.errorEmitted = true
}
return super.emit(ev, ...args)
}
Expand All @@ -49,6 +56,13 @@ module.exports = class BodyReadable extends Readable {
return super.on(ev, ...args)
}

addListener (ev, ...args) {
if (ev === 'data' || ev === 'readable') {
this[kReading] = true
}
return super.addListener(ev, ...args)
}

push (chunk, encoding) {
if (this[kConsume] && chunk !== null && !this[kReading]) {
// Fast path.
Expand Down Expand Up @@ -90,12 +104,16 @@ module.exports = class BodyReadable extends Readable {
return isDisturbed(this)
}

// https://fetch.spec.whatwg.org/#dom-body-body
get body () {
if (this[kConsume] && this[kConsume].type === kWebStreamType) {
return this[kConsume].stream
if (!this[kBody]) {
if (isUnusable(this)) {
throw new TypeError('unusable')
}
this[kBody] = Readable.toWeb(this)
}

return consume(this, kWebStreamType)
return this[kBody]
}
}

Expand Down Expand Up @@ -123,32 +141,17 @@ function isUnusable (self) {
return isDisturbed(self) || isLocked(self)
}

async function consume (parent, type) {
if (isUnusable(parent)) {
// eslint-disable-next-line no-restricted-syntax
async function consume (stream, type) {
if (isUnusable(stream)) {
throw new TypeError('unusable')
}

if (parent[kConsume]) {
// TODO: Should multiple consume in same tick be possible?
// eslint-disable-next-line no-restricted-syntax
throw new TypeError('unusable')
}

if (type === kWebStreamType) {
const consume = parent[kConsume] = {
type,
// TODO: Optimized implementation for web streams.
stream: Readable.toWeb(parent)
}

return consume.stream
}
assert(!stream[kConsume])

return new Promise((resolve, reject) => {
parent[kConsume] = {
stream[kConsume] = {
type,
parent,
stream,
resolve,
reject,
length: 0,
Expand All @@ -159,17 +162,18 @@ async function consume (parent, type) {
ended: false
}

parent
stream
.once('error', function (err) {
consumeFinish(this[kConsume], err)
})
.once('close', function () {
if (this[kConsume].body !== null) {
consumeFinish(this[kConsume], new AbortError())
// TODO: Use Node error?
consumeFinish(this[kConsume], new RequestAbortedError())
}
})

process.nextTick(consumeStart, parent[kConsume])
process.nextTick(consumeStart, stream[kConsume])
})
}

Expand All @@ -178,7 +182,7 @@ function consumeStart (consume) {
return
}

const { _readableState: state } = consume.parent
const { _readableState: state } = consume.stream

for (const chunk of state.buffer) {
consumePush(consume, chunk)
Expand All @@ -187,20 +191,18 @@ function consumeStart (consume) {
if (state.endEmitted) {
consumeEnd(this[kConsume])
} else {
consume.parent.once('end', function () {
consume.stream.once('end', function () {
consumeEnd(this[kConsume])
})
}

if (consume.parent.isPaused()) {
consume.parent.resume()
}
consume.stream.resume()

while (consume.parent.read() != null);
while (consume.stream.read() != null);
}

function consumeEnd (consume) {
const { type, body, resolve, decoder, parent, length } = consume
const { type, body, resolve, decoder, stream, length } = consume

try {
if (type === kTextType) {
Expand All @@ -226,7 +228,7 @@ function consumeEnd (consume) {

consumeFinish(consume)
} catch (err) {
parent.destroy(err)
stream.destroy(err)
}
}

Expand All @@ -241,7 +243,7 @@ function consumePush (consume, chunk, encoding) {

if (chunk === null) {
consume.ended = true
consume.parent.read()
consume.stream.read()
return false
}

Expand All @@ -253,7 +255,7 @@ function consumePush (consume, chunk, encoding) {
consumePushBuffer(consume, chunk, encoding)
}

if (!consume.parent[kReading] && !consume.reading) {
if (!consume.stream[kReading] && !consume.reading) {
consume.reading = true
process.nextTick(consumeReadMore, consume)
}
Expand Down Expand Up @@ -283,6 +285,7 @@ function consumePushString (consume, chunk, encoding) {
} else {
// TODO: What if objectMode? Should we just fail consume
// or throw?
// TODO: Use Node error?
throw new InvalidArgumentError('chunk')
}

Expand All @@ -300,6 +303,7 @@ function consumePushBuffer (consume, chunk, encoding) {
} else if (!ArrayBuffer.isView(chunk)) {
// TODO: What if objectMode? Should we just fail consume
// or throw?
// TODO: Use Node error?
throw new InvalidArgumentError('chunk')
}

Expand All @@ -308,21 +312,22 @@ function consumePushBuffer (consume, chunk, encoding) {
}

function consumeReadMore (consume) {
if (consume.parent[kReading]) {
consume.reading = false
return
}

consume.pushed = true
while (consume.pushed) {

while (consume.pushed && !consume.stream[kReading]) {
consume.pushed = false
consume.parent._read(consume.parent)
consume.stream._read(consume.stream)
}

consume.pushed = false
consume.reading = false
}

function consumeFinish (consume, err) {
if (consume.body === null) {
return
}

if (err) {
consume.reject(err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "undici",
"version": "4.3.0",
"version": "4.2.2",
"description": "An HTTP/1.1 client, written from scratch for Node.js",
"homepage": "https://undici.nodejs.org",
"bugs": {
Expand Down
Loading

0 comments on commit 8d744a2

Please sign in to comment.