diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index a3f69c2099ace1..e6afb6bbd900b9 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -20,6 +20,7 @@ const { ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED, + ERR_STREAM_PREMATURE_CLOSE, }, AbortError, } = require('internal/errors'); @@ -344,13 +345,24 @@ function pipelineImpl(streams, callback, opts) { } function pipe(src, dst, finish, { end }) { + let ended = false + dst.on('close', () => { + if (!ended) { + // Finish of the destination closes before the source has completed. + finish(new ERR_STREAM_PREMATURE_CLOSE()); + } + }) + src.pipe(dst, { end }); if (end) { // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. - src.once('end', () => dst.end()); + src.once('end', () => { + ended = true + dst.end() + }); } else { finish(); } diff --git a/test/parallel/test-stream-pipeline-duplex.js b/test/parallel/test-stream-pipeline-duplex.js new file mode 100644 index 00000000000000..9ba35f6d87c38b --- /dev/null +++ b/test/parallel/test-stream-pipeline-duplex.js @@ -0,0 +1,21 @@ +'use strict'; + +const common = require('../common'); +const { pipeline, Duplex, PassThrough } = require('stream'); +const assert = require('assert'); + +const remote = new PassThrough(); +const local = new Duplex({ + read () {}, + write (chunk, enc, callback) { + callback() + } +}); + +pipeline(remote, local, remote, common.mustCall((err) => { + assert.equal(err.code, 'ERR_STREAM_PREMATURE_CLOSE') +})); + +setImmediate(() => { + remote.end(); +});