diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 757b27150d977..f4a0ae9edcd2f 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixed issue where EndpointStates were not emitted serially. ([#24762](https://github.com/Azure/azure-sdk-for-java/issues/24762)) + ### Other Changes ## 2.4.0 (2022-02-04) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java index 2c8504721d243..9a20c22b70aed 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java @@ -87,10 +87,17 @@ void onNext(EndpointState state) { } endpointStates.emitNext(state, (signalType, emitResult) -> { - addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult) - .log("could not emit endpoint state."); + if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) { + addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult) + .log("Could not emit endpoint state. Non-serial access. Retrying."); - return false; + return true; + } else { + addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult) + .log("Could not emit endpoint state."); + + return false; + } }); } @@ -105,10 +112,17 @@ void onError(Throwable error) { } endpointStates.emitError(error, (signalType, emitResult) -> { - addSignalTypeAndResult(logger.atWarning(), signalType, emitResult) - .log("could not emit error.", error); + if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) { + addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult) + .log("Could not emit error. Non-serial access. Retrying.", error); + + return true; + } else { + addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult) + .log("Could not emit error.", error); - return false; + return false; + } }); } @@ -125,17 +139,31 @@ public void close() { // This is fine in the case that someone called onNext(EndpointState.CLOSED) and then called handler.close(). // We want to ensure that the next endpoint subscriber does not believe the handler is alive still. endpointStates.emitNext(EndpointState.CLOSED, (signalType, emitResult) -> { - addSignalTypeAndResult(logger.atInfo(), signalType, emitResult) - .log("Could not emit closed endpoint state."); + if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) { + addSignalTypeAndResult(logger.atInfo(), signalType, emitResult) + .log("Could not emit closed endpoint state. Non-serial access. Retrying."); - return false; + return true; + } else { + addSignalTypeAndResult(logger.atInfo(), signalType, emitResult) + .log("Could not emit closed endpoint state."); + + return false; + } }); endpointStates.emitComplete((signalType, emitResult) -> { - addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult) - .log("Could not emit complete."); + if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) { + addSignalTypeAndResult(logger.atInfo(), signalType, emitResult) + .log("Could not emit complete. Non-serial access. Retrying."); + + return true; + } else { + addSignalTypeAndResult(logger.atInfo(), signalType, emitResult) + .log("Could not emit complete."); - return false; + return false; + } }); } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java index 56a2e35115fca..d18b42d7c84ad 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java @@ -10,6 +10,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -150,6 +154,52 @@ public void completesOnce() { .verify(); } + /** + * Test that we are handling signals serially. Otherwise, we get an exception: + * EmissionException: Spec. Rule 1.3 encountered when trying to recover receiver. + * + * @see #24762 + */ + @Test + public void serialSignalling() { + final int parallelism = Runtime.getRuntime().availableProcessors(); + final Disposable subscription = handler.getEndpointStates().subscribe(); + + Flux.range(0, 500) + .parallel(parallelism) + .runOn(Schedulers.parallel()) + .flatMap(index -> { + final EndpointState state; + final int current = index % 3; + + switch (current) { + case 0: + state = EndpointState.ACTIVE; + break; + case 1: + state = EndpointState.CLOSED; + break; + case 2: + state = EndpointState.UNINITIALIZED; + break; + default: + throw new IllegalStateException("This shouldn't have happened. value:" + index); + } + + if (index == 500) { + handler.onError(new RuntimeException("Test Error.")); + } else { + handler.onNext(state); + } + + return Mono.empty(); + }) + .then() + .block(); + + subscription.dispose(); + } + private static class TestHandler extends Handler { static final String CONNECTION_ID = "test-connection-id"; static final String HOSTNAME = "test-hostname";