Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NextGen Pipeline (CODENAME: Big Gulp) #4155

Closed
wants to merge 40 commits into from

Conversation

andrewvc
Copy link
Contributor

@andrewvc andrewvc commented Nov 6, 2015

(moved from #4153) (moved to #4254 )

ATTN: This is no longer a POC but production ready code!

This is PR that aims to accomplish the following objectives.

  1. Provide an in-memory implementation (no persistence), of the pipeline I described in Next Generation pipeline architecture for persistence and performance #3693
  2. Be completely backward compatible with current logstash plugins, nothing should break
  3. Allow for complete pipeline snapshotting (even with the Elasticsearch plugin) to address concerns raised by @jsvd in add shutdown controller to force exit on stalled shutdown #4051. This would allow forced shutdowns without losing data. This would require the elasticsearch output to implement receive_multi and no longer use its own internal queue
  4. Expose a new 'batch' interface for filters and output plugins, allowing them to process events in microbatches for efficiency. This could have very significant potential for speed improvements for filters like the useragent filter, which would only have to lookup the number of unique keys per batch vs. each key individually. This would also obviate the need for a separate buffer in the elasticsearch output.

I would say that, along with these changes we should contemplate adding a new plugin config option which would by default set the number of workers for the plugin to be == the number of worker threads. That will give the best user experience for this.

Notes on implementation

This is 90% the system described in #3693 , the one exception to this is that output plugins are assumed to be not threadsafe. This was necessary to make it backward compatible with 2.x series plugins. When the # of output workers is equal to the number of worker threads specified with -w the behavior should be very similar to what the original behavior was. I think the default worker fix described above might actually be a long term solution here.

Persistence Examples
Since this is just a POC, the pipeline shutdown persistence is demonstrated in the writing of two files on shutdown:

Update: This functionality remains in the dump_inflight function, but is currently never called. We may enable this in a future release

  1. /tmp/ls_current_batches: The contents of in-memory buffers BEFORE the shutdown sequence is initiated. A 'fast' shutdown (as described in add shutdown controller to force exit on stalled shutdown #4051 could simple write this then exit safely).
  2. /tmp/ls_current_batches_post_close: This is just there for debugging in the POC, this is written after a clean shutdown. It will always be empty (unless there's a bug in the pipeline!).

** Notes For Reviewers **

  1. This PR involves a small number of changes to the AST. To enable batch oriented processing for each batch we:
    1. Execute the filters one by one accumulating a new batch of filtered events.
    2. Execute output_func once per each function which no longer invokes the output func but rather returns an array of outputs that the event should be sent to.
    3. We use the info from the step above to build separate batches per output as seen in output_batch (in the pipeline)
  2. This PR adds 2 new CLI options batch_size and batch_poll_wait to control the new features of the pipeline.
  3. PLEASE verify my performance results for yourself. I'm currently using this config and running it with time bin/logstash -f that_config.conf. Then comparing it with Logstash 2.0
  4. I set the default batch size to be 125 to match the Elasticsearch output defaults assuming four workers (4*125 = 500). This seemed like a safe default.

Performance

I used the following config to test this thing out:
bin/logstash -w 4 -e "input { generator {} } filter { mutate { add_field => {'foo' => 'bar'} } } output { http { http_method => 'post' url => 'http://localhost:4567/hi' workers => 2 } }"

Note that this requires a little web service to hit. I used the following

require 'sinatra'

post '/hi' do
  puts "Got #{request.body.read}"
  [200, "Thanks!"]
end

Performance

Also worth noting, this new pipeline seems, in at least one micro-benchmark, to be as much as 63% faster!

~/p/logstash (pipeline_poc) $ bin/logstash -w 4 -e "input { generator {} } filter {  mutate { add_field => {'foo' => 'bar'} } } output { stdout {} }" | pv -l > /dev/null
^C61M 0:01:00 [67.1k/s] [                                                                                                                                                    <=>                         ]
~/p/logstash (pipeline_poc) $ cd ~/Downloads/logstash-2.0.0
~/D/logstash-2.0.0 $ bin/logstash -w 4 -e "input { generator {} } filter {  mutate { add_field => {'foo' => 'bar'} } } output { stdout {} }" | pv -l > /dev/null
^C24M 0:01:00 [  41k/s] [

Doing further benchmarking I was able to get the throughput up to 74k/s using a batch size of 40 (the default is 20). Upping the batch size to 200 made it go down to 71k/s, and upping it to 2000 brought it down a lot, to 50k/s. So, batch size has a big impact! Probably due to not being able to use memory/the cache as efficiently. We'll have to do further tests with different messages sizes as well.

definitions << "def #{type}_func(event)"
definitions << " events = [event]" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"
sections.select { |s| s.plugin_type.text_value == type }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a no-op unless I'm missing something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! Good find.

@jsvd
Copy link
Member

jsvd commented Nov 9, 2015

@andrewvc I love this refactor, makes the pipeline a lot simpler 👍
we need to test this heavily now (pretty sure now a lot of rake test:core breaks with these changes) and check the actual impact on performance and check if doesn't break on edge cases.

Regarding the dump of batches, I'm not sure how useful it is until there's a complementary implementation of output buffer dumps. and once that's implemented then we need to figure out a way to make the dumps readable with proper configuration fingerprint checking, etc.

I'd suggest leaving dumping out of this PR, the same way I'm going to remove it from #4051

@andrewvc
Copy link
Contributor Author

andrewvc commented Nov 9, 2015

@jsvd thanks!

To be clear, the buffer does cover outputs so long as the output doesn't implement its own internal buffer. I think I'd like to keep them in as an option for the upcoming refactor of the ES outputs

@suyograo
Copy link
Contributor

suyograo commented Nov 9, 2015

@andrewvc +1 on moving the dump option to another PR as @jsvd mentioned. This refactor really does not have anything to do with dumping the queue contents, its about simplifying the pipeline to have one queue which benefits the upcoming persistence queue work. The other benefit is being able to enable fast shutdown

@andrewvc
Copy link
Contributor Author

andrewvc commented Nov 9, 2015

@suyograo sure thing, I'll just leave in place the locking guarantees and internal tracking of the buffers. It'll be useful for future metrics work, and I'll be able to add those stats to the reporter module. It'll be a good to get a metric like 4/5 workers active, 65 items inflight.

def handle_worker(events)
worker = @available_workers.pop
begin
worker.handle_multi(events)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably invoke receive_multi to avoid the unnecessary mutex synchronize in handle_multi

}
CONFIG

puts "DECL AGENT"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wew puts?

This only works when the `--debug` flag is also enabled.

fixes elastic#3243

Changes to log formatting per request

Change :file to :config_file

:file gets overridden by `logstash/agent.rb`, so a different symbol
is necessary.  Changing to `:config_file`

Fixes elastic#4182
@andrewvc andrewvc changed the title Pipeline POC NextGen Pipeline (CODENAME: Big Gulp) Nov 13, 2015
def multi_receive(events)
events.each {|event|
receive(event)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be a one liner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call :)

@ph
Copy link
Contributor

ph commented Nov 16, 2015

@andrewvc I am not done on the review yet, I'll continue later.

colinsurprenant and others added 6 commits November 16, 2015 17:38
…idation properly when a private gem server (not rubygems) has been configured. This basically loads the sources so the system is able to perform the validation with the right destionation and not with the default source (rubygems.org)

Former commits:

add all defined sources to rubygems so verification can talk to all the repositories, even the private ones

added a very simple plugin manager install command spec, to check for properly loading sources when validating

broke long lines into smaller ones in the install command

make sure the update command takes into account all declared sources, plus there is the option to avoid gem validation in case of only having a private gem repo and no internet connection

fix wrong conditional in the validation of plugins

move the Gem.sources loading to a common place, to sources are loaded all at once and we don't need to worry on each call to be sure we do the right thing for other sources different that rubygems

update test to the last changes in the way gemfile is loaded

move Gem::SourceList.from to bundler::invoke! so we use the default gemset.sources array

fix loading path for Gem::SourceList in bundler, so it's not colliding with other classes

Revert "move the Gem.sources loading to a common place, to sources are loaded all at once and we don't need to worry on each call to be sure we do the right thing for other sources different that rubygems"

This reverts commit 6e1c809.

Revert "update test to the last changes in the way gemfile is loaded"

This reverts commit dc5f65c.

make the Gem.sources load to happen inside the logstash_plugin? call

add the idea of settings to the plugin manager module, like this we can pass throw different stuff usefull to setup dependant components

add the install spec back

remove the PluginManager::Settings concept

change sources for rubygems_sources in the plugin manager options

change rubygems_sources to be rubygems_source

update comments

ammend plugin manager options description in the comments

spaces and new lines cleanup

merged duplicated plugin manager util_spec introduced during rebase

add a check when Gems.versions fail

Add the validation exception for the plugin manager

add better error handling for situation where the validation is not possible due to a connection issue with the remote server

Fixes elastic#3583
updates requested by code review

changes requested by colin: make workers override from -w arg

do not set workers unless user actually specified it via cmdline

fix defaults printing

add describe block to improve test output readability

Closes elastic#4130
…ager as is only relevant for master, not for the version branches

Fixes elastic#4212
@@ -54,32 +61,59 @@ def receive(event)

public
def worker_setup
# TODO: Remove this branch, delete this function
if @workers == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a setting @is_multi_worker once and test on that...
either:

if (@is_multi_worker = @worker > 1)
  define_singleton...
  ...
else
  @worker_plugins = [self]
end

or

@is_multi_worker = @worker > 1
if @is_multi_worker
  ...
else
  ...
end

suyograo and others added 15 commits November 17, 2015 15:53
including dependencies, and reuse them in an offline installation by
providing an package for it. It adds two important commands to the
plugin manager, the pack and upack, to handle package creation and
installation and adds the --local flag to install and update to pull
plugins from the installed local packages.

Former commits:

add a task to create a bundle, under vendor/cache of the installed gems + their dependencies, this can be used later on to be installed offline

add an option to pass the --local flag to the bin/plugin update task, so it fetch plugins from a local cache, by default under vendor/cache

rename package:build to package:build-cache as is more meaningfull

add a --local flag to bin/plugin install to users can install plugins from the local cache, under the default directory of vendor/cache

add a plugin manager command to build the local cache of installed plugins using bundler package alike command

It adds code for compressing and extracting in zip and tar formats to
the utils module of logstash. The zip module is only jruby dependant as
it uses functions provided by java.
There is also code add in the plugin manager package command to handle
compression of plugins dumping.

Cleanup the custom java code to compress and extract zip files as it has
been known that using rubyzip is good and it has a more ruby like
features so the code is more clean and portable.

set of smallish improvement requested during review

added several options to handle situation when the package of plugins we want to generate is already there

clean up old code

applyed feedback from review, mostly changed in documentating behaviour plus better wording for messages

relocate the Environment.windows? check to the bootstrap side as it's used also in the plugin manager

add an unpack bin/plugin command so users can install packages of plugins throw the plugin manager

document override behaviour in the compress helpers (for zip and tar) plus add a fix for the tar extract when reading entries

made the unpack filename parameter a required one

add a force option to the bin/plugin unpack command

add a notice to that if using a local cache the --local flag should be passed with

Code cleanup and refactor introduced during review

add two wording suggestions comming from review

ammend more wording

skip the major version validation as in situation where there is no internet connection like when using the local vendor/cache to do the update

move compress to the bootstrap environment as being used in the plugin manager means not being loaded with logstash-core

Bring pack cached gems in the vendor directory so they can be used for bundler when packaging dependencies

Revert "Bring pack cached gems in the vendor directory so they can be used for bundler when packaging dependencies"

This reverts commit a9d7f46.

patch the Bundler::Source::Rubygems to fetch only gems from a remote  source

small changes in several parts of the plugin manager and the creation of a common pack command with shared code

change compress to read in chuncks

fix wrong var name in the bootstrap compress utils module

fix namespacing conflicts

add basic test for the compress utility module

ammend more namespace issues

add a comment to the rubygems mockey patch to rebuild the gem cache

apply cosmetic changes

make the compress module raise CompressError

remove vendor_path and pattern_path form environment as they where mix up during rebase

change the bin/pack force option to be force-delete-cache

rename force_delete_cache to overwrite

change force for overwrite in tha pack command

remove the override option for bin/plugin unpack

revert Gemfile and Genfile.lock wrong committed
When the event was serialized to string using a `to_s` call, mostly
happenning when doing logging. Each calls would generate a new template
each template was unique because it was containing the date.

The accumulation of templates was making logstash goes OOM.

We aren't cleaning the templates because they should stabilize to a
finite number.
- Offline Plugins for managing offline packs
- Private Rubygem repos
* start logstash with --allow-unsafe-shutdown to force_exit on stalled shutdown
* by default --allow-unsafe-shutdown is disabled
* stall detection kicks in when SIGTERM/SIGINT is received
* check if inflight event count isn't going down and if there are blocked/blocking plugin threads
Adds new material to Managing Plugins
@andrewvc
Copy link
Contributor Author

Moved to #4254

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.