diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 88bdcb643dd201..9f1e0ba0dc93f2 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -34,6 +34,13 @@ function isWritableFinished(stream) { function nop() {} +function isReadableEnded(stream) { + if (stream.readableEnded) return true; + const rState = stream._readableState; + if (!rState || rState.errored) return false; + return rState.endEmitted || (rState.ended && rState.length === 0); +} + function eos(stream, opts, callback) { if (arguments.length === 2) { callback = opts; @@ -84,7 +91,7 @@ function eos(stream, opts, callback) { const onclose = () => { let err; if (readable && !readableEnded) { - if (!rState || !rState.ended) + if (!isReadableEnded(stream)) err = new ERR_STREAM_PREMATURE_CLOSE(); return callback.call(stream, err); } diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 8c6cbf7524ea53..46d499709ee45e 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -26,23 +26,37 @@ let PassThrough; let createReadableStreamAsyncIterator; function destroyer(stream, reading, writing, callback) { - callback = once(callback); - let destroyed = false; + const _destroy = once((err) => { + destroyImpl.destroyer(stream, err); + callback(err); + }); if (eos === undefined) eos = require('internal/streams/end-of-stream'); eos(stream, { readable: reading, writable: writing }, (err) => { - if (destroyed) return; - destroyed = true; - destroyImpl.destroyer(stream, err); - callback(err); + const rState = stream._readableState; + if ( + err && + err.code === 'ERR_STREAM_PREMATURE_CLOSE' && + reading && + (rState && rState.ended && !rState.errored && !rState.errorEmitted) + ) { + // Some readable streams will emit 'close' before 'end'. However, since + // this is on the readable side 'end' should still be emitted if the + // stream has been ended and no error emitted. This should be allowed in + // favor of backwards compatibility. Since the stream is piped to a + // destination this should not result in any observable difference. + // We don't need to check if this is a writable premature close since + // eos will only fail with premature close on the reading side for + // duplex streams. + stream + .once('end', _destroy) + .once('error', _destroy); + } else { + _destroy(err); + } }); - return (err) => { - if (destroyed) return; - destroyed = true; - destroyImpl.destroyer(stream, err); - callback(err || new ERR_STREAM_DESTROYED('pipe')); - }; + return (err) => _destroy(err || new ERR_STREAM_DESTROYED('pipe')); } function popCallback(streams) { diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index c9c6bafd642e97..46a5f93913418b 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -342,3 +342,13 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); d._writableState.errored = true; d.emit('close'); } + +{ + const r = new Readable(); + finished(r, common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + })); + r.push('asd'); + r.push(null); + r.destroy(); +} diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 87e11f8116204e..74473b0db40600 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -920,3 +920,21 @@ const { promisify } = require('util'); })); src.end(); } + +{ + // Make sure 'close' before 'end' finishes without error + // if readable has received eof. + // Ref: https://github.com/nodejs/node/issues/29699 + const r = new Readable(); + const w = new Writable({ + write(chunk, encoding, cb) { + cb(); + } + }); + pipeline(r, w, (err) => { + assert.strictEqual(err, undefined); + }); + r.push('asd'); + r.push(null); + r.emit('close'); +}