From f0f987b8365e59436bcac189f66b59395088d3c6 Mon Sep 17 00:00:00 2001
From: Connie Yau
Date: Mon, 6 Apr 2020 11:44:05 -0700
Subject: [PATCH] Fixes issues in auto-* operations (#9860)
* Adding ReceiveAsyncOptions
* Pulling in autocomplete and lock renewal.
* Extract isAutoComplete and maxAutoRenewDuration into class.
* Change UUID to all String.
---
.../servicebus/ReceiveMessageOptions.java | 74 -----
.../messaging/servicebus/ReceiverOptions.java | 37 +++
.../servicebus/ServiceBusClientBuilder.java | 83 +----
.../ServiceBusReceiverAsyncClient.java | 85 ++++--
.../implementation/ManagementChannel.java | 6 +-
.../implementation/MessageLockContainer.java | 9 +-
.../ServiceBusAsyncConsumer.java | 15 +-
.../ServiceBusManagementNode.java | 2 +-
.../ServiceBusMessageProcessor.java | 284 ++++++++++++------
.../ServiceBusReceiveLinkProcessor.java | 16 +-
.../models/ReceiveAsyncOptions.java | 61 ++++
.../ReceiveMessageAndSettleAsyncSample.java | 10 +-
.../servicebus/ReceiveMessageSyncSample.java | 3 -
.../servicebus/ProxyReceiveTest.java | 1 -
.../ServiceBusClientBuilderTest.java | 74 -----
...BusReceiverAsyncClientIntegrationTest.java | 57 ++--
.../ServiceBusReceiverAsyncClientTest.java | 92 ++++--
.../ServiceBusAsyncConsumerTest.java | 13 +-
.../ServiceBusMessageProcessorTest.java | 238 ++++++++++++---
19 files changed, 704 insertions(+), 456 deletions(-)
delete mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiveMessageOptions.java
create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java
create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ReceiveAsyncOptions.java
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiveMessageOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiveMessageOptions.java
deleted file mode 100644
index 9e853e694538..000000000000
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiveMessageOptions.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.messaging.servicebus;
-
-import com.azure.messaging.servicebus.models.ReceiveMode;
-
-import java.time.Duration;
-
-/**
- * Options set when receiving a message.
- */
-class ReceiveMessageOptions {
- private final boolean autoComplete;
- private final ReceiveMode receiveMode;
- private final int prefetchCount;
- private final boolean isLockAutoRenewed;
- private final Duration maxAutoRenewDuration;
-
- ReceiveMessageOptions(boolean autoComplete, ReceiveMode receiveMode, int prefetchCount, boolean isLockAutoRenewed,
- Duration maxAutoRenewDuration) {
- this.autoComplete = autoComplete;
- this.receiveMode = receiveMode;
- this.prefetchCount = prefetchCount;
- this.isLockAutoRenewed = isLockAutoRenewed;
- this.maxAutoRenewDuration = maxAutoRenewDuration;
- }
-
- /**
- * Gets whether or not to autocomplete messages after they have been processed.
- *
- * @return {@code true} if the message should be completed/abandoned; {@code false} otherwise.
- */
- boolean isAutoComplete() {
- return autoComplete;
- }
-
- /**
- * Gets if lock should be automatically renewed.
- *
- * @return {@code true} if the lock should be automatically renewed; {@code false} otherwise.
- */
- boolean isLockAutoRenewed() {
- return isLockAutoRenewed;
- }
-
- /**
- * Gets the receive mode for the message.
- *
- * @return the receive mode for the message.
- */
- ReceiveMode getReceiveMode() {
- return receiveMode;
- }
-
- /**
- * Gets the prefetch count of the receiver.
- *
- * @return The prefetch count of the receiver.
- */
- int getPrefetchCount() {
- return prefetchCount;
- }
-
- /**
- * Gets the maximum duration within which the lock will be renewed automatically. This value should be greater than
- * the longest message lock duration.
- *
- * @return The maximum duration within which the lock will be renewed automatically.
- */
- Duration getMaxAutoRenewDuration() {
- return maxAutoRenewDuration;
- }
-}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java
new file mode 100644
index 000000000000..13cec4511a3f
--- /dev/null
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java
@@ -0,0 +1,37 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.servicebus;
+
+import com.azure.messaging.servicebus.models.ReceiveMode;
+
+/**
+ * Options set when creating a service bus receiver.
+ */
+class ReceiverOptions {
+ private final ReceiveMode receiveMode;
+ private final int prefetchCount;
+
+ ReceiverOptions(ReceiveMode receiveMode, int prefetchCount) {
+ this.receiveMode = receiveMode;
+ this.prefetchCount = prefetchCount;
+ }
+
+ /**
+ * Gets the receive mode for the message.
+ *
+ * @return the receive mode for the message.
+ */
+ ReceiveMode getReceiveMode() {
+ return receiveMode;
+ }
+
+ /**
+ * Gets the prefetch count of the receiver.
+ *
+ * @return The prefetch count of the receiver.
+ */
+ int getPrefetchCount() {
+ return prefetchCount;
+ }
+}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
index 6c55c76feaf3..dbb28b2417f0 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
@@ -39,7 +39,6 @@
import java.net.InetSocketAddress;
import java.net.Proxy;
-import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
@@ -473,60 +472,14 @@ public final class ServiceBusReceiverClientBuilder {
// Using 0 pre-fetch count for both receive modes, to avoid message lock lost exceptions in application
// receiving messages at a slow rate. Applications can set it to a higher value if they need better performance.
private static final int DEFAULT_PREFETCH_COUNT = 1;
- private static final boolean DEFAULT_AUTO_COMPLETE = true;
- private boolean isAutoComplete;
- private Duration maxAutoLockRenewalDuration;
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
- private boolean isLockAutoRenewed;
private String queueName;
private String subscriptionName;
private String topicName;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
private ServiceBusReceiverClientBuilder() {
- isAutoComplete = DEFAULT_AUTO_COMPLETE;
- }
-
- /**
- * Sets whether or not to automatically complete a received message after it has been processed. Only supported
- * when using the asynchronous {@link ServiceBusReceiverAsyncClient receiver client}.
- *
- * @param autoComplete {@code true} to automatically complete a received message after it has been
- * processed; {@code false} otherwise.
- *
- * @return The modified {@link ServiceBusReceiverClientBuilder} object.
- */
- public ServiceBusReceiverClientBuilder isAutoComplete(boolean autoComplete) {
- this.isAutoComplete = autoComplete;
- return this;
- }
-
- /**
- * Sets if lock should be automatically renewed. Only supported when using the asynchronous
- * {@link ServiceBusReceiverAsyncClient receiver client}.
- *
- * @param isLockAutoRenewed {@code true} if the lock should be automatically renewed; {@code false}
- * otherwise.
- *
- * @return The updated {@link ServiceBusReceiverClientBuilder} object.
- */
- public ServiceBusReceiverClientBuilder isLockAutoRenewed(boolean isLockAutoRenewed) {
- this.isLockAutoRenewed = isLockAutoRenewed;
- return this;
- }
-
- /**
- * Sets the maximum duration within which the lock will be renewed automatically. This value should be greater
- * than the longest message lock duration.
- *
- * @param renewalDuration The maximum duration within which the lock will be renewed automatically.
- *
- * @return The modified {@link ServiceBusReceiverClientBuilder} object.
- */
- public ServiceBusReceiverClientBuilder maxAutoLockRenewalDuration(Duration renewalDuration) {
- this.maxAutoLockRenewalDuration = renewalDuration;
- return this;
}
/**
@@ -605,8 +558,8 @@ public ServiceBusReceiverClientBuilder topicName(String topicName) {
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
- * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) queueName()}
- * or {@link #topicName(String) topicName()}, respectively.
+ * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
+ * queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverAsyncClient buildAsyncClient() {
final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
@@ -635,24 +588,12 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
"prefetchCount (%s) cannot be less than 1.", prefetchCount)));
}
- if (isLockAutoRenewed) {
- if (maxAutoLockRenewalDuration == null) {
- throw logger.logExceptionAsError(new IllegalStateException(
- "'maxAutoLockRenewalDuration' is required when 'isLockAutoRenewed' is enabled."));
- } else if (maxAutoLockRenewalDuration.isZero() || maxAutoLockRenewalDuration.isNegative()) {
- throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
- "maxAutoLockRenewalDuration (%s) cannot be less than or equal to a duration of zero.",
- maxAutoLockRenewalDuration)));
- }
- }
-
final MessageLockContainer messageLockContainer = new MessageLockContainer();
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
- final ReceiveMessageOptions receiveMessageOptions = new ReceiveMessageOptions(isAutoComplete, receiveMode,
- prefetchCount, isLockAutoRenewed, maxAutoLockRenewalDuration);
+ final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount);
return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
- entityType, false, receiveMessageOptions, connectionProcessor, tracerProvider,
+ entityType, false, receiverOptions, connectionProcessor, tracerProvider,
messageSerializer, messageLockContainer, ServiceBusClientBuilder.this::onClientClose);
}
@@ -666,21 +607,11 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
- * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) queueName()}
- * or {@link #topicName(String) topicName()}, respectively.
+ * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
+ * queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverClient buildClient() {
- final ServiceBusReceiverAsyncClient client = buildAsyncClient();
-
- if (isLockAutoRenewed) {
- throw logger.logExceptionAsError(new IllegalStateException(
- "Cannot use 'isLockAutoRenewed' when using synchronous client."));
- } else if (isAutoComplete) {
- throw logger.logExceptionAsError(new IllegalStateException(
- "Cannot use 'isAutoComplete' when using synchronous client."));
- }
-
- return new ServiceBusReceiverClient(client, retryOptions.getTryTimeout());
+ return new ServiceBusReceiverClient(buildAsyncClient(), retryOptions.getTryTimeout());
}
}
}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
index eab2790b7499..73aa6200e233 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
@@ -3,9 +3,11 @@
package com.azure.messaging.servicebus;
+import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.MessageSerializer;
@@ -20,11 +22,13 @@
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor;
+import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
@@ -54,8 +58,7 @@
*
* Rate limiting consumption of messages from Service Bus resource
* For message receivers that need to limit the number of messages they receive at a given time, they can use
- * {@link BaseSubscriber#request(long)}.
- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#basesubscriber}
+ * {@link BaseSubscriber#request(long)}.
{@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#basesubscriber}
*
* @see ServiceBusClientBuilder
* @see ServiceBusReceiverClient To communicate with a Service Bus resource using a synchronous client.
@@ -70,29 +73,30 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
private final String entityPath;
private final MessagingEntityType entityType;
private final boolean isSessionEnabled;
- private final ReceiveMessageOptions receiveOptions;
+ private final ReceiverOptions receiverOptions;
private final ServiceBusConnectionProcessor connectionProcessor;
private final TracerProvider tracerProvider;
private final MessageSerializer messageSerializer;
private final int prefetch;
private final ReceiveMode receiveMode;
private final MessageLockContainer messageLockContainer;
+ private final ReceiveAsyncOptions defaultReceiveOptions;
private final Runnable onClientClose;
/**
- * Map containing linkNames and their associated consumers.
- * Key: linkName
- * Value: consumer associated with that linkName.
+ * Map containing linkNames and their associated consumers. Key: linkName Value: consumer associated with that
+ * linkName.
*/
private final ConcurrentHashMap openConsumers = new ConcurrentHashMap<>();
/**
* Creates a receiver that listens to a Service Bus resource.
+ *
* @param fullyQualifiedNamespace The fully qualified domain name for the Service Bus resource.
* @param entityPath The name of the topic or queue.
* @param entityType The type of the Service Bus resource.
* @param isSessionEnabled {@code true} if sessions are enabled; {@code false} otherwise.
- * @param receiveOptions Options when receiving messages.
+ * @param receiverOptions Options when receiving messages.
* @param connectionProcessor The AMQP connection to the Service Bus resource.
* @param tracerProvider Tracer for telemetry.
* @param messageSerializer Serializes and deserializes Service Bus messages.
@@ -100,25 +104,29 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
* @param onClientClose Operation to run when the client completes.
*/
ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType,
- boolean isSessionEnabled, ReceiveMessageOptions receiveOptions,
+ boolean isSessionEnabled, ReceiverOptions receiverOptions,
ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
MessageSerializer messageSerializer, MessageLockContainer messageLockContainer, Runnable onClientClose) {
this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
- this.receiveOptions = Objects.requireNonNull(receiveOptions, "'receiveMessageOptions' cannot be null.");
+ this.receiverOptions = Objects.requireNonNull(receiverOptions, "'receiveMessageOptions' cannot be null.");
this.connectionProcessor = Objects.requireNonNull(connectionProcessor, "'connectionProcessor' cannot be null.");
this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
- this.prefetch = receiveOptions.getPrefetchCount();
- this.receiveMode = receiveOptions.getReceiveMode();
+ this.prefetch = receiverOptions.getPrefetchCount();
+ this.receiveMode = receiverOptions.getReceiveMode();
this.entityType = entityType;
this.isSessionEnabled = isSessionEnabled;
this.messageLockContainer = messageLockContainer;
this.onClientClose = onClientClose;
+
+ this.defaultReceiveOptions = new ReceiveAsyncOptions()
+ .setEnableAutoComplete(true)
+ .setMaxAutoRenewDuration(connectionProcessor.getRetryOptions().getTryTimeout());
}
/**
@@ -346,25 +354,57 @@ public Flux peekBatchAt(int maxMessages, long sequenc
}
/**
- * Receives a stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity.
+ * Receives a stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity and completes them
+ * when they are finished processing.
+ *
+ *
+ * By default, each successfully consumed message is {@link #complete(MessageLockToken) auto-completed} and {@link
+ * #renewMessageLock(MessageLockToken) auto-renewed}. When downstream consumers throw an exception, the
+ * auto-completion feature will {@link #abandon(MessageLockToken) abandon} the message. {@link
+ * #renewMessageLock(MessageLockToken) Auto-renewal} occurs until the {@link AmqpRetryOptions#getTryTimeout()
+ * operation timeout} has elapsed.
+ *
*
* @return A stream of messages from the Service Bus entity.
+ * @throws AmqpException if {@link AmqpRetryOptions#getTryTimeout() operation timeout} has elapsed and
+ * downstream consumers are still processing the message.
*/
public Flux receive() {
+ return receive(defaultReceiveOptions);
+ }
+
+ /**
+ * Receives a stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity with a set of
+ * options. To disable lock auto-renewal, set {@link ReceiveAsyncOptions#setMaxAutoRenewDuration(Duration)
+ * setMaxAutoRenewDuration} to {@link Duration#ZERO} or {@code null}.
+ *
+ * @param options Set of options to set when receiving messages.
+ * @return A stream of messages from the Service Bus entity.
+ * @throws NullPointerException if {@code options} is null.
+ * @throws IllegalArgumentException if {@link ReceiveAsyncOptions#getMaxAutoRenewDuration() max auto-renew
+ * duration} is negative.
+ */
+ public Flux receive(ReceiveAsyncOptions options) {
if (isDisposed.get()) {
return fluxError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "receive")));
}
- if (receiveMode != ReceiveMode.PEEK_LOCK && receiveOptions.isAutoComplete()) {
+ if (Objects.isNull(options)) {
+ return fluxError(logger, new NullPointerException("'options' cannot be null"));
+ } else if (options.getMaxAutoRenewDuration() != null && options.getMaxAutoRenewDuration().isNegative()) {
+ return fluxError(logger, new IllegalArgumentException("'maxAutoRenewDuration' cannot be negative."));
+ }
+
+ if (receiveMode != ReceiveMode.PEEK_LOCK && options.isEnableAutoComplete()) {
return Flux.error(logger.logExceptionAsError(new UnsupportedOperationException(
- "Autocomplete is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.")));
+ "Auto-complete is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.")));
}
// TODO (conniey): This returns the same consumer instance because the entityPath is not unique.
// Python and .NET does not have the same behaviour.
return Flux.usingWhen(
- Mono.fromCallable(() -> getOrCreateConsumer(entityPath)),
+ Mono.fromCallable(() -> getOrCreateConsumer(entityPath, options)),
consumer -> consumer.receive(),
consumer -> {
final String linkName = consumer.getLinkName();
@@ -484,7 +524,7 @@ public void close() {
onClientClose.run();
}
- private Mono isLockTokenValid(UUID lockToken) {
+ private Mono isLockTokenValid(String lockToken) {
final Instant lockedUntilUtc = messageLockContainer.getLockTokenExpiration(lockToken);
if (lockedUntilUtc == null) {
logger.warning("lockToken[{}] is not owned by this receiver.", lockToken);
@@ -527,7 +567,7 @@ private Mono updateDisposition(MessageLockToken message, DispositionStatus
return monoError(logger, new IllegalArgumentException("'message.lockToken' cannot be empty."));
}
- final UUID lockToken = UUID.fromString(message.getLockToken());
+ final String lockToken = message.getLockToken();
final Instant instant = messageLockContainer.getLockTokenExpiration(lockToken);
logger.info("{}: Update started. Disposition: {}. Lock: {}. Expiration: {}",
entityPath, dispositionStatus, lockToken, instant);
@@ -551,7 +591,7 @@ private Mono updateDisposition(MessageLockToken message, DispositionStatus
}));
}
- private ServiceBusAsyncConsumer getOrCreateConsumer(String linkName) {
+ private ServiceBusAsyncConsumer getOrCreateConsumer(String linkName, ReceiveAsyncOptions options) {
return openConsumers.computeIfAbsent(linkName, name -> {
logger.info("{}: Creating consumer for link '{}'", entityPath, linkName);
@@ -566,13 +606,16 @@ private ServiceBusAsyncConsumer getOrCreateConsumer(String linkName) {
})
.repeat();
+ final LinkErrorContext context = new LinkErrorContext(fullyQualifiedNamespace, entityPath, linkName, null);
final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(connectionProcessor.getRetryOptions());
final ServiceBusReceiveLinkProcessor linkMessageProcessor = receiveLink.subscribeWith(
- new ServiceBusReceiveLinkProcessor(prefetch, retryPolicy, connectionProcessor));
+ new ServiceBusReceiveLinkProcessor(prefetch, retryPolicy, connectionProcessor, context));
+ final boolean isAutoLockRenewal = options.getMaxAutoRenewDuration() != null
+ && !options.getMaxAutoRenewDuration().isZero();
return new ServiceBusAsyncConsumer(linkName, linkMessageProcessor, messageSerializer,
- receiveOptions.isAutoComplete(), receiveOptions.isLockAutoRenewed(),
- receiveOptions.getMaxAutoRenewDuration(), connectionProcessor.getRetryOptions(), messageLockContainer,
+ options.isEnableAutoComplete(), isAutoLockRenewal, options.getMaxAutoRenewDuration(),
+ connectionProcessor.getRetryOptions(), messageLockContainer,
this::complete, this::abandon, this::renewMessageLock);
});
}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java
index eccb58d7fc5f..5b551a8e1832 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java
@@ -89,10 +89,12 @@ public class ManagementChannel implements ServiceBusManagementNode {
}
@Override
- public Mono updateDisposition(UUID lockToken, DispositionStatus dispositionStatus, String deadLetterReason,
+ public Mono updateDisposition(String lockToken, DispositionStatus dispositionStatus, String deadLetterReason,
String deadLetterErrorDescription, Map propertiesToModify) {
+
+ final UUID token = UUID.fromString(lockToken);
return isAuthorized(UPDATE_DISPOSITION_OPERATION).then(createRequestResponse.flatMap(channel -> {
- final Message message = createDispositionMessage(new UUID[] {lockToken}, dispositionStatus,
+ final Message message = createDispositionMessage(new UUID[] {token}, dispositionStatus,
null, null, null, channel.getReceiveLinkName());
return channel.sendWithAck(message);
}).flatMap(response -> {
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageLockContainer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageLockContainer.java
index 3e3b55f91361..70ed664c9a3f 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageLockContainer.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageLockContainer.java
@@ -4,14 +4,13 @@
package com.azure.messaging.servicebus.implementation;
import java.time.Instant;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* Container of Message lock and related metadata related.
*/
public class MessageLockContainer {
- private final ConcurrentHashMap lockTokenExpirationMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap lockTokenExpirationMap = new ConcurrentHashMap<>();
/**
* Adds or updates the expiration time on a lock token. If the expiration time in the container is larger than
@@ -23,7 +22,7 @@ public class MessageLockContainer {
* @return The updated value in the container. If the expiration time in the container is larger than
* {@code lockTokenExpiration}, then the current container value is used.
*/
- public Instant addOrUpdate(UUID lockToken, Instant lockTokenExpiration) {
+ public Instant addOrUpdate(String lockToken, Instant lockTokenExpiration) {
return lockTokenExpirationMap.compute(lockToken, (key, existing) -> {
if (existing == null) {
return lockTokenExpiration;
@@ -42,7 +41,7 @@ public Instant addOrUpdate(UUID lockToken, Instant lockTokenExpiration) {
*
* @return An {@link Instant} for when the lock expires or {@code null} if the {@code lockToken} does not exist.
*/
- public Instant getLockTokenExpiration(UUID lockToken) {
+ public Instant getLockTokenExpiration(String lockToken) {
return lockTokenExpirationMap.get(lockToken);
}
@@ -51,7 +50,7 @@ public Instant getLockTokenExpiration(UUID lockToken) {
*
* @param lockToken Token to remove.
*/
- public void remove(UUID lockToken) {
+ public void remove(String lockToken) {
lockTokenExpirationMap.remove(lockToken);
}
}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAsyncConsumer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAsyncConsumer.java
index 96f8ff7eee6c..bedf8caf8ae1 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAsyncConsumer.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAsyncConsumer.java
@@ -5,6 +5,7 @@
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
+import com.azure.messaging.servicebus.MessageLockToken;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import reactor.core.publisher.Flux;
@@ -27,18 +28,20 @@ public class ServiceBusAsyncConsumer implements AutoCloseable {
public ServiceBusAsyncConsumer(String linkName, ServiceBusReceiveLinkProcessor amqpReceiveLinkProcessor,
MessageSerializer messageSerializer, boolean isAutoComplete, boolean autoLockRenewal,
- Duration autoLockRenewalDuration, AmqpRetryOptions retryOptions, MessageLockContainer messageLockContainer,
- Function> onComplete,
- Function> onAbandon,
- Function> onRenewLock) {
+ Duration maxAutoLockRenewDuration, AmqpRetryOptions retryOptions, MessageLockContainer messageLockContainer,
+ Function> onComplete,
+ Function> onAbandon,
+ Function> onRenewLock) {
this.linkName = linkName;
this.amqpReceiveLinkProcessor = amqpReceiveLinkProcessor;
this.messageSerializer = messageSerializer;
+
this.processor = amqpReceiveLinkProcessor
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class))
- .subscribeWith(new ServiceBusMessageProcessor(isAutoComplete, autoLockRenewal, autoLockRenewalDuration,
- retryOptions, messageLockContainer, onComplete, onAbandon, onRenewLock));
+ .subscribeWith(new ServiceBusMessageProcessor(isAutoComplete, autoLockRenewal, maxAutoLockRenewDuration,
+ retryOptions, messageLockContainer, amqpReceiveLinkProcessor.getErrorContext(),
+ onComplete, onAbandon, onRenewLock));
}
public String getLinkName() {
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java
index 9b4bc2b12bfe..5818dd4239e7 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java
@@ -23,7 +23,7 @@ public interface ServiceBusManagementNode extends AutoCloseable {
*
* @return Mono that completes successfully when the message is completed. Otherwise, returns an error.
*/
- Mono updateDisposition(UUID lockToken, DispositionStatus dispositionStatus, String deadLetterReason,
+ Mono updateDisposition(String lockToken, DispositionStatus dispositionStatus, String deadLetterReason,
String deadLetterErrorDescription, Map propertiesToModify);
/**
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessor.java
index 6a1aabd02ac7..01e34beddcf8 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessor.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessor.java
@@ -3,14 +3,22 @@
package com.azure.messaging.servicebus.implementation;
import com.azure.core.amqp.AmqpRetryOptions;
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpErrorContext;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.servicebus.MessageLockToken;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
+import reactor.core.Exceptions;
+import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
+import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
@@ -19,8 +27,8 @@
import java.time.Instant;
import java.util.Deque;
import java.util.Objects;
-import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -34,24 +42,26 @@ class ServiceBusMessageProcessor extends FluxProcessor> completeFunction;
- private final Function> onAbandon;
- private final Function> onRenewLock;
+ private final AmqpErrorContext errorContext;
+ private final Function> completeFunction;
+ private final Function> onAbandon;
+ private final Function> onRenewLock;
private final Deque messageQueue = new ConcurrentLinkedDeque<>();
private final boolean isAutoRenewLock;
private final Duration maxAutoLockRenewal;
private final MessageLockContainer messageLockContainer;
ServiceBusMessageProcessor(boolean isAutoComplete, boolean isAutoRenewLock, Duration maxAutoLockRenewal,
- AmqpRetryOptions retryOptions, MessageLockContainer messageLockContainer,
- Function> completeFunction,
- Function> onAbandon,
- Function> onRenewLock) {
+ AmqpRetryOptions retryOptions, MessageLockContainer messageLockContainer, AmqpErrorContext errorContext,
+ Function> onComplete,
+ Function> onAbandon,
+ Function> onRenewLock) {
super();
this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' cannot be null.");
- this.completeFunction = Objects.requireNonNull(completeFunction, "'completeFunction' cannot be null.");
+ this.errorContext = Objects.requireNonNull(errorContext, "'errorContext' cannot be null.");
+ this.completeFunction = Objects.requireNonNull(onComplete, "'onComplete' cannot be null.");
this.onAbandon = Objects.requireNonNull(onAbandon, "'onAbandon' cannot be null.");
this.onRenewLock = Objects.requireNonNull(onRenewLock, "'onRenewLock' cannot be null.");
this.messageLockContainer = Objects.requireNonNull(messageLockContainer,
@@ -83,7 +93,9 @@ class ServiceBusMessageProcessor extends FluxProcessor REQUESTED =
AtomicLongFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, "requested");
- private volatile Throwable error;
+ volatile Throwable error;
+ static final AtomicReferenceFieldUpdater ERROR =
+ AtomicReferenceFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, Throwable.class, "error");
/**
* Invoked when this subscribes to an upstream publisher.
@@ -135,8 +147,11 @@ public void onError(Throwable throwable) {
return;
}
- error = throwable;
- isDone = true;
+ if (Exceptions.addThrowable(ERROR, this, throwable)) {
+ isDone = true;
+ } else {
+ Operators.onErrorDropped(throwable, currentContext());
+ }
drain();
}
@@ -163,6 +178,7 @@ public void request(long request) {
if (upstream != null) {
upstream.request(request);
}
+
drain();
}
}
@@ -189,12 +205,17 @@ public void dispose() {
drain();
}
+ @Override
+ public boolean isDisposed() {
+ return isDone || isCancelled;
+ }
+
@Override
public void subscribe(CoreSubscriber super ServiceBusReceivedMessage> downstream) {
Objects.requireNonNull(downstream, "'downstream' cannot be null.");
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
- downstream.onSubscribe(this);
this.downstream = downstream;
+ downstream.onSubscribe(this);
if (isCancelled) {
this.downstream = null;
} else {
@@ -226,117 +247,200 @@ private void drainQueue() {
return;
}
- final ServiceBusReceivedMessage lastMessage = messageQueue.peekLast();
- if (lastMessage == null) {
- if (isDone) {
- downstream.onComplete();
+ while (true) {
+ if (messageQueue.isEmpty()) {
+ break;
}
- return;
- }
- ServiceBusReceivedMessage message = messageQueue.poll();
- while (message != lastMessage) {
- if (message == null) {
- logger.warning("The last message is not null, but the head node is null. lastMessage: {}", lastMessage);
- message = messageQueue.poll();
- continue;
- }
+ long amountRequested = REQUESTED.get(this);
+ long emitted = drainRequested(amountRequested);
- if (isCancelled) {
- Operators.onDiscard(message, downstream.currentContext());
- Operators.onDiscardQueueWithClear(messageQueue, downstream.currentContext(), null);
- return;
+ // We emitted the correct number that was requested.
+ // Nothing more requested since.
+ if (REQUESTED.addAndGet(this, -emitted) == 0) {
+ break;
}
- next(message);
-
- message = messageQueue.poll();
+ if (isDone) {
+ break;
+ }
}
- // Emit the message which is equal to lastMessage.
- next(message);
-
if (isDone) {
if (error != null) {
downstream.onError(error);
} else if (messageQueue.peekLast() == null) {
downstream.onComplete();
+ } else {
+ Operators.onDiscardQueueWithClear(messageQueue, downstream.currentContext(), null);
}
+
+ downstream = null;
}
}
- private void next(ServiceBusReceivedMessage message) {
- final long sequenceNumber = message.getSequenceNumber();
- UUID lockToken = null;
- if (!Objects.isNull(message.getLockToken())) {
- lockToken = UUID.fromString(message.getLockToken());
+ /**
+ * Drains the queue of the requested amount and returns the number taken.
+ *
+ * @param numberRequested Number of items requested.
+ *
+ * @return The number of items emitted.
+ */
+ private long drainRequested(long numberRequested) {
+ long numberEmitted = 0L;
+
+ if (numberRequested == 0L) {
+ return numberEmitted;
}
- boolean isCompleteMessage = isAutoComplete && lockToken != null
- && !MessageUtils.ZERO_LOCK_TOKEN.equals(lockToken);
- final Instant initialLockedUntil;
- if (lockToken != null && !MessageUtils.ZERO_LOCK_TOKEN.equals(lockToken)) {
- initialLockedUntil = messageLockContainer.addOrUpdate(lockToken, message.getLockedUntil());
- } else {
- initialLockedUntil = message.getLockedUntil();
+ for (; numberEmitted < numberRequested; numberEmitted++) {
+ if (isDone) {
+ return numberEmitted;
+ }
+
+ final ServiceBusReceivedMessage message = messageQueue.poll();
+ if (message == null) {
+ break;
+ }
+
+ if (isCancelled) {
+ Operators.onDiscard(message, downstream.currentContext());
+ Operators.onDiscardQueueWithClear(messageQueue, downstream.currentContext(), null);
+ break;
+ }
+
+ try {
+ next(message);
+ } catch (Exception e) {
+ setInternalError(e);
+ break;
+ }
}
- logger.info("seq[{}]. lock[{}]. lockedUntil[{}].", sequenceNumber, initialLockedUntil, initialLockedUntil);
-
- final Disposable renewLockSubscription;
- final UUID finalLockToken = lockToken;
- if (isAutoRenewLock) {
- logger.info("seq[{}]. lockToken[{}]. lockedUntil[{}]. Renewing lock every: {}", sequenceNumber, lockToken,
- message.getLockedUntil(), maxAutoLockRenewal);
- renewLockSubscription = Flux.interval(maxAutoLockRenewal)
- .flatMap(interval -> onRenewLock.apply(message))
- .subscribe(lockedUntil -> {
- final Instant updated = messageLockContainer.addOrUpdate(finalLockToken, lockedUntil);
-
- logger.info("seq[{}]. lockToken[{}]. lockedUntil[{}]. Lock renewal successful.",
- sequenceNumber, finalLockToken, updated);
- }, error -> logger.error("Error occurred while renewing lock token.", error),
- () -> logger.info("Renewing lock token task completed."));
- } else {
- renewLockSubscription = Disposables.disposed();
+ return numberEmitted;
+ }
+
+ private void next(ServiceBusReceivedMessage message) {
+ final long sequenceNumber = message.getSequenceNumber();
+ final String lockToken = message.getLockToken();
+ final Instant initialLockedUntil = !CoreUtils.isNullOrEmpty(lockToken)
+ ? messageLockContainer.addOrUpdate(lockToken, message.getLockedUntil())
+ : message.getLockedUntil();
+
+ if (isAutoComplete && CoreUtils.isNullOrEmpty(lockToken)) {
+ throw logger.logExceptionAsError(new IllegalStateException(
+ "Cannot auto-complete message without a lock token on message. Sequence number: " + sequenceNumber));
}
+ final AtomicBoolean hasError = new AtomicBoolean();
+ final Disposable renewLockOperation = getRenewLockOperation(message, initialLockedUntil, hasError);
+
try {
downstream.onNext(message);
} catch (Exception e) {
+ hasError.set(true);
logger.error("Exception occurred while handling downstream onNext operation.", e);
- if (isCompleteMessage) {
+ if (isAutoComplete) {
logger.info("Abandoning message lock: {}", lockToken);
onAbandon.apply(message)
- .onErrorStop()
- .doOnError(error -> logger.warning("Could not abandon message with lock: {}", finalLockToken,
- error))
- .doFinally(signal -> logger.info("lock[{}]. Abandon status: [{}]", finalLockToken, signal))
+ .onErrorContinue((error, item) -> {
+ logger.warning("Could not abandon message with lock: {}", lockToken, error);
+ setInternalError(error);
+ })
+ .doFinally(signal -> logger.info("lock[{}]. Abandon status: [{}]", lockToken, signal))
.block(retryOptions.getTryTimeout());
+ } else {
+ setInternalError(e);
}
-
- downstream.onError(Operators.onOperatorError(upstream, e, message, downstream.currentContext()));
} finally {
- renewLockSubscription.dispose();
+ renewLockOperation.dispose();
}
- try {
- // check that the pending operation is in the queue and not running yet.
- if (isCompleteMessage) {
- logger.info("sequenceNumber[{}]. lock[{}]. Completing message.", sequenceNumber, lockToken);
-
- completeFunction.apply(message)
- .onErrorStop()
- .doOnError(error -> logger.warning("Could not complete message with lock: {}",
- message.getLockToken(), error))
- .doFinally(signal -> logger.info("lock[{}]. Complete status: [{}]", finalLockToken, signal))
- .block(retryOptions.getTryTimeout());
- }
- } catch (Exception e) {
- logger.error("Exception occurred while auto-completing message. Sequence: {}. Lock token: {}",
- sequenceNumber, lockToken, e);
- downstream.onError(Operators.onOperatorError(upstream, e, message, downstream.currentContext()));
+ // An error occurred in downstream.onNext, while abandoning the message,
+ // or timed out while processing. We return.
+ if (hasError.get()) {
+ return;
+ }
+
+ if (isAutoComplete) {
+ logger.info("sequenceNumber[{}]. lock[{}]. Completing message.", sequenceNumber, lockToken);
+
+ completeFunction.apply(message)
+ .onErrorResume(error -> {
+ logger.warning("Could not complete message with lock: {}", lockToken, error);
+ setInternalError(error);
+ return Mono.empty();
+ })
+ .doFinally(signal -> logger.info("lock[{}]. Complete status: [{}]", lockToken, signal))
+ .block(retryOptions.getTryTimeout());
+ }
+ }
+
+ private Disposable getRenewLockOperation(ServiceBusReceivedMessage message, Instant initialLockedUntil,
+ AtomicBoolean hasError) {
+
+ if (!isAutoRenewLock) {
+ return Disposables.disposed();
+ }
+
+ final long sequenceNumber = message.getSequenceNumber();
+ final String lockToken = message.getLockToken();
+
+ if (isAutoComplete && initialLockedUntil == null) {
+ throw logger.logExceptionAsError(new IllegalStateException(
+ "Cannot renew lock token without a value for 'message.getLockedUntil()'"));
+ }
+
+ final Duration initialInterval = Duration.between(Instant.now(), initialLockedUntil);
+
+ logger.info("lock[{}]. lockedUntil[{}]. interval[{}]", lockToken, initialLockedUntil, initialInterval);
+
+ final EmitterProcessor emitterProcessor = EmitterProcessor.create();
+ final FluxSink sink = emitterProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
+
+ // Adjust the interval, so we can buffer time for the time it'll take to refresh.
+ sink.next(MessageUtils.adjustServerTimeout(initialInterval));
+
+ final Disposable timeoutOperation = Mono.delay(maxAutoLockRenewal)
+ .subscribe(l -> {
+ if (!sink.isCancelled()) {
+ sink.error(new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR,
+ "Could not complete within renewal time. Max renewal time: " + maxAutoLockRenewal,
+ errorContext));
+ }
+ });
+
+ final Disposable renewLockSubscription = Flux.switchOnNext(emitterProcessor.map(i -> Flux.interval(i)))
+ .flatMap(delay -> onRenewLock.apply(message))
+ .map(instant -> {
+ final Instant updated = messageLockContainer.addOrUpdate(lockToken, instant);
+ final Duration next = Duration.between(Instant.now(), updated);
+ logger.info("lockToken[{}]. given[{}]. updated[{}]. Next renewal: [{}]",
+ lockToken, instant, updated, next);
+
+ sink.next(MessageUtils.adjustServerTimeout(next));
+ return updated;
+ })
+ .subscribe(lockedUntil -> {
+ logger.verbose("seq[{}]. lockToken[{}]. lockedUntil[{}]. Lock renewal successful.",
+ sequenceNumber, lockToken, lockedUntil);
+ },
+ error -> {
+ logger.error("Error occurred while renewing lock token.", error);
+ hasError.set(true);
+ setInternalError(error);
+ },
+ () -> logger.info("Renewing lock token task completed."));
+
+ return Disposables.composite(renewLockSubscription, timeoutOperation);
+ }
+
+ private void setInternalError(Throwable error) {
+ if (Exceptions.addThrowable(ERROR, this, error)) {
+ isDone = true;
+ } else {
+ Operators.onErrorDropped(error, downstream.currentContext());
}
}
}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java
index 7a2b6e872abf..bbcbd9dc62b2 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java
@@ -5,6 +5,7 @@
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
+import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.message.Message;
@@ -41,6 +42,7 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor downstream;
@@ -60,9 +62,12 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<>" will be the name of the Service Bus queue instance you created
// inside the Service Bus namespace.
-
ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.receiveMode(ReceiveMode.PEEK_LOCK)
- .isLockAutoRenewed(true)
.queueName("<>")
- .isAutoComplete(false)
- .maxAutoLockRenewalDuration(Duration.ofSeconds(2))
.buildAsyncClient();
Disposable subscription = receiverAsyncClient.receive()
.flatMap(message -> {
- boolean messageProcessed = false;
+ boolean messageProcessed = false;
+
// Process the message here.
// Change the `messageProcessed` according to you business logic and if you are able to process the
// message successfully.
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java
index 0b6c2791c6f8..5d79ea5ab879 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java
@@ -5,8 +5,6 @@
import com.azure.core.util.IterableStream;
-import java.time.Duration;
-
/**
* Sample demonstrates how to receive a batch of {@link ServiceBusReceivedMessage} from an Azure Service Bus Queue
* using sync client.
@@ -36,7 +34,6 @@ public static void main(String[] args) {
.connectionString(connectionString)
.receiver()
.queueName("<>")
- .maxAutoLockRenewalDuration(Duration.ofSeconds(2))
.buildClient();
final IterableStream receivedMessages = receiverClient.receive(5);
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java
index 4b995d6a5b8d..37ca189209da 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java
@@ -87,7 +87,6 @@ public void testReceiverStartOfStreamFilters() {
final ServiceBusReceiverAsyncClient receiver = builder.receiver()
.receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
.queueName(queueName)
- .isAutoComplete(false)
.buildAsyncClient();
// Act & Assert
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java
index 1e3c49034d12..609b0975b3f8 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java
@@ -18,7 +18,6 @@
import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
-import java.time.Duration;
import java.util.Locale;
import java.util.stream.Stream;
@@ -149,51 +148,6 @@ void throwsWhenSubscriptionNameNotSet() {
assertThrows(IllegalStateException.class, receiverBuilder::buildAsyncClient);
}
- private static Stream cannotCreateAutoSyncReceivers() {
- return Stream.of(
- Arguments.of(true, false),
- Arguments.of(false, true)
- );
- }
-
- /**
- * Throws when auto-renewal or auto-complete is set on the sync receiver.
- */
- @ParameterizedTest
- @MethodSource
- void cannotCreateAutoSyncReceivers(boolean isAutoComplete, boolean isAutoRenew) {
- // Arrange
- final ServiceBusReceiverClientBuilder receiverBuilder = new ServiceBusClientBuilder()
- .connectionString(NAMESPACE_CONNECTION_STRING)
- .receiver()
- .topicName("baz").subscriptionName("bar")
- .receiveMode(ReceiveMode.PEEK_LOCK)
- .isAutoComplete(isAutoComplete)
- .isLockAutoRenewed(isAutoRenew)
- .maxAutoLockRenewalDuration(Duration.ofSeconds(10));
-
- // Act & Assert
- assertThrows(IllegalStateException.class, receiverBuilder::buildClient);
- }
-
- /**
- * Throws when auto-renewal is set, we also need a duration.
- */
- @Test
- void cannotAutoRenewLockWithoutDuration() {
- // Arrange
- final ServiceBusReceiverClientBuilder receiverBuilder = new ServiceBusClientBuilder()
- .connectionString(NAMESPACE_CONNECTION_STRING)
- .receiver()
- .topicName("baz").subscriptionName("bar")
- .receiveMode(ReceiveMode.PEEK_LOCK)
- .isAutoComplete(false)
- .isLockAutoRenewed(true);
-
- // Act & Assert
- assertThrows(IllegalStateException.class, receiverBuilder::buildAsyncClient);
- }
-
/**
* Throws when the prefetch is less than 1.
*/
@@ -205,40 +159,12 @@ void invalidPrefetch() {
.receiver()
.topicName("baz").subscriptionName("bar")
.receiveMode(ReceiveMode.PEEK_LOCK)
- .isAutoComplete(true)
.prefetchCount(0);
// Act & Assert
assertThrows(IllegalArgumentException.class, receiverBuilder::buildAsyncClient);
}
- private static Stream cannotAutoRenewLockWithInvalidDuration() {
- return Stream.of(
- Arguments.of(Duration.ZERO),
- Arguments.of(Duration.ofSeconds(-1))
- );
- }
-
- /**
- * Throws when auto-renewal is set, we also need a positive duration.
- */
- @ParameterizedTest
- @MethodSource
- void cannotAutoRenewLockWithInvalidDuration(Duration duration) {
- // Arrange
- final ServiceBusReceiverClientBuilder receiverBuilder = new ServiceBusClientBuilder()
- .connectionString(NAMESPACE_CONNECTION_STRING)
- .receiver()
- .topicName("baz").subscriptionName("bar")
- .receiveMode(ReceiveMode.PEEK_LOCK)
- .isAutoComplete(false)
- .isLockAutoRenewed(true)
- .maxAutoLockRenewalDuration(duration);
-
- // Act & Assert
- assertThrows(IllegalArgumentException.class, receiverBuilder::buildAsyncClient);
- }
-
private static URI getUri(String endpointFormat, String namespace, String domainName) {
try {
return new URI(String.format(Locale.US, endpointFormat, namespace, domainName));
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java
index 1e3eaf0dcbcc..62e318eeebbb 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java
@@ -4,6 +4,7 @@
package com.azure.messaging.servicebus;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
@@ -25,7 +26,6 @@ class ServiceBusReceiverAsyncClientIntegrationTest extends IntegrationTestBase {
private final ClientLogger logger = new ClientLogger(ServiceBusReceiverAsyncClientIntegrationTest.class);
private ServiceBusReceiverAsyncClient receiver;
- private ServiceBusReceiverAsyncClient receiverManual;
private ServiceBusSenderAsyncClient sender;
ServiceBusReceiverAsyncClientIntegrationTest() {
@@ -41,19 +41,12 @@ protected void beforeTest() {
receiver = createBuilder()
.receiver()
.queueName(queueName)
- .isAutoComplete(true)
- .buildAsyncClient();
-
- receiverManual = createBuilder()
- .receiver()
- .queueName(queueName)
- .isAutoComplete(false)
.buildAsyncClient();
}
@Override
protected void afterTest() {
- dispose(receiver, receiverManual, sender);
+ dispose(receiver, sender);
}
/**
@@ -65,10 +58,11 @@ void receiveTwoMessagesAutoComplete() {
// Arrange
final String messageId = UUID.randomUUID().toString();
final ServiceBusMessage message = TestUtils.getServiceBusMessage(CONTENTS, messageId, 0);
+ final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setEnableAutoComplete(false);
// Assert & Act
StepVerifier.create(sender.send(message).then(sender.send(message))
- .thenMany(receiverManual.receive()))
+ .thenMany(receiver.receive(options)))
.assertNext(receivedMessage ->
Assertions.assertTrue(receivedMessage.getProperties().containsKey(MESSAGE_TRACKING_ID)))
.assertNext(receivedMessage ->
@@ -85,9 +79,10 @@ void receiveMessageAutoComplete() {
// Arrange
final String messageId = UUID.randomUUID().toString();
final ServiceBusMessage message = TestUtils.getServiceBusMessage(CONTENTS, messageId, 0);
+ final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setEnableAutoComplete(false);
// Assert & Act
- StepVerifier.create(sender.send(message).thenMany(receiverManual.receive()))
+ StepVerifier.create(sender.send(message).thenMany(receiver.receive(options)))
.assertNext(receivedMessage ->
Assertions.assertTrue(receivedMessage.getProperties().containsKey(MESSAGE_TRACKING_ID)))
.thenCancel()
@@ -140,7 +135,7 @@ void scheduleMessage() {
* Verifies that we can cancel a scheduled message.
*/
@Test
- void cancelScheduleMessage() {
+ void cancelScheduledMessage() {
// Arrange
final String messageId = UUID.randomUUID().toString();
final String contents = "Some-contents";
@@ -151,8 +146,10 @@ void cancelScheduleMessage() {
final Long sequenceNumber = sender.scheduleMessage(message, scheduledEnqueueTime).block();
logger.verbose("Scheduled the message, sequence number {}.", sequenceNumber);
+ Assertions.assertNotNull(sequenceNumber);
+
Mono.delay(delayDuration)
- .then(sender.cancelScheduledMessage(sequenceNumber.longValue()))
+ .then(sender.cancelScheduledMessage(sequenceNumber))
.block();
logger.verbose("Cancelled the scheduled message, sequence number {}.", sequenceNumber);
@@ -223,15 +220,16 @@ void deadLetterMessage() {
// Arrange
final String messageId = UUID.randomUUID().toString();
final ServiceBusMessage message = TestUtils.getServiceBusMessage(CONTENTS, messageId, 0);
+ final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setEnableAutoComplete(false);
final ServiceBusReceivedMessage receivedMessage = sender.send(message)
- .then(receiverManual.receive().next())
+ .then(receiver.receive(options).next())
.block(Duration.ofSeconds(30));
Assertions.assertNotNull(receivedMessage);
// Assert & Act
- StepVerifier.create(receiverManual.deadLetter(receivedMessage))
+ StepVerifier.create(receiver.deadLetter(receivedMessage))
.verifyComplete();
}
@@ -245,23 +243,25 @@ void renewMessageLock() {
final AtomicReference receivedMessage = new AtomicReference<>();
final AtomicReference initialLock = new AtomicReference<>();
+ final ReceiveAsyncOptions options = new ReceiveAsyncOptions()
+ .setEnableAutoComplete(false)
+ .setMaxAutoRenewDuration(null);
// Blocking here because it is not part of the scenario we want to test.
- sender.send(message).block(Duration.ofSeconds(20));
+ sender.send(message).block(TIMEOUT);
+ ServiceBusReceivedMessage m = receiver.receive(options).next().block(TIMEOUT);
+ Assertions.assertNotNull(m);
+ Assertions.assertNotNull(m.getLockedUntil());
+ receivedMessage.set(m);
+ initialLock.set(m.getLockedUntil());
// Assert & Act
- StepVerifier.create(
- receiverManual.receive().take(1).map(m -> {
- Assertions.assertNotNull(m.getLockedUntil());
- receivedMessage.set(m);
- initialLock.set(m.getLockedUntil());
- return m;
- }).then(Mono.delay(Duration.ofSeconds(10))
- .then(Mono.defer(() -> receiverManual.renewMessageLock(receivedMessage.get())))))
+ StepVerifier.create(Mono.delay(Duration.ofSeconds(10))
+ .then(Mono.defer(() -> receiver.renewMessageLock(receivedMessage.get()))))
.assertNext(lockedUntil -> {
Assertions.assertTrue(lockedUntil.isAfter(initialLock.get()),
String.format("Updated lock is not after the initial Lock. updated: [%s]. initial:[%s]",
- lockedUntil, initialLock.get()));
+ lockedUntil, initialLock.get()));
Assertions.assertEquals(receivedMessage.get().getLockedUntil(), lockedUntil);
})
@@ -276,6 +276,9 @@ void autoRenewLockOnReceiveMessage() {
// Arrange
final String messageId = UUID.randomUUID().toString();
final ServiceBusMessage message = getServiceBusMessage(CONTENTS, messageId, 0);
+ final ReceiveAsyncOptions options = new ReceiveAsyncOptions()
+ .setEnableAutoComplete(true)
+ .setMaxAutoRenewDuration(Duration.ofSeconds(2));
// Send the message to verify.
sender.send(message).block(TIMEOUT);
@@ -284,14 +287,12 @@ void autoRenewLockOnReceiveMessage() {
.connectionString(getConnectionString())
.receiver()
.receiveMode(ReceiveMode.PEEK_LOCK)
- .isLockAutoRenewed(true)
.queueName(getQueueName())
- .maxAutoLockRenewalDuration(Duration.ofSeconds(2))
.buildAsyncClient();
try {
// Act & Assert
- StepVerifier.create(receiver.receive())
+ StepVerifier.create(receiver.receive(options))
.assertNext(received -> {
Assertions.assertNotNull(received.getLockedUntil());
Assertions.assertNotNull(received.getLockToken());
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java
index 5b19b960be63..ab4c644a2b78 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java
@@ -21,6 +21,7 @@
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
+import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import org.apache.qpid.proton.message.Message;
import org.junit.jupiter.api.AfterAll;
@@ -69,9 +70,12 @@ class ServiceBusReceiverAsyncClientTest {
private static final String PAYLOAD = "hello";
private static final byte[] PAYLOAD_BYTES = PAYLOAD.getBytes(UTF_8);
private static final int PREFETCH = 5;
- private static final String NAMESPACE = "my-namespace-foo";
+ private static final String NAMESPACE = "my-namespace-foo.net";
private static final String ENTITY_PATH = "queue-name";
private static final MessagingEntityType ENTITY_TYPE = MessagingEntityType.QUEUE;
+ private static final String NAMESPACE_CONNECTION_STRING = String.format(
+ "Endpoint=sb://%s;SharedAccessKeyName=%s;SharedAccessKey=%s",
+ NAMESPACE, "some-name", "something-else");
private final ClientLogger logger = new ClientLogger(ServiceBusReceiverAsyncClientTest.class);
private final String messageTrackingUUID = UUID.randomUUID().toString();
@@ -82,7 +86,7 @@ class ServiceBusReceiverAsyncClientTest {
private ServiceBusConnectionProcessor connectionProcessor;
private ServiceBusReceiverAsyncClient consumer;
- private ReceiveMessageOptions receiveOptions;
+ private ReceiverOptions receiveOptions;
private MessageLockContainer messageContainer;
@Mock
@@ -141,8 +145,7 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRe
.subscribeWith(new ServiceBusConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
connectionOptions.getRetry()));
- receiveOptions = new ReceiveMessageOptions(false, ReceiveMode.PEEK_LOCK, PREFETCH, false,
- Duration.ofSeconds(10));
+ receiveOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH);
messageContainer = new MessageLockContainer();
consumer = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE,
@@ -204,15 +207,17 @@ void receivesNumberOfEvents() {
// Arrange
final int numberOfEvents = 1;
final List messages = getMessages(10);
+ final ReceiveAsyncOptions options = new ReceiveAsyncOptions()
+ .setMaxAutoRenewDuration(Duration.ZERO)
+ .setEnableAutoComplete(false);
ServiceBusReceivedMessage receivedMessage = mock(ServiceBusReceivedMessage.class);
when(receivedMessage.getLockToken()).thenReturn(UUID.randomUUID().toString());
-
when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class)))
.thenReturn(receivedMessage);
// Act & Assert
- StepVerifier.create(consumer.receive().take(numberOfEvents))
+ StepVerifier.create(consumer.receive(options).take(numberOfEvents))
.then(() -> messages.forEach(m -> messageSink.next(m)))
.expectNextCount(numberOfEvents)
.verifyComplete();
@@ -226,8 +231,7 @@ void receivesNumberOfEvents() {
@Test
void receivesAndAutoCompletes() {
// Arrange
- final ReceiveMessageOptions options = new ReceiveMessageOptions(true, ReceiveMode.PEEK_LOCK,
- PREFETCH, false, null);
+ final ReceiverOptions options = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH);
final ServiceBusReceiverAsyncClient consumer2 = new ServiceBusReceiverAsyncClient(
NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, false, options, connectionProcessor,
tracerProvider, messageSerializer, messageContainer, onClientClose);
@@ -253,9 +257,9 @@ void receivesAndAutoCompletes() {
when(connection.getManagementNode(ENTITY_PATH, ENTITY_TYPE))
.thenReturn(Mono.just(managementNode));
- when(managementNode.updateDisposition(eq(lockToken1), eq(DispositionStatus.COMPLETED), isNull(), isNull(), isNull()))
+ when(managementNode.updateDisposition(eq(lockToken1.toString()), eq(DispositionStatus.COMPLETED), isNull(), isNull(), isNull()))
.thenReturn(Mono.empty());
- when(managementNode.updateDisposition(eq(lockToken2), eq(DispositionStatus.COMPLETED), isNull(), isNull(), isNull()))
+ when(managementNode.updateDisposition(eq(lockToken2.toString()), eq(DispositionStatus.COMPLETED), isNull(), isNull(), isNull()))
.thenReturn(Mono.empty());
// Act and Assert
@@ -270,18 +274,16 @@ void receivesAndAutoCompletes() {
.verifyComplete();
logger.info("Verifying assertions.");
- verify(managementNode).updateDisposition(eq(lockToken1), eq(DispositionStatus.COMPLETED), isNull(), isNull(), isNull());
+ verify(managementNode).updateDisposition(eq(lockToken1.toString()), eq(DispositionStatus.COMPLETED), isNull(), isNull(), isNull());
}
-
/**
- * Verifies that if there is no lock token, the message is not completed.
+ * Verifies that if there is no lock token, and auto-complete is requested. It errors.
*/
@Test
- void receivesAndAutoCompleteWithoutLockToken() {
+ void receivesAndAutoCompleteWithoutLockTokenErrors() {
// Arrange
- final ReceiveMessageOptions options = new ReceiveMessageOptions(true, ReceiveMode.PEEK_LOCK,
- PREFETCH, false, null);
+ final ReceiverOptions options = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH);
final ServiceBusReceiverAsyncClient consumer2 = new ServiceBusReceiverAsyncClient(
NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, false, options, connectionProcessor,
tracerProvider, messageSerializer, messageContainer, onClientClose);
@@ -292,6 +294,15 @@ void receivesAndAutoCompleteWithoutLockToken() {
when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage);
when(messageSerializer.deserialize(message2, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage2);
+ final Instant lockedUntil = Instant.now().plusSeconds(30);
+ final Instant lockedUntil2 = Instant.now().plusSeconds(30);
+
+ when(receivedMessage.getLockToken()).thenReturn(null);
+ when(receivedMessage.getLockedUntil()).thenReturn(lockedUntil);
+
+ when(receivedMessage2.getLockToken()).thenReturn(UUID.randomUUID().toString());
+ when(receivedMessage2.getLockedUntil()).thenReturn(lockedUntil2);
+
when(connection.getManagementNode(ENTITY_PATH, ENTITY_TYPE))
.thenReturn(Mono.just(managementNode));
@@ -303,11 +314,9 @@ void receivesAndAutoCompleteWithoutLockToken() {
StepVerifier.create(consumer2.receive().take(2))
.then(() -> {
messageSink.next(message);
- messageSink.next(message2);
})
- .expectNext(receivedMessage)
- .expectNext(receivedMessage2)
- .verifyComplete();
+ .expectError(IllegalStateException.class)
+ .verify();
} finally {
consumer2.close();
}
@@ -348,8 +357,7 @@ void completeNullMessage() {
*/
@Test
void completeInReceiveAndDeleteMode() {
- final ReceiveMessageOptions options = new ReceiveMessageOptions(false,
- ReceiveMode.RECEIVE_AND_DELETE, PREFETCH, false, null);
+ final ReceiverOptions options = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, PREFETCH);
ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH,
MessagingEntityType.QUEUE, false, options, connectionProcessor, tracerProvider,
messageSerializer, messageContainer, onClientClose);
@@ -407,7 +415,7 @@ void peekBatchWithSequenceNumberMessages() {
*/
@Test
void deadLetterWithDescription() {
- final UUID lockToken1 = UUID.randomUUID();
+ final String lockToken1 = UUID.randomUUID().toString();
final String description = "some-dead-letter-description";
final String reason = "dead-letter-reason";
final Map propertiesToModify = new HashMap<>();
@@ -448,9 +456,12 @@ void deadLetterWithDescription() {
@EnumSource(DispositionStatus.class)
void settleMessage(DispositionStatus dispositionStatus) {
// Arrange
- final UUID lockToken1 = UUID.randomUUID();
- final UUID lockToken2 = UUID.randomUUID();
+ final String lockToken1 = UUID.randomUUID().toString();
+ final String lockToken2 = UUID.randomUUID().toString();
final Instant expiration = Instant.now().plus(Duration.ofMinutes(5));
+ final ReceiveAsyncOptions options = new ReceiveAsyncOptions()
+ .setEnableAutoComplete(false)
+ .setMaxAutoRenewDuration(Duration.ZERO);
final MessageWithLockToken message = mock(MessageWithLockToken.class);
final MessageWithLockToken message2 = mock(MessageWithLockToken.class);
@@ -458,9 +469,9 @@ void settleMessage(DispositionStatus dispositionStatus) {
when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage);
when(messageSerializer.deserialize(message2, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage2);
- when(receivedMessage.getLockToken()).thenReturn(lockToken1.toString());
+ when(receivedMessage.getLockToken()).thenReturn(lockToken1);
when(receivedMessage.getLockedUntil()).thenReturn(expiration);
- when(receivedMessage2.getLockToken()).thenReturn(lockToken2.toString());
+ when(receivedMessage2.getLockToken()).thenReturn(lockToken2);
when(receivedMessage2.getLockedUntil()).thenReturn(expiration);
when(connection.getManagementNode(ENTITY_PATH, ENTITY_TYPE))
@@ -473,7 +484,7 @@ void settleMessage(DispositionStatus dispositionStatus) {
// Pretend we receive these before. This is to simulate that so that the receiver keeps track of them in
// the lock map.
- StepVerifier.create(consumer.receive().take(2))
+ StepVerifier.create(consumer.receive(options).take(2))
.then(() -> {
messageSink.next(message);
messageSink.next(message2);
@@ -569,6 +580,31 @@ void callsClientCloseOnce() {
verify(onClientClose).run();
}
+ /**
+ * Tests that invalid options throws and null options.
+ */
+ @Test
+ void receiveIllegalOptions() {
+ // Arrange
+ final ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
+ .connectionString(NAMESPACE_CONNECTION_STRING)
+ .receiver()
+ .topicName("baz").subscriptionName("bar")
+ .receiveMode(ReceiveMode.PEEK_LOCK)
+ .buildAsyncClient();
+ final ReceiveAsyncOptions options = new ReceiveAsyncOptions()
+ .setMaxAutoRenewDuration(Duration.ofSeconds(-1));
+
+ // Act & Assert
+ StepVerifier.create(receiver.receive(options))
+ .expectError(IllegalArgumentException.class)
+ .verify();
+
+ StepVerifier.create(receiver.receive(null))
+ .expectError(NullPointerException.class)
+ .verify();
+ }
+
private List getMessages(int numberOfEvents) {
final Map map = Collections.singletonMap("SAMPLE_HEADER", "foo");
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusAsyncConsumerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusAsyncConsumerTest.java
index dd6b485cad0c..7e49b547305a 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusAsyncConsumerTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusAsyncConsumerTest.java
@@ -6,9 +6,11 @@
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
+import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.servicebus.MessageLockToken;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import org.apache.qpid.proton.message.Message;
import org.junit.jupiter.api.AfterAll;
@@ -63,11 +65,11 @@ class ServiceBusAsyncConsumerTest {
@Mock
private MessageSerializer serializer;
@Mock
- private Function> onComplete;
+ private Function> onComplete;
@Mock
- private Function> onAbandon;
+ private Function> onAbandon;
@Mock
- private Function> onRenewLock;
+ private Function> onRenewLock;
@BeforeAll
static void beforeAll() {
@@ -85,7 +87,8 @@ void setup(TestInfo testInfo) {
MockitoAnnotations.initMocks(this);
linkProcessor = Flux.create(sink -> sink.next(link))
- .subscribeWith(new ServiceBusReceiveLinkProcessor(10, retryPolicy, parentConnection));
+ .subscribeWith(new ServiceBusReceiveLinkProcessor(10, retryPolicy, parentConnection,
+ new AmqpErrorContext("a-namespace")));
when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE)));
@@ -187,7 +190,7 @@ void receiveNoAutoComplete() {
void canDispose() {
// Arrange
final boolean isAutoComplete = false;
- final Function> onComplete = (message) -> {
+ final Function> onComplete = (message) -> {
Assertions.fail("Should not complete");
return Mono.empty();
};
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java
index 4f70324aa8fc..4b7e435a2b4c 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessorTest.java
@@ -3,6 +3,12 @@
package com.azure.messaging.servicebus.implementation;
import com.azure.core.amqp.AmqpRetryOptions;
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpErrorContext;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.amqp.exception.LinkErrorContext;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.servicebus.MessageLockToken;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -16,12 +22,15 @@
import java.time.Duration;
import java.time.Instant;
-import java.util.HashSet;
-import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static com.azure.messaging.servicebus.TestUtils.createMessageSink;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
class ServiceBusMessageProcessorTest {
@@ -35,24 +44,22 @@ class ServiceBusMessageProcessorTest {
private ServiceBusReceivedMessage message4;
@Mock
- private Function> onComplete;
+ private Function> onComplete;
@Mock
- private Function> onAbandon;
+ private Function> onAbandon;
@Mock
- private Function> onRenewLock;
+ private Function> onRenewLock;
+ private final AmqpErrorContext errorContext = new LinkErrorContext("foo", "bar", "link-name", 10);
+ private final ClientLogger logger = new ClientLogger(ServiceBusMessageProcessorTest.class);
private final AmqpRetryOptions retryOptions = new AmqpRetryOptions();
- private final Duration renewDuration = Duration.ofSeconds(10);
private final MessageLockContainer messageContainer = new MessageLockContainer();
@BeforeEach
void setup() {
MockitoAnnotations.initMocks(this);
- when(message1.getLockToken()).thenReturn(UUID.randomUUID().toString());
- when(message2.getLockToken()).thenReturn(UUID.randomUUID().toString());
- when(message3.getLockToken()).thenReturn(UUID.randomUUID().toString());
- when(message4.getLockToken()).thenReturn(UUID.randomUUID().toString());
+ when(onComplete.apply(any())).thenReturn(Mono.empty());
}
@AfterEach
@@ -64,59 +71,222 @@ void teardown() {
* Verifies that all messages are emitted downstream.
*/
@Test
- void emitsAndAutoCompletes() {
+ void autoCompletesNoAutoRenew() {
// Arrange
- final Set expected = new HashSet<>();
- expected.add(message1);
- expected.add(message2);
- expected.add(message3);
- expected.add(message4);
-
final String lock1 = UUID.randomUUID().toString();
final String lock2 = UUID.randomUUID().toString();
final String lock3 = UUID.randomUUID().toString();
final String lock4 = UUID.randomUUID().toString();
+
when(message1.getLockToken()).thenReturn(lock1);
when(message2.getLockToken()).thenReturn(lock2);
when(message3.getLockToken()).thenReturn(lock3);
when(message4.getLockToken()).thenReturn(lock4);
- final Function> onCompleteMethod = (item) -> {
- final boolean removed = expected.remove(item);
- Assertions.assertTrue(removed, "Should have been able to remove item from set.");
- return Mono.empty();
- };
-
final ServiceBusMessageProcessor processor = createMessageSink(message1, message2, message3, message4)
- .subscribeWith(new ServiceBusMessageProcessor(true, false, renewDuration,
- retryOptions, messageContainer, onCompleteMethod, onAbandon, onRenewLock));
+ .subscribeWith(new ServiceBusMessageProcessor(true, false, Duration.ZERO,
+ retryOptions, messageContainer, errorContext, onComplete, onAbandon, onRenewLock));
// Act & Assert
StepVerifier.create(processor)
.expectNext(message1, message2, message3, message4)
.verifyComplete();
- Assertions.assertTrue(expected.isEmpty(), "There should be no more values in the expected set.");
+ verify(onComplete).apply(message1);
+ verify(onComplete).apply(message2);
+ verify(onComplete).apply(message3);
+ verify(onComplete).apply(message4);
}
/**
- * Verifies that all messages are emitted downstream and auto complete is not invoked.
+ * Verifies that all messages are emitted downstream.
*/
@Test
- void emitsDoesNotAutoComplete() {
+ void autoCompletesAndAutoRenews() {
// Arrange
- final Function> onCompleteMethod = (item) -> {
- Assertions.fail("Should not have called complete() method. item:" + item);
- return Mono.empty();
- };
+ final Duration maxRenewDuration = Duration.ofSeconds(60);
+
+ final String lock1 = UUID.randomUUID().toString();
+ final String lock2 = UUID.randomUUID().toString();
+ when(message1.getLockToken()).thenReturn(lock1);
+ when(message1.getLockedUntil()).thenAnswer(invocationOnMock -> Instant.now().plusSeconds(1));
+
+ when(message2.getLockToken()).thenReturn(lock2);
+ when(message2.getLockedUntil()).thenAnswer(invocationOnMock -> Instant.now().plusSeconds(5));
+
+ when(onRenewLock.apply(message1)).thenAnswer(invocationOnMock -> Mono.just(Instant.now().plusSeconds(3)));
+
+ final ServiceBusMessageProcessor processor = createMessageSink(message1, message2)
+ .subscribeWith(new ServiceBusMessageProcessor(true, true, maxRenewDuration,
+ retryOptions, messageContainer, errorContext, onComplete, onAbandon, onRenewLock));
+
+ // Act & Assert
+ StepVerifier.create(processor)
+ .assertNext(m -> {
+ Assertions.assertSame(message1, m);
+
+ logger.info("Now: {}", Instant.now());
+ try {
+ TimeUnit.SECONDS.sleep(8);
+ } catch (InterruptedException ignored) {
+ }
+ logger.info("After: {}", Instant.now());
+ })
+ .expectNext(message2)
+ .verifyComplete();
+
+ verify(onRenewLock, times(3)).apply(message1);
+
+ verify(onComplete).apply(message1);
+ verify(onComplete).apply(message2);
+ }
+ /**
+ * Verifies that all messages are emitted downstream and auto complete is not invoked.
+ */
+ @Test
+ void emitsDoesNotAutoCompleteOrRenew() {
+ // Arrange
final ServiceBusMessageProcessor processor = createMessageSink(message1, message2, message3, message4)
- .subscribeWith(new ServiceBusMessageProcessor(false, false, renewDuration,
- retryOptions, messageContainer, onCompleteMethod, onAbandon, onRenewLock));
+ .subscribeWith(new ServiceBusMessageProcessor(false, false, Duration.ZERO,
+ retryOptions, messageContainer, errorContext, onComplete, onAbandon, onRenewLock));
// Act & Assert
StepVerifier.create(processor)
.expectNext(message1, message2, message3, message4)
.verifyComplete();
+
+ verifyZeroInteractions(onComplete);
+ }
+
+ /**
+ * When the max auto-renewal time has elapsed, we throw an error.
+ */
+ @Test
+ void autoRenewExpires() {
+ // Arrange
+ final Duration maxRenewDuration = Duration.ofSeconds(4);
+ final String lock1 = UUID.randomUUID().toString();
+ final String lock2 = UUID.randomUUID().toString();
+ when(message1.getLockToken()).thenReturn(lock1);
+ when(message1.getLockedUntil()).thenAnswer(invocationOnMock -> Instant.now().plusSeconds(1));
+
+ when(message2.getLockToken()).thenReturn(lock2);
+ when(message2.getLockedUntil()).thenAnswer(invocationOnMock -> Instant.now().plusSeconds(5));
+
+ when(onComplete.apply(any())).thenReturn(Mono.empty());
+
+ when(onRenewLock.apply(message1)).thenAnswer(invocationOnMock -> Mono.just(Instant.now().plusSeconds(7)));
+
+ final ServiceBusMessageProcessor processor = createMessageSink(message1, message2)
+ .subscribeWith(new ServiceBusMessageProcessor(true, true, maxRenewDuration,
+ retryOptions, messageContainer, errorContext, onComplete, onAbandon, onRenewLock));
+
+ // Act & Assert
+ StepVerifier.create(processor)
+ .assertNext(m -> {
+ Assertions.assertSame(message1, m);
+
+ logger.info("Now: {}", Instant.now());
+ try {
+ TimeUnit.SECONDS.sleep(6);
+ } catch (InterruptedException ignored) {
+ }
+ logger.info("After: {}", Instant.now());
+ })
+ .expectErrorSatisfies(error -> {
+ Assertions.assertTrue(error instanceof AmqpException);
+ Assertions.assertEquals(AmqpErrorCondition.TIMEOUT_ERROR, ((AmqpException) error).getErrorCondition());
+ })
+ .verify();
+
+ verify(onRenewLock).apply(message1);
+ verifyZeroInteractions(message2);
+ verifyZeroInteractions(onComplete);
+ }
+
+ /**
+ * When an error occurs in auto-renew lock we stop processing the next items.
+ */
+ @Test
+ void autoRenewOperationErrors() {
+ // Arrange
+ final Duration maxRenewDuration = Duration.ofSeconds(10);
+ final String lock1 = UUID.randomUUID().toString();
+ final String lock2 = UUID.randomUUID().toString();
+ when(message1.getLockToken()).thenReturn(lock1);
+ when(message1.getLockedUntil()).thenAnswer(invocationOnMock -> Instant.now().plusSeconds(1));
+
+ when(message2.getLockToken()).thenReturn(lock2);
+ when(message2.getLockedUntil()).thenAnswer(invocationOnMock -> Instant.now().plusSeconds(5));
+
+ when(onComplete.apply(any())).thenReturn(Mono.empty());
+
+ when(onRenewLock.apply(message1)).thenAnswer(invocationOnMock -> Mono.error(new IllegalArgumentException("Test error occurred.")));
+
+ final ServiceBusMessageProcessor processor = createMessageSink(message1, message2)
+ .subscribeWith(new ServiceBusMessageProcessor(true, true, maxRenewDuration,
+ retryOptions, messageContainer, errorContext, onComplete, onAbandon, onRenewLock));
+
+ // Act & Assert
+ StepVerifier.create(processor)
+ .assertNext(m -> {
+ Assertions.assertSame(message1, m);
+
+ logger.info("Now: {}", Instant.now());
+ try {
+ TimeUnit.SECONDS.sleep(3);
+ } catch (InterruptedException ignored) {
+ }
+ logger.info("After: {}", Instant.now());
+ })
+ .expectErrorSatisfies(error -> {
+ Assertions.assertTrue(error instanceof IllegalArgumentException);
+ })
+ .verify();
+
+ verify(onRenewLock).apply(message1);
+ verifyZeroInteractions(message2);
+ verifyZeroInteractions(onComplete);
+ }
+
+ /**
+ * When an error occurs in complete, we stop processing the next items.
+ */
+ @Test
+ void completeOperationErrors() {
+ // Arrange
+ final Duration maxRenewDuration = Duration.ofSeconds(10);
+ final String lock1 = UUID.randomUUID().toString();
+ final String lock2 = UUID.randomUUID().toString();
+ when(message1.getLockToken()).thenReturn(lock1);
+ when(message1.getLockedUntil()).thenAnswer(invocationOnMock -> Instant.now().plusSeconds(1));
+
+ when(message2.getLockToken()).thenReturn(lock2);
+ when(message2.getLockedUntil()).thenAnswer(invocationOnMock -> Instant.now().plusSeconds(5));
+
+ when(onComplete.apply(message1)).thenAnswer(
+ invocationOnMock -> {
+ return Mono.error(new IllegalArgumentException("Test error occurred."));
+ });
+
+ when(onRenewLock.apply(any())).thenReturn(Mono.empty());
+
+ final ServiceBusMessageProcessor processor = createMessageSink(message1, message2)
+ .subscribeWith(new ServiceBusMessageProcessor(true, true, maxRenewDuration,
+ retryOptions, messageContainer, errorContext, onComplete, onAbandon, onRenewLock));
+
+ // Act & Assert
+ StepVerifier.create(processor)
+ .expectNext(message1)
+ .expectErrorSatisfies(error -> {
+ Assertions.assertTrue(error instanceof IllegalArgumentException);
+ })
+ .verify();
+
+ verify(onComplete).apply(message1);
+
+ verifyZeroInteractions(message2);
+ verifyZeroInteractions(onRenewLock);
}
}