-
Notifications
You must be signed in to change notification settings - Fork 627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add create topics support #958
Conversation
I am looking into the build issue with Kafka 0.9 (metadata version 1 was added in 0.10) |
Updated PR with a few changes to handle clusters of version < 0.10. Let me know what you think @hyperlink |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution @jlandersen! 👍
It would be great if we could add an alias to createTopic in Admin. Also we should update the doc to reflect these changes.
lib/protocol/protocol.js
Outdated
}) | ||
.word16bs('errorCode') | ||
.tap((vars) => { | ||
console.log(vars); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This WIP? Could we get rid of this console.log
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely, will remove WIP parts
lib/kafkaClient.js
Outdated
@@ -695,6 +726,29 @@ KafkaClient.prototype.clearCallbackQueue = function (socket, error) { | |||
delete this.cbqueue[socketId]; | |||
}; | |||
|
|||
// Sends a version 1 metadata request which includes controller node in response | |||
KafkaClient.prototype.loadMetadataForTopicsV2 = function (topics, cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't continue to create these V{x} version of api calls. It creates unnecessary burden of managing kafka version protocol on the user:
- Override Client
loadMetadataForTopics
- ensure the broker is ready and use
getSupportedForRequestType
to get the appropriate version of encoder/decoder (see example insendRequest
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense - my one concern with this is is that the between api version 0 and >=1 the semantics have changed with regards to the topics parameter.
In version 0 an empty array returns all topics. In version >= 1, a null value (-1) is used to fetch all topics. An empty array returns no topic metadata [1]. This is potentially trouble for clients that use this. I guess this could be put under the next major version umbrella, thus allowing this breaking change. But there are also plenty of calls from inside the lib relying on this (and I think doing something like converting [] -> null would be a bit of a lie for clients).
A suggestion could be:
- Override it, and make it work with getSupportedForRequestType, which includes the different behaviour depending on version, thus a breaking change similar to createTopics.
- It is up to clients to be aware of this - basically if they use a version >= 0.10
null
is now used for fetching all topics. - Allow internal use to force version 0 for now to avoid manipulating the input parameter in order to keep all existing functionality that works with < 0.10 and relies on a metadata call unaffected.
[1] (https://kafka.apache.org/protocol#The_Messages_Metadata).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like an ugly change from a users perspective since it puts the burden on the user again to update the behavior of their app depending on the version they're connecting to and would make migrations painful. I think this is a case where we should keep kafka node's behavior consistent and abstract away the ugliness of the kafka protocol. Calling loadMetadataForTopics
with empty array should always return all topics.
I always thought fetching all topic metadata seemed inefficient when we're just to get the broker metadata. So perhaps we can have new method called loadMetadata
that will always pass [] for topic?
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the following changes:
- Change existing
loadMetaDataForTopics
so that if the broker supports api version => 1, then convert an empty array to null value to fetch all topics - Add new
loadMetadata
that doesen't request for topics (by always passing [] for topic, I assume you mean fetch no topics in this case?). This can then be used in other cases where we just need broker metadata.
This sounds good to me - if you agree i'll try and get all the suggested changes in the PR in sometime this week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds great @jlandersen thanks. 👍
lib/kafkaClient.js
Outdated
return callback(error); | ||
} | ||
|
||
var controllerId = result[1].clusterMetadata.controllerId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how often does the controllerId
change? Would it be safe to cache this on the client so we can avoid doing a metadata request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not been able to find documentation that verifies this - my impression is that the election process happens when starting the cluster and otherwise if the current elected broker crashes/becomes unavailable. We can cache it, and update metadata if the request ends up being sent to a broker that isn't controller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jlandersen caching it sounds good
lib/kafkaClient.js
Outdated
broker.write(request); | ||
}; | ||
|
||
KafkaClient.prototype.createTopicsV2 = function (topics, cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm inclined to just call this createTopics
. Planning to drop support node 4 so we're making major version change and this can be included in the things that "break".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good - I'll rename it to createTopics (and let the previous version remain on old client).
lib/kafkaClient.js
Outdated
|
||
KafkaClient.prototype.createTopicsV2 = function (topics, cb) { | ||
var correlationId = this.nextId(); | ||
var request = protocol.encodeCreateTopicRequest(this.clientId, correlationId, topics, this.options.requestTimeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move the request creation and correlationId a line down inside the handler ?
lib/protocol/protocol.js
Outdated
} | ||
|
||
const TIMEOUT_ERROR_CODE = 7; | ||
if (vars.errorCode === TIMEOUT_ERROR_CODE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use the error map in the ERROR_CODE
or just use the createGroupError
function.
lib/protocol/protocol.js
Outdated
this.buffer('topic', vars.topic); | ||
vars.topic = vars.topic.toString(); | ||
}) | ||
.word16bs('errorCode') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIP? We should handle this error code.
Bunch of changes are in:
(1) for the createTopics decoder, I distinguish between two types of errors. Overall request errors and individual topic creation errors. Overall request errors are returned as an instance of Error (e.g. timeout or the request somehow got sent to a non-controller). Topic errors are returned as a normal result with the error message provided by Kafka (e.g. setting a replication factor higher than available brokers). |
lib/kafkaClient.js
Outdated
const correlationId = this.nextId(); | ||
const request = protocol.encodeCreateTopicRequest(this.clientId, correlationId, topics, this.options.requestTimeout); | ||
|
||
this.queueCallback(broker.socket, correlationId, [protocol.decodeCreateTopicResponse, callback]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to clear the cached controller metadata and call createTopics
again when we get a NOT_CONTROLLER
error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can. I am inclined to create an explicit error constructor function for this type of error so it's easier to handle.
I am not sure if createTopics is the best place to do this clearing and possibly retry logic. When more functionality is added that requires talking to the controller (such as deleting topics) those would be in the same situation. Should we add some function that can wrap requests that must go to the controller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. In Client
we have sendGroupRequest
so possibly add a sendControllerRequest
?
} | ||
], (error, result) => { | ||
should.not.exist(error); | ||
result.should.be.empty; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also fetch the topic metadata and verify the partitions/replication factor for newly created topics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - added.
test/test.kafkaClient.js
Outdated
this.skip(); | ||
} | ||
|
||
sandbox = sinon.sandbox.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need the sinon sandbox here and in the afterEach
.
caeafc5
to
4483cd3
Compare
Updated with a Let me know of any other issues/comments/suggestions, otherwise I consider this final for now :-) |
|
||
callback = this.wrapControllerCheckIfNeeded(encoder, decoder, originalArgs, originalCallback); | ||
|
||
this.queueCallback(controller.socket, correlationId, [decoder, callback]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @jlandersen thanks for the update. Could you help me understand why wrapControllerCheckIfNeeded
is needed? Would it not be simpler to just inline a function in place of the callback
that checks if the error is an instance of NotControllerError
and then set the controller Id to null and call sendControllerRequest
again if it is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason was to limit the retry to a single time (or potentially x times). If we add a function in place of the callback that always calls the sendControllerRequest
on a NotControllerError
, it could keep making requests if the error is returned repeatedly. I am not saying it can or will happen, but was rather to be safe. Let me know if you prefer otherwise and i'll put the change in!
I'll merge once conflicts are resolved. thanks @jlandersen |
Adds support for sending a create topics request as a real protocol request. This supports specifying number of partitions and replication factor. Explicit replica assignments and config entries are left out for now. A create topic request must be sent to the controller. To find the current controller, this also adds a version 1 metadata request, which includes the controller Id in the response.
Error message changes a bit depending on version. This relaxes the assertion a bit.
metadata v1 is not supported until 0.10
Happens when cluster is is running version less than 0.10
The order resulted in isInternal being used as the number of partitions during decoding.
- Add createTopics alias on admin client - Remove V2 suffix, replacing old versions of createTopics and loadMetadataForTopics on KafkaClient - Add loadMetadata function - Use supported API to resolve metadata request version
Now fetches metadata again to verify topics were created with requested partition count and replication factor.
Wraps the provided callback with retry logic if NotController error was returned. In this case the cached controller is cleared and it is fetched again by getting the latest metadata.
Rebased on master and all checks out! Thanks! Looking forward to adding more things with the controller request in place! |
Published as 3.0.0 |
This is a first attempt at adding support for the create topics request. I have been wanting this for a while since I work on some "management" tools for Kafka. Most other Kafka clients in other languages also seem to rely on the "metadata" solution.
This supports specifying number of partitions and replication factor. Explicit replica assignments and config entries are left out for now.
A create topic request must be sent to the controller. To find the current controller, this also adds a version 1 metadata request, which includes the controller Id in the response.
There are a couple of things that are less-than-optimal and could use some input for improvement:
createTopics(..)
andloadMetadataForTopics(..)
.createTopics(..)
conflict on the client)?loadMetadataForTopics(..)
could use a different structure. The first item in the array is an object with all the broker nodes. Second item is an object with a metadata property containing the topics. I've added theclusterMetadata
property here, to avoid breaking the existing structure.Let me know of anything we need to change before this can hopefully get merged in.
Once this is wrapped up, I have delete topics request ready next, and will then add support for config entries on creation of topics.