Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'ry/v0.10'
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacs committed Aug 19, 2013
2 parents f97a126 + 5458079 commit fe0f12b
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 4 deletions.
16 changes: 12 additions & 4 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -540,15 +540,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) {

// if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this.
// check for listeners before emit removes one-time listeners.
var errListeners = EE.listenerCount(dest, 'error');
function onerror(er) {
debug('onerror', er);
unpipe();
if (errListeners === 0 && EE.listenerCount(dest, 'error') === 0)
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0)
dest.emit('error', er);
}
dest.once('error', onerror);
// This is a brutally ugly hack to make sure that our error handler
// is attached before any userland ones. NEVER DO THIS.
if (!dest._events.error)
dest.on('error', onerror);
else if (Array.isArray(dest._events.error))
dest._events.error.unshift(onerror);
else
dest._events.error = [onerror, dest._events.error];



// Both close and finish should trigger unpipe, but only once.
function onclose() {
Expand Down
73 changes: 73 additions & 0 deletions test/simple/test-stream-pipe-error-handling.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,76 @@ var Stream = require('stream').Stream;

assert.strictEqual(gotErr, err);
})();

(function testErrorWithRemovedListenerThrows() {
var EE = require('events').EventEmitter;
var R = Stream.Readable;
var W = Stream.Writable;

var r = new R;
var w = new W;
var removed = false;
var didTest = false;

process.on('exit', function() {
assert(didTest);
console.log('ok');
});

r._read = function() {
setTimeout(function() {
assert(removed);
assert.throws(function() {
w.emit('error', new Error('fail'));
});
didTest = true;
});
};

w.on('error', myOnError);
r.pipe(w);
w.removeListener('error', myOnError);
removed = true;

function myOnError(er) {
throw new Error('this should not happen');
}
})();

(function testErrorWithRemovedListenerThrows() {
var EE = require('events').EventEmitter;
var R = Stream.Readable;
var W = Stream.Writable;

var r = new R;
var w = new W;
var removed = false;
var didTest = false;
var caught = false;

process.on('exit', function() {
assert(didTest);
console.log('ok');
});

r._read = function() {
setTimeout(function() {
assert(removed);
w.emit('error', new Error('fail'));
didTest = true;
});
};

w.on('error', myOnError);
w._write = function() {};

r.pipe(w);
// Removing some OTHER random listener should not do anything
w.removeListener('error', function() {});
removed = true;

function myOnError(er) {
assert(!caught);
caught = true;
}
})();

0 comments on commit fe0f12b

Please sign in to comment.