-
Notifications
You must be signed in to change notification settings - Fork 627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer Group Heartbeat refactor #547
Conversation
@@ -53,6 +53,15 @@ function ConsumerGroup (memberOptions, topics) { | |||
const self = this; | |||
this.options = _.defaults((memberOptions || {}), DEFAULTS); | |||
|
|||
if (!this.options.heartbeatInterval) { | |||
this.options.heartbeatInterval = Math.floor(this.options.sessionTimeout / 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you divide by three?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to offical kafka doc for heartbeat.interval.ms
The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
|
||
let heartbeat = this.sendHeartbeat(); | ||
|
||
this.hearbeatInterval = setInterval(() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this interval eventually have to get cleaned up somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I found it in stopHeartbeats()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, in general. If I'm reading everything correctly, only complaint/comment I have is that normally if I see heartbeatTimeoutMs
of 1500, I would think "okay, after 1.5 seconds of not receiving a heartbeat, an error will be generated in some way". Instead, it looks like as long as a heartbeat goes through before the interval ticks, a timed out heartbeat is fine.
e.g. heartbeatInterval=10000, heartbeatTimeoutMs=1500. First heartbeat is sent. 1500ms later, nothing happens. 6000ms later, heartbeat completes and pending is set to false. At the 10000ms mark, the interval ticks and verifyResolved is called. pending is false, so it seems like the heartbeat didn't timeout even though it took 7500ms and the timeout is 1500ms.
Not sure what the alternative would be, just pointing out the confusion.
edit: Actually, why track a timeout/timeTook at all? If the goal is to not send a new heartbeat until the previous is complete, shouldn't tracking the pending status alone solve this problem?
At each heartbeat interval check if last heartbeat responded if there was no response yield timeout error and prevent sending another heartbeat request.