-
Notifications
You must be signed in to change notification settings - Fork 30.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: propagate errors from src streams in async iterator #30861
Conversation
@nodejs/streams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution. After some thoughts, I do not think we should have special behavior for async iterators when piping. The pipe()
operator does not forward errors for pipes, and adding it to async iterators would expose some subtle bugs that are very hard to track.
My recommendation is to focus these efforts on pipeline
which forward errors. We should add native support for async iterators there, like so:
const { pipeline } = require('stream')
const { createReadStream, createWriteStream } = require('fs')
pipeline(
createReadStream('./big-file'),
async function * transform(source) {
for await (let chunk of source) {
yield chunk.toString().toUpperCase()
}
},
createWriteStream('./dest'),
(err) => {
if (err) console.log(err)
}
)
Or something even better to read.
I understand your point, but the async iterator API is already bugged, and this change only affect async iterator API, except the pushing to I doubt anyone can agree that the current implementation of async iterators is not bugged or not weird. The following code is not what anyone would expect when working with promises/async iterators, specially because unless you propagate the errors yourself, like in the following snippet, const fs = require('fs');
const request = require('request');
const { PassThrough } = require('stream')
async function print() {
const read = fs.createReadStream('/tmp/some.json');
const req = request('http://example.com');
const req2 = request('http://example.com/two');
const req3 = request('http://example.com/three');
const stream = new PassThrough();
read.on('error', (err) => stream.emit('error', err));
req.on('error', (err) => stream.emit('error', err));
req2.on('error', (err) => stream.emit('error', err));
req3.on('error', (err) => stream.emit('error', err));
const iterator = read
.pipe(req)
.pipe(req2)
.pipe(req3)
.pipe(stream)
for await (const k of iterator) {
console.log(k);
}
} vs async function print() {
const iterator = fs.createReadStream('/tmp/some.json')
.pipe(request('http://example.com'))
.pipe(request('http://example.com/two'))
.pipe(request('http://example.com/three'))
.pipe(new PassThrough())
for await (const k of iterator) {
console.log(k);
}
} Promise support is an important part of the latest Node releases, and I think they should be supported correctly.
|
I think it's consistent... Maybe we can do something for this use case through for (const k of pipeline(
fs.createReadStream('/tmp/some.json'),
request('http://example.com'),
request('http://example.com/two'),
request('http://example.com/three')
)) {
console.log(k);
} i.e. |
Can you please expand on why you states that the async iterator API is bugged? It merely reflects how streams operate. Streams have suffered from this problem for a long time, however we cannot change them due to backward compatibility. The following work as expected right now: async function print() {
const iterator = pipeline(
fs.createReadStream('/tmp/some.json'),
request('http://example.com'),
request('http://example.com/two'),
request('http://example.com/three'),
new PassThrough(), () = {})
for await (const k of iterator) {
console.log(k);
}
} If that does not work, it would be good to open an issue with a way to reproduce and/or a fix for that that does not involve changing Moreover, I know we can improve on this API and I would love to see further developments to simplify things. |
Oh, I missed that! Would be nice if the callback was optional. |
The following does not work as one would expect, if any of those streams fail,
In my opinion the promise API should trigger errors correctly, so the behaviour is not obvious. In any case, if backward compatibility is the issue, we can add an option to async function print() {
const iterator = fs.createReadStream('/tmp/some.json')
.pipe(request('http://example.com'))
.pipe(new PassThrough(), { wrapErrors: true }) // propagateErrors, errors or whatever name
for await (const k of iterator) {
console.log(k);
}
} And instead of this being a "fix", it'll be a new feature, what do you think? |
I agree with this.
Actually it doesn't work like I expected. It just returns the last stream. |
I don't think this PR is the way to go. See this failing test: async function test () {
const iterator = pipeline(
new Readable({
read() {
// pipeline should propagate this to the next stream through it's
// destroy(err) cleanup.
this.destroy(new Error('err'));
}
}),
new PassThrough(),
() => {}
)
for await (const k of iterator) {
console.log(k);
}
}
test().catch(common.mustCall()); I think this should work as it is right now. |
@marcosc90: A potential workaround: function pipeline2(...streams) {
const pt = new PassThrough();
for (const stream of streams) {
stream.on('error', err => {
pt.destroy(err);
});
}
pipeline(...streams, pt, () => {});
return pt;
} async function print() {
const iterator = pipeline2(
fs.createReadStream('/tmp/some.json'),
request('http://example.com'),
request('http://example.com/two'),
request('http://example.com/three'),
new PassThrough())
for await (const k of iterator) {
console.log(k);
}
} |
My PR fixes this, |
Here is a fix to the pipeline issue: #30869. |
Thanks, this solves half the issue 🎉 I'd like to continue the discussion regarding adding an optional argument, or another alternative so a chain pipe works well with promises. |
This changes makes all stream in a pipeline emit 'error' in case of an abnormal termination of the pipeline. If the last stream is currently being async iterated, this change will make the iteration reject accordingly. See: nodejs#30861 Fixes: nodejs#28194
This changes makes all stream in a pipeline emit 'error' in case of an abnormal termination of the pipeline. If the last stream is currently being async iterated, this change will make the iteration reject accordingly. See: #30861 Fixes: #28194 PR-URL: #30869 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
This changes makes all stream in a pipeline emit 'error' in case of an abnormal termination of the pipeline. If the last stream is currently being async iterated, this change will make the iteration reject accordingly. See: #30861 Fixes: #28194 PR-URL: #30869 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
@mcollina @ronag @nodejs/streams please take another look at #30861 (comment) |
I'm not following here? What's wrong with |
I think both APIs should play well with Async iterators. Since I don't see any support for this, I'll close it, and use pipeline with a mandatory callback as a work around. |
This changes makes all stream in a pipeline emit 'error' in case of an abnormal termination of the pipeline. If the last stream is currently being async iterated, this change will make the iteration reject accordingly. See: nodejs#30861 Fixes: nodejs#28194 PR-URL: nodejs#30869 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
This changes makes all stream in a pipeline emit 'error' in case of an abnormal termination of the pipeline. If the last stream is currently being async iterated, this change will make the iteration reject accordingly. See: #30861 Fixes: #28194 PR-URL: #30869 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
This should fix errors not being propagated when using async iterators on a piped stream if an error occured in one of the sources in the pipe chain/pipeline, which makes it very difficult & weird to handle errors.
Now piped streams keep track of the sources, so only when
Symbol.asyncIterator
is called on the destination stream, anerror
handle can be attached to each source & trigger an error on the stream being iterated.The 3 added tests fail on master.
Fixes: #28194
Checklist
make -j4 test
(UNIX), orvcbuild test
(Windows) passes