Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Next generation pipeline #4340

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@
source "https://rubygems.org"
gem "logstash-core", "3.0.0.dev", :path => "./logstash-core"
gem "logstash-core-event", "3.0.0.dev", :path => "./logstash-core-event"
# gem "logstash-core-event-java", "3.0.0.dev", :path => "./logstash-core-event-java"
gem "file-dependencies", "0.1.6"
gem "ci_reporter_rspec", "1.0.0", :group => :development
gem "simplecov", :group => :development
gem "coveralls", :group => :development
# Tins 1.7 requires the ruby 2.0 platform to install,
# this gem is a dependency of term-ansi-color which is a dependency of coveralls.
# 1.6 is the last supported version on jruby.
gem "tins", "1.6", :group => :development
gem "rspec", "~> 3.1.0", :group => :development
gem "logstash-devutils", "~> 0.0.15", :group => :development
gem "logstash-devutils", ">= 0"
gem "benchmark-ips", :group => :development
gem "octokit", "3.8.0", :group => :build
gem "stud", "~> 0.0.21", :group => :build
gem "fpm", "~> 1.3.3", :group => :build
gem "rubyzip", "~> 1.1.7", :group => :build
gem "gems", "~> 0.8.3", :group => :build
gem "flores", "~> 0.0.6", :group => :development
gem "logstash-filter-clone"
gem "logstash-filter-mutate"
gem "logstash-filter-multiline"
gem "logstash-input-generator"
gem "logstash-input-stdin"
gem "logstash-input-tcp"
gem "logstash-output-stdout"
57 changes: 56 additions & 1 deletion Gemfile.jruby-1.9.lock
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ PATH
logstash-core-event (~> 3.0.0.dev)
minitar (~> 0.5.4)
pry (~> 0.10.1)
rubyzip (~> 1.1.7)
stud (~> 0.0.19)
thread_safe (~> 0.3.5)
treetop (< 1.5.0)
Expand Down Expand Up @@ -74,10 +75,21 @@ GEM
domain_name (~> 0.5)
i18n (0.6.9)
insist (1.0.0)
jls-grok (0.11.2)
cabin (>= 0.6.0)
jrjackson (0.3.7)
jruby-openssl (0.9.12-java)
json (1.8.3-java)
kramdown (1.9.0)
logstash-codec-json (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-codec-json_lines (2.0.2)
logstash-codec-line
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-codec-line (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-codec-plain (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-devutils (0.0.18-java)
gem_publisher
insist (= 1.0.0)
Expand All @@ -87,6 +99,42 @@ GEM
rspec (~> 3.1.0)
rspec-wait
stud (>= 0.0.20)
logstash-filter-clone (2.0.3)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-filter-grok (2.0.2)
jls-grok (~> 0.11.1)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-patterns-core
logstash-filter-multiline (2.0.2)
jls-grok (~> 0.11.0)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-filter-mutate
logstash-patterns-core
logstash-filter-mutate (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-filter-grok
logstash-patterns-core
logstash-input-generator (2.0.2)
logstash-codec-plain
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-input-stdin (2.0.2)
concurrent-ruby
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-plain
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-input-tcp (2.0.4)
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-plain
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-output-stdout (2.0.2)
logstash-codec-line
logstash-core (>= 2.0.0.beta2, < 3.0.0)
logstash-patterns-core (2.0.2)
logstash-core (>= 2.0.0.beta2, < 3.0.0)
method_source (0.8.2)
mime-types (2.6.2)
minitar (0.5.4)
Expand Down Expand Up @@ -155,7 +203,14 @@ DEPENDENCIES
gems (~> 0.8.3)
logstash-core (= 3.0.0.dev)!
logstash-core-event (= 3.0.0.dev)!
logstash-devutils (~> 0.0.15)
logstash-devutils
logstash-filter-clone
logstash-filter-multiline
logstash-filter-mutate
logstash-input-generator
logstash-input-stdin
logstash-input-tcp
logstash-output-stdout
octokit (= 3.8.0)
rspec (~> 3.1.0)
rubyzip (~> 1.1.7)
Expand Down
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def execute
end # def execute

def add_pipeline(pipeline_id, config_str, settings = {})
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings)
@pipelines[pipeline_id] = LogStash::Pipeline.new(config_str, settings.merge(:pipeline_id => pipeline_id))
end

private
Expand Down Expand Up @@ -76,4 +76,4 @@ def trap_sigint
end
end
end
end # class LogStash::Agent
end # class LogStash::Agent
8 changes: 5 additions & 3 deletions logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def compile
# defines @filter_func and @output_func

definitions << "def #{type}_func(event)"
definitions << " targeted_outputs = []" if type == "output"
definitions << " events = [event]" if type == "filter"
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"

Expand All @@ -116,6 +117,7 @@ def compile
end

definitions << " events" if type == "filter"
definitions << " targeted_outputs" if type == "output"
definitions << "end"
end

Expand Down Expand Up @@ -237,7 +239,7 @@ def compile
events = #{variable_name}.multi_filter(events)
CODE
when "output"
return "#{variable_name}.handle(event)\n"
return "targeted_outputs << #{variable_name}\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
Expand Down Expand Up @@ -402,7 +404,7 @@ def cond_func_#{i}(input_events)
<<-CODE
events = cond_func_#{i}(events)
CODE
else
else # Output
<<-CODE
#{super}
end
Expand Down Expand Up @@ -542,4 +544,4 @@ def _inspect(indent="")
""
)
end
end
end
68 changes: 68 additions & 0 deletions logstash-core/lib/logstash/output_delegator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# encoding: utf-8
require "concurrent/atomic/atomic_fixnum"

# This class goes hand in hand with the pipeline to provide a pool of
# free workers to be used by pipeline worker threads. The pool is
# internally represented with a SizedQueue set the the size of the number
# of 'workers' the output plugin is configured with.
#
# This plugin also records some basic statistics
module LogStash; class OutputDelegator
attr_reader :workers, :config, :worker_count

# The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them
# Internally these just get merged together into a single hash
def initialize(logger, klass, *args)
@logger = logger
@config = args.reduce({}, :merge)
@klass = klass
@worker_count = @config["workers"] || 1

@worker_queue = SizedQueue.new(@worker_count)

@workers = @worker_count.times.map do
w = @klass.new(*args)
w.register
@worker_queue << w
w
end

@events_received = Concurrent::AtomicFixnum.new(0)
end

def config_name
@klass.config_name
end

def register
@workers.each {|w| w.register}
end

def multi_receive(events)
@events_received.increment(events.length)

worker = @worker_queue.pop
begin
worker.multi_receive(events)
ensure
@worker_queue.push(worker)
end
end

def do_close
@logger.debug("closing output delegator", :klass => self)

@worker_count.times do
worker = @worker_queue.pop
worker.do_close
end
end

def events_received
@events_received.value
end

def busy_workers
@worker_queue.size
end
end end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woa cleaner +1

44 changes: 11 additions & 33 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require "logstash/plugin"
require "logstash/namespace"
require "logstash/config/mixin"
require "logstash/util/wrapped_synchronous_queue"
require "concurrent/atomic/atomic_fixnum"

class LogStash::Outputs::Base < LogStash::Plugin
include LogStash::Config::Mixin
Expand All @@ -23,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1

attr_reader :worker_plugins, :worker_queue, :worker_threads
attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins

public
def workers_not_supported(message=nil)
Expand All @@ -40,6 +42,10 @@ def workers_not_supported(message=nil)
def initialize(params={})
super
config_init(params)

# If we're running with a single thread we must enforce single-threaded concurrency by default
# Maybe in a future version we'll assume output plugins are threadsafe
@single_worker_mutex = Mutex.new
end

public
Expand All @@ -53,42 +59,14 @@ def receive(event)
end # def receive

public
def worker_setup
if @workers == 1
@worker_plugins = [self]
@worker_threads = []
else
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
@worker_threads = @worker_plugins.map.with_index do |plugin, i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_plugin(self)
plugin.register
while true
event = queue.pop
plugin.handle(event)
end
end
end
end
end

public
def handle(event)
LogStash::Util.set_thread_plugin(self)
receive(event)
end # def handle

def handle_worker(event)
LogStash::Util.set_thread_plugin(self)
@worker_queue.push(event)
# To be overriden in implementations
def multi_receive(events)
events.each {|event| receive(event) }
end

private
def output?(event)
# TODO: noop for now, remove this once we delete this call from all plugins
true
end # def output?
end # class LogStash::Outputs::Base
end # class LogStash::Outputs::Base
Loading