Skip to content

Commit

Permalink
Merge pull request #6 from trinitum/consumer-api
Browse files Browse the repository at this point in the history
Complete High-Level Consumer API
  • Loading branch information
trinitum authored Mar 3, 2017
2 parents 17b30c2 + ac33f1b commit d02ddb9
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 4 deletions.
140 changes: 139 additions & 1 deletion Rdkafka.xs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ krd__new(type, params)
conf = krd_parse_config(aTHX_ RETVAL, params);
rk = rd_kafka_new(type, conf, errstr, 1024);
if (rk == NULL) {
croak(errstr);
croak("%s", errstr);
}
RETVAL->rk = rk;
RETVAL->thx = (IV)PERL_GET_THX;
Expand Down Expand Up @@ -88,6 +88,144 @@ krd_unsubscribe(rdk)
OUTPUT:
RETVAL

SV*
krd_subscription(rdk)
rdkafka_t* rdk
PREINIT:
rd_kafka_topic_partition_list_t* tpar;
rd_kafka_resp_err_t err;
AV* tp;
CODE:
err = rd_kafka_subscription(rdk->rk, &tpar);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
croak("Error retrieving subscriptions: %s", rd_kafka_err2str(err));
}
tp = krd_expand_topic_partition_list(aTHX_ tpar);
rd_kafka_topic_partition_list_destroy(tpar);
RETVAL = newRV_noinc((SV*)tp);
OUTPUT:
RETVAL

int
krd_assign(rdk, tplistsv = NULL)
rdkafka_t* rdk
SV* tplistsv
PREINIT:
AV* tplist;
rd_kafka_topic_partition_list_t* tpar = NULL;
CODE:
if (tplistsv != NULL && SvOK(tplistsv)) {
if (!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
croak("first argument must be an array reference");
}
tplist = (AV*)SvRV(tplistsv);
tpar = krd_parse_topic_partition_list(aTHX_ tplist);
}
RETVAL = rd_kafka_assign(rdk->rk, tpar);
OUTPUT:
RETVAL

SV*
krd_assignment(rdk)
rdkafka_t *rdk
PREINIT:
rd_kafka_topic_partition_list_t* tpar;
rd_kafka_resp_err_t err;
AV* tp;
CODE:
err = rd_kafka_assignment(rdk->rk, &tpar);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
croak("Error retrieving assignments: %s", rd_kafka_err2str(err));
}
tp = krd_expand_topic_partition_list(aTHX_ tpar);
rd_kafka_topic_partition_list_destroy(tpar);
RETVAL = newRV_noinc((SV*)tp);
OUTPUT:
RETVAL

int
krd_commit(rdk, tplistsv = NULL, async = 0)
rdkafka_t* rdk
SV* tplistsv
int async
PREINIT:
AV* tplist;
rd_kafka_topic_partition_list_t* tpar = NULL;
CODE:
if (tplistsv != NULL && SvOK(tplistsv)) {
if(!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
croak("first argument must be an array reference");
}
tplist = (AV*)SvRV(tplistsv);
tpar = krd_parse_topic_partition_list(aTHX_ tplist);
}
RETVAL = rd_kafka_commit(rdk->rk, tpar, async);
OUTPUT:
RETVAL

int
krd_commit_message(rdk, msg, async = 0)
rdkafka_t* rdk
rd_kafka_message_t* msg
int async
CODE:
RETVAL = rd_kafka_commit_message(rdk->rk, msg, async);
OUTPUT:
RETVAL

SV*
krd_committed(rdk, tplistsv, timeout_ms)
rdkafka_t* rdk
SV* tplistsv
int timeout_ms
PREINIT:
AV* tplist;
rd_kafka_topic_partition_list_t* tpar = NULL;
rd_kafka_resp_err_t err;
AV* tp;
CODE:
if (!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
croak("first argument must be an array reference");
}
tplist = (AV*)SvRV(tplistsv);
tpar = krd_parse_topic_partition_list(aTHX_ tplist);
err = rd_kafka_committed(rdk->rk, tpar, timeout_ms);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_topic_partition_list_destroy(tpar);
croak("Error retrieving commited offsets: %s", rd_kafka_err2str(err));
}
tp = krd_expand_topic_partition_list(aTHX_ tpar);
rd_kafka_topic_partition_list_destroy(tpar);
RETVAL = newRV_noinc((SV*)tp);
OUTPUT:
RETVAL

