Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to new shutdown semantics #42

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'logstash/namespace'
require 'logstash/inputs/base'
require 'jruby-kafka'
require 'stud/interval'

# This input will read events from a Kafka topic. It uses the high level consumer API provided
# by Kafka to read messages from the broker. It also maintains the state of what has been
Expand Down Expand Up @@ -133,31 +134,32 @@ def run(logstash_queue)
@logger.info('Running kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
begin
@consumer_group.run(@consumer_threads,@kafka_client_queue)
begin
while true

while !stop?
if !@kafka_client_queue.empty?
event = @kafka_client_queue.pop
queue_event(event, logstash_queue)
end
rescue LogStash::ShutdownSignal
@logger.info('Kafka got shutdown signal')
@consumer_group.shutdown
end

until @kafka_client_queue.empty?
queue_event(@kafka_client_queue.pop,logstash_queue)
end

@logger.info('Done running kafka input')
rescue => e
@logger.warn('kafka client threw exception, restarting',
:exception => e)
if @consumer_group.running?
@consumer_group.shutdown
end
sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000)
retry
Stud.stoppable_sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000) { stop? }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Float * 1 / 1000 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iono, I didn't want to edit this. looks like @consumer_restart_sleep_ms is just a number so this forces it to be a float for more precision.

retry if !stop?
end
finished
end # def run

public
def stop
@consumer_group.shutdown if @consumer_group.running?
end

private
def create_consumer_group(options)
Kafka::Group.new(options)
Expand All @@ -182,5 +184,4 @@ def queue_event(message_and_metadata, output_queue)
:backtrace => e.backtrace)
end # begin
end # def queue_event

end #class LogStash::Inputs::Kafka
1 change: 1 addition & 0 deletions logstash-input-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0'
s.add_runtime_dependency 'logstash-codec-json'
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud', '>= 0.0.22', '< 0.1.0'

s.add_runtime_dependency 'jruby-kafka', ['>= 1.2.0', '< 2.0.0']

Expand Down
37 changes: 33 additions & 4 deletions spec/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ class LogStash::Inputs::TestKafka < LogStash::Inputs::Kafka
private
def queue_event(msg, output_queue)
super(msg, output_queue)
# need to raise exception here to stop the infinite loop
raise LogStash::ShutdownSignal
do_stop
end
end

Expand All @@ -30,7 +29,26 @@ def run(a_num_threads, a_queue)
end
end

describe 'inputs/kafka' do
class LogStash::Inputs::TestInfiniteKafka < LogStash::Inputs::Kafka
private
def queue_event(msg, output_queue)
super(msg, output_queue)
end
end

class TestInfiniteKafkaGroup < Kafka::Group
def run(a_num_threads, a_queue)
blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message')
Thread.new do
while true
a_queue << blah
sleep 0.2
end
end
end
end

describe LogStash::Inputs::Kafka do
let (:kafka_config) {{'topic_id' => 'test'}}
let (:empty_config) {{}}
let (:bad_kafka_config) {{'topic_id' => 'test', 'white_list' => 'other_topic'}}
Expand All @@ -57,6 +75,18 @@ def run(a_num_threads, a_queue)
expect {input.register}.to raise_error
end

it_behaves_like "an interruptible input plugin" do
let(:config) { kafka_config }
let(:mock_kafka_plugin) { LogStash::Inputs::TestInfiniteKafka.new(config) }

before :each do
allow(LogStash::Inputs::Kafka).to receive(:new).and_return(mock_kafka_plugin)
expect(subject).to receive(:create_consumer_group) do |options|
TestInfiniteKafkaGroup.new(options)
end
end
end

it 'should populate kafka config with default values' do
kafka = LogStash::Inputs::TestKafka.new(kafka_config)
insist {kafka.zk_connect} == 'localhost:2181'
Expand Down Expand Up @@ -98,5 +128,4 @@ def run(a_num_threads, a_queue)
insist { e['kafka']['partition'] } == 0
insist { e['kafka']['key'] } == nil
end

end