Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): add flag in SqsFifoProcessor to enable continuous message processing #3954

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions aws_lambda_powertools/utilities/batch/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,19 @@ def __init__(self, msg="", child_exceptions: List[ExceptionInfo] | None = None):
def __str__(self):
parent_exception_str = super(BatchProcessingError, self).__str__()
return self.format_exceptions(parent_exception_str)


class SQSFifoCircuitBreakerError(Exception):
"""
Signals a record not processed due to the SQS FIFO processing being interrupted
"""

pass


class SQSFifoMessageGroupCircuitBreakerError(Exception):
"""
Signals a record not processed due to the SQS FIFO message group processing being interrupted
"""

pass
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from typing import List, Optional, Tuple

from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
import logging
from typing import Optional, Set

from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, ExceptionInfo, FailureResponse
from aws_lambda_powertools.utilities.batch.exceptions import (
SQSFifoCircuitBreakerError,
SQSFifoMessageGroupCircuitBreakerError,
)
from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel


class SQSFifoCircuitBreakerError(Exception):
"""
Signals a record not processed due to the SQS FIFO processing being interrupted
"""

pass
logger = logging.getLogger(__name__)


class SqsFifoPartialProcessor(BatchProcessor):
Expand Down Expand Up @@ -57,36 +56,59 @@ def lambda_handler(event, context: LambdaContext):
None,
)

def __init__(self, model: Optional["BatchSqsTypeModel"] = None):
super().__init__(EventType.SQS, model)
group_circuit_breaker_exc = (
SQSFifoMessageGroupCircuitBreakerError,
SQSFifoMessageGroupCircuitBreakerError("A previous record from this message group failed processing"),
None,
)

def process(self) -> List[Tuple]:
def __init__(self, model: Optional["BatchSqsTypeModel"] = None, skip_group_on_error: bool = False):
"""
Call instance's handler for each record. When the first failed message is detected,
the process is short-circuited, and the remaining messages are reported as failed items.
Initialize the SqsFifoProcessor.

Parameters
----------
model: Optional["BatchSqsTypeModel"]
An optional model for batch processing.
skip_group_on_error: bool
Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures
Default is False.

"""
result: List[Tuple] = []
self._skip_group_on_error: bool = skip_group_on_error
self._current_group_id = None
self._failed_group_ids: Set[str] = set()
super().__init__(EventType.SQS, model)

for i, record in enumerate(self.records):
# If we have failed messages, it means that the last message failed.
# We then short circuit the process, failing the remaining messages
if self.fail_messages:
return self._short_circuit_processing(i, result)
def _process_record(self, record):
self._current_group_id = record.get("attributes", {}).get("MessageGroupId")

# Otherwise, process the message normally
result.append(self._process_record(record))
# Short-circuits the process if:
# - There are failed messages, OR
# - The `skip_group_on_error` option is on, and the current message is part of a failed group.
fail_entire_batch = bool(self.fail_messages) and not self._skip_group_on_error
fail_group_id = self._skip_group_on_error and self._current_group_id in self._failed_group_ids
if fail_entire_batch or fail_group_id:
return self.failure_handler(
record=self._to_batch_type(record, event_type=self.event_type, model=self.model),
exception=self.group_circuit_breaker_exc if self._skip_group_on_error else self.circuit_breaker_exc,
)

return result
return super()._process_record(record)

def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]:
"""
Starting from the first failure index, fail all the remaining messages, and append them to the result list.
"""
remaining_records = self.records[first_failure_index:]
for remaining_record in remaining_records:
data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model)
result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc))
return result
def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
# If we are failing a message and the `skip_group_on_error` is on, we store the failed group ID
# This way, future messages with the same group ID will be failed automatically.
if self._skip_group_on_error and self._current_group_id:
self._failed_group_ids.add(self._current_group_id)

return super().failure_handler(record, exception)

def _clean(self):
self._failed_group_ids.clear()
self._current_group_id = None

super()._clean()

async def _async_process_record(self, record: dict):
raise NotImplementedError()
40 changes: 37 additions & 3 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ Processing batches from SQS works in three stages:

#### FIFO queues

When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank" rel="nofollow"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
This helps preserve the ordering of messages in your queue.
When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, a batch may include messages from different group IDs.

By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.

Enable the `skip_group_on_error` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.

=== "Recommended"

Expand All @@ -164,6 +167,12 @@ This helps preserve the ordering of messages in your queue.
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_decorator.py"
```

=== "Enabling skip_group_on_error flag"

```python hl_lines="2-6 9 23"
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_skip_on_error.py"
```

### Processing messages from Kinesis

Processing batches from Kinesis works in three stages:
Expand Down Expand Up @@ -311,7 +320,7 @@ sequenceDiagram

> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.

Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues.
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues without `skip_group_on_error` flag.

<center>
```mermaid
Expand All @@ -335,6 +344,31 @@ sequenceDiagram
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>

Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues with `skip_group_on_error` flag.

<center>
```mermaid
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function-->Lambda function: Process messages from another MessageGroupID
Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
deactivate Lambda function
activate SQS queue
Lambda service->>SQS queue: Delete successful messages processed
SQS queue-->>SQS queue: Failed messages return
deactivate SQS queue
```
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>

#### Kinesis and DynamoDB Streams

> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import (
SqsFifoPartialProcessor,
process_partial_response,
)
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = SqsFifoPartialProcessor(skip_group_on_error=True)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.json_body # if json string data, otherwise record.body for str
logger.info(payload)


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
Loading