SV*
krd_position(rdk, tplistsv)
rdkafka_t* rdk
SV* tplistsv
PREINIT:
AV* tplist;
rd_kafka_topic_partition_list_t* tpar = NULL;
rd_kafka_resp_err_t err;
AV* tp;
CODE:
if (!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
croak("first argument must be an array reference");
}
tplist = (AV*)SvRV(tplistsv);
tpar = krd_parse_topic_partition_list(aTHX_ tplist);
err = rd_kafka_position(rdk->rk, tpar);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_topic_partition_list_destroy(tpar);
croak("Error retrieving positions: %s", rd_kafka_err2str(err));
}
tp = krd_expand_topic_partition_list(aTHX_ tpar);
rd_kafka_topic_partition_list_destroy(tpar);
RETVAL = newRV_noinc((SV*)tp);
OUTPUT:
RETVAL

rd_kafka_message_t*
krd_consumer_poll(rdk, timeout_ms)
rdkafka_t* rdk
Expand Down
2 changes: 2 additions & 0 deletions bin/kafka_consumer.pl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ =head1 OPTIONS
say "Key: ", $msg->key if defined $msg->key;
say "Payload: ", $msg->payload;
}
# commit offsets to broker
$kafka->commit;
last if $stop;
}

Expand Down
54 changes: 54 additions & 0 deletions lib/Kafka/Librd.pm
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,27 @@ subscribe to the list of topics using balanced consumer groups.
unsubscribe from the current subsctiption set
=head2 subscription
$tplist = $kafka->subscription
return current subscription. Subscription returned as a reference to array of
hashes with the following fields: C<topic>, C<partition>, C<offset>, C<metadata>.
=head2 assign
$err = $kafka->assign(\@tplist)
assign partitions to consume. C<@tplist> is an array of hashes with
C<topic> and C<partition> fields set.
=head2 assignment
$tplist = $kafka->assignment
return current assignment. Result returned in the same way as for
L</subscription>.
=head2 consumer_poll
$msg = $kafka->consumer_poll($timeout_ms)
Expand All @@ -107,6 +128,39 @@ L</Kafka::Librd::Message> object. If C<<$msg->err>> for returned object is zero
(RD_KAFKA_RESP_ERR_NO_ERROR), then it is a proper message, otherwise it is an
event or an error.
=head2 commit
$err = $kafka->commit(\@tplist, $async)
commit offsets to the broker. C<@tplist> is an array of hashes
with the following keys: C<topic>, C<partition>, C<offset>, C<metadata>. If
@topic_partition_list is missing or undef, then current partition assignment
is used instead. If C<$async> is 1, then method returns immediately, if it is
0 or missing then method blocks until offsets are commited.
=head2 commit_message
$err = $kafka->commit_message($msg, $async)
commit message's offset for the message's partition. C<$async> same as for
L</commit>.
=head2 committed
$tplist = $kafka->committed(\@tplist, $timeout_ms)
retrieve commited offsets for topics and partitions specified in C<@tplist>,
which is an array of hashes with C<topic> and C<partition> fields. Returned
C<$tplist> contains a copy of the input list with added C<offset> fields.
=head2 position
$tplist = $kafka->position(\@tplist)
retrieve current offsets for topics and partitions specified in C<@tplist>,
which is an array of hashes with C<topic> and C<partition> fields. Returned
C<$tplist> contains a copy of the input list with added C<offset> fields.
=head2 consumer_close
$err = $kafka->consumer_close
Expand Down
87 changes: 84 additions & 3 deletions rdkafkaxs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,87 @@

#define ERRSTR_SIZE 1024

