diff --git a/pom.xml b/pom.xml index 221fc293..049f513d 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ validation logging-dropwizard-3 queue + queue-java-aws-sdk-v2 diff --git a/queue-java-aws-sdk-v2/pom.xml b/queue-java-aws-sdk-v2/pom.xml new file mode 100644 index 00000000..296d0f65 --- /dev/null +++ b/queue-java-aws-sdk-v2/pom.xml @@ -0,0 +1,53 @@ + + + 4.0.0 + + pay-java-commons + uk.gov.service.payments + 1.0.0 + + + queue-java-aws-sdk-v2 + Classes for receiving and sending queue messages + jar + + + + + io.dropwizard + dropwizard-core + + + io.dropwizard + dropwizard-json-logging + + + + + software.amazon.awssdk + sqs + 2.30.13 + + + + + + + org.hamcrest + hamcrest + ${hamcrest.version} + test + + + org.hamcrest + hamcrest-library + 3.0 + test + + + + diff --git a/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/exception/QueueException.java b/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/exception/QueueException.java new file mode 100644 index 00000000..afda6e2b --- /dev/null +++ b/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/exception/QueueException.java @@ -0,0 +1,12 @@ +package uk.gov.service.payments.commons.queue.exception; + +public class QueueException extends Exception { + + public QueueException(){ + + } + + public QueueException(String message) { + super(message); + } +} diff --git a/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/model/QueueMessage.java b/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/model/QueueMessage.java new file mode 100644 index 00000000..cb0e9c17 --- /dev/null +++ b/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/model/QueueMessage.java @@ -0,0 +1,48 @@ +package uk.gov.service.payments.commons.queue.model; + +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; + +import java.util.List; +import java.util.stream.Collectors; + +public class QueueMessage { + + private String messageId; + private String receiptHandle; + private String messageBody; + + private QueueMessage(String messageId, String receiptHandle, String messageBody) { + this.messageId = messageId; + this.receiptHandle = receiptHandle; + this.messageBody = messageBody; + } + + private QueueMessage(String messageId, String messageBody) { + this(messageId, null, messageBody); + } + + public static List of(ReceiveMessageResponse receiveMessageResult) { + + return receiveMessageResult.messages() + .stream() + .map(c -> new QueueMessage(c.messageId(), c.receiptHandle(), c.body())) + .collect(Collectors.toList()); + } + + public static QueueMessage of(SendMessageResponse sendMessageResult, String messageBody) { + return new QueueMessage(sendMessageResult.messageId(), messageBody); + } + + public String getMessageId() { + return messageId; + } + + public String getReceiptHandle() { + return receiptHandle; + } + + public String getMessageBody() { + return messageBody; + } +} diff --git a/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/sqs/AbstractQueue.java b/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/sqs/AbstractQueue.java new file mode 100644 index 00000000..02f3db26 --- /dev/null +++ b/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/sqs/AbstractQueue.java @@ -0,0 +1,46 @@ +package uk.gov.service.payments.commons.queue.sqs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import uk.gov.service.payments.commons.queue.exception.QueueException; +import uk.gov.service.payments.commons.queue.model.QueueMessage; + +import java.util.List; + +public abstract class AbstractQueue { + + private static final String MESSAGE_ATTRIBUTES_TO_RECEIVE = "All"; + protected ObjectMapper objectMapper; + private String queueUrl; + private int failedMessageRetryDelayInSeconds; + private SqsQueueService sqsQueueService; + + public AbstractQueue(SqsQueueService sqsQueueService, ObjectMapper objectMapper, + String queueUrl, int failedMessageRetryDelayInSeconds) { + this.sqsQueueService = sqsQueueService; + this.queueUrl = queueUrl; + this.failedMessageRetryDelayInSeconds = failedMessageRetryDelayInSeconds; + this.objectMapper = objectMapper; + } + + public QueueMessage sendMessageToQueue(String message) throws QueueException { + return sqsQueueService.sendMessage(queueUrl, message); + } + + public QueueMessage sendMessageToQueueWithDelay(String message, int delayInSeconds) throws QueueException { + return sqsQueueService.sendMessage(queueUrl, message, delayInSeconds); + } + + public List retrieveMessages() throws QueueException { + return sqsQueueService + .receiveMessages(this.queueUrl, MESSAGE_ATTRIBUTES_TO_RECEIVE); + } + + public void markMessageAsProcessed(QueueMessage queueMessage) throws QueueException { + sqsQueueService.deleteMessage(this.queueUrl, queueMessage.getReceiptHandle()); + } + + public void scheduleMessageForRetry(QueueMessage queueMessage) throws QueueException { + sqsQueueService.deferMessage(this.queueUrl, queueMessage.getReceiptHandle(), failedMessageRetryDelayInSeconds); + } +} + diff --git a/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/sqs/SqsQueueService.java b/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/sqs/SqsQueueService.java new file mode 100644 index 00000000..47bda8b9 --- /dev/null +++ b/queue-java-aws-sdk-v2/src/main/java/uk/gov/service/payments/commons/queue/sqs/SqsQueueService.java @@ -0,0 +1,120 @@ +package uk.gov.service.payments.commons.queue.sqs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; +import software.amazon.awssdk.services.sqs.model.SqsException; +import uk.gov.service.payments.commons.queue.exception.QueueException; +import uk.gov.service.payments.commons.queue.model.QueueMessage; + +import java.util.List; + +public class SqsQueueService { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private SqsClient sqsClient; + + private final int messageMaximumWaitTimeInSeconds; + private final int messageMaximumBatchSize; + + public SqsQueueService(SqsClient sqsClient, int messageMaximumWaitTimeInSeconds, int messageMaximumBatchSize) { + this.sqsClient = sqsClient; + this.messageMaximumWaitTimeInSeconds = messageMaximumWaitTimeInSeconds; + this.messageMaximumBatchSize = messageMaximumBatchSize; + } + + public QueueMessage sendMessage(String queueUrl, String messageBody) throws QueueException { + SendMessageRequest sendMessageRequest = SendMessageRequest.builder() + .queueUrl(queueUrl) + .messageBody(messageBody) + .build(); + + try { + return sendMessage(sendMessageRequest); + } catch (SqsException e) { + throw new QueueException(e.getMessage()); + } + } + + + public QueueMessage sendMessage(String queueUrl, String messageBody, int delayInSeconds) throws QueueException { + SendMessageRequest sendMessageRequest = SendMessageRequest.builder() + .queueUrl(queueUrl) + .messageBody(messageBody) + .delaySeconds(delayInSeconds) + .build(); + return sendMessage(sendMessageRequest); + } + + private QueueMessage sendMessage(SendMessageRequest sendMessageRequest) throws QueueException { + try { + SendMessageResponse sendMessageResult = sqsClient.sendMessage(sendMessageRequest); + + logger.info("Message sent to SQS queue - {}", sendMessageResult); + return QueueMessage.of(sendMessageResult, sendMessageRequest.messageBody()); + } catch (SqsException | UnsupportedOperationException e) { + logger.error("Failed sending message to SQS queue - {}", e.getMessage()); + throw new QueueException(e.getMessage()); + } + } + + public List receiveMessages(String queueUrl, String messageAttributeName) throws QueueException { + try { + ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() + .queueUrl(queueUrl) + .messageAttributeNames(messageAttributeName) + .waitTimeSeconds(messageMaximumWaitTimeInSeconds) + .maxNumberOfMessages(messageMaximumBatchSize) + .build(); + + ReceiveMessageResponse receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest); + + return QueueMessage.of(receiveMessageResult); + } catch (SqsException | UnsupportedOperationException e) { + logger.error("Failed to receive messages from SQS queue - {}", e.getMessage()); + throw new QueueException(e.getMessage()); + } + } + + public DeleteMessageResponse deleteMessage(String queueUrl, String messageReceiptHandle) throws QueueException { + try { + DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() + .queueUrl(queueUrl) + .receiptHandle(messageReceiptHandle) + .build(); + return sqsClient.deleteMessage(deleteMessageRequest); + } catch (SqsException | UnsupportedOperationException e) { + logger.error("Failed to delete message from SQS queue - {}", e.getMessage()); + throw new QueueException(e.getMessage()); + } catch (AwsServiceException e) { + logger.error("Failed to delete message from SQS queue - [errorMessage={}] [awsErrorCode={}]", e.getMessage(), e.awsErrorDetails().errorCode()); + String errorMessage = String.format("%s [%s]", e.getMessage(), e.awsErrorDetails().errorCode()); + throw new QueueException(errorMessage); + } + } + + public ChangeMessageVisibilityResponse deferMessage(String queueUrl, String messageReceiptHandle, int timeoutInSeconds) throws QueueException { + try { + ChangeMessageVisibilityRequest changeVisibilityRequest = ChangeMessageVisibilityRequest.builder() + .queueUrl(queueUrl) + .receiptHandle(messageReceiptHandle) + .visibilityTimeout(timeoutInSeconds) + .build(); + + return sqsClient.changeMessageVisibility(changeVisibilityRequest); + } catch (SqsException | UnsupportedOperationException e) { + logger.error("Failed to defer message from SQS queue - {}", e.getMessage()); + throw new QueueException(e.getMessage()); + } + } +} diff --git a/queue-java-aws-sdk-v2/src/test/java/uk/gov/service/payments/commons/queue/sqs/SqsQueueServiceTest.java b/queue-java-aws-sdk-v2/src/test/java/uk/gov/service/payments/commons/queue/sqs/SqsQueueServiceTest.java new file mode 100644 index 00000000..384a54a6 --- /dev/null +++ b/queue-java-aws-sdk-v2/src/test/java/uk/gov/service/payments/commons/queue/sqs/SqsQueueServiceTest.java @@ -0,0 +1,156 @@ +package uk.gov.service.payments.commons.queue.sqs; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.LoggingEvent; +import ch.qos.logback.core.Appender; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; +import software.amazon.awssdk.services.sqs.model.SqsException; +import uk.gov.service.payments.commons.queue.exception.QueueException; +import uk.gov.service.payments.commons.queue.model.QueueMessage; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SqsQueueServiceTest { + + private static final String QUEUE_URL = "http://queue-url"; + private static final String MESSAGE = "{chargeId: 123}"; + private static final String MESSAGE_ATTRIBUTE_NAME = "All"; + + @Mock + private SqsClient mockSqsClient; + @Mock + private Appender mockAppender; + + private ArgumentCaptor loggingEventArgumentCaptor = ArgumentCaptor.forClass(LoggingEvent.class); + + private SqsQueueService sqsQueueService; + + @Before + public void setUp() { + sqsQueueService = new SqsQueueService(mockSqsClient, 20, 10); + + Logger root = (Logger) LoggerFactory.getLogger(SqsQueueService.class); + root.setLevel(Level.INFO); + root.addAppender(mockAppender); + } + + @Test + public void shouldSendMessageToQueueSuccessfully() throws QueueException { + SendMessageResponse sendMessageResult = SendMessageResponse.builder() + .messageId("test-message-id") + .build(); + + SendMessageRequest sendMessageRequest = SendMessageRequest.builder() + .queueUrl(QUEUE_URL) + .messageBody(MESSAGE) + .build(); + + when(mockSqsClient.sendMessage(sendMessageRequest)).thenReturn(sendMessageResult); + + QueueMessage message = sqsQueueService.sendMessage(QUEUE_URL, MESSAGE); + assertEquals("test-message-id", message.getMessageId()); + + verify(mockAppender, times(1)).doAppend(loggingEventArgumentCaptor.capture()); + List logEvents = loggingEventArgumentCaptor.getAllValues(); + + assertThat(logEvents.stream().anyMatch(e -> e.getFormattedMessage().contains("Message sent to SQS queue - SendMessageResponse(MessageId=test-message-id)")), is(true)); + } + + @Test + public void shouldSendMessageWithDelayToQueueSuccessfully() throws QueueException { + SendMessageResponse sendMessageResult = SendMessageResponse.builder() + .messageId("test-message-id") + .build(); + + SendMessageRequest sendMessageRequest = SendMessageRequest.builder() + .queueUrl(QUEUE_URL) + .messageBody(MESSAGE) + .delaySeconds(2) + .build(); + + when(mockSqsClient.sendMessage(sendMessageRequest)).thenReturn(sendMessageResult); + + QueueMessage message = sqsQueueService.sendMessage(QUEUE_URL, MESSAGE, 2); + assertEquals("test-message-id", message.getMessageId()); + + verify(mockAppender, times(1)).doAppend(loggingEventArgumentCaptor.capture()); + List logEvents = loggingEventArgumentCaptor.getAllValues(); + + assertThat(logEvents.stream().anyMatch(e -> e.getFormattedMessage().contains("Message sent to SQS queue - SendMessageResponse(MessageId=test-message-id)")), is(true)); + } + + @Test(expected = QueueException.class) + public void shouldThrowExceptionIfMessageIsNotSentToQueue() throws QueueException { + SendMessageRequest sendMessageRequest = SendMessageRequest.builder() + .queueUrl(QUEUE_URL) + .messageBody(MESSAGE) + .build(); + + when(mockSqsClient.sendMessage(sendMessageRequest)).thenThrow(SqsException.class); + + sqsQueueService.sendMessage(QUEUE_URL, MESSAGE); + } + + @Test + public void shouldReceiveMessagesFromQueueSuccessfully() throws QueueException { + Message message = Message.builder() + .messageId("test-message-id") + .receiptHandle("test-receipt-handle") + .body("test-message-body") + .build(); + + ReceiveMessageResponse receiveMessageResult = ReceiveMessageResponse.builder() + .messages(message) + .build(); + + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); + + List queueMessages = sqsQueueService.receiveMessages(QUEUE_URL, MESSAGE_ATTRIBUTE_NAME); + Assert.assertThat(queueMessages.size(), is(1)); + Assert.assertThat(queueMessages.get(0).getMessageId(), is("test-message-id")); + Assert.assertThat(queueMessages.get(0).getReceiptHandle(), is("test-receipt-handle")); + Assert.assertThat(queueMessages.get(0).getMessageBody(), is("test-message-body")); + } + + @Test + public void shouldReturnEmptyListWhenReceiveDoesNotReturnAnyMessages() throws QueueException { + ReceiveMessageResponse receiveMessageResult = ReceiveMessageResponse.builder() + .build(); + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); + + List queueMessages = sqsQueueService.receiveMessages(QUEUE_URL, MESSAGE_ATTRIBUTE_NAME); + assertTrue(queueMessages.isEmpty()); + } + + @Test(expected = QueueException.class) + public void shouldThrowExceptionIfMessageCannotBeReceivedFromQueue() throws QueueException { + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(SqsException.class); + + sqsQueueService.receiveMessages(QUEUE_URL, MESSAGE_ATTRIBUTE_NAME); + } +}