From 4c819d65f91f32290188f87031a95410bbd66e64 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 6 Jan 2021 19:45:41 +0100 Subject: [PATCH] stream: fix .end() error propagation PR-URL: https://github.com/nodejs/node/pull/36817 Reviewed-By: Luigi Pinca Reviewed-By: Matteo Collina --- lib/internal/streams/writable.js | 40 +++++++++++++------ .../test-stream-writable-end-cb-error.js | 35 ++++++++++++++++ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 97fe1d28ef1f3a..2510398d999fe1 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -27,6 +27,7 @@ const { FunctionPrototype, + Error, ObjectDefineProperty, ObjectDefineProperties, ObjectSetPrototypeOf, @@ -290,8 +291,8 @@ Writable.prototype.pipe = function() { errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); }; -Writable.prototype.write = function(chunk, encoding, cb) { - const state = this._writableState; +function _write(stream, chunk, encoding, cb) { + const state = stream._writableState; if (typeof encoding === 'function') { cb = encoding; @@ -333,11 +334,15 @@ Writable.prototype.write = function(chunk, encoding, cb) { if (err) { process.nextTick(cb, err); - errorOrDestroy(this, err, true); - return false; + errorOrDestroy(stream, err, true); + return err; } state.pendingcb++; - return writeOrBuffer(this, state, chunk, encoding, cb); + return writeOrBuffer(stream, state, chunk, encoding, cb); +} + +Writable.prototype.write = function(chunk, encoding, cb) { + return _write(this, chunk, encoding, cb) === true; }; Writable.prototype.cork = function() { @@ -607,8 +612,14 @@ Writable.prototype.end = function(chunk, encoding, cb) { encoding = null; } - if (chunk !== null && chunk !== undefined) - this.write(chunk, encoding); + let err; + + if (chunk !== null && chunk !== undefined) { + const ret = _write(this, chunk, encoding); + if (ret instanceof Error) { + err = ret; + } + } // .end() fully uncorks. if (state.corked) { @@ -616,12 +627,15 @@ Writable.prototype.end = function(chunk, encoding, cb) { this.uncork(); } - // This is forgiving in terms of unnecessary calls to end() and can hide - // logic errors. However, usually such errors are harmless and causing a - // hard error can be disproportionately destructive. It is not always - // trivial for the user to determine whether end() needs to be called or not. - let err; - if (!state.errored && !state.ending) { + if (err) { + // Do nothing... + } else if (!state.errored && !state.ending) { + // This is forgiving in terms of unnecessary calls to end() and can hide + // logic errors. However, usually such errors are harmless and causing a + // hard error can be disproportionately destructive. It is not always + // trivial for the user to determine whether end() needs to be called + // or not. + state.ending = true; finishMaybe(this, state, true); state.ended = true; diff --git a/test/parallel/test-stream-writable-end-cb-error.js b/test/parallel/test-stream-writable-end-cb-error.js index 8f6d209954436f..20428f1777fd17 100644 --- a/test/parallel/test-stream-writable-end-cb-error.js +++ b/test/parallel/test-stream-writable-end-cb-error.js @@ -46,3 +46,38 @@ const stream = require('stream'); writable.emit('error', new Error('kaboom')); })); } + +{ + const w = new stream.Writable({ + write(chunk, encoding, callback) { + setImmediate(callback); + }, + finish(callback) { + setImmediate(callback); + } + }); + w.end('testing ended state', common.mustCall((err) => { + // This errors since .destroy(err), which is invoked by errors + // in same tick below, will error all pending callbacks. + // Does this make sense? Not sure. + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + })); + assert.strictEqual(w.destroyed, false); + assert.strictEqual(w.writableEnded, true); + w.end(common.mustCall((err) => { + // This errors since .destroy(err), which is invoked by errors + // in same tick below, will error all pending callbacks. + // Does this make sense? Not sure. + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + })); + assert.strictEqual(w.destroyed, false); + assert.strictEqual(w.writableEnded, true); + w.end('end', common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); + })); + assert.strictEqual(w.destroyed, true); + w.on('error', common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); + })); + w.on('finish', common.mustNotCall()); +}