rd_kafka_topic_partition_list_t*
krd_parse_topic_partition_list(pTHX_ AV* tplist) {
char errstr[ERRSTR_SIZE];
rd_kafka_topic_partition_list_t* tpar;

int tplen = av_len(tplist)+1;
tpar = rd_kafka_topic_partition_list_new(tplen);
int i;
for (i=0; i<tplen; i++) {
SV** elemr = av_fetch(tplist, i, 0);
if (elemr == NULL)
continue;
SV* conf = *elemr;
if (!SvROK(conf) || strncmp(sv_reftype(SvRV(conf), 0), "HASH", 5) != 0) {
strncpy(errstr, "elements of topic partition list expected to be hashes", ERRSTR_SIZE);
goto CROAK;
}
HV* confhv = (HV*)SvRV(conf);
SV** topicsv = hv_fetch(confhv, "topic", 5, 0);
if (topicsv == NULL) {
snprintf(errstr, ERRSTR_SIZE, "topic is not specified for element %d of the list", i);
goto CROAK;
}
STRLEN len;
char* topic = SvPV(*topicsv, len);
SV** partitionsv = hv_fetch(confhv, "partition", 9, 0);
if (partitionsv == NULL) {
snprintf(errstr, ERRSTR_SIZE, "partition is not specified for element %d of the list", i);
goto CROAK;
}
int32_t partition = SvIV(*partitionsv);
rd_kafka_topic_partition_t* tp = rd_kafka_topic_partition_list_add(tpar, topic, partition);
hv_iterinit(confhv);
HE* he;
while ((he = hv_iternext(confhv)) != NULL) {
char* key = HePV(he, len);
SV* val = HeVAL(he);
if (strncmp(key, "topic", 6) == 0 || strncmp(key, "partition", 10) == 0) {
// this we already handled
;
} else if (strncmp(key, "offset", 7) == 0) {
tp->offset = SvIV(val);
} else if (strncmp(key, "metadata", 9) == 0) {
tp->metadata = SvPV(val, len);
tp->metadata_size = len;
} else {
snprintf(errstr, ERRSTR_SIZE, "unknown option %s for element %d of the list", key, i);
goto CROAK;
}
}
}
return tpar;

CROAK:
rd_kafka_topic_partition_list_destroy(tpar);
croak("%s", errstr);
return NULL;
}

AV* krd_expand_topic_partition_list(pTHX_ rd_kafka_topic_partition_list_t* tpar) {
char errstr[ERRSTR_SIZE];
AV* tplist = newAV();
int i;
for (i = 0; i < tpar->cnt; i++) {
rd_kafka_topic_partition_t* elem = &(tpar->elems[i]);
HV* tp = newHV();
hv_stores(tp, "topic", newSVpv(elem->topic, 0));
hv_stores(tp, "partition", newSViv(elem->partition));
hv_stores(tp, "offset", newSViv(elem->offset));
if(elem->metadata_size > 0) {
hv_stores(tp, "metadata", newSVpvn(elem->metadata, elem->metadata_size));
}
av_push(tplist, newRV_noinc((SV*)tp));
}
return tplist;

CROAK:
croak("%s", errstr);
return NULL;
}

rd_kafka_conf_t* krd_parse_config(pTHX_ rdkafka_t *krd, HV* params) {
char errstr[ERRSTR_SIZE];
rd_kafka_conf_t* krdconf;
Expand All @@ -11,7 +92,7 @@ rd_kafka_conf_t* krd_parse_config(pTHX_ rdkafka_t *krd, HV* params) {
krdconf = rd_kafka_conf_new();
rd_kafka_conf_set_opaque(krdconf, (void *)krd);
hv_iterinit(params);
while (he = hv_iternext(params)) {
while ((he = hv_iternext(params)) != NULL) {
STRLEN len;
char* key = HePV(he, len);
SV* val = HeVAL(he);
Expand Down Expand Up @@ -41,7 +122,7 @@ rd_kafka_conf_t* krd_parse_config(pTHX_ rdkafka_t *krd, HV* params) {

CROAK:
rd_kafka_conf_destroy(krdconf);
croak(errstr);
croak("%s", errstr);
return NULL;
}

Expand All @@ -51,7 +132,7 @@ rd_kafka_topic_conf_t* krd_parse_topic_config(pTHX_ HV *params, char* errstr) {
HE *he;

hv_iterinit(params);
while (he = hv_iternext(params)) {
while ((he = hv_iternext(params)) != NULL) {
STRLEN len;
char* key = HePV(he, len);
SV* val = HeVAL(he);
Expand Down
2 changes: 2 additions & 0 deletions rdkafkaxs.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ typedef struct rdkafka_s {
IV thx;
} rdkafka_t;

rd_kafka_topic_partition_list_t* krd_parse_topic_partition_list(pTHX_ AV* tplist);
AV* krd_expand_topic_partition_list(pTHX_ rd_kafka_topic_partition_list_t* tpar);
rd_kafka_conf_t* krd_parse_config(pTHX_ rdkafka_t* krd, HV* params);
rd_kafka_topic_conf_t* krd_parse_topic_config(pTHX_ HV *params, char* errstr);

0 comments on commit d02ddb9

Please sign in to comment.