Skip to content

Commit

Permalink
Fixes issues in auto-* operations (#9860)
Browse files Browse the repository at this point in the history
* Adding ReceiveAsyncOptions

* Pulling in autocomplete and lock renewal.

* Extract isAutoComplete and maxAutoRenewDuration into class.

* Change UUID  to all String.
  • Loading branch information
conniey authored Apr 6, 2020
1 parent 6a2e933 commit f0f987b
Show file tree
Hide file tree
Showing 19 changed files with 704 additions and 456 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.messaging.servicebus.models.ReceiveMode;

/**
* Options set when creating a service bus receiver.
*/
class ReceiverOptions {
private final ReceiveMode receiveMode;
private final int prefetchCount;

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
}

/**
* Gets the receive mode for the message.
*
* @return the receive mode for the message.
*/
ReceiveMode getReceiveMode() {
return receiveMode;
}

/**
* Gets the prefetch count of the receiver.
*
* @return The prefetch count of the receiver.
*/
int getPrefetchCount() {
return prefetchCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import java.net.InetSocketAddress;
import java.net.Proxy;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
Expand Down Expand Up @@ -473,60 +472,14 @@ public final class ServiceBusReceiverClientBuilder {
// Using 0 pre-fetch count for both receive modes, to avoid message lock lost exceptions in application
// receiving messages at a slow rate. Applications can set it to a higher value if they need better performance.
private static final int DEFAULT_PREFETCH_COUNT = 1;
private static final boolean DEFAULT_AUTO_COMPLETE = true;

private boolean isAutoComplete;
private Duration maxAutoLockRenewalDuration;
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
private boolean isLockAutoRenewed;
private String queueName;
private String subscriptionName;
private String topicName;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;

private ServiceBusReceiverClientBuilder() {
isAutoComplete = DEFAULT_AUTO_COMPLETE;
}

/**
* Sets whether or not to automatically complete a received message after it has been processed. Only supported
* when using the <b>asynchronous</b> {@link ServiceBusReceiverAsyncClient receiver client}.
*
* @param autoComplete {@code true} to automatically complete a received message after it has been
* processed; {@code false} otherwise.
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
*/
public ServiceBusReceiverClientBuilder isAutoComplete(boolean autoComplete) {
this.isAutoComplete = autoComplete;
return this;
}

/**
* Sets if lock should be automatically renewed. Only supported when using the <b>asynchronous</b>
* {@link ServiceBusReceiverAsyncClient receiver client}.
*
* @param isLockAutoRenewed {@code true} if the lock should be automatically renewed; {@code false}
* otherwise.
*
* @return The updated {@link ServiceBusReceiverClientBuilder} object.
*/
public ServiceBusReceiverClientBuilder isLockAutoRenewed(boolean isLockAutoRenewed) {
this.isLockAutoRenewed = isLockAutoRenewed;
return this;
}

/**
* Sets the maximum duration within which the lock will be renewed automatically. This value should be greater
* than the longest message lock duration.
*
* @param renewalDuration The maximum duration within which the lock will be renewed automatically.
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
*/
public ServiceBusReceiverClientBuilder maxAutoLockRenewalDuration(Duration renewalDuration) {
this.maxAutoLockRenewalDuration = renewalDuration;
return this;
}

/**
Expand Down Expand Up @@ -605,8 +558,8 @@ public ServiceBusReceiverClientBuilder topicName(String topicName) {
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) queueName()}
* or {@link #topicName(String) topicName()}, respectively.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverAsyncClient buildAsyncClient() {
final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
Expand Down Expand Up @@ -635,24 +588,12 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
"prefetchCount (%s) cannot be less than 1.", prefetchCount)));
}

if (isLockAutoRenewed) {
if (maxAutoLockRenewalDuration == null) {
throw logger.logExceptionAsError(new IllegalStateException(
"'maxAutoLockRenewalDuration' is required when 'isLockAutoRenewed' is enabled."));
} else if (maxAutoLockRenewalDuration.isZero() || maxAutoLockRenewalDuration.isNegative()) {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
"maxAutoLockRenewalDuration (%s) cannot be less than or equal to a duration of zero.",
maxAutoLockRenewalDuration)));
}
}

final MessageLockContainer messageLockContainer = new MessageLockContainer();
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiveMessageOptions receiveMessageOptions = new ReceiveMessageOptions(isAutoComplete, receiveMode,
prefetchCount, isLockAutoRenewed, maxAutoLockRenewalDuration);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount);

return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, false, receiveMessageOptions, connectionProcessor, tracerProvider,
entityType, false, receiverOptions, connectionProcessor, tracerProvider,
messageSerializer, messageLockContainer, ServiceBusClientBuilder.this::onClientClose);
}

Expand All @@ -666,21 +607,11 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) queueName()}
* or {@link #topicName(String) topicName()}, respectively.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverClient buildClient() {
final ServiceBusReceiverAsyncClient client = buildAsyncClient();

if (isLockAutoRenewed) {
throw logger.logExceptionAsError(new IllegalStateException(
"Cannot use 'isLockAutoRenewed' when using synchronous client."));
} else if (isAutoComplete) {
throw logger.logExceptionAsError(new IllegalStateException(
"Cannot use 'isAutoComplete' when using synchronous client."));
}

return new ServiceBusReceiverClient(client, retryOptions.getTryTimeout());
return new ServiceBusReceiverClient(buildAsyncClient(), retryOptions.getTryTimeout());
}
}
}
Loading

0 comments on commit f0f987b

Please sign in to comment.