From daeb21dde95ba03a8664c8f2ea3537505ff820fc Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 12 Jul 2023 15:31:49 +0100 Subject: [PATCH] stream: fix deadlock when pipeing to full sink When piping a paused Readable to a full Writable we didn't register a drain listener which cause the src to never resume. Refs: https://github.com/nodejs/node/issues/48666 PR-URL: https://github.com/nodejs/node/pull/48691 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Luigi Pinca Reviewed-By: Matteo Collina --- lib/internal/streams/readable.js | 4 +--- test/parallel/test-stream-pipe-deadlock.js | 27 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) create mode 100644 test/parallel/test-stream-pipe-deadlock.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index b282f51b4e885e..1b40192d9458ba 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -833,9 +833,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // Start the flow if it hasn't been started already. if (dest.writableNeedDrain === true) { - if (state.flowing) { - pause(); - } + pause(); } else if (!state.flowing) { debug('pipe resume'); src.resume(); diff --git a/test/parallel/test-stream-pipe-deadlock.js b/test/parallel/test-stream-pipe-deadlock.js new file mode 100644 index 00000000000000..bf75445877baef --- /dev/null +++ b/test/parallel/test-stream-pipe-deadlock.js @@ -0,0 +1,27 @@ +'use strict'; + +const common = require('../common'); +const { Readable, Writable } = require('stream'); + +// https://github.com/nodejs/node/issues/48666 +(async () => { + // Prepare src that is internally ended, with buffered data pending + const src = new Readable({ read() {} }); + src.push(Buffer.alloc(100)); + src.push(null); + src.pause(); + + // Give it time to settle + await new Promise((resolve) => setImmediate(resolve)); + + const dst = new Writable({ + highWaterMark: 1000, + write(buf, enc, cb) { + process.nextTick(cb); + } + }); + + dst.write(Buffer.alloc(1000)); // Fill write buffer + dst.on('finish', common.mustCall()); + src.pipe(dst); +})().then(common.mustCall());