Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HighLevelConsumer.pause() does not work #337

Closed
sergeyjsg opened this issue Mar 8, 2016 · 6 comments
Closed

HighLevelConsumer.pause() does not work #337

sergeyjsg opened this issue Mar 8, 2016 · 6 comments

Comments

@sergeyjsg
Copy link

"use strict";

let kafka = require('kafka-node');
var conf = require('./providers/Config');

let client = new kafka.Client(conf.kafka.connectionString, conf.kafka.clientName);
let consumer = new kafka.HighLevelConsumer(client, [ { topic: conf.kafka.readTopic } ], { groupId: conf.kafka.clientName, paused: true });

let threads = 0;

consumer.on('message', function(message) {
    threads++;

    if (threads > 10) consumer.pause();

    if (threads > 50) process.exit(1);

    console.log(threads + " >>> " + message.value);
});

consumer.resume();

So I see 50 messages in console and process exits by termination statement

@shaharmor
Copy link
Contributor

Each bulk load is fetching more than 50 messages, so pause will not stop those messages, only future fetches

@sergeyjsg
Copy link
Author

I understand the behaviour. But code above - should work. It have to fixed.

@BadLambdaJamma
Copy link

Pause and Resume are a dicey way to control concurrency. Make sure your not pausing and resuming during a rebalance!

@sergeyjsg
Copy link
Author

And what is the best approach to control concurrency? Obviously my writer is slower then reader. What should I do in context of kafka? And how can I control rebalance?

@BadLambdaJamma
Copy link

Rebalance occurs with a high level consumer, whenever any consumer starts. Set the message size to control how many messages.

consumer.on('rebalancing', function () {
self.rebalancing = true;
});

@BadLambdaJamma
Copy link

Pauses only stops after the first fetch request. so if you have a a bunch of messages at the broker when the client first connects....... you'll exceed your limit. you can throttle the first request by message size.......

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants
@hyperlink @sergeyjsg @BadLambdaJamma @shaharmor and others