Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS ack and failure handlers #2903

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions documentation/src/main/docs/sqs/receiving-aws-sqs-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,38 @@ SQS message attributes can be accessed from the metadata either by name or by th
{{ insert('sqs/inbound/SqsMetadataExample.java') }}
```

## Acknowledgement
## Acknowledgement Strategies

The default strategy for acknowledging AWS SQS Message is to *delete* the message from the queue.
With `ack.delete` set to `false`, the message is not deleted from the queue.
You can set the `ack-strategy` attribute to `ignore` if you want to ignore the message.

[NOTE] Deprecated
`ack.delete` attribute is deprecated and will be removed in a future release.

You can implement a custom strategy by implementing the {{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsAckHandler', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }},
interface with a `Factory` class and registering it as a CDI bean with an `@Identifier`.

``` java
{{ insert('sqs/inbound/SqsCustomAckStrategy.java') }}
```

## Failure Strategies

The default strategy for handling message processing failures is `ignore`.
It lets the visibility timeout of the message consumer to expire and reconsume the message.

Other possible strategies are:

- `fail`: the failure is logged and the channel fail-stops.
- `delete`: the message is removed from the queue.
- `visibility`: the message visibility timeout is reset to 0.

You can implement a custom strategy by implementing the {{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsFailureHandler', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }},
interface with a `Factory` class and registering it as a CDI bean with an `@Identifier`.

``` java
{{ insert('sqs/inbound/SqsCustomNackStrategy.java') }}
```

## Configuration Reference

Expand Down
36 changes: 36 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsCustomAckStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package sqs.inbound;

import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.SqsAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.aws.sqs.SqsMessage;
import io.vertx.mutiny.core.Vertx;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public class SqsCustomAckStrategy implements SqsAckHandler {

@ApplicationScoped
@Identifier("custom")
public static class Factory implements SqsAckHandler.Factory {

@Override
public SqsAckHandler create(SqsConnectorIncomingConfiguration conf,
Vertx vertx,
SqsAsyncClient client,
Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure) {
return new SqsCustomAckStrategy();
}
}

@Override
public Uni<Void> handle(SqsMessage<?> message) {
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
}
33 changes: 33 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsCustomNackStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sqs.inbound;

import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.SqsFailureHandler;
import io.smallrye.reactive.messaging.aws.sqs.SqsMessage;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public class SqsCustomNackStrategy implements SqsFailureHandler {

@ApplicationScoped
@Identifier("custom")
public static class Factory implements SqsFailureHandler.Factory {

@Override
public SqsFailureHandler create(String channel, SqsAsyncClient client, Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure) {
return new SqsCustomNackStrategy();
}
}

@Override
public Uni<Void> handle(SqsMessage<?> message, Metadata metadata, Throwable throwable) {
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
package io.smallrye.reactive.messaging.aws.sqs;

import java.util.function.BiConsumer;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public interface SqsAckHandler {

Uni<Void> handle(SqsMessage message);
interface Strategy {
String DELETE = "delete";
String IGNORE = "ignore";
}

interface Factory {
SqsAckHandler create(SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsAsyncClient client,
Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure);
}

default Uni<SqsMessage<?>> received(SqsMessage<?> message) {
return Uni.createFrom().item(message);
}

Uni<Void> handle(SqsMessage<?> message);

default void close(boolean graceful) {

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.smallrye.reactive.messaging.aws.sqs;

import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow.Publisher;
Expand Down Expand Up @@ -32,26 +36,28 @@

@ApplicationScoped
@Connector(SqsConnector.CONNECTOR_NAME)
@ConnectorAttribute(name = "queue", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of the SQS queue, defaults to channel name if not provided")
@ConnectorAttribute(name = "queue.url", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The url of the SQS queue")
@ConnectorAttribute(name = "region", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of the SQS region")
@ConnectorAttribute(name = "endpoint-override", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The endpoint override")
@ConnectorAttribute(name = "credentials-provider", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The credential provider to be used in the client")
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
@ConnectorAttribute(name = "queue", type = "string", direction = INCOMING_AND_OUTGOING, description = "The name of the SQS queue, defaults to channel name if not provided")
@ConnectorAttribute(name = "queue.url", type = "string", direction = INCOMING_AND_OUTGOING, description = "The url of the SQS queue")
@ConnectorAttribute(name = "region", type = "string", direction = INCOMING_AND_OUTGOING, description = "The name of the SQS region")
@ConnectorAttribute(name = "endpoint-override", type = "string", direction = INCOMING_AND_OUTGOING, description = "The endpoint override")
@ConnectorAttribute(name = "credentials-provider", type = "string", direction = INCOMING_AND_OUTGOING, description = "The credential provider to be used in the client")
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")

@ConnectorAttribute(name = "group.id", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages with the specified group id")
@ConnectorAttribute(name = "batch", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages in batches of maximum 10 messages", defaultValue = "false")
@ConnectorAttribute(name = "batch-size", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "In batch send mode, the maximum number of messages to include in batch, currently SQS maximum is 10 messages", defaultValue = "10")
@ConnectorAttribute(name = "batch-delay", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "In batch send mode, the maximum delay in milliseconds to wait for messages to be included in the batch", defaultValue = "3000")

@ConnectorAttribute(name = "wait-time-seconds", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The maximum amount of time in seconds to wait for messages to be received", defaultValue = "1")
@ConnectorAttribute(name = "max-number-of-messages", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The maximum number of messages to receive", defaultValue = "10")
@ConnectorAttribute(name = "visibility-timeout", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request")
@ConnectorAttribute(name = "receive.request.message-attribute-names", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The message attribute names to retrieve when receiving messages.")
@ConnectorAttribute(name = "receive.request.customizer", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided")
@ConnectorAttribute(name = "receive.request.retries", type = "long", direction = ConnectorAttribute.Direction.INCOMING, description = "If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled.", defaultValue = "2147483647")
@ConnectorAttribute(name = "receive.request.pause.resume", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.", defaultValue = "true")
@ConnectorAttribute(name = "ack.delete", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the acknowledgement deletes the message from the queue", defaultValue = "true")
@ConnectorAttribute(name = "wait-time-seconds", type = "int", direction = INCOMING, description = "The maximum amount of time in seconds to wait for messages to be received", defaultValue = "1")
@ConnectorAttribute(name = "max-number-of-messages", type = "int", direction = INCOMING, description = "The maximum number of messages to receive", defaultValue = "10")
@ConnectorAttribute(name = "visibility-timeout", type = "int", direction = INCOMING, description = "The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request")
@ConnectorAttribute(name = "receive.request.message-attribute-names", type = "string", direction = INCOMING, description = "The message attribute names to retrieve when receiving messages.")
@ConnectorAttribute(name = "receive.request.customizer", type = "string", direction = INCOMING, description = "The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided")
@ConnectorAttribute(name = "receive.request.retries", type = "long", direction = INCOMING, description = "If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled.", defaultValue = "2147483647")
@ConnectorAttribute(name = "receive.request.pause.resume", type = "boolean", direction = INCOMING, description = "Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.", defaultValue = "true")
@ConnectorAttribute(name = "ack.delete", type = "boolean", direction = INCOMING, description = "Whether the acknowledgement deletes the message from the queue. Deprecated, use ack-strategy instead", deprecated = true)
@ConnectorAttribute(name = "ack-strategy", type = "string", direction = INCOMING, description = "The identifier for the bean implementing ack strategy factory. Strategies: 'delete', 'ignore'", defaultValue = "delete")
@ConnectorAttribute(name = "failure-strategy", type = "string", direction = INCOMING, description = "The identifier for the bean implementing failure strategy factory. Strategies: 'ignore', 'fail', 'visibility', 'delete'", defaultValue = "ignore")
public class SqsConnector implements InboundConnector, OutboundConnector, HealthReporter {

@Inject
Expand All @@ -67,6 +73,14 @@ public class SqsConnector implements InboundConnector, OutboundConnector, Health
@Inject
Instance<JsonMapping> jsonMappers;

@Inject
@Any
Instance<SqsAckHandler.Factory> ackHandlerFactories;

@Inject
@Any
Instance<SqsFailureHandler.Factory> failureHandlerFactories;

Vertx vertx;

private static final List<SqsInboundChannel> INBOUND_CHANNELS = new CopyOnWriteArrayList<>();
Expand All @@ -93,7 +107,8 @@ public Publisher<? extends Message<?>> getPublisher(Config config) {
var conf = new SqsConnectorIncomingConfiguration(config);
var customizer = CDIUtils.getInstanceById(customizers, conf.getReceiveRequestCustomizer().orElse(conf.getChannel()),
() -> null);
var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping);
var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping, ackHandlerFactories,
failureHandlerFactories);
INBOUND_CHANNELS.add(channel);
return channel.getStream();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.smallrye.reactive.messaging.aws.sqs;

import java.util.function.BiConsumer;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.mutiny.Uni;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public interface SqsFailureHandler {

interface Strategy {
String DELETE = "delete";
String VISIBILITY = "visibility";
String FAIL = "fail";
String IGNORE = "ignore";
}

interface Factory {
SqsFailureHandler create(String channel, SqsAsyncClient client, Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure);
}

Uni<Void> handle(SqsMessage<?> message, Metadata metadata, Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsDeleteAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsNothingAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsIgnoreAckHandler;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.PausablePollingStream;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
Expand Down Expand Up @@ -47,7 +49,9 @@ public class SqsInboundChannel {
private final Integer visibilityTimeout;

public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsManager sqsManager,
SqsReceiveMessageRequestCustomizer customizer, JsonMapping jsonMapper) {
SqsReceiveMessageRequestCustomizer customizer, JsonMapping jsonMapper,
Instance<SqsAckHandler.Factory> ackHandlerFactories,
Instance<SqsFailureHandler.Factory> failureHandlerFactories) {
this.channel = conf.getChannel();
this.healthEnabled = conf.getHealthEnabled();
this.retries = conf.getReceiveRequestRetries();
Expand All @@ -62,8 +66,8 @@ public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, Sq
this.messageAttributeNames = getMessageAttributeNames(conf);
this.customizer = customizer;

SqsAckHandler ackHandler = conf.getAckDelete() ? new SqsDeleteAckHandler(client, queueUrlUni)
: new SqsNothingAckHandler();
SqsAckHandler ackHandler = createAckHandler(ackHandlerFactories, conf, vertx, client, queueUrlUni);
SqsFailureHandler failureHandler = createFailureHandler(failureHandlerFactories, conf, client, queueUrlUni);
PausablePollingStream<List<software.amazon.awssdk.services.sqs.model.Message>, software.amazon.awssdk.services.sqs.model.Message> pollingStream = new PausablePollingStream<>(
channel, request(null, 0), (messages, processor) -> {
if (messages != null) {
Expand All @@ -74,14 +78,32 @@ channel, request(null, 0), (messages, processor) -> {
}, requestExecutor, maxNumberOfMessages * 2, conf.getReceiveRequestPauseResume());
this.stream = Multi.createFrom()
.deferred(() -> queueUrlUni.onItem().transformToMulti(queueUrl -> pollingStream.getStream()))
.emitOn(r -> context.runOnContext(r))
.onItem().transform(message -> new SqsMessage<>(message, jsonMapper, ackHandler))
.emitOn(context::runOnContext)
.onItem().transform(message -> new SqsMessage<>(message, jsonMapper, ackHandler, failureHandler))
.onFailure().invoke(throwable -> {
log.errorReceivingMessage(channel, throwable);
reportFailure(throwable, false);
});
}

private SqsFailureHandler createFailureHandler(Instance<SqsFailureHandler.Factory> failureHandlerFactories,
SqsConnectorIncomingConfiguration conf,
SqsAsyncClient client, Uni<String> queueUrlUni) {
return CDIUtils.getInstanceById(failureHandlerFactories, conf.getFailureStrategy())
.get()
.create(channel, client, queueUrlUni, this::reportFailure);
}

private SqsAckHandler createAckHandler(Instance<SqsAckHandler.Factory> ackHandlerFactories,
SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsAsyncClient client, Uni<String> queueUrlUni) {
if (!conf.getAckDelete().orElse(true)) {
// nothing to do
return new SqsIgnoreAckHandler();
}
return CDIUtils.getInstanceById(ackHandlerFactories, conf.getAckStrategy()).get()
.create(conf, vertx, client, queueUrlUni, this::reportFailure);
}

private List<String> getMessageAttributeNames(SqsConnectorIncomingConfiguration conf) {
List<String> names = new ArrayList<>();
names.add(SqsConnector.CLASS_NAME_ATTRIBUTE);
Expand Down
Loading
Loading