Skip to content

Commit

Permalink
Update LockRenewalOperation API (#14705)
Browse files Browse the repository at this point in the history
* Removing public from LockRenewalOperation.

* Update LockRenewalOperation to return a completion Mono.
  • Loading branch information
conniey authored Sep 4, 2020
1 parent f01cc0b commit a50c4f0
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.models.LockRenewalStatus;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
Expand All @@ -23,13 +22,14 @@
/**
* Represents a renewal session or message lock renewal operation that.
*/
public class LockRenewalOperation implements AutoCloseable {
class LockRenewalOperation implements AutoCloseable {
private final ClientLogger logger = new ClientLogger(LockRenewalOperation.class);
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final AtomicReference<OffsetDateTime> lockedUntil = new AtomicReference<>();
private final AtomicReference<Throwable> throwable = new AtomicReference<>();
private final AtomicReference<LockRenewalStatus> status = new AtomicReference<>(LockRenewalStatus.RUNNING);
private final MonoProcessor<Void> cancellationProcessor = MonoProcessor.create();
private final Mono<Void> completionMono;

private final String lockToken;
private final boolean isSession;
Expand All @@ -53,35 +53,63 @@ public class LockRenewalOperation implements AutoCloseable {
* Creates a new lock renewal operation.
*
* @param lockToken Lock or session id to renew.
* @param lockedUntil The initial period the message or session is locked until.
* @param tokenLockedUntil The initial period the message or session is locked until.
* @param maxLockRenewalDuration The maximum duration this lock should be renewed.
* @param isSession Whether the lock represents a session lock or message lock.
* @param renewalOperation The renewal operation to call.
*/
LockRenewalOperation(String lockToken, Duration maxLockRenewalDuration, boolean isSession,
Function<String, Mono<OffsetDateTime>> renewalOperation, OffsetDateTime lockedUntil) {
Function<String, Mono<OffsetDateTime>> renewalOperation, OffsetDateTime tokenLockedUntil) {
this.lockToken = Objects.requireNonNull(lockToken, "'lockToken' cannot be null.");
this.renewalOperation = Objects.requireNonNull(renewalOperation, "'renewalOperation' cannot be null.");
this.isSession = isSession;

Objects.requireNonNull(lockedUntil, "'lockedUntil cannot be null.'");
Objects.requireNonNull(tokenLockedUntil, "'lockedUntil cannot be null.'");
Objects.requireNonNull(maxLockRenewalDuration, "'maxLockRenewalDuration' cannot be null.");

if (maxLockRenewalDuration.isNegative()) {
throw logger.logThrowableAsError(new IllegalArgumentException(
throw logger.logExceptionAsError(new IllegalArgumentException(
"'maxLockRenewalDuration' cannot be negative."));
}

this.lockedUntil.set(lockedUntil);
this.subscription = getRenewLockOperation(lockedUntil, maxLockRenewalDuration);
this.lockedUntil.set(tokenLockedUntil);

final Flux<OffsetDateTime> renewLockOperation = getRenewLockOperation(tokenLockedUntil,
maxLockRenewalDuration)
.takeUntilOther(cancellationProcessor)
.cache(Duration.ofMinutes(2));

this.completionMono = renewLockOperation.then();
this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until),
error -> {
logger.error("token[{}]. Error occurred while renewing lock token.", error);
status.set(LockRenewalStatus.FAILED);
throwable.set(error);
cancellationProcessor.onComplete();
}, () -> {
if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) {
logger.verbose("token[{}]. Renewing session lock task completed.", lockToken);
}

cancellationProcessor.onComplete();
});
}

/**
* Gets a mono that completes when the operation does.
*
* @return A mono that completes when the renewal operation does.
*/
Mono<Void> getCompletionOperation() {
return completionMono;
}

/**
* Gets the current datetime the message or session is locked until.
*
* @return the datetime the message or session is locked until.
*/
public OffsetDateTime getLockedUntil() {
OffsetDateTime getLockedUntil() {
return lockedUntil.get();
}

Expand All @@ -90,7 +118,7 @@ public OffsetDateTime getLockedUntil() {
*
* @return The message lock token or {@code null} if a session is being renewed instead.
*/
public String getLockToken() {
String getLockToken() {
return isSession ? null : lockToken;
}

Expand All @@ -99,7 +127,7 @@ public String getLockToken() {
*
* @return The session id or {@code null} if it is not a session renewal.
*/
public String getSessionId() {
String getSessionId() {
return isSession ? lockToken : null;
}

Expand All @@ -108,7 +136,7 @@ public String getSessionId() {
*
* @return The current status of the renewal operation.
*/
public LockRenewalStatus getStatus() {
LockRenewalStatus getStatus() {
return status.get();
}

Expand All @@ -117,7 +145,7 @@ public LockRenewalStatus getStatus() {
*
* @return the exception if an error occurred whilst renewing the message or session lock, otherwise {@code null}.
*/
public Throwable getThrowable() {
Throwable getThrowable() {
return throwable.get();
}

Expand Down Expand Up @@ -146,10 +174,11 @@ public void close() {
* @param maxLockRenewalDuration Duration to renew lock for.
* @return The subscription for the operation.
*/
private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Duration maxLockRenewalDuration) {
private Flux<OffsetDateTime> getRenewLockOperation(OffsetDateTime initialLockedUntil,
Duration maxLockRenewalDuration) {
if (maxLockRenewalDuration.isZero()) {
status.set(LockRenewalStatus.COMPLETE);
return Disposables.single();
return Flux.empty();
}

final OffsetDateTime now = OffsetDateTime.now();
Expand All @@ -174,7 +203,6 @@ private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Dura
sink.next(initialInterval);

final Flux<Object> cancellationSignals = Flux.first(cancellationProcessor, Mono.delay(maxLockRenewalDuration));

return Flux.switchOnNext(emitterProcessor.map(interval -> Mono.delay(interval)
.thenReturn(Flux.create(s -> s.next(interval)))))
.takeUntilOther(cancellationSignals)
Expand All @@ -189,19 +217,6 @@ private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Dura

sink.next(MessageUtils.adjustServerTimeout(next));
return offsetDateTime;
})
.subscribe(until -> lockedUntil.set(until),
error -> {
logger.error("token[{}]. Error occurred while renewing lock token.", error);
status.set(LockRenewalStatus.FAILED);
throwable.set(error);
cancellationProcessor.onComplete();
}, () -> {
if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) {
logger.verbose("token[{}]. Renewing session lock task completed.", lockToken);
}

cancellationProcessor.onComplete();
});
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOption
* @throws IllegalArgumentException if {@code lockToken} is an empty string.
* @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed.
*/
public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) {
public Mono<Void> getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) {
if (isDisposed.get()) {
throw logger.logExceptionAsError(new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewMessageLock")));
Expand All @@ -470,7 +470,8 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m
final LockRenewalOperation operation = new LockRenewalOperation(lockToken, maxLockRenewalDuration, false,
this::renewMessageLock);
renewalContainer.addOrUpdate(lockToken, Instant.now().plus(maxLockRenewalDuration), operation);
return operation;

return operation.getCompletionOperation();
}

/**
Expand All @@ -484,7 +485,7 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m
* @throws IllegalArgumentException if {@code lockToken} is an empty string.
* @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed.
*/
public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
public Mono<Void> getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
if (isDisposed.get()) {
throw logger.logExceptionAsError(new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewSessionLock")));
Expand All @@ -506,7 +507,7 @@ public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration m
this::renewSessionLock);

renewalContainer.addOrUpdate(sessionId, Instant.now().plus(maxLockRenewalDuration), operation);
return operation;
return operation.getCompletionOperation();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
* A <b>synchronous</b> receiver responsible for receiving {@link ServiceBusReceivedMessage} from a specific queue or
Expand Down Expand Up @@ -254,27 +255,41 @@ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions dead
*
* @param lockToken Lock token of the message.
* @param maxLockRenewalDuration Maximum duration to keep renewing the lock token.
* @return A lock renewal operation for the message.
* @param onError A function to call when an error occurs during lock renewal.
* @throws NullPointerException if {@code lockToken} or {@code maxLockRenewalDuration} is null.
* @throws IllegalArgumentException if {@code lockToken} is an empty string.
* @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed.
*/
public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) {
return asyncClient.getAutoRenewMessageLock(lockToken, maxLockRenewalDuration);
public void getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration,
Consumer<Throwable> onError) {
final Consumer<Throwable> throwableConsumer = onError != null
? onError
: error -> logger.warning("Exception occurred while renewing lock token: '{}'.", lockToken, error);

asyncClient.getAutoRenewMessageLock(lockToken, maxLockRenewalDuration).subscribe(
v -> logger.verbose("Completed renewing lock token: '{}'", lockToken),
throwableConsumer);
}

/**
* Gets and starts the auto lock renewal for a session with the given lock.
*
* @param sessionId Id for the session to renew.
* @param maxLockRenewalDuration Maximum duration to keep renewing the lock token.
* @return A lock renewal operation for the message.
* @param onError A function to call when an error occurs during lock renewal.
* @throws NullPointerException if {@code sessionId} or {@code maxLockRenewalDuration} is null.
* @throws IllegalArgumentException if {@code lockToken} is an empty string.
* @throws IllegalArgumentException if {@code sessionId} is an empty string.
* @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed.
*/
public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
return asyncClient.getAutoRenewSessionLock(sessionId, maxLockRenewalDuration);
public void getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration,
Consumer<Throwable> onError) {
final Consumer<Throwable> throwableConsumer = onError != null
? onError
: error -> logger.warning("Exception occurred while renewing session: '{}'.", sessionId, error);

asyncClient.getAutoRenewSessionLock(sessionId, maxLockRenewalDuration).subscribe(
v -> logger.verbose("Completed renewing session: '{}'", sessionId),
throwableConsumer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

Expand Down Expand Up @@ -87,7 +87,7 @@ void constructor(boolean isSession) {
* Verify that when an error occurs, it is displayed.
*/
@Test
void errors() throws InterruptedException {
void errors() {
// Arrange
final boolean isSession = true;
final Duration renewalPeriod = Duration.ofSeconds(2);
Expand Down Expand Up @@ -119,7 +119,11 @@ void errors() throws InterruptedException {
operation = new LockRenewalOperation(A_LOCK_TOKEN, maxDuration, isSession, renewalOperation, lockedUntil);

// Act
TimeUnit.MILLISECONDS.sleep(totalSleepPeriod.toMillis());
StepVerifier.create(operation.getCompletionOperation())
.thenAwait(totalSleepPeriod)
.expectErrorMatches(e -> e instanceof IllegalAccessException
&& e.getMessage().equals(testError.getMessage()))
.verify();

// Assert
assertEquals(LockRenewalStatus.FAILED, operation.getStatus());
Expand All @@ -131,7 +135,7 @@ void errors() throws InterruptedException {
* Verifies that it stops renewing after the duration has elapsed.
*/
@Test
void completes() throws InterruptedException {
void completes() {
// Arrange
final Duration maxDuration = Duration.ofSeconds(8);
final Duration renewalPeriod = Duration.ofSeconds(3);
Expand All @@ -147,10 +151,11 @@ void completes() throws InterruptedException {
operation = new LockRenewalOperation(A_LOCK_TOKEN, maxDuration, false, renewalOperation, lockedUntil);

// Act
Thread.sleep(totalSleepPeriod.toMillis());
logger.info("Finished renewals for first sleep.");
Thread.sleep(2000);
System.out.println("Finished second sleep. Should not have any more renewals.");
StepVerifier.create(operation.getCompletionOperation())
.thenAwait(totalSleepPeriod)
.then(() -> logger.info("Finished renewals for first sleep."))
.expectComplete()
.verify(Duration.ofMillis(2000));

// Assert
assertEquals(LockRenewalStatus.COMPLETE, operation.getStatus());
Expand All @@ -165,7 +170,7 @@ void completes() throws InterruptedException {
* Verify that we can cancel the operation.
*/
@Test
void cancellation() throws InterruptedException {
void cancellation() {
// Arrange
final Duration maxDuration = Duration.ofSeconds(20);
final Duration renewalPeriod = Duration.ofSeconds(3);
Expand All @@ -181,12 +186,14 @@ void cancellation() throws InterruptedException {
operation = new LockRenewalOperation(A_LOCK_TOKEN, maxDuration, false, renewalOperation, lockedUntil);

// Act
Thread.sleep(totalSleepPeriod.toMillis());
logger.info("Finished renewals for first sleep. Cancelling");
operation.close();

Thread.sleep(2000);
System.out.println("Finished second sleep. Should not have any more renewals.");
StepVerifier.create(operation.getCompletionOperation())
.thenAwait(totalSleepPeriod)
.then(() -> {
logger.info("Finished renewals for first sleep. Cancelling");
operation.close();
})
.expectComplete()
.verify(Duration.ofMillis(1000));

// Assert
assertEquals(LockRenewalStatus.CANCELLED, operation.getStatus());
Expand Down
Loading

0 comments on commit a50c4f0

Please sign in to comment.