-
Notifications
You must be signed in to change notification settings - Fork 0
How Malka's Consumes Messages?
Let's take a look into how Malka works internally, which design choices have been made, what trade-offs it brings and how one can change its out-of-box behaviour. This section documents the JSON configuration and how it defines how the Malka consumer will behave internally.
The JSON file is a list of object, where each object represents a subscription between a topic and an AWS Lambda function. Each subscription contains a list of Lambda functions that need to be triggered once new events arrive.
Subscriptions behave like Kafka Consumer Groups. Malka uses each target function to compute a consumer group. The resulting number of consumer groups (C) will be the sum of the target functions (F) available in each subscription (S). For instance, if you have two subscriptions, with two target functions each, we will end up with 4 active consumer groups.
Each subscription can have one or more parallel consumers. This is useful for topics with more than one partition, significantly increasing the ingestion pace of the topics - being also useful to determine how many Lambda functions will be invoked at a given time. Each created subscription instance will receive a new Consumer Group ID, implementing the Static Membership Protocol.
To improve the ingestion throughput even further, Malka will mimic what AWS Kinesis does, buffering messages and sending them in batch to the targeted AWS Lambda functions. Bear in mind that the amount of time it will wait for messages, as well as the maximum buffer size, might impact how failures will be handled by AWS Lambda.
One of the main opinionated decisions behind Malka is how it handles errors. And to understand how errors are handled, we need to first understand what is the Kafka log offset. By design, Kafka messages are ingested in chunks from the broker. When polled, alongside the messages comes the current consumption log offset position, allowing Consumer Groups of a given topic to continue from where it last left off in case the stream application was shut down or crashed due to an unexpected failure.
When a commit message is sent to the broker the consumer also sends the offset position that identifies this commit, telling that the last chunk of messages was correctly consumed. This will move the offset cursor forward, allowing the consumer to poll the next unprocessed chunk of messages. This behaviour implies that you can't acknowledge individual messages, only the last batch of them. This is what makes error handling so hard. Once messages are delivered to AWS Lambda, we not only can't determine which of them caused the failure, we also can't individually rollback a failure message. Either all of them failed, or all of them succeeded.
With that in mind, Malka expects that only failures related to network communication will be automatically handled (rolled back). As a general rule, messages will be considered delivered (therefore having its current log offset committed) whenever AWS Lambda successfully received the event - have the Lambda function failed to handle it or not. Luckily, backoff and retries are features provided out-of-box from AWS Lambda when a message is delivered but failed to be processed. In case of network failures (e.g. poorly configured Security Group, AWS Lambda outage, etc), Malka will keep retrying until everything comes back to normal.
Below is described each entry of the JSON configuration file.
Variable | Description | Type | Required |
---|---|---|---|
topic_name | The Kafka topic name to be consumed | String |
true |
target_functions | A list of strings containing the functions that will receive the consumed events. | Vec<String> |
true |
topic_number_of_consumers | The number of parallel threads within a given consumer group. Defaults to 1 . |
Unsigned Integer |
false |
topic_max_buffer_size | Max number of messages to be sent on each batch to the Lambda functions. Defaults to 100 . |
Unsigned Integer |
false |
topic_max_buffer_await_time | Maximum time (in milliseconds) it will wait for the buffer to be fulfilled before being sent to the Lambda functions. Defaults to 1000 . |
Unsigned Long |
false |
consumer_configuration | A map/dictionary containing the extra Kafka consumer entries. Both keys and values are expected to be in String format. For further details, please check the options available at the librdkafka documentation. Defaults to {} (empty map). |
Map<String, String> |
false |