Skip to content

Commit

Permalink
PP-13575 Investigate java AWS SDK v2 upgrade (#979)
Browse files Browse the repository at this point in the history
* PP-13575 Investigate java AWS SDK v2 upgrade
  • Loading branch information
hjvoid authored Feb 13, 2025
1 parent edeceec commit b7bdd7a
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 0 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<module>validation</module>
<module>logging-dropwizard-3</module>
<module>queue</module>
<module>queue-java-aws-sdk-v2</module>
</modules>

<dependencyManagement>
Expand Down
53 changes: 53 additions & 0 deletions queue-java-aws-sdk-v2/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>pay-java-commons</artifactId>
<groupId>uk.gov.service.payments</groupId>
<version>1.0.0</version>
</parent>

<artifactId>queue-java-aws-sdk-v2</artifactId>
<name>Classes for receiving and sending queue messages</name>
<packaging>jar</packaging>

<dependencies>
<!-- Main dependencies that are imported from one of the BOMs specified
in <dependencyManagement> in the parent POM, so no explicit versions needed -->
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-json-logging</artifactId>
</dependency>

<!-- Main dependencies that need explicit versions -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<version>2.30.13</version>
</dependency>

<!-- Test dependencies that are imported from one of the BOMs specified
in <dependencyManagement> in the parent POM, so no explicit versions needed -->

<!-- Test dependencies that need explicit versions -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package uk.gov.service.payments.commons.queue.exception;

public class QueueException extends Exception {

public QueueException(){

}

public QueueException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package uk.gov.service.payments.commons.queue.model;

import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

import java.util.List;
import java.util.stream.Collectors;

public class QueueMessage {

private String messageId;
private String receiptHandle;
private String messageBody;

private QueueMessage(String messageId, String receiptHandle, String messageBody) {
this.messageId = messageId;
this.receiptHandle = receiptHandle;
this.messageBody = messageBody;
}

private QueueMessage(String messageId, String messageBody) {
this(messageId, null, messageBody);
}

public static List<QueueMessage> of(ReceiveMessageResponse receiveMessageResult) {

return receiveMessageResult.messages()
.stream()
.map(c -> new QueueMessage(c.messageId(), c.receiptHandle(), c.body()))
.collect(Collectors.toList());
}

public static QueueMessage of(SendMessageResponse sendMessageResult, String messageBody) {
return new QueueMessage(sendMessageResult.messageId(), messageBody);
}

public String getMessageId() {
return messageId;
}

public String getReceiptHandle() {
return receiptHandle;
}

public String getMessageBody() {
return messageBody;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package uk.gov.service.payments.commons.queue.sqs;

import com.fasterxml.jackson.databind.ObjectMapper;
import uk.gov.service.payments.commons.queue.exception.QueueException;
import uk.gov.service.payments.commons.queue.model.QueueMessage;

import java.util.List;

public abstract class AbstractQueue {

private static final String MESSAGE_ATTRIBUTES_TO_RECEIVE = "All";
protected ObjectMapper objectMapper;
private String queueUrl;
private int failedMessageRetryDelayInSeconds;
private SqsQueueService sqsQueueService;

public AbstractQueue(SqsQueueService sqsQueueService, ObjectMapper objectMapper,
String queueUrl, int failedMessageRetryDelayInSeconds) {
this.sqsQueueService = sqsQueueService;
this.queueUrl = queueUrl;
this.failedMessageRetryDelayInSeconds = failedMessageRetryDelayInSeconds;
this.objectMapper = objectMapper;
}

public QueueMessage sendMessageToQueue(String message) throws QueueException {
return sqsQueueService.sendMessage(queueUrl, message);
}

public QueueMessage sendMessageToQueueWithDelay(String message, int delayInSeconds) throws QueueException {
return sqsQueueService.sendMessage(queueUrl, message, delayInSeconds);
}

public List<QueueMessage> retrieveMessages() throws QueueException {
return sqsQueueService
.receiveMessages(this.queueUrl, MESSAGE_ATTRIBUTES_TO_RECEIVE);
}

public void markMessageAsProcessed(QueueMessage queueMessage) throws QueueException {
sqsQueueService.deleteMessage(this.queueUrl, queueMessage.getReceiptHandle());
}

public void scheduleMessageForRetry(QueueMessage queueMessage) throws QueueException {
sqsQueueService.deferMessage(this.queueUrl, queueMessage.getReceiptHandle(), failedMessageRetryDelayInSeconds);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package uk.gov.service.payments.commons.queue.sqs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import software.amazon.awssdk.services.sqs.model.SqsException;
import uk.gov.service.payments.commons.queue.exception.QueueException;
import uk.gov.service.payments.commons.queue.model.QueueMessage;

import java.util.List;

public class SqsQueueService {

private final Logger logger = LoggerFactory.getLogger(getClass());

private SqsClient sqsClient;

private final int messageMaximumWaitTimeInSeconds;
private final int messageMaximumBatchSize;

public SqsQueueService(SqsClient sqsClient, int messageMaximumWaitTimeInSeconds, int messageMaximumBatchSize) {
this.sqsClient = sqsClient;
this.messageMaximumWaitTimeInSeconds = messageMaximumWaitTimeInSeconds;
this.messageMaximumBatchSize = messageMaximumBatchSize;
}

public QueueMessage sendMessage(String queueUrl, String messageBody) throws QueueException {
SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(messageBody)
.build();

try {
return sendMessage(sendMessageRequest);
} catch (SqsException e) {
throw new QueueException(e.getMessage());
}
}


public QueueMessage sendMessage(String queueUrl, String messageBody, int delayInSeconds) throws QueueException {
SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(messageBody)
.delaySeconds(delayInSeconds)
.build();
return sendMessage(sendMessageRequest);
}

private QueueMessage sendMessage(SendMessageRequest sendMessageRequest) throws QueueException {
try {
SendMessageResponse sendMessageResult = sqsClient.sendMessage(sendMessageRequest);

logger.info("Message sent to SQS queue - {}", sendMessageResult);
return QueueMessage.of(sendMessageResult, sendMessageRequest.messageBody());
} catch (SqsException | UnsupportedOperationException e) {
logger.error("Failed sending message to SQS queue - {}", e.getMessage());
throw new QueueException(e.getMessage());
}
}

public List<QueueMessage> receiveMessages(String queueUrl, String messageAttributeName) throws QueueException {
try {
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.messageAttributeNames(messageAttributeName)
.waitTimeSeconds(messageMaximumWaitTimeInSeconds)
.maxNumberOfMessages(messageMaximumBatchSize)
.build();

ReceiveMessageResponse receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest);

return QueueMessage.of(receiveMessageResult);
} catch (SqsException | UnsupportedOperationException e) {
logger.error("Failed to receive messages from SQS queue - {}", e.getMessage());
throw new QueueException(e.getMessage());
}
}

public DeleteMessageResponse deleteMessage(String queueUrl, String messageReceiptHandle) throws QueueException {
try {
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(messageReceiptHandle)
.build();
return sqsClient.deleteMessage(deleteMessageRequest);
} catch (SqsException | UnsupportedOperationException e) {
logger.error("Failed to delete message from SQS queue - {}", e.getMessage());
throw new QueueException(e.getMessage());
} catch (AwsServiceException e) {
logger.error("Failed to delete message from SQS queue - [errorMessage={}] [awsErrorCode={}]", e.getMessage(), e.awsErrorDetails().errorCode());
String errorMessage = String.format("%s [%s]", e.getMessage(), e.awsErrorDetails().errorCode());
throw new QueueException(errorMessage);
}
}

public ChangeMessageVisibilityResponse deferMessage(String queueUrl, String messageReceiptHandle, int timeoutInSeconds) throws QueueException {
try {
ChangeMessageVisibilityRequest changeVisibilityRequest = ChangeMessageVisibilityRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(messageReceiptHandle)
.visibilityTimeout(timeoutInSeconds)
.build();

return sqsClient.changeMessageVisibility(changeVisibilityRequest);
} catch (SqsException | UnsupportedOperationException e) {
logger.error("Failed to defer message from SQS queue - {}", e.getMessage());
throw new QueueException(e.getMessage());
}
}
}
Loading

0 comments on commit b7bdd7a

Please sign in to comment.