Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Queue message encoding #19328

Merged
merged 25 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions sdk/storage/azure-storage-queue/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,28 @@

## 12.9.0-beta.2 (Unreleased)

### Support for binary data, custom shapes and Base64 encoding
This release adds a convinient way to send and receive binary data and custom shapes as a payload.
Additionally, support for Base64 encoding in HTTP requests and reponses has been added that makes interoperability with V11 and prior Storage SDK easier to implement.

The `QueueClient.sendMessage` and `QueueAsyncClient.sendMessage` consume `com.azure.core.util.BinaryData` in addition to `String`.
`QueueMessageItem` and `PeekedMessageItem` expose new property `getBody()` of `com.azure.core.util.BinaryData` type to access message payload and should be used instead of `getMessageText()`.

See [BinaryData](https://docs.microsoft.com/java/api/com.azure.core.util.binarydata?view=azure-java-stable) for more information about handling `String`, binary data and custom shapes.

#### Receiving message as string
Before:
```java
QueueMessageItem message = queueClient.receiveMessage();
String messageText = message.getMessageText();
```

After:
```java
QueueMessageItem message = queueClient.receiveMessage();
BinaryData body = message.getBody();
String messageText = body.toString();
```

## 12.9.0-beta.1 (2021-02-10)
- Added support for the 2020-06-12 service version.
Expand Down
4 changes: 2 additions & 2 deletions sdk/storage/azure-storage-queue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(S
// @param key: The key with which the specified value should be associated.
// @param value: The value to be associated with the specified key.
queueClient.peekMessages(5, Duration.ofSeconds(1), new Context(key, value)).forEach(message -> {
System.out.println(message.getMessageText());
System.out.println(message.getBody().toString());
});
```

Expand All @@ -378,7 +378,7 @@ QueueClient queueClient = new QueueClientBuilder().endpoint(queueURL).sasToken(S
.buildClient();
// Try to receive 10 mesages: Maximum number of messages to get
queueClient.receiveMessages(10).forEach(message -> {
System.out.println(message.getMessageText());
System.out.println(message.getBody().toString());
});
```

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.Response;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.implementation.StorageImplUtils;
Expand Down Expand Up @@ -69,6 +70,15 @@ public QueueServiceVersion getServiceVersion() {
return client.getServiceVersion();
}

/**
* Gets the message encoding the client is using.
*
* @return the message encoding the client is using.
*/
public QueueMessageEncoding getMessageEncoding() {
return client.getMessageEncoding();
}

/**
* Gets the {@link HttpPipeline} powering this client.
*
Expand Down Expand Up @@ -410,6 +420,29 @@ public SendMessageResult sendMessage(String messageText) {
return sendMessageWithResponse(messageText, null, null, null, Context.NONE).getValue();
}

/**
* Sends a message that has a time-to-live of 7 days and is instantly visible.
*
* <p><strong>Code Samples</strong></p>
*
* <p>Sends a message of "Hello, Azure"</p>
*
* {@codesnippet com.azure.storage.queue.queueClient.sendMessage#BinaryData}
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/put-message">Azure Docs</a>.</p>
*
* @param message Message content
* @return A {@link SendMessageResult} value that contains the {@link SendMessageResult#getMessageId() messageId}
* and {@link SendMessageResult#getPopReceipt() popReceipt} that are used to interact with the message
* and other metadata about the enqueued message.
* @throws QueueStorageException If the queue doesn't exist
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public SendMessageResult sendMessage(BinaryData message) {
return sendMessageWithResponse(message, null, null, null, Context.NONE).getValue();
}

/**
* Sends a message with a given time-to-live and a timeout period where the message is invisible in the queue.
*
Expand All @@ -431,7 +464,8 @@ public SendMessageResult sendMessage(String messageText) {
* unset the value will default to 0 and the message will be instantly visible. The timeout must be between 0
* seconds and 7 days.
* @param timeToLive Optional. How long the message will stay alive in the queue. If unset the value will default to
* 7 days, if -1 is passed the message will not expire. The time to live must be -1 or any positive number.
* 7 days, if {@code Duration.ofSeconds(-1)} is passed the message will not expire.
* The time to live must be {@code Duration.ofSeconds(-1)} or any positive number of seconds.
* @param timeout An optional timeout applied to the operation. If a response is not returned before the timeout
* concludes a {@link RuntimeException} will be thrown.
* @param context Additional context that is passed through the Http pipeline during the service call.
Expand All @@ -446,7 +480,49 @@ public SendMessageResult sendMessage(String messageText) {
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SendMessageResult> sendMessageWithResponse(String messageText, Duration visibilityTimeout,
Duration timeToLive, Duration timeout, Context context) {
Mono<Response<SendMessageResult>> response = client.sendMessageWithResponse(messageText,
Mono<Response<SendMessageResult>> response = client.sendMessageWithResponse(BinaryData.fromString(messageText),
visibilityTimeout, timeToLive, context);
return StorageImplUtils.blockWithOptionalTimeout(response, timeout);
}

/**
* Sends a message with a given time-to-live and a timeout period where the message is invisible in the queue.
*
* <p><strong>Code Samples</strong></p>
*
* <p>Add a message of "Hello, Azure" that has a timeout of 5 seconds</p>
*
* {@codesnippet com.azure.storage.queue.QueueClient.sendMessageWithResponse#BinaryData-Duration-Duration-Duration-Context1}
*
* <p>Add a message of "Goodbye, Azure" that has a time to live of 5 seconds</p>
*
* {@codesnippet com.azure.storage.queue.QueueClient.sendMessageWithResponse#BinaryData-Duration-Duration-Duration-Context2}
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/put-message">Azure Docs</a>.</p>
*
* @param message Message content
* @param visibilityTimeout Optional. The timeout period for how long the message is invisible in the queue. If
* unset the value will default to 0 and the message will be instantly visible. The timeout must be between 0
* seconds and 7 days.
* @param timeToLive Optional. How long the message will stay alive in the queue. If unset the value will default to
* 7 days, if {@code Duration.ofSeconds(-1)} is passed the message will not expire.
* The time to live must be {@code Duration.ofSeconds(-1)} or any positive number of seconds.
* @param timeout An optional timeout applied to the operation. If a response is not returned before the timeout
* concludes a {@link RuntimeException} will be thrown.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A response containing the {@link SendMessageResult} value that contains the
* {@link SendMessageResult#getMessageId() messageId} and
* {@link SendMessageResult#getPopReceipt() popReceipt} that are used to
* interact with the message and other metadata about the enqueued message.
* @throws QueueStorageException If the queue doesn't exist or the {@code visibilityTimeout} or {@code timeToLive}
* are outside of the allowed limits.
* @throws RuntimeException if the operation doesn't complete before the timeout concludes.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SendMessageResult> sendMessageWithResponse(BinaryData message, Duration visibilityTimeout,
Duration timeToLive, Duration timeout, Context context) {
Mono<Response<SendMessageResult>> response = client.sendMessageWithResponse(message,
visibilityTimeout, timeToLive, context);
return StorageImplUtils.blockWithOptionalTimeout(response, timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import com.azure.storage.queue.implementation.AzureQueueStorageImpl;
import com.azure.storage.queue.implementation.AzureQueueStorageImplBuilder;
import com.azure.storage.queue.implementation.util.BuilderHelper;
import com.azure.storage.queue.models.QueueMessageDecodingError;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* This class provides a fluent builder API to help aid the configuration and instantiation of the {@link QueueClient
Expand Down Expand Up @@ -94,6 +99,10 @@ public final class QueueClientBuilder {
private Configuration configuration;
private QueueServiceVersion version;

private QueueMessageEncoding messageEncoding = QueueMessageEncoding.NONE;
private Function<QueueMessageDecodingError, Mono<Void>> processMessageDecodingErrorAsyncHandler;
private Consumer<QueueMessageDecodingError> processMessageDecodingErrorHandler;

/**
* Creates a builder instance that is able to configure and construct {@link QueueClient QueueClients} and {@link
* QueueAsyncClient QueueAsyncClients}.
Expand Down Expand Up @@ -140,6 +149,13 @@ public QueueClient buildClient() {
*/
public QueueAsyncClient buildAsyncClient() {
StorageImplUtils.assertNotNull("queueName", queueName);
if (processMessageDecodingErrorAsyncHandler != null && processMessageDecodingErrorHandler != null) {
throw logger.logExceptionAsError(new IllegalStateException(
"Either processMessageDecodingError or processMessageDecodingAsyncError should be specified"
+ "but not both.")
);
}

QueueServiceVersion serviceVersion = version != null ? version : QueueServiceVersion.getLatest();

HttpPipeline pipeline = (httpPipeline != null) ? httpPipeline : BuilderHelper.buildPipeline(
Expand All @@ -153,7 +169,8 @@ public QueueAsyncClient buildAsyncClient() {
.version(serviceVersion.getVersion())
.buildClient();

return new QueueAsyncClient(azureQueueStorage, queueName, accountName, serviceVersion);
return new QueueAsyncClient(azureQueueStorage, queueName, accountName, serviceVersion,
messageEncoding, processMessageDecodingErrorAsyncHandler, processMessageDecodingErrorHandler);
}

/**
Expand Down Expand Up @@ -387,6 +404,74 @@ public QueueClientBuilder clientOptions(ClientOptions clientOptions) {
return this;
}

/**
* Sets the queue message encoding.
*
* @param messageEncoding {@link QueueMessageEncoding}.
* @return the updated QueueClientBuilder object
* @throws NullPointerException If {@code messageEncoding} is {@code null}.
*/
public QueueClientBuilder messageEncoding(QueueMessageEncoding messageEncoding) {
this.messageEncoding = Objects.requireNonNull(messageEncoding, "'messageEncoding' cannot be null.");
return this;
}

/**
* Sets the asynchronous handler that performs the tasks needed when a message is received or peaked from the queue
* but cannot be decoded.
* <p>
* Such message can be received or peaked when queue is expecting certain {@link QueueMessageEncoding}
* but there's another producer that is not encoding messages in expected way.
* I.e. the queue contains messages with different encoding.
* <p>
* {@link QueueMessageDecodingError} contains {@link QueueAsyncClient} for the queue that has received
* the message as well as {@link QueueMessageDecodingError#getQueueMessageItem()} or
* {@link QueueMessageDecodingError#getPeekedMessageItem()} with raw body, i.e. no decoding will be attempted
* so that body can be inspected as has been received from the queue.
* <p>
* The handler won't attempt to remove the message from the queue. Therefore such handling should be included into
* handler itself.
* <p><strong>Code Samples</strong></p>
*
* {@codesnippet com.azure.storage.queue.QueueClientBuilder#processMessageDecodingErrorAsyncHandler}
*
* @param processMessageDecodingErrorAsyncHandler the handler.
* @return the updated QueueClientBuilder object
*/
public QueueClientBuilder processMessageDecodingErrorAsync(
Function<QueueMessageDecodingError, Mono<Void>> processMessageDecodingErrorAsyncHandler) {
this.processMessageDecodingErrorAsyncHandler = processMessageDecodingErrorAsyncHandler;
return this;
}

/**
* Sets the handler that performs the tasks needed when a message is received or peaked from the queue
* but cannot be decoded.
* <p>
* Such message can be received or peaked when queue is expecting certain {@link QueueMessageEncoding}
* but there's another producer that is not encoding messages in expected way.
* I.e. the queue contains messages with different encoding.
* <p>
* {@link QueueMessageDecodingError} contains {@link QueueAsyncClient} for the queue that has received
* the message as well as {@link QueueMessageDecodingError#getQueueMessageItem()} or
* {@link QueueMessageDecodingError#getPeekedMessageItem()} with raw body, i.e. no decoding will be attempted
* so that body can be inspected as has been received from the queue.
* <p>
* The handler won't attempt to remove the message from the queue. Therefore such handling should be included into
* handler itself.
* <p><strong>Code Samples</strong></p>
*
* {@codesnippet com.azure.storage.queue.QueueClientBuilder#processMessageDecodingErrorHandler}
*
* @param processMessageDecodingErrorHandler the handler.
* @return the updated QueueClientBuilder object
*/
public QueueClientBuilder processMessageDecodingError(
Consumer<QueueMessageDecodingError> processMessageDecodingErrorHandler) {
this.processMessageDecodingErrorHandler = processMessageDecodingErrorHandler;
return this;
}

/**
* Sets the {@link QueueServiceVersion} that is used when making API requests.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// Code generated by Microsoft (R) AutoRest Code Generator.

package com.azure.storage.queue;

/**
* Determines how queue message body is represented in HTTP requests and responses.
*/
public enum QueueMessageEncoding {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan on adding more encoding formats? If so, this should be ExpandableStringEnum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlikely. Besides per https://azure.github.io/azure-sdk/java_introduction.html#enumerations I don't think this is a case for ExpandableStringEnum . The message encoding is client side concept and we don't expect neither service nor user to provide values that are unknown at given SDK version. Therefore I think normal enum is good fit.

/**
* The queue message body is represented verbatim in HTTP requests and responses. I.e. message is not transformed.
*/
NONE,

/**
* The queue message body is represented as Base64 encoded string in HTTP requests and responses.
* <p>
* This was the default behavior in the prior v8 and v11 library.
* Using this option can make interop with an existing application easier.
*/
BASE64
}
Loading