diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index f9ff2f1..cb27260 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -1,6 +1,7 @@ require 'logstash/namespace' require 'logstash/inputs/base' require 'jruby-kafka' +require 'stud/interval' # This input will read events from a Kafka topic. It uses the high level consumer API provided # by Kafka to read messages from the broker. It also maintains the state of what has been @@ -133,31 +134,32 @@ def run(logstash_queue) @logger.info('Running kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect) begin @consumer_group.run(@consumer_threads,@kafka_client_queue) - begin - while true + + while !stop? + if !@kafka_client_queue.empty? event = @kafka_client_queue.pop queue_event(event, logstash_queue) end - rescue LogStash::ShutdownSignal - @logger.info('Kafka got shutdown signal') - @consumer_group.shutdown end + until @kafka_client_queue.empty? queue_event(@kafka_client_queue.pop,logstash_queue) end + @logger.info('Done running kafka input') rescue => e @logger.warn('kafka client threw exception, restarting', :exception => e) - if @consumer_group.running? - @consumer_group.shutdown - end - sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000) - retry + Stud.stoppable_sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000) { stop? } + retry if !stop? end - finished end # def run + public + def stop + @consumer_group.shutdown if @consumer_group.running? + end + private def create_consumer_group(options) Kafka::Group.new(options) @@ -182,5 +184,4 @@ def queue_event(message_and_metadata, output_queue) :backtrace => e.backtrace) end # begin end # def queue_event - end #class LogStash::Inputs::Kafka diff --git a/logstash-input-kafka.gemspec b/logstash-input-kafka.gemspec index 7714332..8189d10 100644 --- a/logstash-input-kafka.gemspec +++ b/logstash-input-kafka.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0' s.add_runtime_dependency 'logstash-codec-json' s.add_runtime_dependency 'logstash-codec-plain' + s.add_runtime_dependency 'stud', '>= 0.0.22', '< 0.1.0' s.add_runtime_dependency 'jruby-kafka', ['>= 1.2.0', '< 2.0.0'] diff --git a/spec/inputs/kafka_spec.rb b/spec/inputs/kafka_spec.rb index f861f9f..d2504e9 100644 --- a/spec/inputs/kafka_spec.rb +++ b/spec/inputs/kafka_spec.rb @@ -7,8 +7,7 @@ class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka private def queue_event(msg, output_queue) super(msg, output_queue) - # need to raise exception here to stop the infinite loop - raise LogStash::ShutdownSignal + do_stop end end @@ -30,7 +29,26 @@ def run(a_num_threads, a_queue) end end -describe 'inputs/kafka' do +class LogStash::Inputs::TestInfiniteKafka < LogStash::Inputs::Kafka + private + def queue_event(msg, output_queue) + super(msg, output_queue) + end +end + +class TestInfiniteKafkaGroup < Kafka::Group + def run(a_num_threads, a_queue) + blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message') + Thread.new do + while true + a_queue << blah + sleep 0.2 + end + end + end +end + +describe LogStash::Inputs::Kafka do let (:kafka_config) {{'topic_id' => 'test'}} let (:empty_config) {{}} let (:bad_kafka_config) {{'topic_id' => 'test', 'white_list' => 'other_topic'}} @@ -57,6 +75,18 @@ def run(a_num_threads, a_queue) expect {input.register}.to raise_error end + it_behaves_like "an interruptible input plugin" do + let(:config) { kafka_config } + let(:mock_kafka_plugin) { LogStash::Inputs::TestInfiniteKafka.new(config) } + + before :each do + allow(LogStash::Inputs::Kafka).to receive(:new).and_return(mock_kafka_plugin) + expect(subject).to receive(:create_consumer_group) do |options| + TestInfiniteKafkaGroup.new(options) + end + end + end + it 'should populate kafka config with default values' do kafka = LogStash::Inputs::TestKafka.new(kafka_config) insist {kafka.zk_connect} == 'localhost:2181' @@ -98,5 +128,4 @@ def run(a_num_threads, a_queue) insist { e['kafka']['partition'] } == 0 insist { e['kafka']['key'] } == nil end - end