Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListGroups and DescribeGroups protocol #770

Merged
merged 9 commits into from
Oct 5, 2017
90 changes: 90 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka 0.8.1
- [ConsumerGroup](#consumergroup)
- [ConsumerGroupStream](#consumergroupstream)
- [Offset](#offset)
- [Admin](#admin)
- [Troubleshooting / FAQ](#troubleshooting--faq)
- [HighLevelProducer with KeyedPartitioner errors on first send](#highlevelproducer-with-keyedpartitioner-errors-on-first-send)
- [How do I debug an issue?](#how-do-i-debug-an-issue)
Expand Down Expand Up @@ -84,6 +85,7 @@ New KafkaClient connects directly to Kafka brokers instead of connecting to zook
* `autoConnect` : automatically connect when KafkaClient is instantiated otherwise you need to manually call `connect` default: `true`
* `connectRetryOptions` : object hash that applies to the initial connection. see [retry](https://www.npmjs.com/package/retry) module for these options.
* `idleConnection` : allows the broker to disconnect an idle connection from a client (otherwise the clients continues to reconnect after being disconnected). The value is elapsed time in ms without any data written to the TCP socket. default: 5 minutes
* `maxAsyncRequests` : maximum async operations at a time toward the kafka cluster. default: 10

## Client
### Client(connectionString, clientId, [zkOptions], [noAckBatchOptions], [sslOptions])
Expand Down Expand Up @@ -981,6 +983,94 @@ Example
});
```

## Admin

This class provides administrative APIs can be used to monitor and administer the Kafka cluster.

### Admin(kafkaClient)
* `kafkaClient`: client which keeps a connection with the Kafka server. (**`KafkaClient` only**, `client` not supported)

### listGroups(cb)

List the consumer groups managed by the kafka cluster.

* `cb`: **Function**, the callback

Example:

```js
const client = new kafka.KafkaClient();
const admin = new kafka.Admin(client); // client must be KafkaClient
admin.listGroups((err, res) => {
console.log('consumerGroups', res);
});
```

Result:

```js
consumerGroups { 'console-consumer-87148': 'consumer',
'console-consumer-2690': 'consumer',
'console-consumer-7439': 'consumer'
}
```

### describeGroups(consumerGroups, cb)

Fetch consumer group information from the cluster. See result for detailed information.

* `consumerGroups`: **Array**, array of consumer groups (which can be gathered from `listGroups`)
* `cb`: **Function**, the callback

Example:

```js
admin.describeGroups(['console-consumer-2690'], (err, res) => {
console.log(JSON.stringify(res,null,1));
})
```

Result:

```json
{
"console-consumer-2690": {
"members": [
{
"memberId": "consumer-1-20195e12-cb3b-4ba4-9076-e7da8ed0d57a",
"clientId": "consumer-1",
"clientHost": "/192.168.61.1",
"memberMetadata": {
"subscription": [
"twice-tt"
],
"version": 0,
"userData": "JSON parse error",
"id": "consumer-1-20195e12-cb3b-4ba4-9076-e7da8ed0d57a"
},
"memberAssignment": {
"partitions": {
"twice-tt": [
0,
1
]
},
"version": 0,
"userData": "JSON Parse error"
}
}
],
"error": null,
"groupId": "console-consumer-2690",
"state": "Stable",
"protocolType": "consumer",
"protocol": "range",
"brokerId": "4"
}
}
```


# Troubleshooting / FAQ

## HighLevelProducer with KeyedPartitioner errors on first send
Expand Down
1 change: 1 addition & 0 deletions kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ exports.Producer = require('./lib/producer');
exports.Client = require('./lib/client');
exports.KafkaClient = require('./lib/kafkaClient');
exports.Offset = require('./lib/offset');
exports.Admin = require('./lib/admin');
exports.KeyedMessage = require('./lib/protocol').KeyedMessage;
exports.DefaultPartitioner = require('./lib/partitioner').DefaultPartitioner;
exports.CyclicPartitioner = require('./lib/partitioner').CyclicPartitioner;
Expand Down
44 changes: 44 additions & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';

var KafkaClient = require('./kafkaClient');
var util = require('util');
var events = require('events');

var Admin = function (kafkaClient) {
if (!(kafkaClient instanceof KafkaClient)) {
throw new Error("'Admin' only accepts 'KafkaClient' for its kafka client.");
}

var self = this;
this.client = kafkaClient;
this.ready = this.client.ready;
this.client.on('ready', function () {
self.ready = true;
self.emit('ready');
});
this.client.once('connect', function () {
self.emit('connect');
});
this.client.on('error', function (err) {
self.emit('error', err);
});
};
util.inherits(Admin, events.EventEmitter);

Admin.prototype.listGroups = function (cb) {
if (!this.ready) {
this.once('ready', () => this.listGroups(cb));
return;
}
this.client.getListGroups(cb);
};

Admin.prototype.describeGroups = function (consumerGroups, cb) {
if (!this.ready) {
this.once('ready', () => this.describeGroups(consumerGroups, cb));
return;
}
this.client.getDescribeGroups(consumerGroups, cb);
};

module.exports = Admin;
77 changes: 76 additions & 1 deletion lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const DEFAULTS = {
minTimeout: 1 * 1000,
maxTimeout: 60 * 1000,
randomize: true
}
},
maxAsyncRequests: 10
};

const KafkaClient = function (options) {
Expand Down Expand Up @@ -413,6 +414,80 @@ KafkaClient.prototype.getApiVersions = function (broker, cb) {
broker.write(request);
};

KafkaClient.prototype.getListGroups = function (callback) {
if (!this.ready) {
return callback(new Error('Client is not ready (getListGroups)'));
}
const brokers = this.brokerMetadata;
async.mapValuesLimit(brokers, this.options.maxAsyncRequests,
(brokerMetadata, brokerId, cb) => {
const broker = this.brokerForLeader(brokerId);
if (!broker || !broker.isConnected()) {
return cb(new errors.BrokerNotAvailableError('Broker not available (getListGroups)'));
}

const correlationId = this.nextId();
const request = protocol.encodeListGroups(this.clientId, correlationId);
this.queueCallback(broker.socket, correlationId, [protocol.decodeListGroups, cb]);
broker.write(request);
}, (err, results) => {
if (err) {
callback(err);
return;
}
// removing {error: null} before merging
results = _.values(_.map(results, result => _.omitBy(result, _.isNull)));
results.unshift({});
callback(null, _.merge.apply({}, results));
}
);
};

KafkaClient.prototype.getDescribeGroups = function (groups, callback) {
if (!this.ready) {
return callback(new Error('Client is not ready (getDescribeGroups)'));
}

async.groupByLimit(groups, this.options.maxAsyncRequests,
(group, cb) => {
this.sendGroupCoordinatorRequest(group, (err, coordinator) => {
cb(err || null, coordinator ? coordinator.coordinatorId : undefined);
});
},
(err, results) => {
if (err) {
callback(err);
return;
}

async.mapValuesLimit(results, this.options.maxAsyncRequests,
(groups, coordinator, cb) => {
const broker = this.brokerForLeader(coordinator);
if (!broker || !broker.isConnected()) {
return cb(new errors.BrokerNotAvailableError('Broker not available (getDescribeGroups)'));
}

const correlationId = this.nextId();
const request = protocol.encodeDescribeGroups(this.clientId, correlationId, groups);
this.queueCallback(broker.socket, correlationId, [protocol.decodeDescribeGroups, cb]);
broker.write(request);
},
(err, res) => {
if (err) {
return callback(err);
}

callback(null, _.reduce(res, (result, describes, broker) => {
_.each(describes, (values, consumer) => {
result[consumer] = values;
result[consumer].brokerId = broker;
});
return result;
}, {}));
});
});
};

KafkaClient.prototype.close = function (callback) {
logger.debug('close client');
this.closing = true;
Expand Down
128 changes: 128 additions & 0 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,130 @@ function decodeGroupData (resp) {
};
}

function encodeDescribeGroups (clientId, correlationId, groups) {
const request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.describeGroups);

request.Int32BE(groups.length);
groups.forEach(groupId => { request.Int16BE(groupId.length).string(groupId); });

return encodeRequestWithLength(request.make());
}

function decodeDescribeGroups (resp) {
let results = {};

Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word32bs('describeNum')
.loop(decodeDescriptions);

function decodeDescriptions (end, vars) {
if (vars.describeNum-- === 0) return end();

let described = { members: [] };
this.word16bs('errorCode')
.tap(vars => {
described.error = createGroupError(vars.errorCode);
})
.word16bs('groupId')
.tap(vars => {
this.buffer('groupId', vars.groupId);
described.groupId = vars.groupId.toString();
})
.word16bs('state')
.tap(vars => {
this.buffer('state', vars.state);
described.state = vars.state.toString();
})
.word16bs('protocolType')
.tap(vars => {
this.buffer('protocolType', vars.protocolType);
described.protocolType = vars.protocolType.toString();
})
.word16bs('protocol')
.tap(vars => {
this.buffer('protocol', vars.protocol);
described.protocol = vars.protocol.toString();

// keep this for error cases
results[described.groupId] = described;
})
.word32bs('membersNum')
.loop(function decodeGroupMembers (end, vars) {
if (vars.membersNum-- === 0) return end();
let member = {};

this.word16bs('memberId')
.tap(vars => {
this.buffer('memberId', vars.memberId);
member.memberId = vars.memberId.toString();
})
.word16bs('clientId')
.tap(vars => {
this.buffer('clientId', vars.clientId);
member.clientId = vars.clientId.toString();
})
.word16bs('clientHost')
.tap(vars => {
this.buffer('clientHost', vars.clientHost);
member.clientHost = vars.clientHost.toString();
})
.word32bs('memberMetadata')
.tap(vars => {
if (vars.memberMetadata > -1) {
this.buffer('memberMetadata', vars.memberMetadata);
let memberMetadata = decodeGroupData(vars.memberMetadata);
memberMetadata.id = member.memberId;
member.memberMetadata = memberMetadata;
}
})
.word32bs('memberAssignment')
.tap(vars => {
this.buffer('memberAssignment', vars.memberAssignment);
member.memberAssignment = decodeMemberAssignment(vars.memberAssignment);
described.members.push(member);

results[described.groupId] = described;
});
});
}

return results;
}

function encodeListGroups (clientId, correlationId) {
return encodeRequestWithLength(encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.listGroups).make());
}

function decodeListGroups (resp) {
let groups = {};
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word16bs('errorCode')
.tap(vars => {
groups.error = createGroupError(vars.errorCode);
})
.word32bs('groupNum')
.loop(function (end, vars) {
if (vars.groupNum-- === 0) return end();

this.word16bs('groupId')
.tap(function (vars) {
this.buffer('groupId', vars.groupId);
vars.groupId = vars.groupId.toString();
})
.word16bs('protocolType')
.tap(function (vars) {
this.buffer('protocolType', vars.protocolType);
groups[vars.groupId] = vars.protocolType.toString();
});
});

return groups;
}

function encodeVersionsRequest (clientId, correlationId) {
return encodeRequestWithLength(encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.apiVersions).make());
}
Expand Down Expand Up @@ -1166,5 +1290,9 @@ exports.encodeSyncGroupRequest = encodeSyncGroupRequest;
exports.decodeSyncGroupResponse = decodeSyncGroupResponse;
exports.encodeLeaveGroupRequest = encodeLeaveGroupRequest;
exports.decodeLeaveGroupResponse = decodeLeaveGroupResponse;
exports.encodeDescribeGroups = encodeDescribeGroups;
exports.decodeDescribeGroups = decodeDescribeGroups;
exports.encodeListGroups = encodeListGroups;
exports.decodeListGroups = decodeListGroups;
exports.encodeVersionsRequest = encodeVersionsRequest;
exports.decodeVersionsResponse = decodeVersionsResponse;
Loading