-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Next Gen Pipeline #4254
Next Gen Pipeline #4254
Changes from 4 commits
6322c5f
0d09556
5aff6ac
d11d1cc
56ada56
9a0a68c
808c8d8
5c69235
9ac1f2f
16fc8e7
de66bb2
3439464
05e9650
4f0d100
167a180
616afe1
76ddd14
5d6e9a9
5be12df
ec1c03b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -108,6 +108,7 @@ def compile | |
# defines @filter_func and @output_func | ||
|
||
definitions << "def #{type}_func(event)" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is an error here when you run multiples pipeline and the way we are generating the code. I have a PR to fix it in #4298 |
||
definitions << " targeted_outputs = []" if type == "output" | ||
definitions << " events = [event]" if type == "filter" | ||
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)" | ||
|
||
|
@@ -116,6 +117,7 @@ def compile | |
end | ||
|
||
definitions << " events" if type == "filter" | ||
definitions << " targeted_outputs" if type == "output" | ||
definitions << "end" | ||
end | ||
|
||
|
@@ -237,7 +239,7 @@ def compile | |
events = #{variable_name}.multi_filter(events) | ||
CODE | ||
when "output" | ||
return "#{variable_name}.handle(event)\n" | ||
return "targeted_outputs << #{variable_name}\n" | ||
when "codec" | ||
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) | ||
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" | ||
|
@@ -402,7 +404,7 @@ def cond_func_#{i}(input_events) | |
<<-CODE | ||
events = cond_func_#{i}(events) | ||
CODE | ||
else | ||
else # Output | ||
<<-CODE | ||
#{super} | ||
end | ||
|
@@ -542,4 +544,4 @@ def _inspect(indent="") | |
"" | ||
) | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,8 @@ | |
require "logstash/plugin" | ||
require "logstash/namespace" | ||
require "logstash/config/mixin" | ||
require "logstash/util/wrapped_synchronous_queue" | ||
require "concurrent/atomic/atomic_fixnum" | ||
|
||
class LogStash::Outputs::Base < LogStash::Plugin | ||
include LogStash::Config::Mixin | ||
|
@@ -23,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin | |
# Note that this setting may not be useful for all outputs. | ||
config :workers, :validate => :number, :default => 1 | ||
|
||
attr_reader :worker_plugins, :worker_queue, :worker_threads | ||
attr_reader :worker_plugins, :available_workers, :workers, :single_worker_mutex, :is_multi_worker, :worker_plugins | ||
|
||
public | ||
def workers_not_supported(message=nil) | ||
|
@@ -40,6 +42,11 @@ def workers_not_supported(message=nil) | |
def initialize(params={}) | ||
super | ||
config_init(params) | ||
|
||
# If we're running with a single thread we must enforce single-threaded concurrency by default | ||
# Maybe in a future version we'll assume output plugins are threadsafe | ||
@single_worker_mutex = Mutex.new | ||
worker_setup | ||
end | ||
|
||
public | ||
|
@@ -54,41 +61,64 @@ def receive(event) | |
|
||
public | ||
def worker_setup | ||
# TODO: Remove this branch, delete this function | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That TODO is inaccurate and out of date, will remove! |
||
if @workers == 1 | ||
@is_multi_worker = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if it's worth defining this variable it's not used anywhere. In https://github.com/elastic/logstash/pull/4254/files#diff-1646a27fa8dcf9f75d0b65aeaa7c9047R100 it is defined again There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! Will remove this ivar. |
||
@worker_plugins = [self] | ||
@worker_threads = [] | ||
else | ||
define_singleton_method(:handle, method(:handle_worker)) | ||
@worker_queue = SizedQueue.new(20) | ||
@is_multi_worker = true | ||
define_singleton_method(:multi_handle, method(:handle_worker)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redefining the method seems a bit extreme to me? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume it was built this way for speed? I'm just following the pattern the base output used before. I'm totally open to just using a Proc, but I'm curious if anyone else has any insight here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know.. :D There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I understand why this singleton_method is needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because otherwise it would affect the workers. This is just redifining it in the parent. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll just add that it'd be nice to not do this sort of hackery but rather just use ivars for changing this stuff up, however that runs the risk of being a performance regression and is IMHO out of scope for this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, I just got it. there is already a I wonder how we could either name or in some way organize the code or document or whatever that would make it more obvious for someone in the code to understand this mechanic. even knowing about how it works it is really hard to understand. I believe we should take the opportunity of the current refactor to try and make that better? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the real solution is to move multiplexing code out into its own module as discussed elsewhere. That way no more weird method redefinition. |
||
|
||
@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) } | ||
@worker_threads = @worker_plugins.map.with_index do |plugin, i| | ||
Thread.new(original_params, @worker_queue) do |params, queue| | ||
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}") | ||
LogStash::Util.set_thread_plugin(self) | ||
plugin.register | ||
while true | ||
event = queue.pop | ||
plugin.handle(event) | ||
end | ||
end | ||
|
||
@available_workers = SizedQueue.new(@worker_plugins.length) | ||
|
||
@worker_plugins.each do |wp| | ||
wp.register | ||
@available_workers << wp | ||
end | ||
end | ||
end | ||
|
||
public | ||
# Not to be overriden by plugin authors! | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe I'm missing something but this method is no longer used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine with removing this, but multi_handle is so named to be consistent with |
||
def handle(event) | ||
LogStash::Util.set_thread_plugin(self) | ||
receive(event) | ||
@single_worker_mutex.synchronize { receive(event) } | ||
end # def handle | ||
|
||
def handle_worker(event) | ||
LogStash::Util.set_thread_plugin(self) | ||
@worker_queue.push(event) | ||
# To be overriden in implementations | ||
def multi_receive(events) | ||
events.each {|event| | ||
receive(event) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one line maybe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep! Should be :) |
||
end | ||
|
||
# Not to be overriden by plugin authors! | ||
def multi_handle(events) | ||
@single_worker_mutex.synchronize { multi_receive(events) } | ||
end | ||
|
||
def handle_worker(events) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would try use a more descriptive name for this method. this is in reality the top level |
||
worker = @available_workers.pop | ||
begin | ||
worker.multi_receive(events) | ||
ensure | ||
@available_workers.push(worker) | ||
end | ||
end | ||
|
||
def do_close | ||
if @worker_plugins | ||
@worker_plugins.each do |wp| | ||
wp.do_close unless wp === self | ||
end | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call |
||
super | ||
end | ||
|
||
private | ||
def output?(event) | ||
# TODO: noop for now, remove this once we delete this call from all plugins | ||
true | ||
end # def output? | ||
end # class LogStash::Outputs::Base | ||
end # class LogStash::Outputs::Base |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this means this changes allows us to have multiple pipelines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is already in the current master, but yes this is the first step to enable multiple pipelines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, very interesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing to note is I made the pipeline ID part of the thread names so it looks nice in VisualVM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewvc this pipeline_id handling is quirky,I see that in pipeline.rb if the id is nil then
object_id
is used. maybe we could do it here instead? because otherwise callingadd_pipeline(nil, "") will create a nil key in @pipelines but the pipeline.id will return
object_id`There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @jsvd comment.
But this scenario is also problematic if you are trying to add a new pipeline with an existing key, but I think this can be solve in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsvd @ph yep, great observation. I'll move that logic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsvd @ph looking through the code the reason I made the UUID generation automatic inside of pipeline was that for testing this is pretty inconvenient.
If we're thinking of making pipeline_id required it should change the signature of the constructor to
Pipeline.new(pipeline_id, config_str, settings)
. Are you guys OK with this change? It will make the PR a good bit larger as all tests instantiating the pipeline will need to change.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewvc Lets make that into another PR :(