Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No events being emitted from client or producer #685

Closed
RobertHerhold opened this issue Jun 10, 2017 · 8 comments
Closed

No events being emitted from client or producer #685

RobertHerhold opened this issue Jun 10, 2017 · 8 comments

Comments

@RobertHerhold
Copy link

RobertHerhold commented Jun 10, 2017

Bug Report

Environment

  • Node version: 8.1.0
  • Kafka-node version: 1.6.2
  • Kafka version: 0.10.2.1

Include Sample Code to reproduce behavior

const kafka = require('kafka-node');

const kafkaClient = new kafka.Client('<my-remote-url>:2181');
const kafkaProducer = new kafka.Producer(kafkaClient);

kafkaClient.on('error', function (err) {
    logger.error('Kafka Client error: ', err);
});

kafkaClient.on('ready', () => console.log('ready'));

kafkaClient.on('connect', function () {
    console.log('connected');
    
    kafkaClient.loadMetadataForTopics([], function (error, results) {
        if (error) {
            return console.error(error);
        }
        console.log(results);
    });
});

kafkaProducer.on('error', function (err) {
    logger.error('Kafka Producer error: ', err);
});

// And then later on...

kafkaProducer.send([{
    topic: 'test',
    messages: ['test message']
}], function (err) {
    console.log(err);
});

Output

None of my event listeners result in anything being logged, so I'm not sure where to begin. Using env DEBUG="kafka-node:*" when running my server doesn't seem to provide any additional output.

{ BrokerNotAvailableError
    at new BrokerNotAvailableError (/Users/robert/projects/my-project/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
    at Client.loadMetadataForTopics (/Users/robert/projects/my-project/node_modules/kafka-node/lib/client.js:348:15)
    at Client.send (/Users/robert/projects/my-project/node_modules/kafka-node/lib/client.js:515:10)
    at /Users/robert/projects/my-project/node_modules/kafka-node/lib/client.js:220:10
    at /Users/robert/projects/my-project/node_modules/kafka-node/node_modules/async/lib/async.js:52:16
    at Object.async.forEachOf.async.eachOf (/Users/robert/projects/my-project/node_modules/kafka-node/node_modules/async/lib/async.js:236:30)
    at Object.async.forEach.async.each (/Users/robert/projects/my-project/node_modules/kafka-node/node_modules/async/lib/async.js:209:22)
    at Client.sendProduceRequest (/Users/robert/projects/my-project/node_modules/kafka-node/lib/client.js:218:9)
    at Producer.BaseProducer.send (/Users/robert/projects/my-project/node_modules/kafka-node/lib/baseProducer.js:123:10)
    at Object.<anonymous> (/Users/robert/projects/my-project/ws-handler.js:37:15)
    at Module._compile (module.js:569:30)
    at Object.Module._extensions..js (module.js:580:10)
    at Module.load (module.js:503:32)
    at tryModuleLoad (module.js:466:12)
    at Function.Module._load (module.js:458:3)
    at Module.require (module.js:513:17)
    at require (internal/module.js:11:18)
    at Object.<anonymous> (/Users/robert/projects/my-project/app.js:139:5)
    at Module._compile (module.js:569:30)
    at Object.Module._extensions..js (module.js:580:10)
    at Module.load (module.js:503:32)
    at tryModuleLoad (module.js:466:12)
    at Function.Module._load (module.js:458:3)
    at Function.Module.runMain (module.js:605:10)
    at startup (bootstrap_node.js:158:16)
    at bootstrap_node.js:575:3 message: 'Broker not available' }

And when I use kafkacat to confirm that everything is set up right:

kafkacat -b <my-remote-url>:9092 -L

Metadata for all topics (from broker 0: <my-remote-url>:9092/0):
 1 brokers:
  broker 0 at <my-remote-url>:9092
 1 topics:
  topic "test" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0

It seems that everything is set up right, with 1 broker that the kafka-node client should be able to connect to. Are there any issues that you can see from what I've posted?

@RobertHerhold
Copy link
Author

I should also mention that I gave it plenty of time before executing the kafkaProducer.send function; it wasn't being called immediately like my code sample shows

@krystianity
Copy link

You are most likely not even connected to your broker.. which is why you see the BrokerNotAvailable Error when calling .send()..
As of now, kafka-node requires a zookeeper to connect to the broker - since your ready and connect events are not firing, the client is not able to connect to zk in the first place.

The zk module wont fire/log any events when its unable to connect it will just try to reconnect silently.
Using kafkacat you directly connect to the broker and omit the procedure a kafka-node has to run through.

@RobertHerhold
Copy link
Author

@krystianity I also tried the branch discussed in #666 to bypass ZK, but I have the same issue. Any ideas?

@krystianity
Copy link

Can't you use zk for now? Until #666 moves into master/production?

@RobertHerhold
Copy link
Author

I can, I just have to figure out why it can't connect to zk then :/

@RobertHerhold
Copy link
Author

@krystianity I got it figured out, it was just a zookeeper config change. Thanks for the help

@toml7
Copy link

toml7 commented Nov 9, 2017

@RobertHerhold I am having the same issues . Can you please tell me what was the config change that was made so that I can try it out?

@jaiswarvipin
Copy link

jaiswarvipin commented Jun 22, 2020

The title and content is not matching.
Title say: On event is not getting emited by kafak-node lib...
and Body Say: Broker is not available.

Lets solve thing one by one...
1) Broker is not available.
Complete details mention at #160.

