-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Storage] Queue message encoding #19328
Conversation
This pull request is protected by Check Enforcer. What is Check Enforcer?Check Enforcer helps ensure all pull requests are covered by at least one check-run (typically an Azure Pipeline). When all check-runs associated with this pull request pass then Check Enforcer itself will pass. Why am I getting this message?You are getting this message because Check Enforcer did not detect any check-runs being associated with this pull request within five minutes. This may indicate that your pull request is not covered by any pipelines and so Check Enforcer is correctly blocking the pull request being merged. What should I do now?If the check-enforcer check-run is not passing and all other check-runs associated with this PR are passing (excluding license-cla) then you could try telling Check Enforcer to evaluate your pull request again. You can do this by adding a comment to this pull request as follows: What if I am onboarding a new service?Often, new services do not have validation pipelines associated with them, in order to bootstrap pipelines for a new service, you can issue the following command as a pull request comment: |
} catch (RuntimeException ex) { | ||
return monoError(logger, ex); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This can be removed as the method being called into handles this
@@ -586,26 +638,81 @@ public HttpPipeline getHttpPipeline() { | |||
public Mono<Response<SendMessageResult>> sendMessageWithResponse(String messageText, Duration visibilityTimeout, | |||
Duration timeToLive) { | |||
try { | |||
return withContext(context -> sendMessageWithResponse(messageText, visibilityTimeout, timeToLive, context)); | |||
BinaryData message = messageText == null ? null : BinaryData.fromString(messageText); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When messageText
is empty do we want null BinaryData
or an empty BinaryData
? BinaryData.fromString
is null safe and will return an empty BinaryData
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fine. In tests I discovered that previously passing "null" resulted in "XML format exception or so" so producing empty binarydata should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other thing is that when message ends up being enqueued on the service side then worst case is service returning empty string at peek/receive.
null, null, | ||
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE)) | ||
.map(response -> new SimpleResponse<>(response, response.getValue().get(0))); | ||
} | ||
|
||
private String encodeMessage(BinaryData message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we turn this into a Mono<String>
and have it return Mono.error
on an invalid input? This will prevent introducing another scenario where we throw instead of return an error in a reactive stream (reducing the likelihood of the try/catch blocks being used in caller APIs)
null) | ||
)); | ||
} else { | ||
throw logger.logExceptionAsError(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: return FluxUtil.monoError(logger, e)
sdk/storage/azure-storage-queue/src/main/java/com/azure/storage/queue/QueueAsyncClient.java
Show resolved
Hide resolved
List<QueueMessageItemInternal> queueMessageInternalItems = response.getValue(); | ||
List<QueueMessageItem> queueMessageItems = Collections.emptyList(); | ||
Mono<Void> mono = Mono.empty(); | ||
if (queueMessageInternalItems != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we inverted this to queueMessageInternalItems == null
-> return empty paged response, I think we could streamline the conversion logic to use a fully reactive structure.
return Flux.fromIterable(queueMessageInternalItems)
.map(/* conversion logic */)
.collectList()
.map(/* create paged response */);
queueMessageItems.add(transformQueueMessageItemInternal(queueMessageItemInternal, messageEncoding)); | ||
} catch (IllegalArgumentException e) { | ||
if (messageDecodingFailedHandler != null) { | ||
mono = mono.then(messageDecodingFailedHandler.apply( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this failed handler return an error or just log a message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's customer provided so we expect that error handling is implemented inside that handler. However if they get it wrong we want to surface errors in receive/peek API call result (like we do in .NET).
See Peek with handler exception
test.
*/ | ||
public QueueClientBuilder messageDecodingFailedHandler( | ||
Function<QueueMessageDecodingFailure, Mono<Void>> messageDecodingFailedHandler) { | ||
this.messageDecodingFailedHandler = Objects.requireNonNull(messageDecodingFailedHandler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed in the client code this was null checked and by default this is null in the builder, given that should null be an allowed value?
@@ -41,8 +42,7 @@ | |||
/* | |||
* The content of the Message. | |||
*/ | |||
@JsonProperty(value = "MessageText", required = true) | |||
private String messageText; | |||
private BinaryData body; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the serializer/deserialization logic going to handle this correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PeekedMessageItemInternal
and QueueMessageItemInteral
has been created to handle that part.
The data flow is:
receive from wire -> from XML ->PeekedMessageItemInternal -> none/base64 decoding -> PeekedMessageItem
* unset the value will default to 0 and the message will be instantly visible. The timeout must be between 0 | ||
* seconds and 7 days. | ||
* @param timeToLive Optional. How long the message will stay alive in the queue. If unset the value will default to | ||
* 7 days, if -1 is passed the message will not expire. The time to live must be -1 or any positive number. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since timeToLive
is of type Duration
, setting -1 is not an option. It might be better to make it clear how to set this - Duration.ofDays(-1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually it's -1 seconds.
* @return the updated QueueClientBuilder object | ||
* @throws NullPointerException If {@code messageDecodingFailedHandler} is {@code null}. | ||
*/ | ||
public QueueClientBuilder messageDecodingFailedHandler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have used process*
for handlers. So, here we would name this as processMessageDecodingError()
. Also, does this need to be a Function<>
? Can this instead be a Consumer<QueueMessageDecodingFailure>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline, will provide both sync and async handler.
/** | ||
* Determines how queue message body is represented in HTTP requests and responses. | ||
*/ | ||
public enum QueueMessageEncoding { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we plan on adding more encoding formats? If so, this should be ExpandableStringEnum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlikely. Besides per https://azure.github.io/azure-sdk/java_introduction.html#enumerations I don't think this is a case for ExpandableStringEnum . The message encoding is client side concept and we don't expect neither service nor user to provide values that are unknown at given SDK version. Therefore I think normal enum is good fit.
*/ | ||
public QueueServiceClientBuilder messageDecodingFailedHandler( | ||
Function<QueueMessageDecodingFailure, Mono<Void>> messageDecodingFailedHandler) { | ||
this.messageDecodingFailedHandler = Objects.requireNonNull(messageDecodingFailedHandler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to Alan's comment above, this should allow setting null
.
* @throws NullPointerException If {@code messageDecodingFailedHandler} is {@code null}. | ||
*/ | ||
public QueueServiceClientBuilder messageDecodingFailedHandler( | ||
Function<QueueMessageDecodingFailure, Mono<Void>> messageDecodingFailedHandler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer<QueueMessageDecodingFailure>
* @return the updated QueueServiceClientBuilder object | ||
* @throws NullPointerException If {@code messageDecodingFailedHandler} is {@code null}. | ||
*/ | ||
public QueueServiceClientBuilder messageDecodingFailedHandler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
processMessageDecodingError
* Contains information about message that could not be decoded. | ||
*/ | ||
@Immutable | ||
public class QueueMessageDecodingFailure { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this final
. Also, consider naming this type as QueueMessageDecodingError
.
} catch (IllegalArgumentException e) { | ||
if (messageDecodingFailedHandler != null) { | ||
mono = mono.then(messageDecodingFailedHandler.apply( | ||
new QueueMessageDecodingFailure( | ||
this, | ||
null, | ||
transformPeekedMessageItemInternal( | ||
peekedMessageItemInternal, QueueMessageEncoding.NONE)) | ||
)); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When calling the user's error handler, we should include the exception information that was thrown when decoding.
public QueueMessageDecodingFailure( | ||
QueueAsyncClient queueAsyncClient, | ||
QueueMessageItem queueMessageItem, | ||
PeekedMessageItem peekedMessageItem) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider including the exception that was thrown when decoding here, so the customer can actually take action based on the exception.
sdk/storage/azure-storage-queue/src/main/java/com/azure/storage/queue/QueueAsyncClient.java
Show resolved
Hide resolved
sdk/storage/azure-storage-queue/src/main/java/com/azure/storage/queue/QueueAsyncClient.java
Show resolved
Hide resolved
sdk/storage/azure-storage-queue/src/main/java/com/azure/storage/queue/QueueAsyncClient.java
Show resolved
Hide resolved
peekedMessageInternalItems = Collections.emptyList(); | ||
} | ||
return Flux.fromIterable(peekedMessageInternalItems) | ||
.flatMap(peekedMessageItemInternal -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flatMap
can potentially mess up the order of peeked messages. If the order is important, use flatMapSequential
or concatMap
instead.
Resolves #17460
APIView
This is a port from .NET SDK see:
Changelog
QueueMessageEncoding
QueueClientOptions.MessageEncoding
Badly encoded message handler - Problem statement
Badly encoded message handler - .NET implementation
PR1
PR2