Skip to content

Commit

Permalink
add options to kafka_consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
trinitum committed Oct 21, 2016
1 parent 8c5c2ed commit a24fe88
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions bin/kafka_consumer.pl
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,30 @@
use warnings;

use Kafka::Librd;
use Getopt::Long;

GetOptions(
"group-id=s" => \my $group_id,
"topic=s" => \my @topics,
"brokers=s" => \my $brokers,
"debug" => \my $debug,
);

$group_id //= "test-consumer";
$brokers //= "localhost:9092";

my $kafka = Kafka::Librd->new(
Kafka::Librd::RD_KAFKA_CONSUMER,
{
'group.id' => 'test-consumer',

#debug => 'generic,cgrp,topic,fetch',
'group.id' => $group_id,
( $debug ? ( debug => 'cgrp,topic,fetch' ) : () ),
},
);

my $added = $kafka->brokers_add('localhost:9092');
my $added = $kafka->brokers_add($brokers);
say "Added $added brokers";

my $err = $kafka->subscribe( [ 'test1', 'test2', 'abcdef', ] );
my $err = $kafka->subscribe( \@topics );
if ( $err != 0 ) {
die "Couldn't subscribe: ", Kafka::Librd::Error::to_string($err);
}
Expand All @@ -44,6 +54,6 @@

$kafka->consumer_close;

$kafka = undef;
$kafka->destroy;

Kafka::Librd::rd_kafka_wait_destroyed(5000);

0 comments on commit a24fe88

Please sign in to comment.