Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Next Gen Pipeline #4254

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@
source "https://rubygems.org"
gem "logstash-core", "3.0.0.dev", :path => "./logstash-core"
gem "logstash-core-event", "3.0.0.dev", :path => "./logstash-core-event"
# gem "logstash-core-event-java", "3.0.0.dev", :path => "./logstash-core-event-java"
gem "file-dependencies", "0.1.6"
gem "ci_reporter_rspec", "1.0.0", :group => :development
gem "simplecov", :group => :development
gem "coveralls", :group => :development
# Tins 1.7 requires the ruby 2.0 platform to install,
# this gem is a dependency of term-ansi-color which is a dependency of coveralls.
# 1.6 is the last supported version on jruby.
gem "tins", "1.6", :group => :development
gem "rspec", "~> 3.1.0", :group => :development
gem "logstash-devutils", "~> 0.0.15", :group => :development
gem "logstash-devutils", ">= 0"
gem "benchmark-ips", :group => :development
gem "octokit", "3.8.0", :group => :build
gem "stud", "~> 0.0.21", :group => :build
gem "fpm", "~> 1.3.3", :group => :build
gem "rubyzip", "~> 1.1.7", :group => :build
gem "gems", "~> 0.8.3", :group => :build
gem "flores", "~> 0.0.6", :group => :development
gem "logstash-filter-clone"
gem "logstash-filter-mutate"
gem "logstash-filter-multiline"
gem "logstash-input-generator"
gem "logstash-input-stdin"
gem "logstash-input-tcp"
gem "logstash-output-stdout"
57 changes: 56 additions & 1 deletion Gemfile.jruby-1.9.lock
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ PATH
logstash-core-event (~> 3.0.0.dev)
minitar (~> 0.5.4)
pry (~> 0.10.1)
rubyzip (~> 1.1.7)
stud (~> 0.0.19)
thread_safe (~> 0.3.5)
treetop (< 1.5.0)
Expand Down Expand Up @@ -74,10 +75,21 @@ GEM
domain_name (~> 0.5)
i18n (0.6.9)
insist (1.0.0)
jls-grok (0.11.2)
cabin (>= 0.6.0)
jrjackson (0.3.7)
jruby-openssl (0.9.12-java)
json (1.8.3-java)
kramdown (1.9.0)
logstash-codec-json (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-codec-json_lines (2.0.2)
logstash-codec-line
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-codec-line (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-codec-plain (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-devutils (0.0.18-java)
gem_publisher
insist (= 1.0.0)
Expand All @@ -87,6 +99,42 @@ GEM
rspec (~> 3.1.0)
rspec-wait
stud (>= 0.0.20)
logstash-filter-clone (2.0.3)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-filter-grok (2.0.2)
jls-grok (~> 0.11.1)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-patterns-core
logstash-filter-multiline (2.0.2)
jls-grok (~> 0.11.0)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-filter-mutate
logstash-patterns-core
logstash-filter-mutate (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-filter-grok
logstash-patterns-core
logstash-input-generator (2.0.2)
logstash-codec-plain
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-input-stdin (2.0.2)
concurrent-ruby
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-plain
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-input-tcp (2.0.4)
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-plain
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-output-stdout (2.0.2)
logstash-codec-line
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-patterns-core (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
method_source (0.8.2)
mime-types (2.6.2)
minitar (0.5.4)
Expand Down Expand Up @@ -155,7 +203,14 @@ DEPENDENCIES
gems (~> 0.8.3)
logstash-core (= 3.0.0.dev)!
logstash-core-event (= 3.0.0.dev)!
logstash-devutils (~> 0.0.15)
logstash-devutils
logstash-filter-clone
logstash-filter-multiline
logstash-filter-mutate
logstash-input-generator
logstash-input-stdin
logstash-input-tcp
logstash-output-stdout
octokit (= 3.8.0)
rspec (~> 3.1.0)
rubyzip (~> 1.1.7)
Expand Down
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def execute
end # def execute

def add_pipeline(pipeline_id, config_str, settings = {})
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings)
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings.merge(:pipeline_id => pipeline_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this means this changes allows us to have multiple pipelines?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is already in the current master, but yes this is the first step to enable multiple pipelines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, very interesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note is I made the pipeline ID part of the thread names so it looks nice in VisualVM!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc this pipeline_id handling is quirky,I see that in pipeline.rb if the id is nil then object_id is used. maybe we could do it here instead? because otherwise calling add_pipeline(nil, "") will create a nil key in @pipelines but the pipeline.id will returnobject_id`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @jsvd comment.

But this scenario is also problematic if you are trying to add a new pipeline with an existing key, but I think this can be solve in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd @ph yep, great observation. I'll move that logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd @ph looking through the code the reason I made the UUID generation automatic inside of pipeline was that for testing this is pretty inconvenient.

If we're thinking of making pipeline_id required it should change the signature of the constructor to Pipeline.new(pipeline_id, config_str, settings). Are you guys OK with this change? It will make the PR a good bit larger as all tests instantiating the pipeline will need to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc Lets make that into another PR :(

end

private
Expand Down Expand Up @@ -76,4 +76,4 @@ def trap_sigint
end
end
end
end # class LogStash::Agent
end # class LogStash::Agent
8 changes: 5 additions & 3 deletions logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def compile
# defines @filter_func and @output_func

definitions << "def #{type}_func(event)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an error here when you run multiples pipeline and the way we are generating the code.
the def need to be a define_singleton_method

I have a PR to fix it in #4298

definitions << " targeted_outputs = []" if type == "output"
definitions << " events = [event]" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"

Expand All @@ -116,6 +117,7 @@ def compile
end

definitions << " events" if type == "filter"
definitions << " targeted_outputs" if type == "output"
definitions << "end"
end

Expand Down Expand Up @@ -237,7 +239,7 @@ def compile
events = #{variable_name}.multi_filter(events)
CODE
when "output"
return "#{variable_name}.handle(event)\n"
return "targeted_outputs << #{variable_name}\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
Expand Down Expand Up @@ -402,7 +404,7 @@ def cond_func_#{i}(input_events)
<<-CODE
events = cond_func_#{i}(events)
CODE
else
else # Output
<<-CODE
#{super}
end
Expand Down Expand Up @@ -542,4 +544,4 @@ def _inspect(indent="")
""
)
end
end
end
68 changes: 68 additions & 0 deletions logstash-core/lib/logstash/output_delegator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# encoding: utf-8
require "concurrent/atomic/atomic_fixnum"

# This class goes hand in hand with the pipeline to provide a pool of
# free workers to be used by pipeline worker threads. The pool is
# internally represented with a SizedQueue set the the size of the number
# of 'workers' the output plugin is configured with.
#
# This plugin also records some basic statistics
module LogStash; class OutputDelegator
attr_reader :workers, :config, :worker_count

# The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them
# Internally these just get merged together into a single hash
def initialize(logger, klass, *args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for readability and maintainability and especially when non-obvious, but ideally always, we should document method parameters. In particular here *args requires reading code below to understand the purpose.

@logger = logger
@config = args.reduce({}, :merge)
@klass = klass
@worker_count = @config["workers"] || 1

@worker_queue = SizedQueue.new(@worker_count)

@workers = @worker_count.times.map do
w = @klass.new(*args)
w.register
@worker_queue << w
w
end

@events_received = Concurrent::AtomicFixnum.new(0)
end

def config_name
@klass.config_name
end

def register
@workers.each {|w| w.register}
end

def multi_receive(events)
@events_received.increment(events.length)

worker = @worker_queue.pop
begin
worker.multi_receive(events)
ensure
@worker_queue.push(worker)
end
end

def do_close
@logger.debug("closing output delegator", :klass => self)

@worker_count.times do
worker = @worker_queue.pop
worker.do_close
end
end

def events_received
@events_received.value
end

def busy_workers
@worker_queue.size
end
end end
44 changes: 11 additions & 33 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require "logstash/plugin"
require "logstash/namespace"
require "logstash/config/mixin"
require "logstash/util/wrapped_synchronous_queue"
require "concurrent/atomic/atomic_fixnum"

class LogStash::Outputs::Base < LogStash::Plugin
include LogStash::Config::Mixin
Expand All @@ -23,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1

attr_reader :worker_plugins, :worker_queue, :worker_threads
attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins

public
def workers_not_supported(message=nil)
Expand All @@ -40,6 +42,10 @@ def workers_not_supported(message=nil)
def initialize(params={})
super
config_init(params)

# If we're running with a single thread we must enforce single-threaded concurrency by default
# Maybe in a future version we'll assume output plugins are threadsafe
@single_worker_mutex = Mutex.new
end

public
Expand All @@ -53,42 +59,14 @@ def receive(event)
end # def receive

public
def worker_setup
if @workers == 1
@worker_plugins = [self]
@worker_threads = []
else
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
@worker_threads = @worker_plugins.map.with_index do |plugin, i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_plugin(self)
plugin.register
while true
event = queue.pop
plugin.handle(event)
end
end
end
end
end

public
def handle(event)
LogStash::Util.set_thread_plugin(self)
receive(event)
end # def handle

def handle_worker(event)
LogStash::Util.set_thread_plugin(self)
@worker_queue.push(event)
# To be overriden in implementations
def multi_receive(events)
events.each {|event| receive(event) }
end

private
def output?(event)
# TODO: noop for now, remove this once we delete this call from all plugins
true
end # def output?
end # class LogStash::Outputs::Base
end # class LogStash::Outputs::Base
Loading