Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsumerGroup attempting to consume from a non existent topic #1056

Closed
DimitrijeManic opened this issue Aug 30, 2018 · 9 comments · Fixed by #1057
Closed

ConsumerGroup attempting to consume from a non existent topic #1056

DimitrijeManic opened this issue Aug 30, 2018 · 9 comments · Fixed by #1057

Comments

@DimitrijeManic
Copy link

DimitrijeManic commented Aug 30, 2018

Bug Report

From my understanding, the ConsumerGroup should fail if there is an attempt to consume from a non existing topic when auto.create.topics.enable=false on the broker.

Currently the ConsumerGroup will not throw any errors regarding a non existent topic.

Also if the topic is created after the consumer group is initialized, and messages are written. The consumer group will not receive these messages.

Environment

  • Node version: v10.7.0
  • Kafka-node version: 2.6.1
  • Kafka version: 1.1.1

For specific cases also provide

  • Number of Brokers: 1
  • Number partitions for topic: 1

Include Sample Code to reproduce behavior

'use strict'
const kafka = require('kafka-node');
const ConsumerGroup = require('kafka-node/lib/consumerGroup');

async function readTopic(topic) {
  let options = {
    kafkaHost: "localhost:9092",
    groupId: "ExampleTestGroup",
    commitOffsetsOnFirstJoin: true, // I have tried setting this both to true & false
    fromOffset: 'latest'
  }

  const consumerGroup = new ConsumerGroup(options, topic);

  return new Promise((resolve, reject) => {
    consumerGroup.on('connect', () => {
      resolve(consumerGroup);
    })
    consumerGroup.on('error', (err) => {
      reject(err);
    })
  });
}

async function startTest() {
  let consumer = await readTopic('nonExistentTopic');

  consumer.on('message', (msg) => {
    console.log(msg);
  });

  await new Promise((resolve) => { });
}

startTest();

Include output with Debug turned on

/home/dmanic/.nvm/versions/node/v10.7.0/bin/node --inspect-brk=24019 ../../test-environment/sohu-bug/test.js
Debugger listening on ws://127.0.0.1:24019/1f52e0eb-fe06-467b-9b80-831f5ef1fd94
For help, see: https://nodejs.org/en/docs/inspector
Debugger attached.
Thu, 30 Aug 2018 15:06:36 GMT kafka-node:KafkaClient Connect attempt 1
Thu, 30 Aug 2018 15:06:36 GMT kafka-node:KafkaClient Trying to connect to host: localhost port: 9092
Thu, 30 Aug 2018 15:06:36 GMT kafka-node:KafkaClient Sending versions request to localhost:9092
Thu, 30 Aug 2018 15:06:36 GMT kafka-node:KafkaClient broker socket connected {"host":"localhost","port":"9092"}
Thu, 30 Aug 2018 15:06:36 GMT kafka-node:KafkaClient Received versions response from localhost:9092
Thu, 30 Aug 2018 15:06:36 GMT kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":1,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"36":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":0},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":false},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false}}
Thu, 30 Aug 2018 15:06:36 GMT kafka-node:KafkaClient updating metadatas
Thu, 30 Aug 2018 15:06:36 GMT kafka-node:ConsumerGroup Connecting kafka-node-client
Thu, 30 Aug 2018 15:06:36 GMT kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'localhost',
coordinatorPort: 9092,
coordinatorId: 1 }
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup joinGroupResponse {"members":[{"subscription":["nonExistentTopic"],"version":0,"id":"kafka-node-client-493f7d82-ee33-4366-9992-105c99fa17c2"}],"generationId":7,"groupProtocol":"roundrobin","leaderId":"kafka-node-client-493f7d82-ee33-4366-9992-105c99fa17c2","memberId":"kafka-node-client-493f7d82-ee33-4366-9992-105c99fa17c2"} from kafka-node-client
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup Assigning Partitions to members [ { subscription: [ 'nonExistentTopic' ],
version: 0,
userData: undefined,
id: 'kafka-node-client-493f7d82-ee33-4366-9992-105c99fa17c2' } ]
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup Using group protocol roundrobin
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup loadingMetadata for topics: [ 'nonExistentTopic' ]
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup mapTopicToPartitions {}
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:Roundrobin topicPartition: {}
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:Roundrobin groupMembers: [{"subscription":["nonExistentTopic"],"version":0,"id":"kafka-node-client-493f7d82-ee33-4366-9992-105c99fa17c2"}]
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:Roundrobin members [ 'kafka-node-client-493f7d82-ee33-4366-9992-105c99fa17c2' ]
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:Roundrobin subscribers { 'kafka-node-client-493f7d82-ee33-4366-9992-105c99fa17c2': [ 'nonExistentTopic' ] }
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:Roundrobin round robin on topic partition pairs: []
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup SyncGroup Request from kafka-node-client-493f7d82-ee33-4366-9992-105c99fa17c2
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup SyncGroup Response
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup generationId 7
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup startFetch is false
Thu, 30 Aug 2018 15:06:39 GMT kafka-node:ConsumerGroup kafka-node-client started heartbeats at every 10000 ms

Thanks for your contribution!

@hyperlink
Copy link
Collaborator

afaik there's no way for the client to know the broker's setting for auto.create.topics.enable.

What would the expected behavior for this scenario? Should it emit an error if any of the topics is missing?

Should the leader of the consumerGroup try to periodically watch the metadata for the existence of the missing topics and consume/assign the existing topics?

@DimitrijeManic
Copy link
Author

I was wrong about the consumer needing to fail if the topic doesn't exist.

I feel in order to keep in line with Kafkas API, the second scenario you mentioned should be happening. The leader of the consumer group should be aware of topic/partition changes and handle partition assignment.

I think this turns out to be a duplicate of #877

@winniegy
Copy link

We met the same issue as the second scenario.
The problem is that, as soon as consumerGroup is connected to kafka, the topic is created in kafka (it is not created before, and by listing the topic it is there after the consumer connection). However, the consumer still cannot consume any message on that topic.

@DimitrijeManic
Copy link
Author

DimitrijeManic commented Aug 31, 2018

Does Kafka provide any sort of event for topic creation that could be used to reconnect/refresh? I not sure how the console consumer handles this case, it may be worth looking into.

@hyperlink
Copy link
Collaborator

hyperlink commented Sep 1, 2018

I'm not seeing any response from the group coordinator when a topic is added or number of partitions altered. I think the best thing to do is allow the group leader to periodically poll the subscribed topics metadata and if they changed leave the group to trigger a rebalance.

@winniegy
Copy link

winniegy commented Sep 3, 2018

I have tried the commit and it works with the scenario that I have mentioned before (no topic before subscribe and the topic is created afterwards).
I also observed that, if there a change in the pulled metadata (for instance, the subscribed topic is created), connect will be invoked and the consumerGroup will get a callback on connect.

In addition, I have a question.
As I use "fromOffset: 'latest'" in the options for consumerGroup,
Let's say if there is no topic for the previous pulling for metadata,
before the next pull, the topic is created and messages for that topic is added in Kafka,
then the consumerGroup performs the 2nd pull for metadata and detects the change.
I guess the consumerGroup will miss the messages before the 2nd pull. In other words, it cannot consume the message before it detects metadata change.
Is my assumption correct? Thanks.

@hyperlink
Copy link
Collaborator

@winniegy yes I believe using latest will get the latest messages after the consumer started consuming from that new topic/partition.

@winniegy
Copy link

winniegy commented Sep 4, 2018

@hyperlink I see. Thanks for the help.

@hyperlink
Copy link
Collaborator

Published as 3.0.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants