Skip to content

Commit

Permalink
add shutdown controller to force exit on stalled shutdown
Browse files Browse the repository at this point in the history
* start logstash with --force-shutdown to force_exit on stall
* stall detection kicks in when SIGTERM/SIGINT is received
* add DeadLetterPostOffice to receive dead letter events
* plugins have a dead_letter(event) api to send to dead_letter
* dead_letter destination is a file
  • Loading branch information
jsvd committed Oct 22, 2015
1 parent f3527dc commit 53956cf
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 35 deletions.
14 changes: 10 additions & 4 deletions lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class LogStash::Agent < Clamp::Command
I18n.t("logstash.agent.flag.configtest"),
:attribute_name => :config_test

option "--[no-]force-shutdown", :flag,
I18n.t("logstash.agent.flag.force_shutdown"),
:attribute_name => :force_shutdown,
:default => false

# Emit a warning message.
def warn(message)
# For now, all warnings are fatal.
Expand All @@ -75,6 +80,9 @@ def execute
require "logstash/plugin"
@logger = Cabin::Channel.get(LogStash)

LogStash::ShutdownController.force_shutdown = force_shutdown?
LogStash::DeadLetterPostOffice.logger = @logger

if version?
show_version
return 0
Expand Down Expand Up @@ -114,6 +122,7 @@ def execute

begin
pipeline = LogStash::Pipeline.new(@config_string)
LogStash::DeadLetterPostOffice.destination = LogStash::DeadLetterPostOffice::Destination::File.new
rescue LoadError => e
fail("Configuration problem.")
end
Expand Down Expand Up @@ -175,10 +184,7 @@ def execute
end # def execute

def shutdown(pipeline)
pipeline.shutdown do
InflightEventsReporter.logger = @logger
InflightEventsReporter.start(pipeline.input_to_filter, pipeline.filter_to_output, pipeline.outputs)
end
pipeline.shutdown
end

def show_version
Expand Down
55 changes: 55 additions & 0 deletions lib/logstash/dead_letter_post_office.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# encoding: utf-8

class LogStash::DeadLetterPostOffice

def self.logger=(logger)
@logger = logger
end

def self.logger
@logger ||= Cabin::Channel.get(LogStash)
end

def self.destination=(destination)
logger.info("Dead letter events will be sent to \"#{destination.location}\".")
@destination = destination
end

def self.<<(events)
events = [events] unless events.is_a?(Array)

events.each do |event|
logger.warn("dead letter received!", :event => event.to_hash)
event.tag("_dead_letter")
event.cancel
@destination << event
end
end

module Destination

class Base
def location; end
def <<(event); end
end

class File < Base

START_TIME = Time.now
DUMP_PATH = ::File.join("/tmp", "dump.#{START_TIME.strftime("%Y%m%d%H%M%S")}.log")

def initialize(path=DUMP_PATH)
@path = path
@file = ::File.open(path, "w")
end

def location
@path
end

def <<(event)
@file.puts(event.to_json)
end
end
end
end
43 changes: 40 additions & 3 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/util/reporter"
require "logstash/config/cpu_core_strategy"
require "logstash/util/defaults_printer"
require "logstash/shutdown_controller"
require "logstash/dead_letter_post_office"

class LogStash::Pipeline
module LogStash; class Pipeline
attr_reader :inputs, :filters, :outputs, :input_to_filter, :filter_to_output

def initialize(configstr)
Expand All @@ -25,6 +26,7 @@ def initialize(configstr)

grammar = LogStashConfigParser.new
@config = grammar.parse(configstr)

if @config.nil?
raise LogStash::ConfigurationError, grammar.failure_reason
end
Expand Down Expand Up @@ -266,13 +268,21 @@ def shutdown(&before_stop)
# shutdown method which can be called from another thread at any time
sleep(0.1) while !ready?

shutdown_controller = ::LogStash::ShutdownController.new(self)
shutdown_controller.logger = @logger
shutdown_controller.start

# TODO: should we also check against calling shutdown multiple times concurently?

before_stop.call if block_given?

@inputs.each(&:do_stop)
end # def shutdown

def force_exit
exit(-1)
end

def plugin(plugin_type, name, *args)
args << {} if args.empty?
klass = LogStash::Plugin.lookup(plugin_type, name)
Expand Down Expand Up @@ -309,4 +319,31 @@ def flush_filters_to_output!(options = {})
end
end # flush_filters_to_output!

