Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for streaming stdout/stderr from Child invocations #75

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

brianmario
Copy link
Collaborator

@brianmario brianmario commented Dec 30, 2016

As the title says, this adds support for passing a block for receiving chunks of output stdout/stderr as they're being read.

I'm not super happy with how the API turned out, and I could probably add a few more tests - so suggestions are definitely welcome.

@tmm1 @peff @piki @carlosmn @simonsj @vmg @scottjg @mclark @arthurschreiber @tma in case either of you have time to review this 🙏

@@ -337,6 +337,10 @@ class MaximumOutputExceeded < StandardError
class TimeoutExceeded < StandardError
end

# Exception raised when output streaming is aborted early.
class Aborted < StandardError
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could maybe be CallerAborted or something more specific?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with either.

@err << chunk
end
end
end
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole block is pretty gross, but the alternative may involve being tricky (and less readable?) with Ruby.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the "tricky" Ruby you're talking about, but it seems to me that when streaming is not requested, you could set @stdout_block anyway, to

Proc.new do |chunk|
  @out << chunk
  false
end

(like you do for the tests) and do the equivalent for @stderr_block. Then you could avoid the inner conditionals here.

To shrink the code even further, you could set

@blocks = { stdout => @stdout_block, stderr => @stderr_block }

(in which case you wouldn't even need @stdout_block and @stderr_block anymore, but you get the idea) then this whole processing code could become

if @blocks[fd].call(chunk)
  raise Aborted
end

end

if @streaming && abort
raise Aborted
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love raising here, but it enforces proper cleanup (and killing the subprocess) up on

rescue Object
.

Copy link

@mhagger mhagger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments. It would be great to have docs, too.

@err << chunk
end
end
end
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the "tricky" Ruby you're talking about, but it seems to me that when streaming is not requested, you could set @stdout_block anyway, to

Proc.new do |chunk|
  @out << chunk
  false
end

(like you do for the tests) and do the equivalent for @stderr_block. Then you could avoid the inner conditionals here.

To shrink the code even further, you could set

@blocks = { stdout => @stdout_block, stderr => @stderr_block }

(in which case you wouldn't even need @stdout_block and @stderr_block anymore, but you get the idea) then this whole processing code could become

if @blocks[fd].call(chunk)
  raise Aborted
end

end
end

if @streaming && abort
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @streaming && is redundant here, since when @streaming is not set, abort retains its initial value, false.

})
end
end

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no tests that involve reading more than one BUFSIZE worth of output, or reading from both stdout and stderr. Those might be worthwhile additions.

Copy link

@mclark mclark Jan 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a test for passing in a minimal custom object, just to ensure the interface contract is maintained. May be a good time to use a spy.
No longer applicable as we are using Procs now.

This requires that the stdout and stderr stream objects passed respond to
`#write` and `#string` methods.
Copy link

@mclark mclark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm liking the duck typed object interface MUCH better than the two Procs. I think that was the right way to go in this case.

[stdin, stdout, stderr].each do |fd|
fd.set_encoding('BINARY', 'BINARY')
fd.set_encoding(bin_encoding, bin_encoding)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this on a per fd basis?

bin_encoding = Encoding::BINARY
[stdin, stdout, stderr].each do |fd|
  fd.set_encoding(bin_encoding, bin_encoding) if fd.respond_to?(:set_encoding)
end

Also, are we intentionally dropping the force_encoding calls on stdout and stderr here?

@out.force_encoding('BINARY')
@err.force_encoding('BINARY')
input = input.dup.force_encoding('BINARY') if input
@stdout_buffer.set_encoding(bin_encoding)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these duplicate calls to the above intentional? We really can't assume these objects respond to these methods any more.

abort = false
if chunk
if fd == stdout
abort = (@stdout_buffer.write(chunk) == 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel like this is a safe way to test for aborting the operation. The output object could be simply refusing to write the current chunk but is not done consuming the stream.
Why not use an exception for this test instead? If the consumer raises Posix::Spawn::Aborted then we clearly know to abort.

@@ -262,12 +288,10 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil)
end

# maybe we've hit our max output
if max && ready[0].any? && (@out.size + @err.size) > max
if max && ready[0].any? && (@stdout_buffer.size + @stderr_buffer.size) > max
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, we can't assume there is a #size method on these objects...

I think we should probably keep a local count of the bytes we have written instead of calling #size anyway. We can't trust these objects any more as they could be anything.

})
end
end

Copy link

@mclark mclark Jan 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a test for passing in a minimal custom object, just to ensure the interface contract is maintained. May be a good time to use a spy.
No longer applicable as we are using Procs now.

@@ -337,6 +337,10 @@ class MaximumOutputExceeded < StandardError
class TimeoutExceeded < StandardError
end

# Exception raised when output streaming is aborted early.
class Aborted < StandardError
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with either.

@mclark
Copy link

mclark commented Jan 3, 2017

Oh right, we also need some thorough docs on this once we've nailed down the exact 🦆 interface we are using

begin
buf << fd.readpartial(BUFSIZE)
chunk = fd.readpartial(BUFSIZE)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding this right, the old code would always append directly into the final buffer, whereas this one reads a chunk and then appends that chunk to the buffer. Not knowing anything about how Ruby operates under the hood, is this a potential performance problem? It should just be an extra memcpy in the worst case, but I recall that we've hit bottlenecks on reading into Ruby before. I suspect those were mostly about arrays and not buffers, though (e.g., reading each line into its own buffer can be slow).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be wrong here (cc @tenderlove) but I'm pretty sure the previous code would actually create a new string (down inside readpartial), then that string would have been appended to buf. Which would require potentially resizing that buffer first, then the memcpy.

