Skip to content

Commit

Permalink
stream: fix writable.end callback behavior
Browse files Browse the repository at this point in the history
Changes so that the end() callback behaves the same way in relation
to _final as write() does to _write/_writev.

PR-URL: #34101
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
ronag committed Jul 1, 2020
1 parent e2b468e commit c7e55c6
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 42 deletions.
11 changes: 6 additions & 5 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,9 @@ Is `true` after [`writable.destroy()`][writable-destroy] has been called.
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/34101
description: The `callback` is invoked before 'finish' or on error.
- version: v14.0.0
pr-url: https://github.com/nodejs/node/pull/29747
description: The `callback` is invoked if 'finish' or 'error' is emitted.
Expand All @@ -428,15 +431,13 @@ changes:
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding if `chunk` is a string
* `callback` {Function} Optional callback for when the stream finishes
or errors
* `callback` {Function} Callback for when the stream is finished.
* Returns: {this}

Calling the `writable.end()` method signals that no more data will be written
to the [`Writable`][]. The optional `chunk` and `encoding` arguments allow one
final additional chunk of data to be written immediately before closing the
stream. If provided, the optional `callback` function is attached as a listener
for the [`'finish'`][] and the `'error'` event.
stream.

Calling the [`stream.write()`][stream-write] method after calling
[`stream.end()`][stream-end] will raise an error.
Expand Down Expand Up @@ -592,7 +593,7 @@ changes:
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string. **Default:** `'utf8'`
* `callback` {Function} Callback for when this chunk of data is flushed
* `callback` {Function} Callback for when this chunk of data is flushed.
* Returns: {boolean} `false` if the stream wishes for the calling code to
wait for the `'drain'` event to be emitted before continuing to write
additional data; otherwise `true`.
Expand Down
53 changes: 25 additions & 28 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ ObjectSetPrototypeOf(Writable, Stream);

function nop() {}

const kOnFinished = Symbol('kOnFinished');

function WritableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
Expand Down Expand Up @@ -185,6 +187,8 @@ function WritableState(options, stream, isDuplex) {
// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false;

this[kOnFinished] = [];
}

function resetBuffer(state) {
Expand Down Expand Up @@ -411,7 +415,7 @@ function onwriteError(stream, state, er, cb) {
// not enabled. Passing `er` here doesn't make sense since
// it's related to one specific write, not to the buffered
// writes.
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
errorBuffer(state);
// This can emit error, but error must always follow cb.
errorOrDestroy(stream, er);
}
Expand Down Expand Up @@ -487,14 +491,14 @@ function afterWrite(stream, state, count, cb) {
}

if (state.destroyed) {
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
errorBuffer(state);
}

finishMaybe(stream, state);
}

// If there's something in the buffer waiting, then invoke callbacks.
function errorBuffer(state, err) {
function errorBuffer(state) {
if (state.writing) {
return;
}
Expand All @@ -503,7 +507,11 @@ function errorBuffer(state, err) {
const { chunk, callback } = state.buffered[n];
const len = state.objectMode ? 1 : chunk.length;
state.length -= len;
callback(err);
callback(new ERR_STREAM_DESTROYED('write'));
}

for (const callback of state[kOnFinished].splice(0)) {
callback(new ERR_STREAM_DESTROYED('end'));
}

resetBuffer(state);
Expand Down Expand Up @@ -611,10 +619,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
}

if (typeof cb === 'function') {
if (err || state.finished)
if (err || state.finished) {
process.nextTick(cb, err);
else
onFinished(this, cb);
} else {
state[kOnFinished].push(cb);
}
}

return this;
Expand All @@ -636,6 +645,9 @@ function callFinal(stream, state) {
stream._final((err) => {
state.pendingcb--;
if (err) {
for (const callback of state[kOnFinished].splice(0)) {
callback(err);
}
errorOrDestroy(stream, err, state.sync);
} else if (needFinish(state)) {
state.prefinished = true;
Expand Down Expand Up @@ -683,6 +695,11 @@ function finish(stream, state) {
return;

state.finished = true;

for (const callback of state[kOnFinished].splice(0)) {
callback();
}

stream.emit('finish');

if (state.autoDestroy) {
Expand All @@ -701,26 +718,6 @@ function finish(stream, state) {
}
}

// TODO(ronag): Avoid using events to implement internal logic.
function onFinished(stream, cb) {
function onerror(err) {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb(err);
if (stream.listenerCount('error') === 0) {
stream.emit('error', err);
}
}

function onfinish() {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb();
}
stream.on('finish', onfinish);
stream.prependListener('error', onerror);
}

ObjectDefineProperties(Writable.prototype, {

destroyed: {
Expand Down Expand Up @@ -800,7 +797,7 @@ const destroy = destroyImpl.destroy;
Writable.prototype.destroy = function(err, cb) {
const state = this._writableState;
if (!state.destroyed) {
process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write'));
process.nextTick(errorBuffer, state);
}
destroy.call(this, err, cb);
return this;
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-stream-transform-final-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ const t = new stream.Transform({
t.on('finish', common.mustCall(function() {
state++;
// finishListener
assert.strictEqual(state, 14);
assert.strictEqual(state, 15);
}, 1));
t.on('end', common.mustCall(function() {
state++;
Expand All @@ -106,5 +106,5 @@ t.write(4);
t.end(7, common.mustCall(function() {
state++;
// endMethodCallback
assert.strictEqual(state, 15);
assert.strictEqual(state, 14);
}, 1));
4 changes: 2 additions & 2 deletions test/parallel/test-stream-transform-final.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ const t = new stream.Transform({
t.on('finish', common.mustCall(function() {
state++;
// finishListener
assert.strictEqual(state, 14);
assert.strictEqual(state, 15);
}, 1));
t.on('end', common.mustCall(function() {
state++;
Expand All @@ -108,5 +108,5 @@ t.write(4);
t.end(7, common.mustCall(function() {
state++;
// endMethodCallback
assert.strictEqual(state, 15);
assert.strictEqual(state, 14);
}, 1));
2 changes: 1 addition & 1 deletion test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ const assert = require('assert');
assert.strictEqual(err.message, 'asd');
}));
write.end('asd', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
}));
write.destroy(new Error('asd'));
}
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-stream-writable-end-cb-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ const stream = require('stream');
}));
writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
}));
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
}));
}

Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-writable-end-cb-uncaught.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ writable._final = (cb) => {

writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
}));
2 changes: 1 addition & 1 deletion test/parallel/test-stream-write-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ for (const withPendingData of [ false, true ]) {
w.destroy();
assert.strictEqual(chunksWritten, 1);
callbacks.shift()();
assert.strictEqual(chunksWritten, 2);
assert.strictEqual(chunksWritten, useEnd && !withPendingData ? 1 : 2);
assert.strictEqual(callbacks.length, 0);
assert.strictEqual(drains, 1);

Expand Down

0 comments on commit c7e55c6

Please sign in to comment.