Skip to content

Commit

Permalink
Update to Node.js 18.19.0, add Node 21.x to CI
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <hello@matteocollina.com>
  • Loading branch information
mcollina committed Dec 15, 2023
1 parent 268229d commit d5b816d
Show file tree
Hide file tree
Showing 36 changed files with 979 additions and 609 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/node.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
node-version: [12.x, 14.x, 16.x, 18.x, 20.x]
node-version: [12.x, 14.x, 16.x, 18.x, 20.x, 21.x]
exclude:
- os: windows-latest
node-version: 12.x
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
npm install readable-stream
```

This package is a mirror of the streams implementations in Node.js 18.16.0.
This package is a mirror of the streams implementations in Node.js 18.19.0.

Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.16.0/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.19.0/docs/api/stream.html).

If you want to guarantee a stable streams base, regardless of what version of
Node you, or the users of your libraries are using, use **readable-stream** _only_ and avoid the _"stream"_ module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).
Expand Down
1 change: 1 addition & 0 deletions build/build.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ async function extract(nodeVersion, tarFile) {
})

await finished(tarFile.pipe(parser))
info('extraction done')
return contents
}

Expand Down
51 changes: 48 additions & 3 deletions build/replacements.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ const internalStreamsNoRequireAbortController = [
'const AbortController = globalThis.AbortController || require(\'abort-controller\').AbortController;'
]

const internalStreamsNoRequireAbortController2 = [
'const \\{ AbortController, AbortSignal \\} = .+',
'const AbortController = globalThis.AbortController || require(\'abort-controller\').AbortController;'
]

const internalStreamsRequireInternal = ["require\\('internal/([^']+)'\\)", "require('../$1')"]

const internalStreamsRequireErrors = ["require\\('internal/errors'\\)", "require('../../ours/errors')"]
Expand Down Expand Up @@ -74,7 +79,12 @@ const internalStreamsRequireWebStream = ["require\\('internal/webstreams/adapter

const internalStreamsWeakHandler = [
"const \\{ kWeakHandler \\} = require\\('../event_target'\\);",
"const kWeakHandler = require('../../ours/primordials').Symbol('kWeak');"
"require\\('../event_target'\\);const kWeakHandler = require('../../ours/primordials').Symbol('kWeak');"
]

const internalStreamsWeakHandler2 = [
"const \\{ kWeakHandler, kResistStopPropagation \\} = .*;",
"const kWeakHandler = require('../../ours/primordials').Symbol('kWeak');\nconst kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation');"
]

const internalValidatorsNoCoalesceAssignment = [
Expand Down Expand Up @@ -142,6 +152,7 @@ const testCommonKnownGlobals = [
typeof AbortController !== 'undefined' ? AbortController : require('abort-controller').AbortController,
typeof AbortSignal !== 'undefined' ? AbortSignal : require('abort-controller').AbortSignal,
typeof EventTarget !== 'undefined' ? EventTarget : require('event-target-shim').EventTarget,
typeof navigator !== 'undefined' ? navigator : {},
`
]

