diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index cd9d0c6941e..30a046df3e5 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -5,6 +5,7 @@ require "logstash/config/cpu_core_strategy" require "uri" require "net/http" +require "logstash/pipeline" LogStash::Environment.load_locale! class LogStash::Agent < Clamp::Command @@ -20,10 +21,20 @@ class LogStash::Agent < Clamp::Command :default_input => DEFAULT_INPUT, :default_output => DEFAULT_OUTPUT), :default => "", :attribute_name => :config_string - option ["-w", "--filterworkers"], "COUNT", - I18n.t("logstash.agent.flag.filterworkers"), - :attribute_name => :filter_workers, - :default => 0, &:to_i + option ["-w", "--pipeline-workers"], "COUNT", + I18n.t("logstash.runner.flag.pipeline-workers"), + :attribute_name => :pipeline_workers, + :default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers], &:to_i + + option ["-b", "--pipeline-batch-size"], "SIZE", + I18n.t("logstash.runner.flag.pipeline-batch-size"), + :attribute_name => :pipeline_batch_size, + :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size], &:to_i + + option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS", + I18n.t("logstash.runner.flag.pipeline-batch-delay"), + :attribute_name => :pipeline_batch_delay, + :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay], &:to_i option ["-l", "--log"], "FILE", I18n.t("logstash.agent.flag.log"), @@ -121,7 +132,12 @@ def execute end begin - pipeline = LogStash::Pipeline.new(@config_string) + pipeline = LogStash::Pipeline.new(@config_string, { + :pipeline_workers => pipeline_workers, + :pipeline_batch_size => pipeline_batch_size, + :pipeline_batch_delay => pipeline_batch_delay, + :pipeline_id => "base" + }) rescue LoadError => e fail("Configuration problem.") end @@ -151,8 +167,6 @@ def execute configure_logging(log_file) end - pipeline.configure("filter-workers", filter_workers) if filter_workers > 0 - # Stop now if we are only asking for a config test. if config_test? report "Configuration OK" diff --git a/logstash-core/lib/logstash/config/config_ast.rb b/logstash-core/lib/logstash/config/config_ast.rb index ace7322fedb..5ed6c20e4dd 100644 --- a/logstash-core/lib/logstash/config/config_ast.rb +++ b/logstash-core/lib/logstash/config/config_ast.rb @@ -108,6 +108,7 @@ def compile # defines @filter_func and @output_func definitions << "def #{type}_func(event)" + definitions << " targeted_outputs = []" if type == "output" definitions << " events = [event]" if type == "filter" definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)" @@ -116,6 +117,7 @@ def compile end definitions << " events" if type == "filter" + definitions << " targeted_outputs" if type == "output" definitions << "end" end @@ -237,7 +239,7 @@ def compile events = #{variable_name}.multi_filter(events) CODE when "output" - return "#{variable_name}.handle(event)\n" + return "targeted_outputs << #{variable_name}\n" when "codec" settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" @@ -402,7 +404,7 @@ def cond_func_#{i}(input_events) <<-CODE events = cond_func_#{i}(events) CODE - else + else # Output <<-CODE #{super} end @@ -542,4 +544,4 @@ def _inspect(indent="") "" ) end -end +end \ No newline at end of file diff --git a/logstash-core/lib/logstash/output_delegator.rb b/logstash-core/lib/logstash/output_delegator.rb new file mode 100644 index 00000000000..161e774d86a --- /dev/null +++ b/logstash-core/lib/logstash/output_delegator.rb @@ -0,0 +1,68 @@ +# encoding: utf-8 +require "concurrent/atomic/atomic_fixnum" + +# This class goes hand in hand with the pipeline to provide a pool of +# free workers to be used by pipeline worker threads. The pool is +# internally represented with a SizedQueue set the the size of the number +# of 'workers' the output plugin is configured with. +# +# This plugin also records some basic statistics +module LogStash; class OutputDelegator + attr_reader :workers, :config, :worker_count + + # The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them + # Internally these just get merged together into a single hash + def initialize(logger, klass, *args) + @logger = logger + @config = args.reduce({}, :merge) + @klass = klass + @worker_count = @config["workers"] || 1 + + @worker_queue = SizedQueue.new(@worker_count) + + @workers = @worker_count.times.map do + w = @klass.new(*args) + w.register + @worker_queue << w + w + end + + @events_received = Concurrent::AtomicFixnum.new(0) + end + + def config_name + @klass.config_name + end + + def register + @workers.each {|w| w.register} + end + + def multi_receive(events) + @events_received.increment(events.length) + + worker = @worker_queue.pop + begin + worker.multi_receive(events) + ensure + @worker_queue.push(worker) + end + end + + def do_close + @logger.debug("closing output delegator", :klass => self) + + @worker_count.times do + worker = @worker_queue.pop + worker.do_close + end + end + + def events_received + @events_received.value + end + + def busy_workers + @worker_queue.size + end +end end \ No newline at end of file diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index 29bf7f7e191..c552f202e28 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -4,6 +4,8 @@ require "logstash/plugin" require "logstash/namespace" require "logstash/config/mixin" +require "logstash/util/wrapped_synchronous_queue" +require "concurrent/atomic/atomic_fixnum" class LogStash::Outputs::Base < LogStash::Plugin include LogStash::Config::Mixin @@ -23,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin # Note that this setting may not be useful for all outputs. config :workers, :validate => :number, :default => 1 - attr_reader :worker_plugins, :worker_queue, :worker_threads + attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins public def workers_not_supported(message=nil) @@ -40,6 +42,10 @@ def workers_not_supported(message=nil) def initialize(params={}) super config_init(params) + + # If we're running with a single thread we must enforce single-threaded concurrency by default + # Maybe in a future version we'll assume output plugins are threadsafe + @single_worker_mutex = Mutex.new end public @@ -53,37 +59,9 @@ def receive(event) end # def receive public - def worker_setup - if @workers == 1 - @worker_plugins = [self] - @worker_threads = [] - else - define_singleton_method(:handle, method(:handle_worker)) - @worker_queue = SizedQueue.new(20) - @worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) } - @worker_threads = @worker_plugins.map.with_index do |plugin, i| - Thread.new(original_params, @worker_queue) do |params, queue| - LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}") - LogStash::Util.set_thread_plugin(self) - plugin.register - while true - event = queue.pop - plugin.handle(event) - end - end - end - end - end - - public - def handle(event) - LogStash::Util.set_thread_plugin(self) - receive(event) - end # def handle - - def handle_worker(event) - LogStash::Util.set_thread_plugin(self) - @worker_queue.push(event) + # To be overriden in implementations + def multi_receive(events) + events.each {|event| receive(event) } end private @@ -91,4 +69,4 @@ def output?(event) # TODO: noop for now, remove this once we delete this call from all plugins true end # def output? -end # class LogStash::Outputs::Base +end # class LogStash::Outputs::Base \ No newline at end of file diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 1a74d15456e..635f4d5b853 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -12,20 +12,34 @@ require "logstash/config/cpu_core_strategy" require "logstash/util/defaults_printer" require "logstash/shutdown_controller" +require "logstash/util/wrapped_synchronous_queue" +require "logstash/pipeline_reporter" +require "logstash/output_delegator" module LogStash; class Pipeline - attr_reader :inputs, :filters, :outputs, :input_to_filter, :filter_to_output - - def initialize(configstr) + attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id + + DEFAULT_SETTINGS = { + :default_pipeline_workers => LogStash::Config::CpuCoreStrategy.fifty_percent, + :pipeline_batch_size => 125, + :pipeline_batch_delay => 5, # in milliseconds + :flush_interval => 5, # in seconds + :flush_timeout_interval => 60 # in seconds + } + + def initialize(config_str, settings = {}) + @pipeline_id = settings[:pipeline_id] || self.object_id @logger = Cabin::Channel.get(LogStash) + @reporter = LogStash::PipelineReporter.new(@logger, self) @inputs = nil @filters = nil @outputs = nil - grammar = LogStashConfigParser.new - @config = grammar.parse(configstr) + @worker_threads = [] + grammar = LogStashConfigParser.new + @config = grammar.parse(config_str) if @config.nil? raise LogStash::ConfigurationError, grammar.failure_reason end @@ -42,18 +56,23 @@ def initialize(configstr) raise end - @input_to_filter = SizedQueue.new(20) - # if no filters, pipe inputs directly to outputs - @filter_to_output = filters? ? SizedQueue.new(20) : @input_to_filter - - @settings = { - "default-filter-workers" => LogStash::Config::CpuCoreStrategy.fifty_percent - } + @input_queue = LogStash::Util::WrappedSynchronousQueue.new + @events_filtered = Concurrent::AtomicFixnum.new(0) + @events_consumed = Concurrent::AtomicFixnum.new(0) + # We generally only want one thread at a time able to access pop/take/poll operations + # from this queue. We also depend on this to be able to block consumers while we snapshot + # in-flight buffers + @input_queue_pop_mutex = Mutex.new + @input_threads = [] + @settings = DEFAULT_SETTINGS.clone # @ready requires thread safety since it is typically polled from outside the pipeline thread @ready = Concurrent::AtomicBoolean.new(false) - @input_threads = [] - @filter_threads = [] + @running = Concurrent::AtomicBoolean.new(false) + @flushing = Concurrent::AtomicReference.new(false) + settings.each {|setting, value| configure(setting, value) } + + start_flusher end # def initialize def ready? @@ -64,24 +83,29 @@ def configure(setting, value) @settings[setting] = value end - def safe_filter_worker_count - default = @settings["default-filter-workers"] - thread_count = @settings["filter-workers"] #override from args "-w 8" or config + def safe_pipeline_worker_count + default = DEFAULT_SETTINGS[:default_pipeline_workers] + thread_count = @settings[:pipeline_workers] #override from args "-w 8" or config safe_filters, unsafe_filters = @filters.partition(&:threadsafe?) + if unsafe_filters.any? plugins = unsafe_filters.collect { |f| f.class.config_name } case thread_count when nil # user did not specify a worker thread count # warn if the default is multiple - @logger.warn("Defaulting filter worker threads to 1 because there are some filters that might not work with multiple worker threads", - :count_was => default, :filters => plugins) if default > 1 + + if default > 1 + @logger.warn("Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads", + :count_was => default, :filters => plugins) + end + 1 # can't allow the default value to propagate if there are unsafe filters when 0, 1 1 else @logger.warn("Warning: Manual override - there are filters that might not work with multiple worker threads", - :worker_threads => thread_count, :filters => plugins) + :worker_threads => thread_count, :filters => plugins) thread_count # allow user to force this even if there are unsafe filters end else @@ -94,31 +118,25 @@ def filters? end def run + LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager") @logger.terminal(LogStash::Util::DefaultsPrinter.print(@settings)) - begin - start_inputs - start_filters if filters? - start_outputs - ensure - # it is important to garantee @ready to be true after the startup sequence has been completed - # to potentially unblock the shutdown method which may be waiting on @ready to proceed - @ready.make_true - end + start_workers @logger.info("Pipeline started") @logger.terminal("Logstash startup completed") + # Block until all inputs have stopped + # Generally this happens if SIGINT is sent and `shutdown` is called from an external thread + + @running.make_true wait_inputs + @running.make_false - if filters? - shutdown_filters - wait_filters - flush_filters_to_output!(:final => true) - end + @logger.info("Input plugins stopped! Will shutdown filter/output workers.") - shutdown_outputs - wait_outputs + shutdown_flusher + shutdown_workers @logger.info("Pipeline shutdown complete.") @logger.terminal("Logstash shutdown completed") @@ -127,27 +145,140 @@ def run return 0 end # def run - def wait_inputs - @input_threads.each(&:join) + def start_workers + @inflight_batches = {} + + @worker_threads.clear # In case we're restarting the pipeline + begin + start_inputs + @outputs.each {|o| o.register } + @filters.each {|f| f.register} + + pipeline_workers = safe_pipeline_worker_count + batch_size = @settings[:pipeline_batch_size] + batch_delay = @settings[:pipeline_batch_delay] + @logger.info("Starting pipeline", + :id => self.pipeline_id, + :pipeline_workers => pipeline_workers, + :batch_size => batch_size, + :batch_delay => batch_delay) + + pipeline_workers.times do |t| + @worker_threads << Thread.new do + LogStash::Util.set_thread_name("[#{pipeline_id}]>worker#{t}") + worker_loop(batch_size, batch_delay) + end + end + ensure + # it is important to garantee @ready to be true after the startup sequence has been completed + # to potentially unblock the shutdown method which may be waiting on @ready to proceed + @ready.make_true + end + end + + # Main body of what a worker thread does + # Repeatedly takes batches off the queu, filters, then outputs them + def worker_loop(batch_size, batch_delay) + running = true + + while running + # To understand the purpose behind this synchronize please read the body of take_batch + input_batch, signal = @input_queue_pop_mutex.synchronize { take_batch(batch_size, batch_delay) } + running = false if signal == LogStash::SHUTDOWN + + @events_consumed.increment(input_batch.size) + + filtered_batch = filter_batch(input_batch) + + if signal # Flush on SHUTDOWN or FLUSH + flush_options = (signal == LogStash::SHUTDOWN) ? {:final => true} : {} + flush_filters_to_batch(filtered_batch, flush_options) + end + + @events_filtered.increment(filtered_batch.size) + + output_batch(filtered_batch) + + inflight_batches_synchronize { set_current_thread_inflight_batch(nil) } + end + end + + def take_batch(batch_size, batch_delay) + batch = [] + # Since this is externally synchronized in `worker_look` wec can guarantee that the visibility of an insight batch + # guaranteed to be a full batch not a partial batch + set_current_thread_inflight_batch(batch) + + signal = false + batch_size.times do |t| + event = (t == 0) ? @input_queue.take : @input_queue.poll(batch_delay) + + if event.nil? + next + elsif event == LogStash::SHUTDOWN || event == LogStash::FLUSH + # We MUST break here. If a batch consumes two SHUTDOWN events + # then another worker may have its SHUTDOWN 'stolen', thus blocking + # the pipeline. We should stop doing work after flush as well. + signal = event + break + else + batch << event + end + end + + [batch, signal] + end + + def filter_batch(batch) + batch.reduce([]) do |acc,e| + if e.is_a?(LogStash::Event) + filtered = filter_func(e) + filtered.each {|fe| acc << fe unless fe.cancelled?} + end + acc + end + rescue Exception => e + # Plugins authors should manage their own exceptions in the plugin code + # but if an exception is raised up to the worker thread they are considered + # fatal and logstash will not recover from this situation. + # + # Users need to check their configuration or see if there is a bug in the + # plugin. + @logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.", + "exception" => e, "backtrace" => e.backtrace) + raise end - def shutdown_filters - @flusher_thread.kill - @input_to_filter.push(LogStash::SHUTDOWN) + # Take an array of events and send them to the correct output + def output_batch(batch) + # Build a mapping of { output_plugin => [events...]} + outputs_events = batch.reduce(Hash.new { |h, k| h[k] = [] }) do |acc, event| + # We ask the AST to tell us which outputs to send each event to + # Then, we stick it in the correct bin + output_func(event).each do |output| + acc[output] << event + end + acc + end + # Now that we have our output to event mapping we can just invoke each output + # once with its list of events + outputs_events.each do |output, events| + output.multi_receive(events) + end end - def wait_filters - @filter_threads.each(&:join) + def set_current_thread_inflight_batch(batch) + @inflight_batches[Thread.current] = batch end - def shutdown_outputs - # nothing, filters will do this - @filter_to_output.push(LogStash::SHUTDOWN) + def inflight_batches_synchronize + @input_queue_pop_mutex.synchronize do + yield(@inflight_batches) + end end - def wait_outputs - # Wait for the outputs to stop - @output_threads.each(&:join) + def wait_inputs + @input_threads.each(&:join) end def start_inputs @@ -167,45 +298,15 @@ def start_inputs end end - def start_filters - @filters.each(&:register) - # dynamically get thread count based on filter threadsafety - # moved this test to here to allow for future config reloading - to_start = safe_filter_worker_count - @filter_threads = to_start.times.collect do |i| - Thread.new do - LogStash::Util.set_thread_name("|filterworker.#{i}") - filterworker - end - end - actually_started = @filter_threads.select(&:alive?).size - msg = "Worker threads expected: #{to_start}, worker threads started: #{actually_started}" - if actually_started < to_start - @logger.warn(msg) - else - @logger.info(msg) - end - @flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } } - end - - def start_outputs - @outputs.each(&:register) - @output_threads = [ - Thread.new { outputworker } - ] - end - def start_input(plugin) @input_threads << Thread.new { inputworker(plugin) } end def inputworker(plugin) - LogStash::Util.set_thread_name("<#{plugin.class.config_name}") - LogStash::Util.set_thread_plugin(plugin) + LogStash::Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}") begin - plugin.run(@input_to_filter) + plugin.run(@input_queue) rescue => e - # if plugin is stopping, ignore uncatched exceptions and exit worker if plugin.stop? @logger.debug("Input plugin raised exception during shutdown, ignoring it.", :plugin => plugin.class.config_name, :exception => e, @@ -233,56 +334,6 @@ def inputworker(plugin) end end # def inputworker - def filterworker - begin - while true - event = @input_to_filter.pop - - case event - when LogStash::Event - # filter_func returns all filtered events, including cancelled ones - filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? } - when LogStash::FlushEvent - # handle filter flushing here so that non threadsafe filters (thus only running one filterworker) - # don't have to deal with thread safety implementing the flush method - flush_filters_to_output! - when LogStash::ShutdownEvent - # pass it down to any other filterworker and stop this worker - @input_to_filter.push(event) - break - end - end - rescue Exception => e - # Plugins authors should manage their own exceptions in the plugin code - # but if an exception is raised up to the worker thread they are considered - # fatal and logstash will not recover from this situation. - # - # Users need to check their configuration or see if there is a bug in the - # plugin. - @logger.error("Exception in filterworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.", - "exception" => e, "backtrace" => e.backtrace) - raise - ensure - @filters.each(&:do_close) - end - end # def filterworker - - def outputworker - LogStash::Util.set_thread_name(">output") - @outputs.each(&:worker_setup) - - while true - event = @filter_to_output.pop - break if event == LogStash::SHUTDOWN - output_func(event) - LogStash::Util.set_thread_plugin(nil) - end - ensure - @outputs.each do |output| - output.worker_plugins.each(&:do_close) - end - end # def outputworker - # initiate the pipeline shutdown sequence # this method is intended to be called from outside the pipeline thread # @param before_stop [Proc] code block called before performing stop operation on input plugins @@ -296,13 +347,40 @@ def shutdown(&before_stop) before_stop.call if block_given? + @logger.info "Closing inputs" @inputs.each(&:do_stop) + @logger.info "Closed inputs" end # def shutdown + # After `shutdown` is called from an external thread this is called from the main thread to + # tell the worker threads to stop and then block until they've fully stopped + # This also stops all filter and output plugins + def shutdown_workers + # Each worker thread will receive this exactly once! + @worker_threads.each do |t| + @logger.debug("Pushing shutdown", :thread => t) + @input_queue.push(LogStash::SHUTDOWN) + end + + @worker_threads.each do |t| + @logger.debug("Shutdown waiting for worker thread #{t}") + t.join + end + + @filters.each(&:do_close) + @outputs.each(&:do_close) + end + def plugin(plugin_type, name, *args) args << {} if args.empty? + klass = LogStash::Plugin.lookup(plugin_type, name) - return klass.new(*args) + + if plugin_type == "output" + LogStash::OutputDelegator.new(@logger, klass, *args) + else + klass.new(*args) + end end # for backward compatibility in devutils for the rspec helpers, this method is not used @@ -312,6 +390,7 @@ def filter(event, &block) filter_func(event).each { |e| block.call(e) } end + # perform filters flush and yeild flushed event to the passed block # @param options [Hash] # @option options [Boolean] :final => true to signal a final shutdown flush @@ -323,61 +402,51 @@ def flush_filters(options = {}, &block) end end + def start_flusher + @flusher_thread = Thread.new do + while Stud.stoppable_sleep(5, 0.1) { @running.false? } + flush + break if @running.false? + end + end + end + + def shutdown_flusher + @flusher_thread.join + end + + def flush + if @flushing.compare_and_set(false, true) + @logger.debug? && @logger.debug("Pushing flush onto pipeline") + @input_queue.push(LogStash::FLUSH) + end + end + # perform filters flush into the output queue # @param options [Hash] # @option options [Boolean] :final => true to signal a final shutdown flush - def flush_filters_to_output!(options = {}) + def flush_filters_to_batch(batch, options = {}) flush_filters(options) do |event| unless event.cancelled? @logger.debug? and @logger.debug("Pushing flushed events", :event => event) - @filter_to_output.push(event) + batch << event end end - end # flush_filters_to_output! - - def inflight_count - data = {} - total = 0 - - input_to_filter = @input_to_filter.size - total += input_to_filter - filter_to_output = @filter_to_output.size - total += filter_to_output - - data["input_to_filter"] = input_to_filter if input_to_filter > 0 - data["filter_to_output"] = filter_to_output if filter_to_output > 0 - - output_worker_queues = [] - @outputs.each do |output| - next unless output.worker_queue && output.worker_queue.size > 0 - plugin_info = output.debug_info - size = output.worker_queue.size - total += size - plugin_info << size - output_worker_queues << plugin_info - end - data["output_worker_queues"] = output_worker_queues unless output_worker_queues.empty? - data["total"] = total - data - end - def stalling_threads - plugin_threads - .reject {|t| t["blocked_on"] } # known begnin blocking statuses - .each {|t| t.delete("backtrace") } - .each {|t| t.delete("blocked_on") } - .each {|t| t.delete("status") } - end + @flushing.set(false) + end # flush_filters_to_output! - def plugin_threads - input_threads = @input_threads.select {|t| t.alive? }.map {|t| thread_info(t) } - filter_threads = @filter_threads.select {|t| t.alive? }.map {|t| thread_info(t) } - output_threads = @output_threads.select {|t| t.alive? }.map {|t| thread_info(t) } - output_worker_threads = @outputs.flat_map {|output| output.worker_threads }.map {|t| thread_info(t) } - input_threads + filter_threads + output_threads + output_worker_threads + def plugin_threads_info + input_threads = @input_threads.select {|t| t.alive? } + worker_threads = @worker_threads.select {|t| t.alive? } + (input_threads + worker_threads).map {|t| LogStash::Util.thread_info(t) } end - def thread_info(thread) - LogStash::Util.thread_info(thread) + def stalling_threads_info + plugin_threads_info + .reject {|t| t["blocked_on"] } # known benign blocking statuses + .each {|t| t.delete("backtrace") } + .each {|t| t.delete("blocked_on") } + .each {|t| t.delete("status") } end -end; end +end end \ No newline at end of file diff --git a/logstash-core/lib/logstash/pipeline_reporter.rb b/logstash-core/lib/logstash/pipeline_reporter.rb new file mode 100644 index 00000000000..c7ae6ca847c --- /dev/null +++ b/logstash-core/lib/logstash/pipeline_reporter.rb @@ -0,0 +1,114 @@ +# encoding: utf-8 +require 'ostruct' + +module LogStash; class PipelineReporter + attr_reader :logger, :pipeline + + # This is an immutable copy of the pipeline state, + # It is a proxy to a hash to allow us to add methods dynamically to the hash + class Snapshot + def initialize(data) + @data = data + end + + def to_hash + @data + end + + def to_simple_hash + {"inflight_count" => inflight_count, "stalling_thread_info" => format_threads_by_plugin} + end + + def to_str + to_simple_hash.to_s + end + alias_method :to_s, :to_str + + def method_missing(meth) + @data[meth] + end + + def format_threads_by_plugin + stalled_plugins = {} + stalling_threads_info.each do |thr| + key = (thr.delete("plugin") || "other") + stalled_plugins[key] ||= [] + stalled_plugins[key] << thr + end + stalled_plugins + end + end + + def initialize(logger,pipeline) + @logger = logger + @pipeline = pipeline + end + + # The main way of accessing data from the reporter,, + # this provides a (more or less) consistent snapshot of what's going on in the + # pipeline with some extra decoration + def snapshot + Snapshot.new(self.to_hash) + end + + def to_hash + pipeline.inflight_batches_synchronize do |batch_map| + worker_states_snap = worker_states(batch_map) # We only want to run this once + inflight_count = worker_states_snap.map {|s| s[:inflight_count] }.reduce(0, :+) + + { + :events_filtered => events_filtered, + :events_consumed => events_consumed, + :worker_count => pipeline.worker_threads.size, + :inflight_count => inflight_count, + :worker_states => worker_states_snap, + :output_info => output_info, + :thread_info => pipeline.plugin_threads_info, + :stalling_threads_info => pipeline.stalling_threads_info + } + end + end + + private + + def events_filtered + pipeline.events_filtered.value + end + + def events_consumed + pipeline.events_consumed.value + end + + def plugin_threads + pipeline.plugin_threads + end + + # Not threadsafe! must be called within an `inflight_batches_synchronize` block + def worker_states(batch_map) + pipeline.worker_threads.map.with_index do |thread,idx| + status = thread.status || "dead" + inflight_count = batch_map[thread] ? batch_map[thread].size : 0 + { + :status => status, + :alive => thread.alive?, + :index => idx, + :inflight_count => inflight_count + } + end + end + + def output_info + pipeline.outputs.map do |output_delegator| + is_multi_worker = output_delegator.worker_count > 1 + + { + :type => output_delegator.config_name, + :config => output_delegator.config, + :is_multi_worker => is_multi_worker, + :events_received => output_delegator.events_received, + :workers => output_delegator.workers, + :busy_workers => output_delegator.busy_workers + } + end + end +end end \ No newline at end of file diff --git a/logstash-core/lib/logstash/shutdown_controller.rb b/logstash-core/lib/logstash/shutdown_controller.rb index 6941753bbc8..dd12246ef9f 100644 --- a/logstash-core/lib/logstash/shutdown_controller.rb +++ b/logstash-core/lib/logstash/shutdown_controller.rb @@ -47,10 +47,10 @@ def start cycle_number = 0 stalled_count = 0 Stud.interval(@cycle_period) do - @reports << Report.from_pipeline(@pipeline) + @reports << pipeline_report_snapshot @reports.delete_at(0) if @reports.size > @report_every # expire old report if cycle_number == (@report_every - 1) # it's report time! - logger.warn(@reports.last.to_hash) + logger.warn(@reports.last) if shutdown_stalled? logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0 @@ -69,6 +69,10 @@ def start end end + def pipeline_report_snapshot + @pipeline.reporter.snapshot + end + # A pipeline shutdown is stalled if # * at least REPORT_EVERY reports have been created # * the inflight event count is in monotonically increasing @@ -78,7 +82,7 @@ def shutdown_stalled? return false unless @reports.size == @report_every # # is stalled if inflight count is either constant or increasing stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report| - prev_report.inflight_count["total"] <= next_report.inflight_count["total"] + prev_report.inflight_count <= next_report.inflight_count end if stalled_event_count @reports.each_cons(2).all? do |prev_report, next_report| @@ -93,35 +97,4 @@ def force_exit exit(-1) end end - - class Report - - attr_reader :inflight_count, :stalling_threads - - def self.from_pipeline(pipeline) - new(pipeline.inflight_count, pipeline.stalling_threads) - end - - def initialize(inflight_count, stalling_threads) - @inflight_count = inflight_count - @stalling_threads = format_threads_by_plugin(stalling_threads) - end - - def to_hash - { - "INFLIGHT_EVENT_COUNT" => @inflight_count, - "STALLING_THREADS" => @stalling_threads - } - end - - def format_threads_by_plugin(stalling_threads) - stalled_plugins = {} - stalling_threads.each do |thr| - key = (thr.delete("plugin") || "other") - stalled_plugins[key] ||= [] - stalled_plugins[key] << thr - end - stalled_plugins - end - end end diff --git a/logstash-core/lib/logstash/util/worker_threads_default_printer.rb b/logstash-core/lib/logstash/util/worker_threads_default_printer.rb index 82e88196e96..a4628552f7b 100644 --- a/logstash-core/lib/logstash/util/worker_threads_default_printer.rb +++ b/logstash-core/lib/logstash/util/worker_threads_default_printer.rb @@ -6,8 +6,8 @@ module LogStash module Util class WorkerThreadsDefaultPrinter def initialize(settings) - @setting = settings.fetch('filter-workers', 0) - @default = settings.fetch('default-filter-workers', 0) + @setting = settings.fetch('pipeline-workers', 0) + @default = settings.fetch('default-pipeline-workers', 0) end def visit(collector) @@ -17,12 +17,12 @@ def visit(collector) def visit_setting(collector) return if @setting == 0 - collector.push("User set filter workers: #{@setting}") + collector.push("User set pipeline workers: #{@setting}") end def visit_default(collector) return if @default == 0 - collector.push "Default filter workers: #{@default}" + collector.push "Default pipeline workers: #{@default}" end end end end diff --git a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb new file mode 100644 index 00000000000..e056c08d213 --- /dev/null +++ b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb @@ -0,0 +1,27 @@ +# encoding: utf-8 + +module LogStash; module Util + class WrappedSynchronousQueue + java_import java.util.concurrent.SynchronousQueue + java_import java.util.concurrent.TimeUnit + + def initialize() + @queue = java.util.concurrent.SynchronousQueue.new() + end + + def push(obj) + @queue.put(obj) + end + alias_method(:<<, :push) + + # Blocking + def take + @queue.take() + end + + # Block for X millis + def poll(millis) + @queue.poll(millis, TimeUnit::MILLISECONDS) + end + end +end end \ No newline at end of file diff --git a/logstash-core/locales/en.yml b/logstash-core/locales/en.yml index 3626144521a..d35fb240909 100644 --- a/logstash-core/locales/en.yml +++ b/logstash-core/locales/en.yml @@ -155,8 +155,13 @@ en: the empty string for the '-e' flag. configtest: |+ Check configuration for valid syntax and then exit. - filterworkers: |+ - Sets the number of filter workers to run. + pipeline-workers: |+ + Sets the number of pipeline workers to run. + pipeline-batch-size: |+ + Size of batches the pipeline is to work in. + pipeline-batch-delay: |+ + When creating pipeline batches, how long to wait while polling + for the next event. log: |+ Write logstash internal logs to the given file. Without this flag, logstash will emit diff --git a/logstash-core/spec/logstash/output_delegator_spec.rb b/logstash-core/spec/logstash/output_delegator_spec.rb new file mode 100644 index 00000000000..913fb527947 --- /dev/null +++ b/logstash-core/spec/logstash/output_delegator_spec.rb @@ -0,0 +1,49 @@ +# encoding: utf-8 +require 'spec_helper' + + + +describe LogStash::OutputDelegator do + let(:logger) { double("logger") } + let(:out_klass) { double("output klass") } + let(:out_inst) { double("output instance") } + + subject { described_class.new(logger, out_klass) } + + before do + allow(out_klass).to receive(:new).with(any_args).and_return(out_inst) + allow(out_inst).to receive(:register) + allow(logger).to receive(:debug).with(any_args) + end + + it "should initialize cleanly" do + expect { subject }.not_to raise_error + end + + context "after having received a batch of events" do + let(:events) { 7.times.map { LogStash::Event.new }} + + before do + allow(out_inst).to receive(:multi_receive) + subject.multi_receive(events) + end + + it "should pass the events through" do + expect(out_inst).to have_received(:multi_receive).with(events) + end + + it "should increment the number of events received" do + expect(subject.events_received).to eql(events.length) + end + end + + it "should register all workers on register" do + expect(out_inst).to receive(:register) + subject.register + end + + it "should close all workers when closing" do + expect(out_inst).to receive(:do_close) + subject.do_close + end +end diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index 841ba424df9..59c5a8919cc 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -15,12 +15,13 @@ def receive(event) end end -describe "LogStash::Outputs::Base#worker_setup" do - it "should create workers using original parameters except workers = 1" do +describe "LogStash::Outputs::Base#new" do + it "should instantiate cleanly" do params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 } worker_params = params.dup; worker_params["workers"] = 1 - output = LogStash::Outputs::NOOP.new(params.dup) - expect(LogStash::Outputs::NOOP).to receive(:new).twice.with(worker_params).and_call_original - output.worker_setup + + expect do + LogStash::Outputs::NOOP.new(params.dup) + end.not_to raise_error end end diff --git a/logstash-core/spec/logstash/pipeline_reporter_spec.rb b/logstash-core/spec/logstash/pipeline_reporter_spec.rb new file mode 100644 index 00000000000..bdd83d4ff24 --- /dev/null +++ b/logstash-core/spec/logstash/pipeline_reporter_spec.rb @@ -0,0 +1,85 @@ +# encoding: utf-8 +require "spec_helper" +require "logstash/pipeline" +require "logstash/pipeline_reporter" + +class DummyOutput < LogStash::Outputs::Base + config_name "dummyoutput" + milestone 2 + + attr_reader :num_closes, :events + + def initialize(params={}) + super + @num_closes = 0 + @events = [] + end + + def register + end + + def receive(event) + @events << event + end + + def close + @num_closes += 1 + end +end + +#TODO: Figure out how to add more tests that actually cover inflight events +#This will require some janky multithreading stuff +describe LogStash::PipelineReporter do + let(:generator_count) { 5 } + let(:config) do + "input { generator { count => #{generator_count} } } output { dummyoutput {} } " + end + let(:pipeline) { LogStash::Pipeline.new(config)} + let(:reporter) { pipeline.reporter } + + before do + allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput) + allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_call_original + allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_call_original + + @pre_snapshot = reporter.snapshot + pipeline.run + @post_snapshot = reporter.snapshot + end + + describe "events filtered" do + it "should start at zero" do + expect(@pre_snapshot.events_filtered).to eql(0) + end + + it "should end at the number of generated events" do + expect(@post_snapshot.events_filtered).to eql(generator_count) + end + end + + describe "events consumed" do + it "should start at zero" do + expect(@pre_snapshot.events_consumed).to eql(0) + end + + it "should end at the number of generated events" do + expect(@post_snapshot.events_consumed).to eql(generator_count) + end + end + + describe "inflight count" do + it "should be zero before running" do + expect(@pre_snapshot.inflight_count).to eql(0) + end + + it "should be zero after running" do + expect(@post_snapshot.inflight_count).to eql(0) + end + end + + describe "output states" do + it "should include the count of received events" do + expect(@post_snapshot.output_info.first[:events_received]).to eql(generator_count) + end + end +end \ No newline at end of file diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index 35f7acf0592..b410816ef2d 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -1,5 +1,7 @@ # encoding: utf-8 require "spec_helper" +require "logstash/inputs/generator" +require "logstash/filters/multiline" class DummyInput < LogStash::Inputs::Base config_name "dummyinput" @@ -35,17 +37,19 @@ class DummyOutput < LogStash::Outputs::Base config_name "dummyoutput" milestone 2 - attr_reader :num_closes + attr_reader :num_closes, :events def initialize(params={}) super @num_closes = 0 + @events = [] end def register end def receive(event) + @events << event end def close @@ -80,22 +84,21 @@ def close() end end class TestPipeline < LogStash::Pipeline - attr_reader :outputs, :filter_threads, :settings, :logger + attr_reader :outputs, :settings, :logger end describe LogStash::Pipeline do - let(:worker_thread_count) { 8 } + let(:worker_thread_count) { LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers] } let(:safe_thread_count) { 1 } let(:override_thread_count) { 42 } - describe "defaulting the filter workers based on thread safety" do + describe "defaulting the pipeline workers based on thread safety" do before(:each) do allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput) allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec) allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput) allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter) allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummysafefilter").and_return(DummySafeFilter) - allow(LogStash::Config::CpuCoreStrategy).to receive(:fifty_percent).and_return(worker_thread_count) end context "when there are some not threadsafe filters" do @@ -117,13 +120,13 @@ class TestPipeline < LogStash::Pipeline context "when there is no command line -w N set" do it "starts one filter thread" do - msg = "Defaulting filter worker threads to 1 because there are some" + + msg = "Defaulting pipeline worker threads to 1 because there are some" + " filters that might not work with multiple worker threads" pipeline = TestPipeline.new(test_config_with_filters) expect(pipeline.logger).to receive(:warn).with(msg, {:count_was=>worker_thread_count, :filters=>["dummyfilter"]}) pipeline.run - expect(pipeline.filter_threads.size).to eq(safe_thread_count) + expect(pipeline.worker_threads.size).to eq(safe_thread_count) end end @@ -134,9 +137,9 @@ class TestPipeline < LogStash::Pipeline pipeline = TestPipeline.new(test_config_with_filters) expect(pipeline.logger).to receive(:warn).with(msg, {:worker_threads=> override_thread_count, :filters=>["dummyfilter"]}) - pipeline.configure("filter-workers", override_thread_count) + pipeline.configure(:pipeline_workers, override_thread_count) pipeline.run - expect(pipeline.filter_threads.size).to eq(override_thread_count) + expect(pipeline.worker_threads.size).to eq(override_thread_count) end end end @@ -161,7 +164,7 @@ class TestPipeline < LogStash::Pipeline it "starts multiple filter threads" do pipeline = TestPipeline.new(test_config_with_filters) pipeline.run - expect(pipeline.filter_threads.size).to eq(worker_thread_count) + expect(pipeline.worker_threads.size).to eq(worker_thread_count) end end end @@ -206,8 +209,8 @@ class TestPipeline < LogStash::Pipeline pipeline.run expect(pipeline.outputs.size ).to eq(1) - expect(pipeline.outputs.first.worker_plugins.size ).to eq(1) - expect(pipeline.outputs.first.worker_plugins.first.num_closes ).to eq(1) + expect(pipeline.outputs.first.workers.size ).to eq(1) + expect(pipeline.outputs.first.workers.first.num_closes ).to eq(1) end it "should call output close correctly with output workers" do @@ -215,8 +218,13 @@ class TestPipeline < LogStash::Pipeline pipeline.run expect(pipeline.outputs.size ).to eq(1) - expect(pipeline.outputs.first.num_closes).to eq(0) - pipeline.outputs.first.worker_plugins.each do |plugin| + # We even close the parent output worker, even though it doesn't receive messages + + output_delegator = pipeline.outputs.first + output = output_delegator.workers.first + + expect(output.num_closes).to eq(1) + output_delegator.workers.each do |plugin| expect(plugin.num_closes ).to eq(1) end end @@ -318,8 +326,53 @@ class TestPipeline < LogStash::Pipeline it "doesn't raise an error" do pipeline = TestPipeline.new(pipeline_with_no_filters) pipeline.run - expect { pipeline.stalling_threads }.to_not raise_error + expect { pipeline.stalling_threads_info }.to_not raise_error end end end + + context "Periodic Flush" do + let(:number_of_events) { 100 } + let(:config) do + <<-EOS + input { + generator { + count => #{number_of_events} + } + } + filter { + multiline { + pattern => "^NeverMatch" + negate => true + what => "previous" + } + } + output { + dummyoutput {} + } + EOS + end + let(:output) { DummyOutput.new } + + before do + allow(DummyOutput).to receive(:new).with(any_args).and_return(output) + allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator) + allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain) + allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline) + allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput) + end + + it "flushes the buffered contents of the filter" do + Thread.abort_on_exception = true + pipeline = LogStash::Pipeline.new(config, { :flush_interval => 1 }) + Thread.new { pipeline.run } + sleep 0.1 while !pipeline.ready? + # give us a bit of time to flush the events + wait(5).for do + next unless output && output.events && output.events.first + output.events.first["message"].split("\n").count + end.to eq(number_of_events) + pipeline.shutdown + end + end end diff --git a/logstash-core/spec/logstash/util/defaults_printer_spec.rb b/logstash-core/spec/logstash/util/defaults_printer_spec.rb index ed47cf7ca50..3ec4a8517a8 100644 --- a/logstash-core/spec/logstash/util/defaults_printer_spec.rb +++ b/logstash-core/spec/logstash/util/defaults_printer_spec.rb @@ -10,7 +10,7 @@ end let(:workers) { 1 } - let(:expected) { "Settings: User set filter workers: #{workers}" } + let(:expected) { "Settings: User set pipeline workers: #{workers}" } let(:settings) { {} } describe 'class methods API' do @@ -24,8 +24,8 @@ end context 'when the settings hash has content' do - let(:workers) { 42 } - let(:settings) { {'filter-workers' => workers} } + let(:worker_queue) { 42 } + let(:settings) { {'pipeline-workers' => workers} } it_behaves_like "a defaults printer" end end @@ -42,7 +42,7 @@ context 'when the settings hash has content' do let(:workers) { 13 } - let(:settings) { {'filter-workers' => workers} } + let(:settings) { {'pipeline-workers' => workers} } it_behaves_like "a defaults printer" end diff --git a/logstash-core/spec/logstash/util/worker_threads_default_printer_spec.rb b/logstash-core/spec/logstash/util/worker_threads_default_printer_spec.rb index c2f5391cf38..410d8c9fbf7 100644 --- a/logstash-core/spec/logstash/util/worker_threads_default_printer_spec.rb +++ b/logstash-core/spec/logstash/util/worker_threads_default_printer_spec.rb @@ -19,26 +19,26 @@ end context 'when the settings hash has both user and default content' do - let(:settings) { {'filter-workers' => 42, 'default-filter-workers' => 5} } + let(:settings) { {'pipeline-workers' => 42, 'default-pipeline-workers' => 5} } it 'adds two strings' do - expect(collector).to eq(["User set filter workers: 42", "Default filter workers: 5"]) + expect(collector).to eq(["User set pipeline workers: 42", "Default pipeline workers: 5"]) end end context 'when the settings hash has only user content' do - let(:settings) { {'filter-workers' => 42} } + let(:settings) { {'pipeline-workers' => 42} } - it 'adds a string with user set filter workers' do - expect(collector.first).to eq("User set filter workers: 42") + it 'adds a string with user set pipeline workers' do + expect(collector.first).to eq("User set pipeline workers: 42") end end context 'when the settings hash has only default content' do - let(:settings) { {'default-filter-workers' => 5} } + let(:settings) { {'default-pipeline-workers' => 5} } - it 'adds a string with default filter workers' do - expect(collector.first).to eq("Default filter workers: 5") + it 'adds a string with default pipeline workers' do + expect(collector.first).to eq("Default pipeline workers: 5") end end end diff --git a/spec/core/shutdown_controller_spec.rb b/spec/core/shutdown_controller_spec.rb index 5f755f290a8..3e777aafe9b 100644 --- a/spec/core/shutdown_controller_spec.rb +++ b/spec/core/shutdown_controller_spec.rb @@ -8,10 +8,16 @@ let(:check_threshold) { 100 } subject { LogStash::ShutdownController.new(pipeline, check_every) } let(:pipeline) { double("pipeline") } + let(:reporter) { double("reporter") } + let(:reporter_snapshot) { double("reporter snapshot") } report_count = 0 before :each do - allow(LogStash::Report).to receive(:from_pipeline).and_wrap_original do |m, *args| + allow(pipeline).to receive(:reporter).and_return(reporter) + allow(reporter).to receive(:snapshot).and_return(reporter_snapshot) + allow(reporter_snapshot).to receive(:o_simple_hash).and_return({}) + + allow(subject).to receive(:pipeline_report_snapshot).and_wrap_original do |m, *args| report_count += 1 m.call(*args) end @@ -22,10 +28,10 @@ end context "when pipeline is stalled" do - let(:increasing_count) { (1..5000).to_a.map {|i| { "total" => i } } } + let(:increasing_count) { (1..5000).to_a } before :each do - allow(pipeline).to receive(:inflight_count).and_return(*increasing_count) - allow(pipeline).to receive(:stalling_threads) { { } } + allow(reporter_snapshot).to receive(:inflight_count).and_return(*increasing_count) + allow(reporter_snapshot).to receive(:stalling_threads) { { } } end describe ".unsafe_shutdown = true" do @@ -49,7 +55,7 @@ it "should do exactly \"abort_threshold\"*\"report_every\" stall checks" do allow(subject).to receive(:force_exit) - expect(LogStash::Report).to receive(:from_pipeline).exactly(abort_threshold*report_every).times.and_call_original + expect(subject).to receive(:pipeline_report_snapshot).exactly(abort_threshold*report_every).times.and_call_original subject.start end end @@ -70,10 +76,10 @@ end context "when pipeline is not stalled" do - let(:decreasing_count) { (1..5000).to_a.reverse.map {|i| { "total" => i } } } + let(:decreasing_count) { (1..5000).to_a.reverse } before :each do - allow(pipeline).to receive(:inflight_count).and_return(*decreasing_count) - allow(pipeline).to receive(:stalling_threads) { { } } + allow(reporter_snapshot).to receive(:inflight_count).and_return(*decreasing_count) + allow(reporter_snapshot).to receive(:stalling_threads) { { } } end describe ".unsafe_shutdown = true" do