Skip to content

Commit

Permalink
fixup: remove node specific stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 30, 2021
1 parent 119e794 commit 463f103
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 506 deletions.
25 changes: 2 additions & 23 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const Readable = require('../node/readable')
const Readable = require('./readable')
const {
InvalidArgumentError,
RequestAbortedError
Expand All @@ -9,27 +9,6 @@ const util = require('../core/util')
const { AsyncResource } = require('async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')

const kAbort = Symbol('abort')

class RequestResponse extends Readable {
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })
this[kAbort] = abort
}

_destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (err) {
this[kAbort]()
}

callback(err)
}
}

class RequestHandler extends AsyncResource {
constructor (opts, callback) {
if (!opts || typeof opts !== 'object') {
Expand Down Expand Up @@ -92,7 +71,7 @@ class RequestHandler extends AsyncResource {
return
}

const body = new RequestResponse(resume, abort)
const body = new Readable(resume, abort)

this.callback = null
this.res = body
Expand Down
110 changes: 21 additions & 89 deletions lib/node/readable.js → lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@

const assert = require('assert')
const { Readable } = require('stream')
const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')
const { RequestAbortedError } = require('../core/errors')

let StringDecoder
let Blob

const kConsume = Symbol('kConsume')
const kReading = Symbol('kReading')
const kBody = Symbol('kBody')
const kAbort = Symbol('abort')

const kTextType = 1
const kBlobType = 2
const kArrayBufferType = 3
const kJSONType = 4

module.exports = class BodyReadable extends Readable {
constructor (opts) {
super(opts)
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })

this._readableState.dataEmitted = false

this[kAbort] = abort
this[kConsume] = null
this[kBody] = null

Expand All @@ -41,6 +42,15 @@ module.exports = class BodyReadable extends Readable {
// Node < 16
return this
}

if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (err) {
this[kAbort]()
}

return super.destroy(err)
}

Expand Down Expand Up @@ -81,20 +91,13 @@ module.exports = class BodyReadable extends Readable {
return this.off(ev, ...args)
}

push (chunk, encoding) {
push (chunk) {
if (this[kConsume] && chunk !== null && !this[kReading]) {
// Fast path.
return consumePush(
this[kConsume],
chunk,
encoding || this._readableState.defaultEncoding
)
consumePush(this[kConsume], chunk)
return true
} else {
return super.push(chunk)
}

const pushed = super.push(chunk, encoding)
const consumed = consumePush(this[kConsume], chunk, encoding)

return pushed && consumed
}

// https://fetch.spec.whatwg.org/#dom-body-text
Expand Down Expand Up @@ -138,7 +141,7 @@ module.exports = class BodyReadable extends Readable {

// https://streams.spec.whatwg.org/#readablestream-locked
function isLocked (self) {
// consume is implicit lock
// Consume is an implicit lock.
return (self[kBody] && self[kBody].locked === true) || self[kConsume]
}

Expand Down Expand Up @@ -173,11 +176,7 @@ async function consume (stream, type) {
resolve,
reject,
length: 0,
decoder: undefined,
body: undefined,
reading: false,
pushed: false,
ended: false
body: undefined
}

stream
Expand Down Expand Up @@ -251,61 +250,17 @@ function consumeEnd (consume) {
}

function consumePush (consume, chunk, encoding) {
if (!consume) {
return true
}

if (consume.ended) {
return false
}

if (chunk === null) {
consume.ended = true
consume.stream.read()
return false
}

consume.pushed = true

if (consume.type === kTextType || consume.type === kJSONType) {
consumePushString(consume, chunk, encoding)
} else {
consumePushBuffer(consume, chunk, encoding)
}

if (!consume.stream[kReading] && !consume.reading) {
consume.reading = true
process.nextTick(consumeReadMore, consume)
}

return true
}

function consumePushString (consume, chunk, encoding) {
if (!consume.body) {
consume.body = ''
}

if (typeof chunk === 'string') {
if (consume.decoder) {
chunk = consume.decoder.write(Buffer.from(chunk, encoding))
} else if (encoding !== 'utf8') {
chunk = Buffer.from(chunk, encoding).toString()
}
} else if (ArrayBuffer.isView(chunk)) {
if (!consume.decoder) {
if (!StringDecoder) {
StringDecoder = require('string_decoder').StringDecoder
}
consume.decoder = new StringDecoder('utf8')
}
chunk = consume.decoder.write(chunk)
} else {
// TODO: This does not support objectMode.
// TODO: Use Node error?
throw new InvalidArgumentError('chunk')
}

consume.length += chunk.length
consume.body += chunk
}
Expand All @@ -314,33 +269,10 @@ function consumePushBuffer (consume, chunk, encoding) {
if (!consume.body) {
consume.body = []
}

if (typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding)
} else if (!ArrayBuffer.isView(chunk)) {
// TODO: This does not support objectMode.
// TODO: Use Node error?
throw new InvalidArgumentError('chunk')
}

consume.length += chunk.byteLength
consume.body.push(chunk)
}

function consumeReadMore (consume) {
consume.pushed = true

while (consume.pushed && !consume.stream[kReading]) {
consume.pushed = false
// TODO: This does not support thenable return.
// TODO: This does not support throwing in _read().
consume.stream._read()
}

consume.pushed = false
consume.reading = false
}

function consumeFinish (consume, err) {
if (consume.body === null) {
return
Expand Down
Loading

0 comments on commit 463f103

Please sign in to comment.