Skip to content

Commit

Permalink
deps: readable-stream@4.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
wraithgar committed May 17, 2023
1 parent 858f0ca commit f6a0884
Show file tree
Hide file tree
Showing 13 changed files with 481 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const { AbortError, codes } = require('../../ours/errors')
const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils')
const eos = require('./end-of-stream')
const { ERR_INVALID_ARG_TYPE } = codes

Expand All @@ -12,27 +13,32 @@ const validateAbortSignal = (signal, name) => {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal)
}
}
function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function')
}
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal')
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream)
if (!isNodeStream(stream) && !isWebStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream)
}
return module.exports.addAbortSignalNoValidate(signal, stream)
}
module.exports.addAbortSignalNoValidate = function (signal, stream) {
if (typeof signal !== 'object' || !('aborted' in signal)) {
return stream
}
const onAbort = () => {
stream.destroy(
new AbortError(undefined, {
cause: signal.reason
})
)
}
const onAbort = isNodeStream(stream)
? () => {
stream.destroy(
new AbortError(undefined, {
cause: signal.reason
})
)
}
: () => {
stream[kControllerErrorFunction](
new AbortError(undefined, {
cause: signal.reason
})
)
}
if (signal.aborted) {
onAbort()
} else {
Expand Down
143 changes: 101 additions & 42 deletions node_modules/readable-stream/lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
const { pipeline } = require('./pipeline')
const Duplex = require('./duplex')
const { destroyer } = require('./destroy')
const { isNodeStream, isReadable, isWritable } = require('./utils')
const {
isNodeStream,
isReadable,
isWritable,
isWebStream,
isTransformStream,
isWritableStream,
isReadableStream
} = require('./utils')
const {
AbortError,
codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS }
} = require('../../ours/errors')
const eos = require('./end-of-stream')
module.exports = function compose(...streams) {
if (streams.length === 0) {
throw new ERR_MISSING_ARGS('streams')
Expand All @@ -24,14 +33,17 @@ module.exports = function compose(...streams) {
streams[idx] = Duplex.from(streams[idx])
}
for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
if (
n < streams.length - 1 &&
!(isReadable(streams[n]) || isReadableStream(streams[n]) || isTransformStream(streams[n]))
) {
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable')
}
if (n > 0 && !isWritable(streams[n])) {
if (n > 0 && !(isWritable(streams[n]) || isWritableStream(streams[n]) || isTransformStream(streams[n]))) {
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable')
}
}
Expand All @@ -53,8 +65,8 @@ module.exports = function compose(...streams) {
}
const head = streams[0]
const tail = pipeline(streams, onfinished)
const writable = !!isWritable(head)
const readable = !!isReadable(tail)
const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head))
const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail))

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
Expand All @@ -67,25 +79,49 @@ module.exports = function compose(...streams) {
readable
})
if (writable) {
d._write = function (chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback()
} else {
ondrain = callback
if (isNodeStream(head)) {
d._write = function (chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback()
} else {
ondrain = callback
}
}
}
d._final = function (callback) {
head.end()
onfinish = callback
}
head.on('drain', function () {
if (ondrain) {
const cb = ondrain
ondrain = null
cb()
d._final = function (callback) {
head.end()
onfinish = callback
}
})
tail.on('finish', function () {
head.on('drain', function () {
if (ondrain) {
const cb = ondrain
ondrain = null
cb()
}
})
} else if (isWebStream(head)) {
const writable = isTransformStream(head) ? head.writable : head
const writer = writable.getWriter()
d._write = async function (chunk, encoding, callback) {
try {
await writer.ready
writer.write(chunk).catch(() => {})
callback()
} catch (err) {
callback(err)
}
}
d._final = async function (callback) {
try {
await writer.ready
writer.close().catch(() => {})
onfinish = callback
} catch (err) {
callback(err)
}
}
}
const toRead = isTransformStream(tail) ? tail.readable : tail
eos(toRead, () => {
if (onfinish) {
const cb = onfinish
onfinish = null
Expand All @@ -94,25 +130,46 @@ module.exports = function compose(...streams) {
})
}
if (readable) {
tail.on('readable', function () {
if (onreadable) {
const cb = onreadable
onreadable = null
cb()
}
})
tail.on('end', function () {
d.push(null)
})
d._read = function () {
while (true) {
const buf = tail.read()
if (buf === null) {
onreadable = d._read
return
if (isNodeStream(tail)) {
tail.on('readable', function () {
if (onreadable) {
const cb = onreadable
onreadable = null
cb()
}
})
tail.on('end', function () {
d.push(null)
})
d._read = function () {
while (true) {
const buf = tail.read()
if (buf === null) {
onreadable = d._read
return
}
if (!d.push(buf)) {
return
}
}
if (!d.push(buf)) {
return
}
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail
const reader = readable.getReader()
d._read = async function () {
while (true) {
try {
const { value, done } = await reader.read()
if (!d.push(value)) {
return
}
if (done) {
d.push(null)
return
}
} catch {
return
}
}
}
}
Expand All @@ -128,7 +185,9 @@ module.exports = function compose(...streams) {
callback(err)
} else {
onclose = callback
destroyer(tail, err)
if (isNodeStream(tail)) {
destroyer(tail, err)
}
}
}
return d
Expand Down
19 changes: 11 additions & 8 deletions node_modules/readable-stream/lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function destroy(err, cb) {
const w = this._writableState
// With duplex streams we use the writable side for state.
const s = w || r
if ((w && w.destroyed) || (r && r.destroyed)) {
if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) {
if (typeof cb === 'function') {
cb()
}
Expand Down Expand Up @@ -107,14 +107,14 @@ function emitCloseNT(self) {
if (r) {
r.closeEmitted = true
}
if ((w && w.emitClose) || (r && r.emitClose)) {
if ((w !== null && w !== undefined && w.emitClose) || (r !== null && r !== undefined && r.emitClose)) {
self.emit('close')
}
}
function emitErrorNT(self, err) {
const r = self._readableState
const w = self._writableState
if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
if ((w !== null && w !== undefined && w.errorEmitted) || (r !== null && r !== undefined && r.errorEmitted)) {
return
}
if (w) {
Expand Down Expand Up @@ -162,10 +162,11 @@ function errorOrDestroy(stream, err, sync) {

const r = stream._readableState
const w = stream._writableState
if ((w && w.destroyed) || (r && r.destroyed)) {
if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) {
return this
}
if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err)
if ((r !== null && r !== undefined && r.autoDestroy) || (w !== null && w !== undefined && w.autoDestroy))
stream.destroy(err)
else if (err) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack // eslint-disable-line no-unused-expressions
Expand Down Expand Up @@ -228,16 +229,18 @@ function constructNT(stream) {
}
}
try {
stream._construct(onConstruct)
stream._construct((err) => {
process.nextTick(onConstruct, err)
})
} catch (err) {
onConstruct(err)
process.nextTick(onConstruct, err)
}
}
function emitConstructNT(stream) {
stream.emit(kConstruct)
}
function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function'
return (stream === null || stream === undefined ? undefined : stream.setHeader) && typeof stream.abort === 'function'
}
function emitCloseLegacy(stream) {
stream.emit('close')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,6 @@ function _duplexify(pair) {
cb(err)
} else if (err) {
d.destroy(err)
} else if (!readable && !writable) {
d.destroy()
}
}

Expand Down
Loading

0 comments on commit f6a0884

Please sign in to comment.