From 9d09969f4c29b7f2bacc9cb44e210c4e269945a4 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 28 Sep 2019 10:24:56 +0200 Subject: [PATCH] stream: always invoke end callback Ensure that the callback passed into end() is always invoke in order to avoid bug such as deadlock the user. PR-URL: https://github.com/nodejs/node/pull/29747 Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Trivikram Kamat --- lib/_stream_writable.js | 34 ++++++++++-- test/parallel/test-stream-writable-destroy.js | 52 +++++++++++++++++++ .../test-stream-writable-end-cb-error.js | 48 +++++++++++++++++ .../test-stream-writable-end-cb-uncaugth.js | 23 ++++++++ 4 files changed, 153 insertions(+), 4 deletions(-) create mode 100644 test/parallel/test-stream-writable-end-cb-error.js create mode 100644 test/parallel/test-stream-writable-end-cb-uncaugth.js diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 9b4036e4764418..c7a3047dc72268 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -611,11 +611,11 @@ Writable.prototype.end = function(chunk, encoding, cb) { } // Ignore unnecessary end() calls. - if (!state.ending) + if (!state.ending) { endWritable(this, state, cb); - else if (typeof cb === 'function') { + } else if (typeof cb === 'function') { if (!state.finished) { - this.once('finish', cb); + onFinished(this, state, cb); } else { cb(new ERR_STREAM_ALREADY_FINISHED('end')); } @@ -695,7 +695,7 @@ function endWritable(stream, state, cb) { if (state.finished) process.nextTick(cb); else - stream.once('finish', cb); + onFinished(stream, state, cb); } state.ended = true; stream.writable = false; @@ -715,6 +715,32 @@ function onCorkedFinish(corkReq, state, err) { state.corkedRequestsFree.next = corkReq; } +function onFinished(stream, state, cb) { + if (state.destroyed && state.errorEmitted) { + // TODO(ronag): Backwards compat. Should be moved to end() without + // errorEmitted check and with errorOrDestroy. + const err = new ERR_STREAM_DESTROYED('end'); + process.nextTick(cb, err); + return; + } + + function onerror(err) { + stream.removeListener('finish', onfinish); + stream.removeListener('error', onerror); + cb(err); + if (stream.listenerCount('error') === 0) { + stream.emit('error', err); + } + } + function onfinish() { + stream.removeListener('finish', onfinish); + stream.removeListener('error', onerror); + cb(); + } + stream.on('finish', onfinish); + stream.prependListener('error', onerror); +} + Object.defineProperty(Writable.prototype, 'destroyed', { // Making it explicit this property is not enumerable // because otherwise some prototype manipulation in diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index c4a96788ab24dd..30e4503c05773a 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -292,3 +292,55 @@ const assert = require('assert'); })); write.uncork(); } + +{ + // Call end(cb) after error & destroy + + const write = new Writable({ + write(chunk, enc, cb) { cb(new Error('asd')); } + }); + write.on('error', common.mustCall(() => { + write.destroy(); + let ticked = false; + write.end(common.mustCall((err) => { + assert.strictEqual(ticked, true); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + })); + ticked = true; + })); + write.write('asd'); +} + +{ + // Call end(cb) after finish & destroy + + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + write.on('finish', common.mustCall(() => { + write.destroy(); + let ticked = false; + write.end(common.mustCall((err) => { + assert.strictEqual(ticked, false); + assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); + })); + ticked = true; + })); + write.end(); +} + +{ + // Call end(cb) after error & destroy and don't trigger + // unhandled exception. + + const write = new Writable({ + write(chunk, enc, cb) { process.nextTick(cb); } + }); + write.once('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + write.end('asd', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + write.destroy(new Error('asd')); +} diff --git a/test/parallel/test-stream-writable-end-cb-error.js b/test/parallel/test-stream-writable-end-cb-error.js new file mode 100644 index 00000000000000..24989a6d06a111 --- /dev/null +++ b/test/parallel/test-stream-writable-end-cb-error.js @@ -0,0 +1,48 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const stream = require('stream'); + +{ + // Invoke end callback on failure. + const writable = new stream.Writable(); + + writable._write = (chunk, encoding, cb) => { + process.nextTick(cb, new Error('kaboom')); + }; + + writable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + writable.write('asd'); + writable.end(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + writable.end(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); +} + +{ + // Don't invoke end callback twice + const writable = new stream.Writable(); + + writable._write = (chunk, encoding, cb) => { + process.nextTick(cb); + }; + + let called = false; + writable.end('asd', common.mustCall((err) => { + called = true; + assert.strictEqual(err, undefined); + })); + + writable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + writable.on('finish', common.mustCall(() => { + assert.strictEqual(called, true); + writable.emit('error', new Error('kaboom')); + })); +} diff --git a/test/parallel/test-stream-writable-end-cb-uncaugth.js b/test/parallel/test-stream-writable-end-cb-uncaugth.js new file mode 100644 index 00000000000000..ab25cac81b0bee --- /dev/null +++ b/test/parallel/test-stream-writable-end-cb-uncaugth.js @@ -0,0 +1,23 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const stream = require('stream'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); +})); + +const writable = new stream.Writable(); + +writable._write = (chunk, encoding, cb) => { + cb(); +}; +writable._final = (cb) => { + cb(new Error('kaboom')); +}; + +writable.write('asd'); +writable.end(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); +}));