2) Producer and Consumer not emitting the ready event
Please try below code

var intCounter = 104;

	setInterval(() => {
		setMessageInTopic('topic'+intCounter, function(response){
			echo("Producer Response"+response);
			if(response){
				echo("Consumer init");
				getMessageFromKafak(response,function(pStrMessage){
					echo ("------------------------Consumer message --------------------");
					echo(pStrMessage);
				});
			}else{
				echo("Error");
			}
		});
		intCounter++; 
	},5000);
	
	
	
function setMessageInTopic(pStrTopicName, callback){
	const kafka = require('kafka-node');

	try {
	  const Producer = kafka.Producer;
	  const client = new kafka.KafkaClient({kafkaHost:"localhost:9094"});
	  const producer = new Producer(client);
	  const kafka_topic =  pStrTopicName;
	  console.log(kafka_topic);
	  let payloads = [
		{
		  topic: kafka_topic,
		  messages: {'name':'vipin'}
		}
	  ];

	  producer.on('ready', async function() {
		let push_status = producer.send(payloads, (err, data) => {
		  if (err) {
			console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
			callback(false);
		  } else {
			console.log('[kafka-producer -> '+kafka_topic+']: broker update success');
			callback(pStrTopicName);
		  }
		});
	  });

	  producer.on('error', function(err) {
		console.log(err);
		console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
		callback(false);
		///throw err;
	  });
	}
	catch(e) {
	  console.log(e);
	  callback(false);
	}
}

function getMessageFromKafak(pStrTopicName, callback){
	try{
		var kafka = require('kafka-node');
		//var HighLevelProducer = kafka.HighLevelProducer;
		var Consumer = kafka.Consumer;
		var client = new kafka.KafkaClient({kafkaHost:"localhost:9094"});
		
		let consumer = new Consumer(
			client,
			[{ topic: pStrTopicName}],
			{
			  autoCommit: true,
			  fetchMaxWaitMs: 1000,
			  fetchMaxBytes: 1024 * 1024,
			  encoding: 'utf8',
			  fromOffset: false
			}
		  );
	  
		echo("------------------RESPONE MESSAGE PROCESS "+pStrTopicName+" ----------------------")
		consumer.on('message', function (message) {
			echo("------------------RESPONE MESSAGE  ----------------------")
			console.log(message);
			consumer.close();
			client.close();
			return callback(message);
		}).on('error', function (message) {
			echo("------------------RESPONE MESSAGE ERROR ----------------------")
			console.log(message);
			consumer.close();
			client.close();
			return callback(message);
		});
	}catch(e) {
	  console.log(e);
	}
}

Hope that helps.

Thanks & Regards
Jaiswar Vipin Kumar R.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants