Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Next Gen Pipeline #4254

Closed
wants to merge 20 commits into from
Closed

Next Gen Pipeline #4254

wants to merge 20 commits into from

Conversation

andrewvc
Copy link
Contributor

This takes over from #4155 which was pointed toward the next_gen_pipeline branch. This new PR is pointed at master.

NOTE: I have not included configurable values for batch size or batch delay as I'm waiting for #3872 to merge first which will certainly cause a merge conflict!

ATTN: This is no longer a POC but production ready code!

This is PR that aims to accomplish the following objectives.

  1. Provide an in-memory implementation (no persistence), of the pipeline I described in Next Generation pipeline architecture for persistence and performance #3693
  2. Be completely backward compatible with current logstash plugins, nothing should break
  3. Allow for complete pipeline snapshotting (even with the Elasticsearch plugin) to address concerns raised by @jsvd in add shutdown controller to force exit on stalled shutdown #4051. This would allow forced shutdowns without losing data. This would require the elasticsearch output to implement receive_multi and no longer use its own internal queue
  4. Expose a new 'batch' interface for filters and output plugins, allowing them to process events in microbatches for efficiency. This could have very significant potential for speed improvements for filters like the useragent filter, which would only have to lookup the number of unique keys per batch vs. each key individually. This would also obviate the need for a separate buffer in the elasticsearch output.

I would say that, along with these changes we should contemplate adding a new plugin config option which would by default set the number of workers for the plugin to be == the number of worker threads. That will give the best user experience for this.

Notes on implementation

This is 90% the system described in #3693 , the one exception to this is that output plugins are assumed to be not threadsafe. This was necessary to make it backward compatible with 2.x series plugins. When the # of output workers is equal to the number of worker threads specified with -w the behavior should be very similar to what the original behavior was. I think the default worker fix described above might actually be a long term solution here.

Persistence Examples
Since this is just a POC, the pipeline shutdown persistence is demonstrated in the writing of two files on shutdown:

