Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shutdown controller to force exit on stalled shutdown #4051

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class LogStash::Agent < Clamp::Command
I18n.t("logstash.agent.flag.configtest"),
:attribute_name => :config_test

option "--[no-]allow-unsafe-shutdown", :flag,
I18n.t("logstash.agent.flag.unsafe_shutdown"),
:attribute_name => :unsafe_shutdown,
:default => false

# Emit a warning message.
def warn(message)
# For now, all warnings are fatal.
Expand All @@ -75,6 +80,9 @@ def execute
require "logstash/plugin"
@logger = Cabin::Channel.get(LogStash)

LogStash::ShutdownController.unsafe_shutdown = unsafe_shutdown?
LogStash::ShutdownController.logger = @logger

if version?
show_version
return 0
Expand Down Expand Up @@ -176,8 +184,7 @@ def execute

def shutdown(pipeline)
pipeline.shutdown do
InflightEventsReporter.logger = @logger
InflightEventsReporter.start(pipeline.input_to_filter, pipeline.filter_to_output, pipeline.outputs)
::LogStash::ShutdownController.start(pipeline)
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are removing most the content of this method, is there any reason not to call pipeline.shutdown directly and remove the inderection?


Expand Down
1 change: 1 addition & 0 deletions lib/logstash/filters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def filter(event)
# @return [Array<LogStash::Event] filtered events and any new events generated by the filter
public
def multi_filter(events)
LogStash::Util.set_thread_plugin(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

result = []
events.each do |event|
unless event.cancelled?
Expand Down
10 changes: 7 additions & 3 deletions lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1

attr_reader :worker_plugins, :worker_queue
attr_reader :worker_plugins, :worker_queue, :worker_threads

public
def workers_not_supported(message=nil)
Expand Down Expand Up @@ -56,13 +56,15 @@ def receive(event)
def worker_setup
if @workers == 1
@worker_plugins = [self]
@worker_threads = []
else
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
@worker_plugins.map.with_index do |plugin, i|
@worker_threads = @worker_plugins.map.with_index do |plugin, i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util::set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_plugin(self)
plugin.register
while true
event = queue.pop
Expand All @@ -75,10 +77,12 @@ def worker_setup

public
def handle(event)
LogStash::Util.set_thread_plugin(self)
receive(event)
end # def handle

def handle_worker(event)
LogStash::Util.set_thread_plugin(self)
@worker_queue.push(event)
end

Expand Down
64 changes: 57 additions & 7 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/util/reporter"
require "logstash/config/cpu_core_strategy"
require "logstash/util/defaults_printer"
require "logstash/shutdown_controller"

class LogStash::Pipeline
module LogStash; class Pipeline
attr_reader :inputs, :filters, :outputs, :input_to_filter, :filter_to_output

def initialize(configstr)
Expand All @@ -25,6 +25,7 @@ def initialize(configstr)

grammar = LogStashConfigParser.new
@config = grammar.parse(configstr)

if @config.nil?
raise LogStash::ConfigurationError, grammar.failure_reason
end
Expand Down Expand Up @@ -150,8 +151,11 @@ def start_inputs
def start_filters
@filters.each(&:register)
to_start = @settings["filter-workers"]
@filter_threads = to_start.times.collect do
Thread.new { filterworker }
@filter_threads = to_start.times.collect do |i|
Thread.new do
LogStash::Util.set_thread_name("|filterworker.#{i}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

filterworker
end
end
actually_started = @filter_threads.select(&:alive?).size
msg = "Worker threads expected: #{to_start}, worker threads started: #{actually_started}"
Expand All @@ -175,7 +179,8 @@ def start_input(plugin)
end

def inputworker(plugin)
LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
LogStash::Util.set_thread_name("<#{plugin.class.config_name}")
LogStash::Util.set_thread_plugin(plugin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love having this around for troubleshooting!

begin
plugin.run(@input_to_filter)
rescue => e
Expand Down Expand Up @@ -208,7 +213,6 @@ def inputworker(plugin)
end # def inputworker

def filterworker
LogStash::Util.set_thread_name("|worker")
begin
while true
event = @input_to_filter.pop
Expand Down Expand Up @@ -250,6 +254,7 @@ def outputworker
event = @filter_to_output.pop
break if event == LogStash::SHUTDOWN
output_func(event)
LogStash::Util.set_thread_plugin(nil)
end
ensure
@outputs.each do |output|
Expand Down Expand Up @@ -309,4 +314,49 @@ def flush_filters_to_output!(options = {})
end
end # flush_filters_to_output!

end # class Pipeline
def inflight_count
data = {}
total = 0

input_to_filter = @input_to_filter.size
total += input_to_filter
filter_to_output = @filter_to_output.size
total += filter_to_output

data["input_to_filter"] = input_to_filter if input_to_filter > 0
data["filter_to_output"] = filter_to_output if filter_to_output > 0

output_worker_queues = []
@outputs.each do |output|
next unless output.worker_queue && output.worker_queue.size > 0
plugin_info = output.debug_info
size = output.worker_queue.size
total += size
plugin_info << size
output_worker_queues << plugin_info
end
data["output_worker_queues"] = output_worker_queues unless output_worker_queues.empty?
data["total"] = total
data
end

def stalling_threads
plugin_threads
.reject {|t| t["blocked_on"] } # known begnin blocking statuses
.each {|t| t.delete("backtrace") }
.each {|t| t.delete("blocked_on") }
.each {|t| t.delete("status") }
end

def plugin_threads
input_threads = @input_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
filter_threads = @filter_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
output_threads = @output_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
output_worker_threads = @outputs.flat_map {|output| output.worker_threads }.map {|t| thread_info(t) }
input_threads + filter_threads + output_threads + output_worker_threads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think to refactor {|t| t.alive? }.map {|t| thread_info(t) } into a proc?

end

def thread_info(thread)
LogStash::Util.thread_info(thread)
end
end; end
5 changes: 5 additions & 0 deletions lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def inspect
end
end

public
def debug_info
[self.class.to_s, original_params]
end

# Look up a plugin by type and name.
public
def self.lookup(type, name)
Expand Down
127 changes: 127 additions & 0 deletions lib/logstash/shutdown_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# encoding: utf-8

module LogStash
class ShutdownController

CHECK_EVERY = 1 # second
REPORT_EVERY = 5 # checks
ABORT_AFTER = 3 # stalled reports

attr_reader :cycle_period, :report_every, :abort_threshold

def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
@pipeline = pipeline
@cycle_period = cycle_period
@report_every = report_every
@abort_threshold = abort_threshold
@reports = []
end

def self.unsafe_shutdown=(boolean)
@unsafe_shutdown = boolean
end

def self.unsafe_shutdown?
@unsafe_shutdown
end

def self.logger=(logger)
@logger = logger
end

def self.logger
@logger ||= Cabin::Channel.get(LogStash)
end

def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
controller = self.new(pipeline, cycle_period, report_every, abort_threshold)
Thread.new(controller) { |controller| controller.start }
end

def logger
self.class.logger
end

def start
sleep(@cycle_period)
cycle_number = 0
stalled_count = 0
Stud.interval(@cycle_period) do
@reports << Report.from_pipeline(@pipeline)
@reports.delete_at(0) if @reports.size > @report_every # expire old report
if cycle_number == (@report_every - 1) # it's report time!
logger.warn(@reports.last.to_hash)

if shutdown_stalled?
logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0
stalled_count += 1

if self.class.unsafe_shutdown? && @abort_threshold == stalled_count
logger.fatal("Forcefully quitting logstash..")
force_exit()
break
end
else
stalled_count = 0
end
end
cycle_number = (cycle_number + 1) % @report_every
end
end

# A pipeline shutdown is stalled if
# * at least REPORT_EVERY reports have been created
# * the inflight event count is in monotonically increasing
# * there are worker threads running which aren't blocked on SizedQueue pop/push
# * the stalled thread list is constant in the previous REPORT_EVERY reports
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the comment

def shutdown_stalled?
return false unless @reports.size == @report_every #
# is stalled if inflight count is either constant or increasing
stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report|
prev_report.inflight_count["total"] <= next_report.inflight_count["total"]
end
if stalled_event_count
@reports.each_cons(2).all? do |prev_report, next_report|
prev_report.stalling_threads == next_report.stalling_threads
end
else
false
end
end

def force_exit
exit(-1)
end
end

class Report

attr_reader :inflight_count, :stalling_threads

def self.from_pipeline(pipeline)
new(pipeline.inflight_count, pipeline.stalling_threads)
end

def initialize(inflight_count, stalling_threads)
@inflight_count = inflight_count
@stalling_threads = format_threads_by_plugin(stalling_threads)
end

def to_hash
{
"INFLIGHT_EVENT_COUNT" => @inflight_count,
"STALLING_THREADS" => @stalling_threads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use the accessor here for theses values.
@stalling_threads => stalling_threads

}
end

def format_threads_by_plugin(stalling_threads)
stalled_plugins = {}
stalling_threads.each do |thr|
key = (thr.delete("plugin") || "other")
stalled_plugins[key] ||= []
stalled_plugins[key] << thr
end
stalled_plugins
end
end
end
35 changes: 35 additions & 0 deletions lib/logstash/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,41 @@ def self.set_thread_name(name)
end
end # def set_thread_name

def self.set_thread_plugin(plugin)
Thread.current[:plugin] = plugin
end

def self.get_thread_id(thread)
if RUBY_ENGINE == "jruby"
JRuby.reference(thread).native_thread.id
else
raise Exception.new("Native thread IDs aren't supported outside of JRuby")
end
end

def self.thread_info(thread)
backtrace = thread.backtrace.map do |line|
line.gsub(LogStash::Environment::LOGSTASH_HOME, "[...]")
end

blocked_on = case backtrace.first
when /in `push'/ then "blocked_on_push"
when /(?:pipeline|base).*pop/ then "waiting_for_events"
else nil
end

{
"thread_id" => get_thread_id(thread),
"name" => thread[:name],
"plugin" => (thread[:plugin] ? thread[:plugin].debug_info : nil),
"backtrace" => backtrace,
"blocked_on" => blocked_on,
"status" => thread.status,
"current_call" => backtrace.first
}
end


# Merge hash 'src' into 'dst' nondestructively
#
# Duplicate keys will become array values
Expand Down
Loading