Skip to content

Commit

Permalink
Next generation pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Nov 23, 2015
1 parent 50a5a4b commit ffaa783
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 265 deletions.
8 changes: 4 additions & 4 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ class LogStash::Agent < Clamp::Command
:default_input => DEFAULT_INPUT, :default_output => DEFAULT_OUTPUT),
:default => "", :attribute_name => :config_string

option ["-w", "--filterworkers"], "COUNT",
I18n.t("logstash.agent.flag.filterworkers"),
:attribute_name => :filter_workers,
option ["-w", "--pipelineworkers"], "COUNT",
I18n.t("logstash.agent.flag.pipelineworkers"),
:attribute_name => :pipeline_workers,
:default => 0, &:to_i

option ["-l", "--log"], "FILE",
Expand Down Expand Up @@ -151,7 +151,7 @@ def execute
configure_logging(log_file)
end

pipeline.configure("filter-workers", filter_workers) if filter_workers > 0
pipeline.configure("pipeline-workers", pipeline_workers) if pipeline_workers > 0

# Stop now if we are only asking for a config test.
if config_test?
Expand Down
48 changes: 25 additions & 23 deletions logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def compile
# defines @filter_func and @output_func

definitions << "def #{type}_func(event)"
definitions << " targeted_outputs = []" if type == "output"
definitions << " events = [event]" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"

Expand All @@ -116,6 +117,7 @@ def compile
end

definitions << " events" if type == "filter"
definitions << " targeted_outputs" if type == "output"
definitions << "end"
end

Expand All @@ -142,15 +144,15 @@ def compile_initializer


code << <<-CODE
#{name} = #{plugin.compile_initializer}
#{name} = #{plugin.compile_initializer}
@#{plugin.plugin_type}s << #{name}
CODE

# The flush method for this filter.
if plugin.plugin_type == "filter"

code << <<-CODE
#{name}_flush = lambda do |options, &block|
#{name}_flush = lambda do |options, &block|
@logger.debug? && @logger.debug(\"Flushing\", :plugin => #{name})
events = #{name}.flush(options)
Expand Down Expand Up @@ -230,18 +232,18 @@ def compile_initializer

def compile
case plugin_type
when "input"
return "start_input(#{variable_name})"
when "filter"
return <<-CODE
when "input"
return "start_input(#{variable_name})"
when "filter"
return <<-CODE
events = #{variable_name}.multi_filter(events)
CODE
when "output"
return "#{variable_name}.handle(event)\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})"
CODE
when "output"
return "targeted_outputs << #{variable_name}\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})"
end
end

Expand Down Expand Up @@ -345,7 +347,7 @@ def validate!
:column => input.column_of(interval.first),
:byte => interval.first + 1,
:after => input[0..interval.first]
)
)
)
end
end
Expand Down Expand Up @@ -402,9 +404,9 @@ def cond_func_#{i}(input_events)
<<-CODE
events = cond_func_#{i}(events)
CODE
else
else # Output
<<-CODE
#{super}
#{super}
end
CODE
end
Expand Down Expand Up @@ -525,21 +527,21 @@ def _inspect(indent="")
tv = "...#{tv[-20..-1]}" if tv.size > 20

indent +
self.class.to_s.sub(/.*:/,'') +
self.class.to_s.sub(/.*:/,'') +
em.map{|m| "+"+m.to_s.sub(/.*:/,'')}*"" +
" offset=#{interval.first}" +
", #{tv.inspect}" +
im +
(elements && elements.size > 0 ?
":" +
(elements.select { |e| !e.is_a?(LogStash::Config::AST::Whitespace) && e.elements && e.elements.size > 0 }||[]).map{|e|
begin
"\n"+e.inspect(indent+" ")
rescue # Defend against inspect not taking a parameter
"\n"+indent+" "+e.inspect
end
begin
"\n"+e.inspect(indent+" ")
rescue # Defend against inspect not taking a parameter
"\n"+indent+" "+e.inspect
end
}.join("") :
""
)
end
end
end
70 changes: 50 additions & 20 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require "logstash/plugin"
require "logstash/namespace"
require "logstash/config/mixin"
require "logstash/util/wrapped_synchronous_queue"
require "concurrent/atomic/atomic_fixnum"

class LogStash::Outputs::Base < LogStash::Plugin
include LogStash::Config::Mixin
Expand All @@ -23,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1

attr_reader :worker_plugins, :worker_queue, :worker_threads
attr_reader :worker_plugins, :available_workers, :workers, :single_worker_mutex, :is_multi_worker, :worker_plugins

public
def workers_not_supported(message=nil)
Expand All @@ -40,6 +42,11 @@ def workers_not_supported(message=nil)
def initialize(params={})
super
config_init(params)

# If we're running with a single thread we must enforce single-threaded concurrency by default
# Maybe in a future version we'll assume output plugins are threadsafe
@single_worker_mutex = Mutex.new
worker_setup
end

public
Expand All @@ -54,41 +61,64 @@ def receive(event)

public
def worker_setup
# TODO: Remove this branch, delete this function
if @workers == 1
@is_multi_worker = false
@worker_plugins = [self]
@worker_threads = []
else
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@is_multi_worker = true
define_singleton_method(:multi_handle, method(:handle_worker))

@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
@worker_threads = @worker_plugins.map.with_index do |plugin, i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_plugin(self)
plugin.register
while true
event = queue.pop
plugin.handle(event)
end
end

@available_workers = SizedQueue.new(@worker_plugins.length)

@worker_plugins.each do |wp|
wp.register
@available_workers << wp
end
end
end

public
# Not to be overriden by plugin authors!
def handle(event)
LogStash::Util.set_thread_plugin(self)
receive(event)
@single_worker_mutex.synchronize { receive(event) }
end # def handle

def handle_worker(event)
LogStash::Util.set_thread_plugin(self)
@worker_queue.push(event)
# To be overriden in implementations
def multi_receive(events)
events.each {|event|
receive(event)
}
end

# Not to be overriden by plugin authors!
def multi_handle(events)
@single_worker_mutex.synchronize { multi_receive(events) }
end

def handle_worker(events)
worker = @available_workers.pop
begin
worker.multi_receive(events)
ensure
@available_workers.push(worker)
end
end

def do_close
if @worker_plugins
@worker_plugins.each do |wp|
wp.do_close unless wp === self
end
end
super
end

private
def output?(event)
# TODO: noop for now, remove this once we delete this call from all plugins
true
end # def output?
end # class LogStash::Outputs::Base
end # class LogStash::Outputs::Base
Loading

0 comments on commit ffaa783

Please sign in to comment.