Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Next Gen Pipeline #4254

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def execute
end # def execute

def add_pipeline(pipeline_id, config_str, settings = {})
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings)
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings.merge(:pipeline_id => pipeline_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this means this changes allows us to have multiple pipelines?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is already in the current master, but yes this is the first step to enable multiple pipelines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, very interesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note is I made the pipeline ID part of the thread names so it looks nice in VisualVM!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc this pipeline_id handling is quirky,I see that in pipeline.rb if the id is nil then object_id is used. maybe we could do it here instead? because otherwise calling add_pipeline(nil, "") will create a nil key in @pipelines but the pipeline.id will returnobject_id`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @jsvd comment.

But this scenario is also problematic if you are trying to add a new pipeline with an existing key, but I think this can be solve in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd @ph yep, great observation. I'll move that logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd @ph looking through the code the reason I made the UUID generation automatic inside of pipeline was that for testing this is pretty inconvenient.

If we're thinking of making pipeline_id required it should change the signature of the constructor to Pipeline.new(pipeline_id, config_str, settings). Are you guys OK with this change? It will make the PR a good bit larger as all tests instantiating the pipeline will need to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc Lets make that into another PR :(

end

private
Expand Down Expand Up @@ -76,4 +76,4 @@ def trap_sigint
end
end
end
end # class LogStash::Agent
end # class LogStash::Agent
8 changes: 5 additions & 3 deletions logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def compile
# defines @filter_func and @output_func

definitions << "def #{type}_func(event)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an error here when you run multiples pipeline and the way we are generating the code.
the def need to be a define_singleton_method

I have a PR to fix it in #4298

definitions << " targeted_outputs = []" if type == "output"
definitions << " events = [event]" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"

Expand All @@ -116,6 +117,7 @@ def compile
end

definitions << " events" if type == "filter"
definitions << " targeted_outputs" if type == "output"
definitions << "end"
end

Expand Down Expand Up @@ -237,7 +239,7 @@ def compile
events = #{variable_name}.multi_filter(events)
CODE
when "output"
return "#{variable_name}.handle(event)\n"
return "targeted_outputs << #{variable_name}\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
Expand Down Expand Up @@ -402,7 +404,7 @@ def cond_func_#{i}(input_events)
<<-CODE
events = cond_func_#{i}(events)
CODE
else
else # Output
<<-CODE
#{super}
end
Expand Down Expand Up @@ -542,4 +544,4 @@ def _inspect(indent="")
""
)
end
end
end
66 changes: 66 additions & 0 deletions logstash-core/lib/logstash/output_delegator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# encoding: utf-8
require "concurrent/atomic/atomic_fixnum"

# This class goes hand in hand with the pipeline to provide a pool of
# free workers to be used by pipeline worker threads. The pool is
# internally represented with a SizedQueue set the the size of the number
# of 'workers' the output plugin is configured with.
#
# This plugin also records some basic statistics
module LogStash; class OutputDelegator
attr_reader :workers, :config, :worker_count

def initialize(logger, klass, *args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for readability and maintainability and especially when non-obvious, but ideally always, we should document method parameters. In particular here *args requires reading code below to understand the purpose.

@logger = logger
@config = args.reduce({}, :merge)
@klass = klass
@worker_count = @config["workers"] || 1

@worker_queue = SizedQueue.new(@worker_count)

@workers = @worker_count.times.map do
w = @klass.new(*args)
w.register
@worker_queue << w
w
end

@events_received = Concurrent::AtomicFixnum.new(0)
end

def config_name
@klass.config_name
end

def register
@workers.each {|w| w.register}
end

def multi_receive(events)
@events_received.increment(events.length)

worker = @worker_queue.pop
begin
worker.multi_receive(events)
ensure
@worker_queue.push(worker)
end
end

def do_close
@logger.debug("closing output delegator", :klass => self)

@worker_count.times do
worker = @worker_queue.pop
worker.do_close
end
end

def events_received
@events_received.value
end

def busy_workers
@worker_queue.size
end
end end
44 changes: 11 additions & 33 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require "logstash/plugin"
require "logstash/namespace"
require "logstash/config/mixin"
require "logstash/util/wrapped_synchronous_queue"
require "concurrent/atomic/atomic_fixnum"

class LogStash::Outputs::Base < LogStash::Plugin
include LogStash::Config::Mixin
Expand All @@ -23,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1

attr_reader :worker_plugins, :worker_queue, :worker_threads
attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins

public
def workers_not_supported(message=nil)
Expand All @@ -40,6 +42,10 @@ def workers_not_supported(message=nil)
def initialize(params={})
super
config_init(params)

# If we're running with a single thread we must enforce single-threaded concurrency by default
# Maybe in a future version we'll assume output plugins are threadsafe
@single_worker_mutex = Mutex.new
end

public
Expand All @@ -53,42 +59,14 @@ def receive(event)
end # def receive

public
def worker_setup
if @workers == 1
@worker_plugins = [self]
@worker_threads = []
else
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
@worker_threads = @worker_plugins.map.with_index do |plugin, i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_plugin(self)
plugin.register
while true
event = queue.pop
plugin.handle(event)
end
end
end
end
end

public
def handle(event)
LogStash::Util.set_thread_plugin(self)
receive(event)
end # def handle

def handle_worker(event)
LogStash::Util.set_thread_plugin(self)
@worker_queue.push(event)
# To be overriden in implementations
def multi_receive(events)
events.each {|event| receive(event) }
end

private
def output?(event)
# TODO: noop for now, remove this once we delete this call from all plugins
true
end # def output?
end # class LogStash::Outputs::Base
end # class LogStash::Outputs::Base
Loading