end # class Pipeline
def inflight_count
data = {
"input_to_filter" => @input_to_filter.size,
"filter_to_output" => @filter_to_output.size,
"outputs" => []
}
@outputs.each do |output|
next unless output.worker_queue && output.worker_queue.size > 0
data["outputs"] << [output.inspect, output.worker_queue.size]
end

data["total"] = data["input_to_filter"] + data["filter_to_output"] +
data["outputs"].map(&:last).inject(0, :+)
data
end

def dump
dump = []
[@input_to_filter].each do |queue|
until queue.empty? do
event = queue.pop(true) rescue ThreadError # non-block pop
next unless event.is_a?(LogStash::Event)
dump << event
end
end
dump
end
end; end
5 changes: 5 additions & 0 deletions lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def inspect
end
end

def dead_letter(event)
return unless event.is_a?(LogStash::Event)
LogStash::DeadLetterPostOffice << event
end

# Look up a plugin by type and name.
public
def self.lookup(type, name)
Expand Down
62 changes: 62 additions & 0 deletions lib/logstash/shutdown_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# encoding: utf-8
module LogStash
class ShutdownController

REPORT_CYCLE = 5 # seconds
REPORTS = []
NUM_REPORTS = 3

def self.force_shutdown=(boolean)
@force_shutdown = boolean
end

def self.force_shutdown?
@force_shutdown
end

def logger=(logger)
@logger = logger
end

def logger
@logger ||= Cabin::Channel.get(LogStash)
end

def initialize(pipeline)
@pipeline = pipeline
end

def start(cycle=REPORT_CYCLE)
@thread ||= Thread.new do
Stud.interval(cycle) do
REPORTS << @pipeline.inflight_count
REPORTS.delete_at(0) if REPORTS.size > NUM_REPORTS # expire old report
report(REPORTS.last)
if self.class.force_shutdown? && stalled?
logger.fatal("Stalled pipeline detected. Forcefully quitting logstash..")
DeadLetterPostOffice << @pipeline.dump
@pipeline.force_exit()
break
end
end
end
end

def stop!
@thread.terminate if @thread.is_a?(Thread)
@thread = nil
end

def report(report)
logger.warn ["INFLIGHT_EVENTS_REPORT", Time.now.iso8601, report]
end

def stalled?
return false unless REPORTS.size == NUM_REPORTS
# is stalled if inflight count is either constant or increasing
REPORTS.each_cons(2).all? do |prev_report, next_report|
prev_report["total"] <= next_report["total"]
end
end
end
end
28 changes: 0 additions & 28 deletions lib/logstash/util/reporter.rb

This file was deleted.

5 changes: 5 additions & 0 deletions locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,8 @@ en:
debug: |+
Most verbose logging. This causes 'debug'
level logs to be emitted.
force_shutdown: |+
Force logstash to exit during shutdown even
if there are still inflight events in memory.
By default, logstash will refuse to quit until all
received events have been pushed to the outputs.
46 changes: 46 additions & 0 deletions spec/core/dead_letter_post_office_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# encoding: utf-8
require "spec_helper"

describe LogStash::DeadLetterPostOffice do

describe ".<<" do
subject { LogStash::DeadLetterPostOffice }
let(:filter) { LogStash::Filter::Base.new }
let(:event) { LogStash::Event.new("message" => "test") }
let(:destination) { LogStash::DeadLetterPostOffice::Destination::Base.new }

before :each do
subject.destination = destination
allow(destination).to receive(:<<) {|event| }
end

it "should send event to destination" do
expect(destination).to receive(:<<).with(event)
subject << event
end

it "should tag the event with \"_dead_letter\"" do
subject << event
expect(event["tags"]).to include("_dead_letter")
end

it "should cancel the event" do
subject << event
expect(event).to be_cancelled
end

context "array of events" do
let(:event1) { LogStash::Event.new("message" => "test1") }
let(:event2) { LogStash::Event.new("message" => "test2") }
let(:event3) { LogStash::Event.new("message" => "test3") }
let(:events) { [event1, event2, event3] }

it "should push each event to the destination" do
expect(destination).to receive(:<<).with(event1)
expect(destination).to receive(:<<).with(event2)
expect(destination).to receive(:<<).with(event3)
subject << events
end
end
end
end
Loading

0 comments on commit 53956cf

Please sign in to comment.