diff --git a/README.md b/README.md index e91962d3..36c7510d 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka 0.8.1 - [For a new consumer how do I start consuming from the latest message in a partition?](#for-a-new-consumer-how-do-i-start-consuming-from-the-latest-message-in-a-partition) - [FailedToRebalanceConsumerError: Exception: NODE_EXISTS[-110]](#failedtorebalanceconsumererror-exception-node_exists-110) - [HighLevelConsumer does not consume on all partitions](#highlevelconsumer-does-not-consume-on-all-partitions) + - [How to throttle messages / control the concurrency of processing messages](#how-to-throttle-messages--control-the-concurrency-of-processing-messages) - [Running Tests](#running-tests) - [LICENSE - "MIT"](#license---mit) @@ -361,10 +362,10 @@ consumer.setOffset('topic', 0, 0); ``` ### pause() -Pause the consumer +Pause the consumer. ***Calling `pause` does not automatically stop messages from being emitted.*** This is because pause just stops the kafka consumer fetch loop. Each iteration of the fetch loop can obtain a batch of messages (limited by `fetchMaxBytes`). ### resume() -Resume the consumer +Resume the consumer. Resumes the fetch loop. ### pauseTopics(topics) Pause specify topics @@ -522,10 +523,10 @@ consumer.setOffset('topic', 0, 0); ``` ### pause() -Pause the consumer +Pause the consumer. ***Calling `pause` does not automatically stop messages from being emitted.*** This is because pause just stops the kafka consumer fetch loop. Each iteration of the fetch loop can obtain a batch of messages (limited by `fetchMaxBytes`). ### resume() -Resume the consumer +Resume the consumer. Resumes the fetch loop. ### close(force, cb) * `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false` @@ -697,6 +698,12 @@ Your partition will be stuck if the `fetchMaxBytes` is smaller than the message Reference to issue [#339](https://github.com/SOHU-Co/kafka-node/issues/339) +## How to throttle messages / control the concurrency of processing messages + +1. Create a `async.queue` with message processor and concurrency of one (the message processor itself is wrapped with `setImmediate` so it will not freeze up the event loop) +2. Set the `queue.drain` to resume the consumer +3. The handler for consumer's `message` event pauses the consumer and pushes the message to the queue. + # Running Tests ### Install Docker