-
Notifications
You must be signed in to change notification settings - Fork 30.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add stream.pipeline and stream.onEnd #19828
Conversation
6cc85d2
to
ca4f736
Compare
Goal is to get this out in 10, backports might be tricky as the core streams in 9, doesn't play well with error handling patterns used by pump. |
ca4f736
to
7b5e84b
Compare
Isn't the second commit (changing the order of emitted stream events) unrelated and would be better as a separate PR? |
lib/stream.js
Outdated
@@ -33,6 +35,9 @@ Stream.Duplex = require('_stream_duplex'); | |||
Stream.Transform = require('_stream_transform'); | |||
Stream.PassThrough = require('_stream_passthrough'); | |||
|
|||
Stream.pipeline = pump; | |||
Stream.onEnd = eos; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onEnd
is a confusing name in the context of streams, we should use something else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open for suggestions :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we used finished
, like in mississippi
?
I think the two new internal modules could use some improvements code style-wise (they may not even pass the linter?) and performance-wise (e.g. replacing functional array methods). |
Also, why does this need to live on |
7b5e84b
to
1593e58
Compare
@mscdex the order change is to make the streams play well with the pump code, and how it, in general should be emitted (mistake on my part when I updated the error handling). Happy to move that out if that's easier to review. I'd very much prefer not to rewrite the source at this point. This is ported with minimal changes from my pump and end-of-stream module, to pass the node core linter, and there is lots of complexity in there. I'm not I understand what you mean by constructors? |
1593e58
to
f38fd9b
Compare
IMO if it's being included directly in node core, we should strive to make the code as readable as possible and performant as possible (especially in this case because the public functions being added could be in hot paths). There isn't even that much code there, so I don't see why this is a problem. Some of the changes would even be pretty straightforward (e.g. replacing the functional array methods).
e.g. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some test regarding to http.OutgoingMessage (both on a client and on a server)? They are the special writable-like object.
Also for HTTP2 compat API.
lib/stream.js
Outdated
@@ -22,6 +22,8 @@ | |||
'use strict'; | |||
|
|||
const { Buffer } = require('buffer'); | |||
const pump = require('internal/streams/pump'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would name this pipeline
instead.
}); | ||
} | ||
|
||
run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to call common.crashOnUnhandledRejection()
at the top of the file.
|
||
await pipelinePromise(read, write); | ||
|
||
// nexttick to avoid the promise catching it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this nextTick is not clear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was to work around the need for common.crashOnUnhandledRejection()
which i didn't know about 👍
read.push('data'); | ||
setImmediate(() => read.destroy(new Error('kaboom'))); | ||
|
||
pipeline(read, write, common.mustCall((err) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add some test for the return value of pipeline?
lib/internal/streams/pump.js
Outdated
if (Array.isArray(streams[0])) streams = streams[0]; | ||
|
||
if (streams.length < 2) { | ||
throw new ERR_ASSERTION('pipeline requires two streams per minimum'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is the right error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an assertion error no?
req.on('close', common.mustCall(() => { | ||
req.on('error', common.mustCall(() => { | ||
req.on('error', common.mustCall(() => { | ||
req.on('close', common.mustCall(() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these changes part of this PR, if so why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I messed up the emit order in my streams error handling PR (error should be before close). Otherwise these modules will report a premature close error instead of the error passed to stream.destroy. It's quite related to the end-of-stream/pump stuff so just bundled that in here. Want me to open an independent PR with that? (tests won't pass without this fix)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes please! If possible I would like to backport this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I don't think a backport is needed as the stream error handling stuff we did is going out in 10 only I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, I would like to backport this PR to 8 if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm splitting it out in a sec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mscdex Still not sure I follow, you want this exposed as |
c128bad
to
bd162fa
Compare
@@ -0,0 +1,95 @@ | |||
// Ported from https://github.com/mafintosh/pump with | |||
// permission from the author, Mathias Buus (@mafintosh). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the original code has a copyright/license, then it should likely either be included here or embedded in the LICENSE file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does, but I did mention here I'm fine waiving copyright, mafintosh/pump#17 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I'm fine with whatever is easiest btw)
lib/internal/streams/pipeline.js
Outdated
if (Array.isArray(streams[0])) streams = streams[0]; | ||
|
||
if (streams.length < 2) { | ||
throw new ERR_ASSERTION('pipeline requires two streams per minimum'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is for the assert module.
You should probably be using ERR_MISSING_ARGS
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
lib/internal/streams/pipeline.js
Outdated
return streams.pop(); | ||
} | ||
|
||
function pump(...streams) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you name this pipeline?
bd162fa
to
4f83f94
Compare
// where the client keeps the event loop going | ||
// (replacing the rs.destroy() with req.end() | ||
// exits it so seems to be a destroy bug there | ||
client.unref(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be investigating this later on this afternoon :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing this with other Readable
instances shows this working. For instance, replace the custom Readable
above with const rs = fs.createReadStream(__filename)
and the test appears to work just fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, just verified, commenting out the client.unref()
and replacing the custom Readable
with fs.createReadStream()
and adjusting the cnt
below a bit to account for the difference in the number of data
events allows this test to pass and the Http2Stream
shuts down normally. So the issue appears to be specific to the custom Readable
?? /cc @mcollina
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jasnell hmm, i don't understand how the Readable should affect this. it's just the trigger of the destruction of the pipeline. could it be that there is an http2 issue if a bunch of writes happen while the stream is destroyed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that I'm aware of. At this point I'm a bit unsure what could be happening. I've tried a number of variations but can only reproduce it with the custom Readable
. I've asked @mcollina to take a look. He said he will take a look tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mcollina... Please don't forget to take a look at this one :)
4f83f94
to
3a117e5
Compare
@mcollina PTAL (docs missing, working on that now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for many petty doc nits)
doc/api/errors.md
Outdated
<a id="ERR_STREAM_PREMATURE_CLOSE"></a> | ||
### ERR_STREAM_PREMATURE_CLOSE | ||
|
||
An error returned by `stream.onEnd` and `stream.pipeline`, when a stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: `stream.onEnd` and `stream.pipeline`
-> `stream.onEnd()` and `stream.pipeline()`
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be stream.finished()
and not stream.onEnd()
, since the former is what is currently used in this PR.
doc/api/stream.md
Outdated
@@ -46,6 +46,8 @@ There are four fundamental stream types within Node.js: | |||
* [Transform][] - Duplex streams that can modify or transform the data as it | |||
is written and read (for example [`zlib.createDeflate()`][]). | |||
|
|||
Additionally this module includes the utility functions [pipeline][] and [finished][]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line should be wrapped after 80 characters.
doc/api/stream.md
Outdated
@@ -1263,6 +1265,106 @@ implementors should not override this method, but instead implement | |||
[`readable._destroy`][readable-_destroy]. | |||
The default implementation of `_destroy` for `Transform` also emit `'close'`. | |||
|
|||
### stream.pipeline(...streams[, callback]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it OK to use rest parameters syntax with non-last parameter even just for convenience?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems stream.pipeline()
section should go after stream.finished()
section, alphabetically.
doc/api/stream.md
Outdated
* `callback` {Function} A callback function that takes an optional error | ||
argument. | ||
|
||
A class method to pipe between streams forwarding errors and properly cleaning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class method
-> module method
?
doc/api/stream.md
Outdated
const fs = require('fs'); | ||
const zlib = require('zlib'); | ||
|
||
// Use pipeline api to easily pipe a series of streams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
api
-> API
?
doc/api/stream.md
Outdated
console.log('Pipeline succeded'); | ||
} | ||
|
||
run().catch(console.log); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.catch(console.log)
-> .catch(console.error)
?
doc/api/stream.md
Outdated
|
||
finished(rs, (err) => { | ||
if (err) { | ||
console.log('Stream failed', err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
console.log()
-> console.error()
?
doc/api/stream.md
Outdated
|
||
Especially useful in error handling scenarios where a stream is destroyed | ||
prematurely (like an aborted HTTP request), and will not emit `end` | ||
or `finish`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but maybe `end` or `finish`
-> `'end'` or `'finish'` events
?
doc/api/stream.md
Outdated
prematurely (like an aborted HTTP request), and will not emit `end` | ||
or `finish`. | ||
|
||
The `finished` API is promisify'able as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as well
-> as well:
?
doc/api/stream.md
Outdated
console.log('Stream is done reading'); | ||
} | ||
|
||
run().catch(console.log); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.catch(console.log)
-> .catch(console.error)
?
59b3fea
to
7a4afd3
Compare
@vsemozhetbyt fixed your doc nits |
7a4afd3
to
89523b2
Compare
Rebased as #19836 landed, PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still LGTM
@mafintosh Sorry for bikeshedding. I'm not a native English speaker, but would like to ask... Can |
@ronkorving Not a native speaker as well, but I use |
@mscdex I plan on landing this Monday, so PTAL before if you have time :) |
PR-URL: #19828 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Landed in f64bebf |
PR-URL: #19828 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
PR-URL: nodejs#19828 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Checklist
make -j4 test
(UNIX), orvcbuild test
(Windows) passesAdds
stream.pipeline(...streams, [cb])
andstream.onEnd(stream, cb)
based on my two modules https://github.com/mafintosh/pump and https://github.com/mafintosh/end-of-stream.Needs tests+docs (writing those now), but thought it'd be good to get this up now for feedback.
Supersedes #13506, as there's been changes since so making a new PR was easier