Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Next Gen Pipeline #4254

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def execute
end # def execute

def add_pipeline(pipeline_id, config_str, settings = {})
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings)
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings.merge(:pipeline_id => pipeline_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this means this changes allows us to have multiple pipelines?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is already in the current master, but yes this is the first step to enable multiple pipelines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, very interesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note is I made the pipeline ID part of the thread names so it looks nice in VisualVM!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc this pipeline_id handling is quirky,I see that in pipeline.rb if the id is nil then object_id is used. maybe we could do it here instead? because otherwise calling add_pipeline(nil, "") will create a nil key in @pipelines but the pipeline.id will returnobject_id`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @jsvd comment.

But this scenario is also problematic if you are trying to add a new pipeline with an existing key, but I think this can be solve in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd @ph yep, great observation. I'll move that logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd @ph looking through the code the reason I made the UUID generation automatic inside of pipeline was that for testing this is pretty inconvenient.

If we're thinking of making pipeline_id required it should change the signature of the constructor to Pipeline.new(pipeline_id, config_str, settings). Are you guys OK with this change? It will make the PR a good bit larger as all tests instantiating the pipeline will need to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc Lets make that into another PR :(

end

private
Expand Down Expand Up @@ -76,4 +76,4 @@ def trap_sigint
end
end
end
end # class LogStash::Agent
end # class LogStash::Agent
48 changes: 25 additions & 23 deletions logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def compile
# defines @filter_func and @output_func

definitions << "def #{type}_func(event)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an error here when you run multiples pipeline and the way we are generating the code.
the def need to be a define_singleton_method

I have a PR to fix it in #4298

definitions << " targeted_outputs = []" if type == "output"
definitions << " events = [event]" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"

Expand All @@ -116,6 +117,7 @@ def compile
end

definitions << " events" if type == "filter"
definitions << " targeted_outputs" if type == "output"
definitions << "end"
end

Expand All @@ -142,15 +144,15 @@ def compile_initializer


code << <<-CODE
#{name} = #{plugin.compile_initializer}
#{name} = #{plugin.compile_initializer}
@#{plugin.plugin_type}s << #{name}
CODE

# The flush method for this filter.
if plugin.plugin_type == "filter"

code << <<-CODE
#{name}_flush = lambda do |options, &block|
#{name}_flush = lambda do |options, &block|
@logger.debug? && @logger.debug(\"Flushing\", :plugin => #{name})
events = #{name}.flush(options)
Expand Down Expand Up @@ -230,18 +232,18 @@ def compile_initializer

def compile
case plugin_type
when "input"
return "start_input(#{variable_name})"
when "filter"
return <<-CODE
when "input"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these cosmetic indentation changes part of our style guidelines? Or are these general ruby style guidelines? I've seen the unindented style much more frequently, so I'm curious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be my editor (rubymine) sneaking in with its opinion. Good call, I'll re-indent this stuff.

return "start_input(#{variable_name})"
when "filter"
return <<-CODE
events = #{variable_name}.multi_filter(events)
CODE
when "output"
return "#{variable_name}.handle(event)\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})"
CODE
when "output"
return "targeted_outputs << #{variable_name}\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})"
end
end

Expand Down Expand Up @@ -345,7 +347,7 @@ def validate!
:column => input.column_of(interval.first),
:byte => interval.first + 1,
:after => input[0..interval.first]
)
)
)
end
end
Expand Down Expand Up @@ -402,9 +404,9 @@ def cond_func_#{i}(input_events)
<<-CODE
events = cond_func_#{i}(events)
CODE
else
else # Output
<<-CODE
#{super}
#{super}
end
CODE
end
Expand Down Expand Up @@ -525,21 +527,21 @@ def _inspect(indent="")
tv = "...#{tv[-20..-1]}" if tv.size > 20

indent +
self.class.to_s.sub(/.*:/,'') +
self.class.to_s.sub(/.*:/,'') +
em.map{|m| "+"+m.to_s.sub(/.*:/,'')}*"" +
" offset=#{interval.first}" +
", #{tv.inspect}" +
im +
(elements && elements.size > 0 ?
":" +
(elements.select { |e| !e.is_a?(LogStash::Config::AST::Whitespace) && e.elements && e.elements.size > 0 }||[]).map{|e|
begin
"\n"+e.inspect(indent+" ")
rescue # Defend against inspect not taking a parameter
"\n"+indent+" "+e.inspect
end
begin
"\n"+e.inspect(indent+" ")
rescue # Defend against inspect not taking a parameter
"\n"+indent+" "+e.inspect
end
}.join("") :
""
)
end
end
end
70 changes: 50 additions & 20 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require "logstash/plugin"
require "logstash/namespace"
require "logstash/config/mixin"
require "logstash/util/wrapped_synchronous_queue"
require "concurrent/atomic/atomic_fixnum"

class LogStash::Outputs::Base < LogStash::Plugin
include LogStash::Config::Mixin
Expand All @@ -23,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1

attr_reader :worker_plugins, :worker_queue, :worker_threads
attr_reader :worker_plugins, :available_workers, :workers, :single_worker_mutex, :is_multi_worker, :worker_plugins

public
def workers_not_supported(message=nil)
Expand All @@ -40,6 +42,11 @@ def workers_not_supported(message=nil)
def initialize(params={})
super
config_init(params)

# If we're running with a single thread we must enforce single-threaded concurrency by default
# Maybe in a future version we'll assume output plugins are threadsafe
@single_worker_mutex = Mutex.new
worker_setup
end

public
Expand All @@ -54,41 +61,64 @@ def receive(event)

public
def worker_setup
# TODO: Remove this branch, delete this function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That TODO is inaccurate and out of date, will remove!

if @workers == 1
@is_multi_worker = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's worth defining this variable it's not used anywhere.

In https://github.com/elastic/logstash/pull/4254/files#diff-1646a27fa8dcf9f75d0b65aeaa7c9047R100 it is defined again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Will remove this ivar.

@worker_plugins = [self]
@worker_threads = []
else
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@is_multi_worker = true
define_singleton_method(:multi_handle, method(:handle_worker))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redefining the method seems a bit extreme to me?
Can we just set the proc to an instance variable and multi_handle proxy the call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it was built this way for speed? I'm just following the pattern the base output used before. I'm totally open to just using a Proc, but I'm curious if anyone else has any insight here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know.. :D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand why this singleton_method is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because otherwise it would affect the workers. This is just redifining it in the parent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just add that it'd be nice to not do this sort of hackery but rather just use ivars for changing this stuff up, however that runs the risk of being a performance regression and is IMHO out of scope for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but multi_handle would never be invoked in workers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I just got it. there is already a multi_handle defined in the class which is the one used by the actual workers and the define_singleton_method(:multi_handle, method(:handle_worker)) redefinition is only for the top worker which will pass events downs to the downstream output workers.

I wonder how we could either name or in some way organize the code or document or whatever that would make it more obvious for someone in the code to understand this mechanic. even knowing about how it works it is really hard to understand. I believe we should take the opportunity of the current refactor to try and make that better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the real solution is to move multiplexing code out into its own module as discussed elsewhere. That way no more weird method redefinition.


@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
@worker_threads = @worker_plugins.map.with_index do |plugin, i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_plugin(self)
plugin.register
while true
event = queue.pop
plugin.handle(event)
end
end

@available_workers = SizedQueue.new(@worker_plugins.length)

@worker_plugins.each do |wp|
wp.register
@available_workers << wp
end
end
end

public
# Not to be overriden by plugin authors!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm missing something but this method is no longer used.
I suggest removing it and renaming multi_handle to handle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with removing this, but multi_handle is so named to be consistent with Filter#multi_filter. I'd rather not change multi_handle given that.

def handle(event)
LogStash::Util.set_thread_plugin(self)
receive(event)
@single_worker_mutex.synchronize { receive(event) }
end # def handle

def handle_worker(event)
LogStash::Util.set_thread_plugin(self)
@worker_queue.push(event)
# To be overriden in implementations
def multi_receive(events)
events.each {|event|
receive(event)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one line maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep! Should be :)

end

# Not to be overriden by plugin authors!
def multi_handle(events)
@single_worker_mutex.synchronize { multi_receive(events) }
end

def handle_worker(events)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would try use a more descriptive name for this method. this is in reality the top level multi_handle when multiple workers are configured. maybe something like multi_handle_proxy or multi_handle_multiplexer or something better

worker = @available_workers.pop
begin
worker.multi_receive(events)
ensure
@available_workers.push(worker)
end
end

def do_close
if @worker_plugins
@worker_plugins.each do |wp|
wp.do_close unless wp === self
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code @worker_plugins will always be an array since we set it in the initialize method, so we dont need to do if @worker_plugins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call

super
end

private
def output?(event)
# TODO: noop for now, remove this once we delete this call from all plugins
true
end # def output?
end # class LogStash::Outputs::Base
end # class LogStash::Outputs::Base
Loading