Skip to content

Commit

Permalink
stream: use addAbortListener
Browse files Browse the repository at this point in the history
PR-URL: #48550
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
atlowChemi authored and juanarbol committed Jul 13, 2023
1 parent f691dca commit 5a382d0
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 26 deletions.
10 changes: 8 additions & 2 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
'use strict';

const {
SymbolDispose,
} = primordials;

const {
AbortError,
codes,
Expand All @@ -13,6 +17,7 @@ const {

const eos = require('internal/streams/end-of-stream');
const { ERR_INVALID_ARG_TYPE } = codes;
let addAbortListener;

// This method is inlined here for readable-stream
// It also does not allow for signal to not exist on the stream
Expand Down Expand Up @@ -46,8 +51,9 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) {
if (signal.aborted) {
onAbort();
} else {
signal.addEventListener('abort', onAbort);
eos(stream, () => signal.removeEventListener('abort', onAbort));
addAbortListener ??= require('events').addAbortListener;
const disposable = addAbortListener(signal, onAbort);
eos(stream, disposable[SymbolDispose]);
}
return stream;
};
17 changes: 12 additions & 5 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ const {
validateBoolean,
} = require('internal/validators');

const { Promise, PromisePrototypeThen } = primordials;
const {
Promise,
PromisePrototypeThen,
SymbolDispose,
} = primordials;

const {
isClosed,
Expand All @@ -40,6 +44,7 @@ const {
willEmitClose: _willEmitClose,
kIsClosedPromise,
} = require('internal/streams/utils');
let addAbortListener;

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
Expand Down Expand Up @@ -249,12 +254,13 @@ function eos(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort);
} else {
addAbortListener ??= require('events').addAbortListener;
const disposable = addAbortListener(options.signal, abort);
const originalCallback = callback;
callback = once((...args) => {
options.signal.removeEventListener('abort', abort);
disposable[SymbolDispose]();
originalCallback.apply(stream, args);
});
options.signal.addEventListener('abort', abort);
}
}

Expand All @@ -272,12 +278,13 @@ function eosWeb(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort);
} else {
addAbortListener ??= require('events').addAbortListener;
const disposable = addAbortListener(options.signal, abort);
const originalCallback = callback;
callback = once((...args) => {
options.signal.removeEventListener('abort', abort);
disposable[SymbolDispose]();
originalCallback.apply(stream, args);
});
options.signal.addEventListener('abort', abort);
}
}
const resolverFn = (...args) => {
Expand Down
20 changes: 5 additions & 15 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const { AbortController } = require('internal/abort_controller');
const { AbortController, AbortSignal } = require('internal/abort_controller');

const {
codes: {
Expand All @@ -16,7 +16,7 @@ const {
validateInteger,
validateObject,
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { kWeakHandler, kResistStopPropagation } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');
const staticCompose = require('internal/streams/compose');
const {
Expand All @@ -27,6 +27,7 @@ const { deprecate } = require('internal/util');

const {
ArrayPrototypePush,
Boolean,
MathFloor,
Number,
NumberIsNaN,
Expand Down Expand Up @@ -84,19 +85,11 @@ function map(fn, options) {
validateInteger(concurrency, 'concurrency', 1);

return async function* map() {
const ac = new AbortController();
const signal = AbortSignal.any([options?.signal].filter(Boolean));
const stream = this;
const queue = [];
const signal = ac.signal;
const signalOpt = { signal };

const abort = () => ac.abort();
if (options?.signal?.aborted) {
abort();
}

options?.signal?.addEventListener('abort', abort);

let next;
let resume;
let done = false;
Expand Down Expand Up @@ -153,7 +146,6 @@ function map(fn, options) {
next();
next = null;
}
options?.signal?.removeEventListener('abort', abort);
}
}

Expand Down Expand Up @@ -188,8 +180,6 @@ function map(fn, options) {
});
}
} finally {
ac.abort();

done = true;
if (resume) {
resume();
Expand Down Expand Up @@ -301,7 +291,7 @@ async function reduce(reducer, initialValue, options) {
const ac = new AbortController();
const signal = ac.signal;
if (options?.signal) {
const opts = { once: true, [kWeakHandler]: this };
const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true };
options.signal.addEventListener('abort', () => ac.abort(), opts);
}
let gotAnyItemFromStream = false;
Expand Down
10 changes: 8 additions & 2 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const {
ArrayIsArray,
Promise,
SymbolAsyncIterator,
SymbolDispose,
} = primordials;

const eos = require('internal/streams/end-of-stream');
Expand Down Expand Up @@ -44,6 +45,7 @@ const { AbortController } = require('internal/abort_controller');

let PassThrough;
let Readable;
let addAbortListener;

function destroyer(stream, reading, writing) {
let finished = false;
Expand Down Expand Up @@ -206,7 +208,11 @@ function pipelineImpl(streams, callback, opts) {
finishImpl(new AbortError());
}

outerSignal?.addEventListener('abort', abort);
addAbortListener ??= require('events').addAbortListener;
let disposable;
if (outerSignal) {
disposable = addAbortListener(outerSignal, abort);
}

let error;
let value;
Expand All @@ -231,7 +237,7 @@ function pipelineImpl(streams, callback, opts) {
destroys.shift()(error);
}

outerSignal?.removeEventListener('abort', abort);
disposable?.[SymbolDispose]();
ac.abort();

if (final) {
Expand Down
8 changes: 6 additions & 2 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const {
SafePromiseAll,
Symbol,
SymbolAsyncIterator,
SymbolDispose,
SymbolToStringTag,
Uint8Array,
} = primordials;
Expand Down Expand Up @@ -140,6 +141,7 @@ const kRelease = Symbol('kRelease');

let releasedError;
let releasingError;
let addAbortListener;

const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm;

Expand Down Expand Up @@ -1259,6 +1261,7 @@ function readableStreamPipeTo(

let reader;
let writer;
let disposable;
// Both of these can throw synchronously. We want to capture
// the error and return a rejected promise instead.
try {
Expand Down Expand Up @@ -1291,7 +1294,7 @@ function readableStreamPipeTo(
writableStreamDefaultWriterRelease(writer);
readableStreamReaderGenericRelease(reader);
if (signal !== undefined)
signal.removeEventListener('abort', abortAlgorithm);
disposable?.[SymbolDispose]();
if (rejected)
promise.reject(error);
else
Expand Down Expand Up @@ -1418,7 +1421,8 @@ function readableStreamPipeTo(
abortAlgorithm();
return promise.promise;
}
signal.addEventListener('abort', abortAlgorithm, { once: true });
addAbortListener ??= require('events').addAbortListener;
disposable = addAbortListener(signal, abortAlgorithm);
}

setPromiseHandled(run());
Expand Down

0 comments on commit 5a382d0

Please sign in to comment.