-
Notifications
You must be signed in to change notification settings - Fork 627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ListGroups and DescribeGroups protocol #770
Conversation
Oh, I think correlationId and request object should go inside mapValuesSeries before this.queueCallback. I'll fix it. |
lib/kafkaClient.js
Outdated
// removing {error: null} before merging | ||
results = _.map(results, result => _.omitBy(result, _.isNull)); | ||
|
||
callback(null, _.merge({}, ...(_.values(results)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are still supporting node 4 so spread can't be used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change it to use .apply
instead
lib/kafkaClient.js
Outdated
this.queueCallback(broker.socket, correlationId, [ | ||
protocol.decodeListGroups, | ||
cb, | ||
this.options.versions.requestTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be omitted if using the default requestTimeout
value.
lib/kafkaClient.js
Outdated
return callback(new Error('Client is not ready (getListGroups)')); | ||
} | ||
const brokers = this.brokerMetadata; | ||
async.mapValuesSeries(brokers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Series is not need here since there's no ordering requirement on the result correct? Lets we change this to mapValuesLimit
instead with a default (configurable) limit of 10.
lib/kafkaClient.js
Outdated
if (!this.ready) { | ||
return callback(new Error('Client is not ready (getDescribeGroups)')); | ||
} | ||
const self = this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is setting self
necessary since we're using fat arrows throughout?
lib/kafkaClient.js
Outdated
return; | ||
} | ||
|
||
async.mapValuesSeries(results, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mapValuesLimit
?
lib/kafkaClient.js
Outdated
this.queueCallback(broker.socket, correlationId, [ | ||
protocol.decodeDescribeGroups, | ||
cb, | ||
this.options.versions.requestTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above this is optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Any plans to write a test for this?
I've changed all mentioned problems, and introduced a new option For test, I think I can't write tests right now because I have at least minimum working code and have to make service first. I may try in the 1st week of October if it's not done. |
lib/kafkaClient.js
Outdated
(brokerMetadata, brokerId, cb) => { | ||
const broker = this.brokerForLeader(brokerId); | ||
if (!broker || !broker.isConnected()) { | ||
return callback(new errors.BrokerNotAvailableError('Broker not available (getListGroups)')); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
callback
should be cb
here.
lib/kafkaClient.js
Outdated
return callback(new Error('Client is not ready (getDescribeGroups)')); | ||
} | ||
|
||
async.groupBy(groups, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use groupByLimit
here instead?
lib/kafkaClient.js
Outdated
(groups, coordinator, cb) => { | ||
const broker = this.brokerForLeader(coordinator); | ||
if (!broker || !broker.isConnected()) { | ||
return callback(new errors.BrokerNotAvailableError('Broker not available (getDescribeGroups)')); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be cb
here and not that main callback
.
lib/kafkaClient.js
Outdated
broker.write(request); | ||
}, | ||
(err, res) => { | ||
callback(err || null, res); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we flatten the result so it's keyed by groupId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks much better thanks for the update!
Can you update the README
with these new APIs ?
This is optional but you can also create a new Admin
wrapper class if so inclined.
Changed as you requested. I also wrote new admin wrapper class and README. For |
Thanks for sticking with this @wooyeong ! Great feature 👍 |
#769