Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer.send() doesn't always invoke callback if broker goes offline (v2.3.0) #824

Closed
thomaslee opened this issue Dec 18, 2017 · 1 comment
Assignees

Comments

@thomaslee
Copy link
Contributor

thomaslee commented Dec 18, 2017

I'm still investigating this one, but wanted to raise it here in case the fix is obvious to you. This bug doesn't always occur & seems a bit timing sensitive, but I can usually reproduce within 1-5 attempts:

  1. Start ZK
  2. Start Kafka broker
  3. Run DEBUG=kafka-node:* node repro.js (below)
  4. Stop Kafka broker
  5. Per DEBUG output, callback queues are cleared without error & kafka-node:KafkaClient kafka-node-client reconnecting to ... messages show up on 1s intervals.
  6. Start Kafka broker
  7. Producer appears to recover, but callback passed to producer.send(...) is never invoked & program is "stuck".

Source code for repro.js:

const kafka = require('kafka-node')
  
function makePayload() {
  return [{
    topic: 'foo',
    messages: ['message']
  }]
}

const client = new kafka.KafkaClient()
const producer = new kafka.Producer(client, {requireAcks: -1})
function producerSendCb(err, data) {
  if (err) {
    console.error(err)
    // work-around for #798
    producer.client.refreshMetadata([], err => {
      if (err) {
        console.error(err)
      }
      send()
    })
    return
  }
  send()
}
function send() {
  producer.send(makePayload(), producerSendCb)
}
producer.on('ready', send)
producer.on('error', err => {
  console.error(err)
})

function cleanup() {
  producer.close(err => {
    if (err) {
      console.error(err)
    }
    process.exit()
  })
}
process.on('SIGINT', cleanup)
process.on('SIGTERM', cleanup)

Partial output from DEBUG=kafka-node:* (my notes prefixed with ###):

 kafka-node:KafkaClient sending request +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +0ms
  kafka-node:KafkaClient compressing messages if needed +1ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient sending request +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +0ms
  kafka-node:KafkaClient compressing messages if needed +1ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient sending request +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +1ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +0ms
  kafka-node:KafkaClient compressing messages if needed +1ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient sending request +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +1ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +0ms
  kafka-node:KafkaClient compressing messages if needed +12ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient sending request +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +1ms
  kafka-node:KafkaClient Using V2 of produce +3ms
  kafka-node:KafkaClient compressing messages if needed +5ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient sending request +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +4ms
  kafka-node:KafkaClient compressing messages if needed +11ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient sending request +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +0ms
  kafka-node:KafkaClient compressing messages if needed +2ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient sending request +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +0ms
  kafka-node:KafkaClient compressing messages if needed +2ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient sending request +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +0ms
### broker stopped around here ###
  kafka-node:KafkaClient clearing localhost:9092 callback queue without error +12ms
  kafka-node:KafkaClient clearing 192.168.1.9:9092 callback queue without error +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +4ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +997ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +7ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +999ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +4ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +997ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +6ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +996ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +7ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +994ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +11ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +994ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +11ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +991ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +14ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +991ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +11ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +993ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +10ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +995ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +10ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +995ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +7ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +994ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +11ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +992ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +13ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +988ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +17ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +987ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +18ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +984ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +22ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +984ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +20ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +989ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +16ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +991ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +12ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +991ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +17ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +988ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +22ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 192.168.1.9:9092 +979ms
  kafka-node:KafkaClient Sending versions request to 192.168.1.9:9092 +1ms
  kafka-node:KafkaClient Received versions response from 192.168.1.9:9092 +15ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"36":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":6,"usable":0},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":0},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":false},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false}} +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +11ms
  kafka-node:KafkaClient Sending versions request to localhost:9092 +8ms
  kafka-node:KafkaClient Received versions response from localhost:9092 +7ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"36":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":6,"usable":0},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":0},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":false},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false}} +0ms
### program is "stuck", no further output ###

EDIT: make my example code a little more correct

@thomaslee thomaslee changed the title Producer.send() doesn't always invoke callback if broker goes offline Producer.send() doesn't always invoke callback if broker goes offline (v2.3.0) Dec 18, 2017
@thomaslee
Copy link
Contributor Author

Issue seems to go away if we always clear the callback queue with an error, irrespective of the state of hadError. I'll open a PR.

thomaslee added a commit to thomaslee/kafka-node that referenced this issue Dec 18, 2017
Failure to do this can cause confusion in user code because callbacks
passed to Producer.send(...) never get invoked.
thomaslee added a commit to thomaslee/kafka-node that referenced this issue Dec 18, 2017
Failure to do this can cause confusion in user code because callbacks
passed to Producer.send(...) never get invoked.
@hyperlink hyperlink self-assigned this Jan 5, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants