From f4609bdf3fabdf441da6af17c2022565f4e18f9d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 2 May 2021 18:17:18 +0200 Subject: [PATCH] stream: bypass legacy destroy for pipeline and async iteration PR-URL: https://github.com/nodejs/node/pull/38505 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina Reviewed-By: James M Snell --- lib/_http_client.js | 2 + lib/_http_incoming.js | 10 ++- lib/_http_server.js | 12 +-- lib/internal/streams/destroy.js | 59 +++++++++++++- lib/stream.js | 2 + test/parallel/test-stream-destroy.js | 115 +++++++++++++++++++++++++++ 6 files changed, 188 insertions(+), 12 deletions(-) create mode 100644 test/parallel/test-stream-destroy.js diff --git a/lib/_http_client.js b/lib/_http_client.js index f3b88b62730175..fde7fde86bbf25 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -53,6 +53,7 @@ const { prepareError, } = require('_http_common'); const { OutgoingMessage } = require('_http_outgoing'); +const { kDestroy } = require('internal/streams/destroy'); const Agent = require('_http_agent'); const { Buffer } = require('buffer'); const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); @@ -609,6 +610,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) { DTRACE_HTTP_CLIENT_RESPONSE(socket, req); req.res = res; res.req = req; + res[kDestroy] = null; // Add our listener first, so that we guarantee socket cleanup res.on('end', responseOnEnd); diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index e0f1354e6c969c..3961b583de9ddc 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -31,6 +31,7 @@ const { } = primordials; const { Readable, finished } = require('stream'); +const { kDestroy } = require('internal/streams/destroy'); const kHeaders = Symbol('kHeaders'); const kHeadersCount = Symbol('kHeadersCount'); @@ -188,13 +189,18 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) { this.socket.destroy(err); const cleanup = finished(this.socket, (e) => { cleanup(); - onError(this, e || err, cb); + process.nextTick(onError, this, e || err, cb); }); } else { - onError(this, err, cb); + process.nextTick(onError, this, err, cb); } }; +IncomingMessage.prototype[kDestroy] = function(err) { + this.socket = null; + this.destroy(err); +}; + IncomingMessage.prototype._addHeaderLines = _addHeaderLines; function _addHeaderLines(headers, n) { if (headers && headers.length) { diff --git a/lib/_http_server.js b/lib/_http_server.js index 97df58a007daba..946394f9add754 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -231,9 +231,7 @@ function onServerResponseClose() { // where the ServerResponse object has already been deconstructed. // Fortunately, that requires only a single if check. :-) if (this._httpMessage) { - this._httpMessage.destroyed = true; - this._httpMessage._closed = true; - this._httpMessage.emit('close'); + emitCloseNT(this._httpMessage); } } @@ -837,9 +835,11 @@ function resOnFinish(req, res, socket, state, server) { } function emitCloseNT(self) { - self.destroyed = true; - self._closed = true; - self.emit('close'); + if (!self.destroyed) { + self.destroyed = true; + self._closed = true; + self.emit('close'); + } } // The following callback is issued after the headers have been read on a diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index a2892c67a0fcfa..4238e12074d769 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -5,6 +5,7 @@ const { codes: { ERR_MULTIPLE_CALLBACK, }, + AbortError, } = require('internal/errors'); const { Symbol, @@ -363,15 +364,65 @@ function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } +const kDestroyed = Symbol('kDestroyed'); + +function emitCloseLegacy(stream) { + stream.emit('close'); +} + +function emitErrorCloseLegacy(stream, err) { + stream.emit('error', err); + process.nextTick(emitCloseLegacy, stream); +} + +function isDestroyed(stream) { + return stream.destroyed || stream[kDestroyed]; +} + +function isReadable(stream) { + return stream.readable && !stream.readableEnded && !isDestroyed(stream); +} + +function isWritable(stream) { + return stream.writable && !stream.writableEnded && !isDestroyed(stream); +} + // Normalize destroy for legacy. function destroyer(stream, err) { - if (isRequest(stream)) return stream.abort(); - if (isRequest(stream.req)) return stream.req.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(err); - if (typeof stream.close === 'function') return stream.close(); + if (isDestroyed(stream)) { + return; + } + + if (!err && (isReadable(stream) || isWritable(stream))) { + err = new AbortError(); + } + + // TODO: Remove isRequest branches. + if (typeof stream[kDestroy] === 'function') { + stream[kDestroy](err); + } else if (isRequest(stream)) { + stream.abort(); + } else if (isRequest(stream.req)) { + stream.req.abort(); + } else if (typeof stream.destroy === 'function') { + stream.destroy(err); + } else if (typeof stream.close === 'function') { + // TODO: Don't lose err? + stream.close(); + } else if (err) { + process.nextTick(emitErrorCloseLegacy, stream); + } else { + process.nextTick(emitCloseLegacy, stream); + } + + if (!stream.destroyed) { + stream[kDestroyed] = true; + } } module.exports = { + kDestroy, + isDestroyed, construct, destroyer, destroy, diff --git a/lib/stream.js b/lib/stream.js index 85adda81b32f29..16a2370232e360 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -30,6 +30,7 @@ const { } = require('internal/util'); const pipeline = require('internal/streams/pipeline'); +const { destroyer } = require('internal/streams/destroy'); const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); @@ -45,6 +46,7 @@ Stream.pipeline = pipeline; const { addAbortSignal } = require('internal/streams/add-abort-signal'); Stream.addAbortSignal = addAbortSignal; Stream.finished = eos; +Stream.destroy = destroyer; ObjectDefineProperty(Stream, 'promises', { configurable: true, diff --git a/test/parallel/test-stream-destroy.js b/test/parallel/test-stream-destroy.js new file mode 100644 index 00000000000000..f99c29c811932d --- /dev/null +++ b/test/parallel/test-stream-destroy.js @@ -0,0 +1,115 @@ +'use strict'; + +const common = require('../common'); +const { + Writable, + Readable, + destroy +} = require('stream'); +const assert = require('assert'); +const http = require('http'); + +{ + const r = new Readable({ read() {} }); + destroy(r); + assert.strictEqual(r.destroyed, true); + r.on('error', common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); + r.on('close', common.mustCall()); +} + +{ + const r = new Readable({ read() {} }); + destroy(r, new Error('asd')); + assert.strictEqual(r.destroyed, true); + r.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + r.on('close', common.mustCall()); +} + +{ + const w = new Writable({ write() {} }); + destroy(w); + assert.strictEqual(w.destroyed, true); + w.on('error', common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); + w.on('close', common.mustCall()); +} + +{ + const w = new Writable({ write() {} }); + destroy(w, new Error('asd')); + assert.strictEqual(w.destroyed, true); + w.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + w.on('close', common.mustCall()); +} + +{ + const server = http.createServer((req, res) => { + destroy(req); + req.on('error', common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); + req.on('close', common.mustCall(() => { + res.end('hello'); + })); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + req.write('asd'); + req.on('response', (res) => { + const buf = []; + res.on('data', (data) => buf.push(data)); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual( + Buffer.concat(buf), + Buffer.from('hello') + ); + server.close(); + })); + }); + }); +} + +{ + const server = http.createServer((req, res) => { + req + .resume() + .on('end', () => { + destroy(req); + }) + .on('error', common.mustNotCall()); + + req.on('close', common.mustCall(() => { + res.end('hello'); + })); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + req.write('asd'); + req.on('response', (res) => { + const buf = []; + res.on('data', (data) => buf.push(data)); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual( + Buffer.concat(buf), + Buffer.from('hello') + ); + server.close(); + })); + }); + }); +}