Skip to content

Commit

Permalink
NG Pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Dec 16, 2015
1 parent c95947d commit 9f3a44a
Show file tree
Hide file tree
Showing 17 changed files with 744 additions and 300 deletions.
28 changes: 21 additions & 7 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require "logstash/config/cpu_core_strategy"
require "uri"
require "net/http"
require "logstash/pipeline"
LogStash::Environment.load_locale!

class LogStash::Agent < Clamp::Command
Expand All @@ -20,10 +21,20 @@ class LogStash::Agent < Clamp::Command
:default_input => DEFAULT_INPUT, :default_output => DEFAULT_OUTPUT),
:default => "", :attribute_name => :config_string

option ["-w", "--filterworkers"], "COUNT",
I18n.t("logstash.agent.flag.filterworkers"),
:attribute_name => :filter_workers,
:default => 0, &:to_i
option ["-w", "--pipeline-workers"], "COUNT",
I18n.t("logstash.runner.flag.pipeline-workers"),
:attribute_name => :pipeline_workers,
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers], &:to_i

option ["-b", "--pipeline-batch-size"], "SIZE",
I18n.t("logstash.runner.flag.pipeline-batch-size"),
:attribute_name => :pipeline_batch_size,
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size], &:to_i

option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS",
I18n.t("logstash.runner.flag.pipeline-batch-delay"),
:attribute_name => :pipeline_batch_delay,
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay], &:to_i

option ["-l", "--log"], "FILE",
I18n.t("logstash.agent.flag.log"),
Expand Down Expand Up @@ -121,7 +132,12 @@ def execute
end

begin
pipeline = LogStash::Pipeline.new(@config_string)
pipeline = LogStash::Pipeline.new(@config_string, {
:pipeline_workers => pipeline_workers,
:pipeline_batch_size => pipeline_batch_size,
:pipeline_batch_delay => pipeline_batch_delay,
:pipeline_id => "base"
})
rescue LoadError => e
fail("Configuration problem.")
end
Expand Down Expand Up @@ -151,8 +167,6 @@ def execute
configure_logging(log_file)
end

pipeline.configure("filter-workers", filter_workers) if filter_workers > 0

# Stop now if we are only asking for a config test.
if config_test?
report "Configuration OK"
Expand Down
8 changes: 5 additions & 3 deletions logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def compile
# defines @filter_func and @output_func

definitions << "def #{type}_func(event)"
definitions << " targeted_outputs = []" if type == "output"
definitions << " events = [event]" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"

Expand All @@ -116,6 +117,7 @@ def compile
end

definitions << " events" if type == "filter"
definitions << " targeted_outputs" if type == "output"
definitions << "end"
end

Expand Down Expand Up @@ -237,7 +239,7 @@ def compile
events = #{variable_name}.multi_filter(events)
CODE
when "output"
return "#{variable_name}.handle(event)\n"
return "targeted_outputs << #{variable_name}\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
Expand Down Expand Up @@ -402,7 +404,7 @@ def cond_func_#{i}(input_events)
<<-CODE
events = cond_func_#{i}(events)
CODE
else
else # Output
<<-CODE
#{super}
end
Expand Down Expand Up @@ -542,4 +544,4 @@ def _inspect(indent="")
""
)
end
end
end
68 changes: 68 additions & 0 deletions logstash-core/lib/logstash/output_delegator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# encoding: utf-8
require "concurrent/atomic/atomic_fixnum"

# This class goes hand in hand with the pipeline to provide a pool of
# free workers to be used by pipeline worker threads. The pool is
# internally represented with a SizedQueue set the the size of the number
# of 'workers' the output plugin is configured with.
#
# This plugin also records some basic statistics
module LogStash; class OutputDelegator
attr_reader :workers, :config, :worker_count

# The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them
# Internally these just get merged together into a single hash
def initialize(logger, klass, *args)
@logger = logger
@config = args.reduce({}, :merge)
@klass = klass
@worker_count = @config["workers"] || 1

@worker_queue = SizedQueue.new(@worker_count)

@workers = @worker_count.times.map do
w = @klass.new(*args)
w.register
@worker_queue << w
w
end

@events_received = Concurrent::AtomicFixnum.new(0)
end

def config_name
@klass.config_name
end

def register
@workers.each {|w| w.register}
end

def multi_receive(events)
@events_received.increment(events.length)

worker = @worker_queue.pop
begin
worker.multi_receive(events)
ensure
@worker_queue.push(worker)
end
end

def do_close
@logger.debug("closing output delegator", :klass => self)

@worker_count.times do
worker = @worker_queue.pop
worker.do_close
end
end

def events_received
@events_received.value
end

def busy_workers
@worker_queue.size
end
end end
44 changes: 11 additions & 33 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require "logstash/plugin"
require "logstash/namespace"
require "logstash/config/mixin"
require "logstash/util/wrapped_synchronous_queue"
require "concurrent/atomic/atomic_fixnum"

class LogStash::Outputs::Base < LogStash::Plugin
include LogStash::Config::Mixin
Expand All @@ -23,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1

attr_reader :worker_plugins, :worker_queue, :worker_threads
attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins

public
def workers_not_supported(message=nil)
Expand All @@ -40,6 +42,10 @@ def workers_not_supported(message=nil)
def initialize(params={})
super
config_init(params)

# If we're running with a single thread we must enforce single-threaded concurrency by default
# Maybe in a future version we'll assume output plugins are threadsafe
@single_worker_mutex = Mutex.new
end

public
Expand All @@ -53,42 +59,14 @@ def receive(event)
end # def receive

public
def worker_setup
if @workers == 1
@worker_plugins = [self]
@worker_threads = []
else
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
@worker_threads = @worker_plugins.map.with_index do |plugin, i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_plugin(self)
plugin.register
while true
event = queue.pop
plugin.handle(event)
end
end
end
end
end

public
def handle(event)
LogStash::Util.set_thread_plugin(self)
receive(event)
end # def handle

def handle_worker(event)
LogStash::Util.set_thread_plugin(self)
@worker_queue.push(event)
# To be overriden in implementations
def multi_receive(events)
events.each {|event| receive(event) }
end

private
def output?(event)
# TODO: noop for now, remove this once we delete this call from all plugins
true
end # def output?
end # class LogStash::Outputs::Base
end # class LogStash::Outputs::Base
Loading

0 comments on commit 9f3a44a

Please sign in to comment.