-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Only emit AMQP channels downstream when they are active. #24582
Comments
Log Analysisazure-amqp-core version: 2.4.0 We can define 5 phases for the closing logs logs.md:
Call graphGraph 1 - CBS node start to close and timeout exception Graph 2 - CBS node constantly retry and ReactorExecutor closed Issue 1: Two threads are created to close CBS nodeReason We manually try to close an closed CBS node twice. and both got timeout. First time: ...
//RequestResponseChannel
this.subscriptions = Disposables.composite(
...
amqpConnection.getShutdownSignals().next().flatMap(signal -> {
logger.verbose("Shutdown signal received.");
return closeAsync();
}).subscribe()
); Second time: //ReactorConnection
final Mono<Void> cbsCloseOperation;
if (cbsChannelProcessor != null) {
cbsCloseOperation = cbsChannelProcessor.flatMap(channel -> channel.closeAsync());
} else {
cbsCloseOperation = Mono.empty();
} These two manually calls both encounter the timeout exceptions and create a new thread to resume. Issue 2: CBS node close timeout exception
Why Because When we use the debug mode, sometimes there is no timeout issue. This is because Issue 3: CBS node retry infinite loop which cause massive logsReason When CBS node (RequestResponseChannel) is closed, it will emit a complete signal, and that signal is catch by But when new instance of This back and forth process cause massive log in console. Code details //RequestResponseChannel
private void onTerminalState() {
...
endpointStates.emitComplete(((signalType, emitResult) -> onEmitSinkFailure(...)));
...
} The complete signal is catched by //AmqpChannelProcessor
connectionSubscription = endpointStatesFunction.apply(amqpChannel).subscribe(
...
() -> {
...
requestUpstream();
}); Because CBS channel is requested from a repeat Flux inside //ReactorConnection
protected AmqpChannelProcessor<RequestResponseChannel> createRequestResponseChannel(
final Flux<RequestResponseChannel> createChannel = ...
.map(reactorSession -> new RequestResponseChannel(...))
.doOnNext(e -> {...})
.repeat(); However, inside ...
//RequestResponseChannel
this.subscriptions = Disposables.composite(
...
amqpConnection.getShutdownSignals().next().flatMap(signal -> {
logger.verbose("Shutdown signal received.");
return closeAsync();
}).subscribe()
); As new channel is closed, it will emit a complete signal to request new upstream, which goes to the beginning step. |
Currently in AMQPChannelProcessor, when the CBS node is closed, it'll make a request upstream for a new instance. This instance is immediately emitted even if it's not active.
This should alleviate the mass of logs accumulated about retries and fix retry policy-related issues.
/cc @anuchandy
The text was updated successfully, but these errors were encountered: