Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListGroups and DescribeGroups protocol #770

Merged
merged 9 commits into from
Oct 5, 2017
67 changes: 66 additions & 1 deletion lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const DEFAULTS = {
minTimeout: 1 * 1000,
maxTimeout: 60 * 1000,
randomize: true
}
},
maxAsyncRequests: 10
};

const KafkaClient = function (options) {
Expand Down Expand Up @@ -413,6 +414,70 @@ KafkaClient.prototype.getApiVersions = function (broker, cb) {
broker.write(request);
};

KafkaClient.prototype.getListGroups = function (callback) {
if (!this.ready) {
return callback(new Error('Client is not ready (getListGroups)'));
}
const brokers = this.brokerMetadata;
async.mapValuesLimit(brokers, this.options.maxAsyncRequests,
(brokerMetadata, brokerId, cb) => {
const broker = this.brokerForLeader(brokerId);
if (!broker || !broker.isConnected()) {
return callback(new errors.BrokerNotAvailableError('Broker not available (getListGroups)'));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

callback should be cb here.

}

const correlationId = this.nextId();
const request = protocol.encodeListGroups(this.clientId, correlationId);
this.queueCallback(broker.socket, correlationId, [protocol.decodeListGroups, cb]);
broker.write(request);
}, (err, results) => {
if (err) {
callback(err);
return;
}
// removing {error: null} before merging
results = _.values(_.map(results, result => _.omitBy(result, _.isNull)));
results.unshift({});
callback(null, _.merge.apply({}, results));
}
);
};

KafkaClient.prototype.getDescribeGroups = function (groups, callback) {
if (!this.ready) {
return callback(new Error('Client is not ready (getDescribeGroups)'));
}

async.groupBy(groups,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use groupByLimit here instead?

(group, cb) => {
this.sendGroupCoordinatorRequest(group, (err, coordinator) => {
cb(err || null, coordinator ? coordinator.coordinatorId : undefined);
});
},
(err, results) => {
if (err) {
callback(err);
return;
}

async.mapValuesLimit(results, this.options.maxAsyncRequests,
(groups, coordinator, cb) => {
const broker = this.brokerForLeader(coordinator);
if (!broker || !broker.isConnected()) {
return callback(new errors.BrokerNotAvailableError('Broker not available (getDescribeGroups)'));
Copy link
Collaborator

@hyperlink hyperlink Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be cb here and not that main callback.

}

const correlationId = this.nextId();
const request = protocol.encodeDescribeGroups(this.clientId, correlationId, groups);
this.queueCallback(broker.socket, correlationId, [protocol.decodeDescribeGroups, cb]);
broker.write(request);
},
(err, res) => {
callback(err || null, res);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we flatten the result so it's keyed by groupId?

});
});
};

KafkaClient.prototype.close = function (callback) {
logger.debug('close client');
this.closing = true;
Expand Down
128 changes: 128 additions & 0 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,130 @@ function decodeGroupData (resp) {
};
}

function encodeDescribeGroups (clientId, correlationId, groups) {
const request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.describeGroups);

request.Int32BE(groups.length);
groups.forEach(groupId => { request.Int16BE(groupId.length).string(groupId); });

return encodeRequestWithLength(request.make());
}

function decodeDescribeGroups (resp) {
let results = {};

Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word32bs('describeNum')
.loop(decodeDescriptions);

function decodeDescriptions (end, vars) {
if (vars.describeNum-- === 0) return end();

let described = { members: [] };
this.word16bs('errorCode')
.tap(vars => {
described.error = createGroupError(vars.errorCode);
})
.word16bs('groupId')
.tap(vars => {
this.buffer('groupId', vars.groupId);
described.groupId = vars.groupId.toString();
})
.word16bs('state')
.tap(vars => {
this.buffer('state', vars.state);
described.state = vars.state.toString();
})
.word16bs('protocolType')
.tap(vars => {
this.buffer('protocolType', vars.protocolType);
described.protocolType = vars.protocolType.toString();
})
.word16bs('protocol')
.tap(vars => {
this.buffer('protocol', vars.protocol);
described.protocol = vars.protocol.toString();

// keep this for error cases
results[described.groupId] = described;
})
.word32bs('membersNum')
.loop(function decodeGroupMembers (end, vars) {
if (vars.membersNum-- === 0) return end();
let member = {};

this.word16bs('memberId')
.tap(vars => {
this.buffer('memberId', vars.memberId);
member.memberId = vars.memberId.toString();
})
.word16bs('clientId')
.tap(vars => {
this.buffer('clientId', vars.clientId);
member.clientId = vars.clientId.toString();
})
.word16bs('clientHost')
.tap(vars => {
this.buffer('clientHost', vars.clientHost);
member.clientHost = vars.clientHost.toString();
})
.word32bs('memberMetadata')
.tap(vars => {
if (vars.memberMetadata > -1) {
this.buffer('memberMetadata', vars.memberMetadata);
let memberMetadata = decodeGroupData(vars.memberMetadata);
memberMetadata.id = member.memberId;
member.memberMetadata = memberMetadata;
}
})
.word32bs('memberAssignment')
.tap(vars => {
this.buffer('memberAssignment', vars.memberAssignment);
member.memberAssignment = decodeMemberAssignment(vars.memberAssignment);
described.members.push(member);

results[described.groupId] = described;
});
});
}

return results;
}

function encodeListGroups (clientId, correlationId) {
return encodeRequestWithLength(encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.listGroups).make());
}

function decodeListGroups (resp) {
let groups = {};
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word16bs('errorCode')
.tap(vars => {
groups.error = createGroupError(vars.errorCode);
})
.word32bs('groupNum')
.loop(function (end, vars) {
if (vars.groupNum-- === 0) return end();

this.word16bs('groupId')
.tap(function (vars) {
this.buffer('groupId', vars.groupId);
vars.groupId = vars.groupId.toString();
})
.word16bs('protocolType')
.tap(function (vars) {
this.buffer('protocolType', vars.protocolType);
groups[vars.groupId] = vars.protocolType.toString();
});
});

return groups;
}

function encodeVersionsRequest (clientId, correlationId) {
return encodeRequestWithLength(encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.apiVersions).make());
}
Expand Down Expand Up @@ -1166,5 +1290,9 @@ exports.encodeSyncGroupRequest = encodeSyncGroupRequest;
exports.decodeSyncGroupResponse = decodeSyncGroupResponse;
exports.encodeLeaveGroupRequest = encodeLeaveGroupRequest;
exports.decodeLeaveGroupResponse = decodeLeaveGroupResponse;
exports.encodeDescribeGroups = encodeDescribeGroups;
exports.decodeDescribeGroups = decodeDescribeGroups;
exports.encodeListGroups = encodeListGroups;
exports.decodeListGroups = decodeListGroups;
exports.encodeVersionsRequest = encodeVersionsRequest;
exports.decodeVersionsResponse = decodeVersionsResponse;
4 changes: 2 additions & 2 deletions lib/protocol/protocolVersions.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ const API_MAP = {
heartbeat: [[p.encodeGroupHeartbeatRequest, p.decodeGroupHeartbeatResponse]],
leaveGroup: [[p.encodeLeaveGroupRequest, p.decodeLeaveGroupResponse]],
syncGroup: [[p.encodeJoinGroupRequest, p.decodeJoinGroupResponse]],
describeGroups: null,
listGroups: null,
describeGroups: [[p.encodeDescribeGroups, p.decodeDescribeGroups]],
listGroups: [[p.encodeListGroups, p.decodeListGroups]],
saslHandshake: null,
apiVersions: [[p.encodeVersionsRequest, p.decodeVersionsResponse]],
createTopics: null,
Expand Down