Expand Down Expand Up @@ -238,6 +249,32 @@ const readmeLink = ['(\\[Node.js website\\]\\(https://nodejs.org/dist/v)(\\d+.\\

const streamRequire = [ "require\\('stream'\\)", "require('../../lib/stream.js')" ]

const removeWebStreamsFromDuplexFromTest= [
'const { ReadableStream, WritableStream } = .+;',
`function makeATestReadableStream(value) {
return Readable.from([value])
}
function makeATestWritableStream(writeFunc) {
return new Writable({
write(chunk, enc, cb) {
writeFunc(chunk)
cb()
}
})
}
`
]

const duplexFromTestWebStreamNeutralizeReadable = [
'makeATestReadableStream\\(value\\) {',
'makeATestReadableStreamOff(value) {'
]

const duplexFromTestWebStreamNeutralizeWritable = [
'makeATestWritableStream\\(writeFunc\\) {',
'makeATestWritableStreamOff(writeFunc) {'
]

export const replacements = {
'lib/_stream.+': [legacyStreamsRequireStream],
'lib/internal/streams/duplexify.+': [
Expand All @@ -248,7 +285,9 @@ export const replacements = {
],
'lib/internal/streams/(operators|pipeline).+': [
internalStreamsAbortControllerPolyfill,
internalStreamsNoRequireAbortController
internalStreamsNoRequireAbortController,
internalStreamsNoRequireAbortController2,
internalStreamsWeakHandler2
],
'lib/internal/streams/readable.js': [
removefromWebReadableMethod,
Expand Down Expand Up @@ -307,7 +346,13 @@ export const replacements = {
testParallelSilentConsole,
testParallelTimersPromises
],
'test/parallel/test-stream-duplex-from.js': [testParallelDuplexFromBlob, testParallelDuplexSkipWithoutBlob],
'test/parallel/test-stream-duplex-from.js': [
testParallelDuplexFromBlob,
testParallelDuplexSkipWithoutBlob,
duplexFromTestWebStreamNeutralizeReadable,
duplexFromTestWebStreamNeutralizeWritable,
removeWebStreamsFromDuplexFromTest
],
'test/parallel/test-stream-finished.js': [testParallelFinishedEvent],
'test/parallel/test-stream-flatMap.js': [testParallelFlatMapWinLineSeparator],
'test/parallel/test-stream-preprocess.js': [testParallelPreprocessWinLineSeparator],
Expand Down
7 changes: 5 additions & 2 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
'use strict'

const { SymbolDispose } = require('../../ours/primordials')
const { AbortError, codes } = require('../../ours/errors')
const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils')
const eos = require('./end-of-stream')
const { ERR_INVALID_ARG_TYPE } = codes
let addAbortListener

// This method is inlined here for readable-stream
// It also does not allow for signal to not exist on the stream
Expand Down Expand Up @@ -42,8 +44,9 @@ module.exports.addAbortSignalNoValidate = function (signal, stream) {
if (signal.aborted) {
onAbort()
} else {
signal.addEventListener('abort', onAbort)
eos(stream, () => signal.removeEventListener('abort', onAbort))
addAbortListener ??= require('events').addAbortListener
const disposable = addAbortListener(signal, onAbort)
eos(stream, disposable[SymbolDispose])
}
return stream
}
2 changes: 1 addition & 1 deletion lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ module.exports = function compose(...streams) {
d = new Duplex({
// TODO (ronag): highWaterMark?
writableObjectMode: !!(head !== null && head !== undefined && head.writableObjectMode),
readableObjectMode: !!(tail !== null && tail !== undefined && tail.writableObjectMode),
readableObjectMode: !!(tail !== null && tail !== undefined && tail.readableObjectMode),
writable,
readable
})
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const {
AbortError
} = require('../../ours/errors')
const { Symbol } = require('../../ours/primordials')
const { kDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils')
const { kIsDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils')
const kDestroy = Symbol('kDestroy')
const kConstruct = Symbol('kConstruct')
function checkError(err, w, r) {
Expand Down Expand Up @@ -278,7 +278,7 @@ function destroyer(stream, err) {
process.nextTick(emitCloseLegacy, stream)
}
if (!stream.destroyed) {
stream[kDestroyed] = true
stream[kIsDestroyed] = true
}
}
module.exports = {
Expand Down
41 changes: 20 additions & 21 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ const {
isNodeStream,
isReadableNodeStream,
isWritableNodeStream,
isDuplexNodeStream
isDuplexNodeStream,
isReadableStream,
isWritableStream
} = require('./utils')
const eos = require('./end-of-stream')
const {
Expand All @@ -23,6 +25,7 @@ const {
const { destroyer } = require('./destroy')
const Duplex = require('./duplex')
const Readable = require('./readable')
const Writable = require('./writable')
const { createDeferredPromise } = require('../../ours/util')
const from = require('./from')
const Blob = globalThis.Blob || bufferModule.Blob
Expand Down Expand Up @@ -77,17 +80,16 @@ module.exports = function duplexify(body, name) {
readable: false
})
}

// TODO: Webstreams
// if (isReadableStream(body)) {
// return _duplexify({ readable: Readable.fromWeb(body) });
// }

// TODO: Webstreams
// if (isWritableStream(body)) {
// return _duplexify({ writable: Writable.fromWeb(body) });
// }

if (isReadableStream(body)) {
return _duplexify({
readable: Readable.fromWeb(body)
})
}
if (isWritableStream(body)) {
return _duplexify({
writable: Writable.fromWeb(body)
})
}
if (typeof body === 'function') {
const { value, write, final, destroy } = fromAsyncGen(body)
if (isIterable(value)) {
Expand Down Expand Up @@ -144,15 +146,12 @@ module.exports = function duplexify(body, name) {
writable: false
})
}

// TODO: Webstreams.
// if (
// isReadableStream(body?.readable) &&
// isWritableStream(body?.writable)
// ) {
// return Duplexify.fromWeb(body);
// }

if (
isReadableStream(body === null || body === undefined ? undefined : body.readable) &&
isWritableStream(body === null || body === undefined ? undefined : body.writable)
) {
return Duplexify.fromWeb(body)
}
if (
typeof (body === null || body === undefined ? undefined : body.writable) === 'object' ||
typeof (body === null || body === undefined ? undefined : body.readable) === 'object'
Expand Down
13 changes: 8 additions & 5 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const { AbortError, codes } = require('../../ours/errors')
const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes
const { kEmptyObject, once } = require('../../ours/util')
const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators')
const { Promise, PromisePrototypeThen } = require('../../ours/primordials')
const { Promise, PromisePrototypeThen, SymbolDispose } = require('../../ours/primordials')
const {
isClosed,
isReadable,
Expand All @@ -28,6 +28,7 @@ const {
willEmitClose: _willEmitClose,
kIsClosedPromise
} = require('./utils')
let addAbortListener
function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function'
}
Expand Down Expand Up @@ -212,12 +213,13 @@ function eos(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort)
} else {
addAbortListener ??= require('events').addAbortListener
const disposable = addAbortListener(options.signal, abort)
const originalCallback = callback
callback = once((...args) => {
options.signal.removeEventListener('abort', abort)
disposable[SymbolDispose]()
originalCallback.apply(stream, args)
})
options.signal.addEventListener('abort', abort)
}
}
return cleanup
Expand All @@ -238,12 +240,13 @@ function eosWeb(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort)
} else {
addAbortListener ??= require('events').addAbortListener
const disposable = addAbortListener(options.signal, abort)
const originalCallback = callback
callback = once((...args) => {
options.signal.removeEventListener('abort', abort)
disposable[SymbolDispose]()
originalCallback.apply(stream, args)
})
options.signal.addEventListener('abort', abort)
}
}
const resolverFn = (...args) => {
Expand Down
Loading

0 comments on commit d5b816d

Please sign in to comment.