Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add catch handler for async _construct #34416

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ function Transform(options) {
}

function final(cb) {
let called = false;
if (typeof this._flush === 'function' && !this.destroyed) {
this._flush((er, data) => {
const result = this._flush((er, data) => {
called = true;
if (er) {
if (cb) {
cb(er);
Expand All @@ -126,6 +128,33 @@ function final(cb) {
cb();
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
(data) => {
if (called)
return;
if (data != null)
this.push(data);
this.push(null);
if (cb)
process.nextTick(cb);
},
(err) => {
if (cb) {
process.nextTick(cb, err);
} else {
process.nextTick(() => this.destroy(err));
}
});
}
} catch (err) {
process.nextTick(() => this.destroy(err));
}
}
} else {
this.push(null);
if (cb) {
Expand All @@ -151,7 +180,9 @@ Transform.prototype._write = function(chunk, encoding, callback) {
const wState = this._writableState;
const length = rState.length;

this._transform(chunk, encoding, (err, val) => {
let called = false;
const result = this._transform(chunk, encoding, (err, val) => {
called = true;
if (err) {
callback(err);
return;
Expand All @@ -172,6 +203,38 @@ Transform.prototype._write = function(chunk, encoding, callback) {
this[kCallback] = callback;
}
});
if (result !== undefined && result != null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
(val) => {
if (called)
return;

if (val != null) {
this.push(val);
}

if (
wState.ended ||
length === rState.length ||
rState.length < rState.highWaterMark ||
rState.length === 0) {
process.nextTick(callback);
} else {
this[kCallback] = callback;
}
},
(err) => {
process.nextTick(callback, err);
});
}
} catch (err) {
process.nextTick(callback, err);
}
}
};

Transform.prototype._read = function() {
Expand Down
27 changes: 26 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ function needFinish(state) {
function callFinal(stream, state) {
state.sync = true;
state.pendingcb++;
stream._final((err) => {
const result = stream._final((err) => {
state.pendingcb--;
if (err) {
for (const callback of state[kOnFinished].splice(0)) {
Expand All @@ -664,6 +664,31 @@ function callFinal(stream, state) {
process.nextTick(finish, stream, state);
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (state.prefinished)
return;
state.prefinish = true;
process.nextTick(() => stream.emit('prefinish'));
state.pendingcb++;
process.nextTick(finish, stream, state);
},
function(err) {
for (const callback of state[kOnFinished].splice(0)) {
process.nextTick(callback, err);
}
process.nextTick(errorOrDestroy, stream, err, state.sync);
});
}
} catch (err) {
process.nextTick(errorOrDestroy, stream, err, state.sync);
}
}
state.sync = false;
}

Expand Down
106 changes: 104 additions & 2 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@ function destroy(err, cb) {
}

function _destroy(self, err, cb) {
self._destroy(err || null, (err) => {
let called = false;
const result = self._destroy(err || null, (err) => {
const r = self._readableState;
const w = self._writableState;

called = true;

if (err) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack;
Expand Down Expand Up @@ -92,6 +95,64 @@ function _destroy(self, err, cb) {
process.nextTick(emitCloseNT, self);
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (called)
return;

const r = self._readableState;
const w = self._writableState;

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb);
}

process.nextTick(emitCloseNT, self);
},
function(err) {
const r = self._readableState;
const w = self._writableState;
err.stack;

called = true;

if (w && !w.errored) {
w.errored = err;
}
if (r && !r.errored) {
r.errored = err;
}

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb, err);
}

process.nextTick(emitErrorCloseNT, self, err);
});
}
} catch (err) {
process.nextTick(emitErrorNT, self, err);
}
}
}

function emitErrorCloseNT(self, err) {
Expand Down Expand Up @@ -230,7 +291,7 @@ function constructNT(stream) {
const s = w || r;

let called = false;
stream._construct((err) => {
const result = stream._construct((err) => {
if (r) {
r.constructed = true;
}
Expand All @@ -252,6 +313,47 @@ function constructNT(stream) {
process.nextTick(emitConstructNT, stream);
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
// If the callback was invoked, do nothing further.
if (called)
return;
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy));
} else {
process.nextTick(emitConstructNT, stream);
}
},
function(err) {
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
called = true;
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy, err));
} else {
process.nextTick(errorOrDestroy, stream, err);
}
});
}
} catch (err) {
process.nextTick(emitErrorNT, stream, err);
}
}
jasnell marked this conversation as resolved.
Show resolved Hide resolved
}

function emitConstructNT(stream) {
Expand Down
Loading