Update: This functionality remains in the dump_inflight function, but is currently never called. We may enable this in a future release

  1. /tmp/ls_current_batches: The contents of in-memory buffers BEFORE the shutdown sequence is initiated. A 'fast' shutdown (as described in add shutdown controller to force exit on stalled shutdown #4051 could simple write this then exit safely).
  2. /tmp/ls_current_batches_post_close: This is just there for debugging in the POC, this is written after a clean shutdown. It will always be empty (unless there's a bug in the pipeline!).

** Notes For Reviewers **

  1. This PR involves a small number of changes to the AST. To enable batch oriented processing for each batch we:
    1. Execute the filters one by one accumulating a new batch of filtered events.
    2. Execute output_func once per each function which no longer invokes the output func but rather returns an array of outputs that the event should be sent to.
    3. We use the info from the step above to build separate batches per output as seen in output_batch (in the pipeline)
  2. This PR adds 2 new CLI options batch_size and batch_poll_wait to control the new features of the pipeline.
  3. PLEASE verify my performance results for yourself. I'm currently using this config and running it with time bin/logstash -f that_config.conf. Then comparing it with Logstash 2.0
  4. I set the default batch size to be 125 to match the Elasticsearch output defaults assuming four workers (4*125 = 500). This seemed like a safe default.

Performance

I used the following config to test this thing out:
bin/logstash -w 4 -e "input { generator {} } filter { mutate { add_field => {'foo' => 'bar'} } } output { http { http_method => 'post' url => 'http://localhost:4567/hi' workers => 2 } }"

Note that this requires a little web service to hit. I used the following

require 'sinatra'

post '/hi' do
  puts "Got #{request.body.read}"
  [200, "Thanks!"]
end

Performance

Also worth noting, this new pipeline seems, in at least one micro-benchmark, to be as much as 63% faster!

~/p/logstash (pipeline_poc) $ bin/logstash -w 4 -e "input { generator {} } filter {  mutate { add_field => {'foo' => 'bar'} } } output { stdout {} }" | pv -l > /dev/null
^C61M 0:01:00 [67.1k/s] [                                                                                                                                                    <=>                         ]
~/p/logstash (pipeline_poc) $ cd ~/Downloads/logstash-2.0.0
~/D/logstash-2.0.0 $ bin/logstash -w 4 -e "input { generator {} } filter {  mutate { add_field => {'foo' => 'bar'} } } output { stdout {} }" | pv -l > /dev/null
^C24M 0:01:00 [  41k/s] [

Doing further benchmarking I was able to get the throughput up to 74k/s using a batch size of 40 (the default is 20). Upping the batch size to 200 made it go down to 71k/s, and upping it to 2000 brought it down a lot, to 50k/s. So, batch size has a big impact! Probably due to not being able to use memory/the cache as efficiently. We'll have to do further tests with different messages sizes as well.

return "start_input(#{variable_name})"
when "filter"
return <<-CODE
when "input"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these cosmetic indentation changes part of our style guidelines? Or are these general ruby style guidelines? I've seen the unindented style much more frequently, so I'm curious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be my editor (rubymine) sneaking in with its opinion. Good call, I'll re-indent this stuff.

@andrewvc andrewvc changed the title Ng pipeline Next Gen Pipeline Nov 25, 2015
@andrewvc
Copy link
Contributor Author

@untergeek my latest code changes clean up the whitespace mangling in config_ast . Enjoy!

@jordansissel
Copy link
Contributor

Big change! <3

I'm going to review this in several rounds - starting with tests and userinterface, then ending with code review.

Doing testing now.

@jordansissel
Copy link
Contributor

I get this when doing rake test:install-default

Error Bundler::InstallError, retrying 1/10
An error occurred while installing atomic (1.1.99), and Bundler cannot continue.
Make sure that `gem install atomic -v '1.1.99'` succeeds before bundling.
WARNING: SSLSocket#session= is not supported

I tried to reproduce this on master, but was unable to reproduce. I thikn it may have to do with JRuby 1.7.23?

@jordansissel
Copy link
Contributor

Ignore my prior comment - Upon trying again, I wasn't able to reproduce it.

@jordansissel
Copy link
Contributor

Running the following throughput mini benchmark, I get the same results on my laptop (battery power, 2 cores)

% bin/logstash -e 'input { generator { } } filter { mutate { } } output { stdout { codec => dots } }' | pv -War > /dev/null

This PR: 62keps.
Logstash 2.1.0.snapshot3: 62keps

@@ -33,7 +33,7 @@ def execute
end # def execute

def add_pipeline(pipeline_id, config_str, settings = {})
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings)
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings.merge(:pipeline_id => pipeline_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this means this changes allows us to have multiple pipelines?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is already in the current master, but yes this is the first step to enable multiple pipelines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, very interesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note is I made the pipeline ID part of the thread names so it looks nice in VisualVM!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc this pipeline_id handling is quirky,I see that in pipeline.rb if the id is nil then object_id is used. maybe we could do it here instead? because otherwise calling add_pipeline(nil, "") will create a nil key in @pipelines but the pipeline.id will returnobject_id`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @jsvd comment.

But this scenario is also problematic if you are trying to add a new pipeline with an existing key, but I think this can be solve in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd @ph yep, great observation. I'll move that logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd @ph looking through the code the reason I made the UUID generation automatic inside of pipeline was that for testing this is pretty inconvenient.

If we're thinking of making pipeline_id required it should change the signature of the constructor to Pipeline.new(pipeline_id, config_str, settings). Are you guys OK with this change? It will make the PR a good bit larger as all tests instantiating the pipeline will need to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc Lets make that into another PR :(

@andrewvc
Copy link
Contributor Author

@jordansissel your benchmark results are correct, but they don't really measure the salient thing here, since that pipeline is basically a no-op. The new pipeline will run your 'null' pipeline at about the same throughput, but it will run it with less user/system time. I suspect that the bottleneck is stdout. Once real filters/codecs are added in the story changes significantly.

First, lets inspect the execution profile of the 'null' example using the time command. Note that the wallclock time is roughly 17% faster for the ng pipeline.

 ~/D/logstash-2.1.0> time bin/logstash -w 5 -e 'input { generator { count => 4000000 } } filter { mutate { } } output { stdout { codec => dots } }' | wc -c
       47.31 real       135.16 user        40.84 sys
 4000000
 ~/p/logstash /ng_pipeline> time bin/logstash -w 5 -e 'input { generator { count => 4000000 } } filter { mutate { } } output { stdout { codec => dots } }' | wc -c
       40.95 real        96.60 user        17.16 sys
 4000000

Now, let's take a look at a super-light 'real world' example. The new pipeline has much higher throughput, the gap has widened to a 27% difference.

The gap is quite variable based on workload, but it is quite pronounced.

 ~/D/logstash-2.1.0> time bin/logstash -w 5 -e 'input { generator { count => 4000000 } } filter { mutate { add_field => {"x" => "y"}} mutate { add_field => {"blah" => "bot"}} } output { stdout { codec => json_lines } }' | wc -l
       87.34 real       294.83 user        72.91 sys
 4000000
 ~/p/logstash /ng_pipeline> time bin/logstash -w 5 -e 'input { generator { count => 4000000 } } filter { mutate { add_field => {"x" => "y"}} mutate { add_field => {"blah" => "bot"}} } output { stdout { codec => json_lines } }' | wc -l
       68.04 real       210.77 user        26.39 sys
 4000000

@jordansissel
Copy link
Contributor

Hmm good points on the cpu time differences. The new architecture (one less message passing point) really shows up on the sys (kernel/contextswitching) area.

@jordansissel
Copy link
Contributor

I'll test this on a machine with more cpus than my tiny laptop also ;P

@andrewvc
Copy link
Contributor Author

andrewvc commented Dec 1, 2015

@jordansissel yep! Thanks for taking a look. Curious to see your benchmarks. It's interesting that on my box the results were more skewed in favor of the new pipeline (maybe my CPU is different?). To be clear I think the performance is a combination of the following three factors.

  1. Less context switching (as you say).
  2. SynchronousQueue may just be a faster impl. (which I'd expect given that it's a simpler construct)
  3. Coarser context switch granularity with the batching approach (maybe more CPU cache friendly).

@andrewvc
Copy link
Contributor Author

andrewvc commented Dec 8, 2015

I have updated this pr to include @ph 's fantastic new test for broken filter flushes, as well as to include pipeline reporter updates for the new OutputDelegator.

@ph
Copy link
Contributor

ph commented Dec 9, 2015

@andrewvc we will get somewhere, I think other people outside of us should test it with real configuration.

@andrewvc
Copy link
Contributor Author

andrewvc commented Dec 9, 2015

@ph yep! I don't think merging this to master should wait on that however.

ph added a commit that referenced this pull request Dec 10, 2015
When you run multiples pipeline and the code get evaluated the
class cache will be clear and the last evaluated code will be called
everything. The `filter_func` and the `output_func` need to be unique
for every instance of the pipeline.

This PR is based on #4254
ph added a commit that referenced this pull request Dec 10, 2015
module LogStash; class OutputDelegator
attr_reader :workers, :config, :worker_count

def initialize(logger, klass, *args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for readability and maintainability and especially when non-obvious, but ideally always, we should document method parameters. In particular here *args requires reading code below to understand the purpose.

@andrewvc
Copy link
Contributor Author

@colinsurprenant I've incorporated your feedback with my latest commit, how's it look now :)


signal = false
batch_size.times do |t|
event = t==0 ? @input_queue.take : @input_queue.poll(batch_delay)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmetic comment: t == 0 ? sometime using the ternary form, grouping in parens makes it easier to read:

event = (t == 0) ? ... : ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I typically only use parens for more complex operations than a single boolean operation on a ternary operator. So, I politely disagree here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my comment was first about adding spaces t == 0 instead of t==0

then my second comment is really just about readability specifically when using an expression (like x == u or something similar instead of a bool check like x.nil?) in a ternary form with all the = == ? very close together makes it confusing when looking at it. using parens ease the parsing/comprehension process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right about the spaces, I'll go add them in. I'll just add the parens anyway since they aren't a huge deal to me.

@colinsurprenant
Copy link
Contributor

Generally speaking this is awesome work and the modifications for adding the OutputDelegator is really nice. I also really like the fact that we have more pipeline specs.

I would like to know what the strategy is in terms of moving forward with merging this in master. Personally I did not do any real-world tests and I wonder if any stress-tests/real world tests have been done so far?

I think we basically have 2 choices:

  • we merge soon and hope for the best as we go and fix bugs as we encounter them? One of the problem I see for this is that since this is so central, it might be harder to find the cause of future bugs, is it because of a modification I introduced of is it because of the new pipeline?
  • we take the time to do stress tests and integrations tests and when we reach a point of comfort we merge?

Thoughts?

@andrewvc
Copy link
Contributor Author

@colinsurprenant great points, and sadly we don't have great answers. I've spent quite a while running various benchmarks with the pipeline and so far have not hit any stability problems. I don't think stress style tests are useful here, but throwing a variety of configs is.

Ideally we'd have a test suite that did macro style testing for LS, but since we don't I think only the first option is viable.

I'd say that our goal should be to merge into master only and beware of future bugs that may arise.

I'll do a run with the cloud benchmarker this afternoon which is a bit more stressy, but I don't anticipate finding anything there given that it's somewhat similar to previous benchmarks I've run.

@suyograo
Copy link
Contributor

I am in favor of merging this to master only as-is. We can then backport to 2.x -- which is a release version -- when we feel we are ready. This will provide us the bake time it needs, CI time, and also LS developers start using this code in master as they work on stuff.

@andrewvc
Copy link
Contributor Author

@suyograo +1 for merging this now.

@ph
Copy link
Contributor

ph commented Dec 11, 2015

@suyograo +1 on your points.

@purbon
Copy link
Contributor

purbon commented Dec 11, 2015

@suyograo fair points. +1

@andrewvc
Copy link
Contributor Author

@ph @purbon @suyograo are those LGTMS? :shipit: 😎 🎉

@colinsurprenant
Copy link
Contributor

On my side, I will take the required time to actually test it in real life (tm) locally with difference configs/plugins, etc, either this weekend or on Monday and present my final thoughts on it once I'm through that.

@andrewvc
Copy link
Contributor Author

Moved to #4340

@andrewvc andrewvc closed this Dec 11, 2015
ph added a commit that referenced this pull request Jan 6, 2016
When you run multiples pipeline and the config code get evaluated, on
every evaluation the class cached is clear, everytime you were calling a
`func` method you had the latest evaluated code. The `filter_func` and
the `output_func` need to be unique for every instance of the pipeline,
this PR replace the `def` with a `define_single_method` call ensuring
the uniqueness of the code on every instance.

This PR is based on #4254

Fixes #4298
ph added a commit that referenced this pull request Jan 6, 2016
When you run multiples pipeline and the config code get evaluated, on
every evaluation the class cached is clear, everytime you were calling a
`func` method you had the latest evaluated code. The `filter_func` and
the `output_func` need to be unique for every instance of the pipeline,
this PR replace the `def` with a `define_single_method` call ensuring
the uniqueness of the code on every instance.

This PR is based on #4254

Fixes #4298
ph added a commit that referenced this pull request Jan 6, 2016
When you run multiples pipeline and the config code get evaluated, on
every evaluation the class cached is clear, everytime you were calling a
`func` method you had the latest evaluated code. The `filter_func` and
the `output_func` need to be unique for every instance of the pipeline,
this PR replace the `def` with a `define_single_method` call ensuring
the uniqueness of the code on every instance.

This PR is based on #4254

Fixes #4298
@colinsurprenant colinsurprenant mentioned this pull request Jan 20, 2016
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants