Skip to content

Commit

Permalink
Fixes EmissionException when outputting endpoint states. (#27250)
Browse files Browse the repository at this point in the history
* Update CHANGELOG
* Add testcase
* Change to use FailureHandler
  • Loading branch information
conniey authored Feb 23, 2022
1 parent e3f9498 commit c2fcbb5
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 12 deletions.
2 changes: 2 additions & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixed issue where EndpointStates were not emitted serially. ([#24762](https://github.com/Azure/azure-sdk-for-java/issues/24762))

### Other Changes

## 2.4.0 (2022-02-04)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,17 @@ void onNext(EndpointState state) {
}

endpointStates.emitNext(state, (signalType, emitResult) -> {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.log("could not emit endpoint state.");
if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.log("Could not emit endpoint state. Non-serial access. Retrying.");

return false;
return true;
} else {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.log("Could not emit endpoint state.");

return false;
}
});
}

Expand All @@ -105,10 +112,17 @@ void onError(Throwable error) {
}

endpointStates.emitError(error, (signalType, emitResult) -> {
addSignalTypeAndResult(logger.atWarning(), signalType, emitResult)
.log("could not emit error.", error);
if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.log("Could not emit error. Non-serial access. Retrying.", error);

return true;
} else {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.log("Could not emit error.", error);

return false;
return false;
}
});
}

Expand All @@ -125,17 +139,31 @@ public void close() {
// This is fine in the case that someone called onNext(EndpointState.CLOSED) and then called handler.close().
// We want to ensure that the next endpoint subscriber does not believe the handler is alive still.
endpointStates.emitNext(EndpointState.CLOSED, (signalType, emitResult) -> {
addSignalTypeAndResult(logger.atInfo(), signalType, emitResult)
.log("Could not emit closed endpoint state.");
if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
addSignalTypeAndResult(logger.atInfo(), signalType, emitResult)
.log("Could not emit closed endpoint state. Non-serial access. Retrying.");

return false;
return true;
} else {
addSignalTypeAndResult(logger.atInfo(), signalType, emitResult)
.log("Could not emit closed endpoint state.");

return false;
}
});

endpointStates.emitComplete((signalType, emitResult) -> {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.log("Could not emit complete.");
if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
addSignalTypeAndResult(logger.atInfo(), signalType, emitResult)
.log("Could not emit complete. Non-serial access. Retrying.");

return true;
} else {
addSignalTypeAndResult(logger.atInfo(), signalType, emitResult)
.log("Could not emit complete.");

return false;
return false;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -150,6 +154,52 @@ public void completesOnce() {
.verify();
}

/**
* Test that we are handling signals serially. Otherwise, we get an exception:
* EmissionException: Spec. Rule 1.3 encountered when trying to recover receiver.
*
* @see <a href="https://github.com/Azure/azure-sdk-for-java/issues/24762">#24762</a>
*/
@Test
public void serialSignalling() {
final int parallelism = Runtime.getRuntime().availableProcessors();
final Disposable subscription = handler.getEndpointStates().subscribe();

Flux.range(0, 500)
.parallel(parallelism)
.runOn(Schedulers.parallel())
.flatMap(index -> {
final EndpointState state;
final int current = index % 3;

switch (current) {
case 0:
state = EndpointState.ACTIVE;
break;
case 1:
state = EndpointState.CLOSED;
break;
case 2:
state = EndpointState.UNINITIALIZED;
break;
default:
throw new IllegalStateException("This shouldn't have happened. value:" + index);
}

if (index == 500) {
handler.onError(new RuntimeException("Test Error."));
} else {
handler.onNext(state);
}

return Mono.empty();
})
.then()
.block();

subscription.dispose();
}

private static class TestHandler extends Handler {
static final String CONNECTION_ID = "test-connection-id";
static final String HOSTNAME = "test-hostname";
Expand Down

0 comments on commit c2fcbb5

Please sign in to comment.