Skip to content

Commit

Permalink
Use a simple timer thread for flushes instead of a task
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Dec 14, 2015
1 parent ff5c88f commit c02dd53
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
19 changes: 12 additions & 7 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
require "logstash/shutdown_controller"
require "logstash/util/wrapped_synchronous_queue"
require "logstash/pipeline_reporter"
require "concurrent/timer_task"
require "logstash/output_delegator"

module LogStash; class Pipeline
Expand Down Expand Up @@ -69,6 +68,7 @@ def initialize(config_str, settings = {})
@settings = DEFAULT_SETTINGS.clone
# @ready requires thread safety since it is typically polled from outside the pipeline thread
@ready = Concurrent::AtomicBoolean.new(false)
@running = Concurrent::AtomicBoolean.new(false)
@flushing = Concurrent::AtomicReference.new(false)
settings.each {|setting, value| configure(setting, value) }

Expand Down Expand Up @@ -128,7 +128,11 @@ def run

# Block until all inputs have stopped
# Generally this happens if SIGINT is sent and `shutdown` is called from an external thread

@running.make_true
wait_inputs
@running.make_false

@logger.info("Input plugins stopped! Will shutdown filter/output workers.")

shutdown_flusher
Expand Down Expand Up @@ -419,15 +423,16 @@ def update(time, result, exception)
end

def start_flusher
@flusher_task = Concurrent::TimerTask.new { flush }
@flusher_task.execution_interval = @settings[:flush_interval]
@flusher_task.timeout_interval = @settings[:flush_timeout_interval]
@flusher_task.add_observer(FlusherObserver.new(@logger, @flushing))
@flusher_task.execute
@flusher_thread = Thread.new do
while Stud.stoppable_sleep(5, 0.1) { @running.false? }
flush
break if @running.false?
end
end
end

def shutdown_flusher
@flusher_task.shutdown
@flusher_thread.join
end

def flush
Expand Down
5 changes: 4 additions & 1 deletion logstash-core/spec/logstash/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ class TestPipeline < LogStash::Pipeline
Thread.new { pipeline.run }
sleep 0.1 while !pipeline.ready?
# give us a bit of time to flush the events
wait(5).for { output.events.first["message"].split("\n").count }.to eq(number_of_events)
wait(5).for do
next unless output && output.events && output.events.first
output.events.first["message"].split("\n").count
end.to eq(number_of_events)
pipeline.shutdown
end
end
Expand Down

0 comments on commit c02dd53

Please sign in to comment.