This new code just keeps that first string as a local var first, so we can later determine where to write it. In the default case we're using a StringIO so the result is essentially the same as before (potential buffer resize then a copy). Though iirc we saw some pretty significant speedups by using an array to keep track of chunks, then calling join at the end when it was all needed. The reason for that is because it avoids the reallocation of the resulting buffer as we're reading, and instead allows join to allocate a buffer exactly the size that's needed then copying all the chunks in to it.

Basically this (the old way):

buffer = ""
buffer << "one,"
buffer << "two,"
buffer << "three"

return buffer

vs this (the array optimized version I just mentioned):

buffer = []
buffer << "one,"
buffer << "two,"
buffer << "three"

# buffer is an array with 3 elements at this point
# and this join call figures out how big all of the strings inside are, then creates a single buffer to append them to.
return buffer.join

Using that approach efficiently may change the API contract here slightly though...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianmario Ah, right, that sort of return value optimization would be pretty easy to implement, and would mean we end up with the same number of copies. Though if we're just counting memcpys anyway, I suspect it doesn't matter much either way.

The reason for that is because it avoids the reallocation of the resulting buffer as we're reading, and instead allows join to allocate a buffer exactly the size that's needed then copying all the chunks in to it.

Interesting. It sounds like appending doesn't grow the buffer aggressively in that case, because you should be able to get amortized constant-time.

Anyway. We're well out of my level for intelligent discussion of Ruby internals. The interesting result is whether reading the output of a spawned cat some-gigabyte-file is measurably any different. Probably not, but it's presumably easy to test.

Copy link
Collaborator Author

@brianmario brianmario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I decided to go back to the proc-based API because the requirements on the caller are much simpler. The objects passed as streams need only respond to call with an arity of 1 (the current chunk) and return a boolean. true on success false to abort (note this is opposite from how I originally had it, though I think it makes more sense).

I'll keep going on tests and the documentation changes, but wanted to give folks one last chance to review this direction.

streams = {stdout => @stdout_stream, stderr => @stderr_stream}

bytes_seen = 0
chunk_buffer = ""
Copy link
Collaborator Author

@brianmario brianmario Jan 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer is reused by readpartial below. Internally, so far as I can tell, it will be resized to BUFSIZE on the first call to readpartial and then that underlying buffer will be reused from then on out.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the issue with appending to strings that have been mentioned, we might want to consider having #readpartial allocate a new string and give ownership of it to the stream, but since we're now re-using this buffer, it's probably already more efficient than what we had before, so we can probably leave it until we actually find a perf issue we can trace back to this specifically.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another "" that might be better as String.new here.

raise MaximumOutputExceeded
end
end

[@out, @err]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to just drop returning these in an attempt at consistency since these one or both of these ivars are useless if we're streaming.

streams = {stdout => @stdout_stream, stderr => @stderr_stream}

bytes_seen = 0
chunk_buffer = ""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the issue with appending to strings that have been mentioned, we might want to consider having #readpartial allocate a new string and give ownership of it to the stream, but since we're now re-using this buffer, it's probably already more efficient than what we had before, so we can probably leave it until we actually find a perf issue we can trace back to this specifically.

begin
Child.new('yes', :streams => {:stdout => stdout_stream}, :max => limit)
rescue POSIX::Spawn::MaximumOutputExceeded
end
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a assert_raises block so we assert that the exception was raised, as-is this would not fail the test even if we never raise the error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call 👍

@carlosmn
Copy link

carlosmn commented Jan 4, 2017

This doesn't seem to be new, but the semantics of :max seem rather confusing. It doesn't mean "send me at most X bytes" but rather "if the process sends over X amount, give it to me and then raise an error", which means that for a :max which is a multiple of the effective buffer size, you get a whole extra chunk.

While I was playing around with the added test here, I noticed that all chunks are 16kB (which seems to be the default pipe buffer size here) so the BUFSIZE * 2 size four times the chunk size (since yes outputs so quickly, we always find it full) which means the output stream gets a whole extra 16kB beyond what is specified as :max. This seems to be semantics that are already there, but these seems really hard to plan for, if I can get an arbitrary amount of bytes beyond what I asked for.

@brianmario
Copy link
Collaborator Author

@carlosmn went ahead and added a failing test for that. Will get things fixed up so we only ever hand the caller max bytes. I don't think that'll break anything because it seems like that's been the assumption all along.

@@ -95,6 +95,31 @@ def initialize(*args)
@options[:pgroup] = true
end
@options.delete(:chdir) if @options[:chdir].nil?

@out, @err = "", ""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to use String.new to avoid breaking when someone passes --enable-frozen-string-literal.

@out << chunk

true
end

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stdout_stream = @out.method(:<<)

:trollface: 🚲 🏠

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I love it! Being able to use method was one of the main reasons for going back to the proc-based API ;)

streams = {stdout => @stdout_stream, stderr => @stderr_stream}

bytes_seen = 0
chunk_buffer = ""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another "" that might be better as String.new here.

@arrbee
Copy link
Contributor

arrbee commented Jun 28, 2017

Any plans to merge this? I'd like to use it :-D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants