Skip to content

Commit

Permalink
Recreating connections on transient failures (#7090)
Browse files Browse the repository at this point in the history
* Adding EventHubConnectionProcessor.

* Add tests for fetching new connection.

* Add tests to ensure on error conditions.

* Remove circuar dependency on EventHubConnection from EventHubConnectionProcessor

* Update AmqpConnection interface to create send and receive links.

* Fixing errors in ConnectionProcessor and update ReactorAmqpConnection to keep track of send links.

* Removing EventHubConnection.

* Update clients to use ConnectionProcessor.

* Fixing EHubClientBuilder to repeat the mono, so we have a potentially infinite stream of connections.

* Add test to verify that on non-transient errors, the connection is not revived.

* Delay retrying EventHubConnection creation on a transient error.

* Consolidate request from upstream for new connection.

* Not requesting new connection unless needed.
  • Loading branch information
conniey authored Jan 7, 2020
1 parent c036768 commit cac2ac9
Show file tree
Hide file tree
Showing 19 changed files with 1,237 additions and 659 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public int getMaxRetries() {
* @return The amount of time to delay before retrying the associated operation; if {@code null}, then the operation
* is no longer eligible to be retried.
*/
public Duration calculateRetryDelay(Exception lastException, int retryCount) {
public Duration calculateRetryDelay(Throwable lastException, int retryCount) {
if (retryOptions.getDelay() == Duration.ZERO
|| retryOptions.getMaxDelay() == Duration.ZERO
|| retryCount > retryOptions.getMaxRetries()) {
Expand Down Expand Up @@ -138,7 +138,7 @@ public boolean equals(Object obj) {
* @param exception An exception that was observed for the operation to be retried.
* @return true if the exception is a retriable exception, otherwise false.
*/
private static boolean isRetriableException(Exception exception) {
private static boolean isRetriableException(Throwable exception) {
return (exception instanceof AmqpException) && ((AmqpException) exception).isTransient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -25,15 +26,16 @@
class EventHubAsyncClient implements Closeable {
private final ClientLogger logger = new ClientLogger(EventHubAsyncClient.class);
private final MessageSerializer messageSerializer;
private final EventHubConnection connection;
private final EventHubConnectionProcessor connectionProcessor;
private final boolean isSharedConnection;
private final TracerProvider tracerProvider;

EventHubAsyncClient(EventHubConnection connection, TracerProvider tracerProvider,
EventHubAsyncClient(EventHubConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
MessageSerializer messageSerializer, boolean isSharedConnection) {
this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
this.connection = Objects.requireNonNull(connection, "'connection' cannot be null.");
this.connectionProcessor = Objects.requireNonNull(connectionProcessor,
"'connectionProcessor' cannot be null.");
this.isSharedConnection = isSharedConnection;
}

Expand All @@ -43,7 +45,7 @@ class EventHubAsyncClient implements Closeable {
* @return The fully qualified namespace of this Event Hub.
*/
String getFullyQualifiedNamespace() {
return connection.getFullyQualifiedNamespace();
return connectionProcessor.getFullyQualifiedNamespace();
}

/**
Expand All @@ -52,7 +54,7 @@ String getFullyQualifiedNamespace() {
* @return The Event Hub name this client interacts with.
*/
String getEventHubName() {
return connection.getEventHubName();
return connectionProcessor.getEventHubName();
}

/**
Expand All @@ -61,7 +63,9 @@ String getEventHubName() {
* @return The set of information for the Event Hub that this client is associated with.
*/
Mono<EventHubProperties> getProperties() {
return connection.getManagementNode().flatMap(EventHubManagementNode::getEventHubProperties);
return connectionProcessor
.flatMap(connection -> connection.getManagementNode())
.flatMap(EventHubManagementNode::getEventHubProperties);
}

/**
Expand All @@ -81,7 +85,9 @@ Flux<String> getPartitionIds() {
* @return The set of information for the requested partition under the Event Hub this client is associated with.
*/
Mono<PartitionProperties> getPartitionProperties(String partitionId) {
return connection.getManagementNode().flatMap(node -> node.getPartitionProperties(partitionId));
return connectionProcessor
.flatMap(connection -> connection.getManagementNode())
.flatMap(node -> node.getPartitionProperties(partitionId));
}

/**
Expand All @@ -91,8 +97,9 @@ Mono<PartitionProperties> getPartitionProperties(String partitionId) {
* @return A new {@link EventHubProducerAsyncClient}.
*/
EventHubProducerAsyncClient createProducer() {
return new EventHubProducerAsyncClient(connection.getFullyQualifiedNamespace(), getEventHubName(), connection,
connection.getRetryOptions(), tracerProvider, messageSerializer, isSharedConnection);
return new EventHubProducerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(),
connectionProcessor, connectionProcessor.getRetryOptions(), tracerProvider, messageSerializer,
isSharedConnection);
}

/**
Expand All @@ -115,8 +122,8 @@ EventHubConsumerAsyncClient createConsumer(String consumerGroup, int prefetchCou
new IllegalArgumentException("'consumerGroup' cannot be an empty string."));
}

return new EventHubConsumerAsyncClient(connection.getFullyQualifiedNamespace(), getEventHubName(),
connection, messageSerializer, consumerGroup, prefetchCount, isSharedConnection);
return new EventHubConsumerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(),
connectionProcessor, messageSerializer, consumerGroup, prefetchCount, isSharedConnection);
}

/**
Expand All @@ -126,6 +133,6 @@ EventHubConsumerAsyncClient createConsumer(String consumerGroup, int prefetchCou
*/
@Override
public void close() {
connection.close();
connectionProcessor.dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubSharedKeyCredential;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -125,7 +127,7 @@ public class EventHubClientBuilder {
private String fullyQualifiedNamespace;
private String eventHubName;
private String consumerGroup;
private EventHubConnection eventHubConnection;
private EventHubConnectionProcessor eventHubConnectionProcessor;
private int prefetchCount;
private boolean isSharedConnection;

Expand Down Expand Up @@ -464,17 +466,17 @@ EventHubAsyncClient buildAsyncClient() {

final MessageSerializer messageSerializer = new EventHubMessageSerializer();

if (isSharedConnection && eventHubConnection == null) {
eventHubConnection = buildConnection(messageSerializer);
if (isSharedConnection && eventHubConnectionProcessor == null) {
eventHubConnectionProcessor = buildConnectionProcessor(messageSerializer);
}

final EventHubConnection connection = isSharedConnection
? eventHubConnection
: buildConnection(messageSerializer);
final EventHubConnectionProcessor processor = isSharedConnection
? eventHubConnectionProcessor
: buildConnectionProcessor(messageSerializer);

final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));

return new EventHubAsyncClient(connection, tracerProvider, messageSerializer, isSharedConnection);
return new EventHubAsyncClient(processor, tracerProvider, messageSerializer, isSharedConnection);
}

/**
Expand Down Expand Up @@ -508,25 +510,28 @@ EventHubClient buildClient() {
return new EventHubClient(client, retryOptions);
}

private EventHubConnection buildConnection(MessageSerializer messageSerializer) {
private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer messageSerializer) {
final ConnectionOptions connectionOptions = getConnectionOptions();
final TokenManagerProvider tokenManagerProvider = new AzureTokenManagerProvider(
connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(),
ClientConstants.AZURE_ACTIVE_DIRECTORY_SCOPE);
final ReactorProvider provider = new ReactorProvider();
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider);

Map<String, String> properties = CoreUtils.getProperties(EVENTHUBS_PROPERTIES_FILE);
String product = properties.getOrDefault(NAME_KEY, UNKNOWN);
String clientVersion = properties.getOrDefault(VERSION_KEY, UNKNOWN);
final Map<String, String> properties = CoreUtils.getProperties(EVENTHUBS_PROPERTIES_FILE);
final String product = properties.getOrDefault(NAME_KEY, UNKNOWN);
final String clientVersion = properties.getOrDefault(VERSION_KEY, UNKNOWN);

final Mono<EventHubAmqpConnection> connectionMono = Mono.fromCallable(() -> {
final Flux<EventHubAmqpConnection> connectionFlux = Mono.fromCallable(() -> {
final String connectionId = StringUtil.getRandomString("MF");
return new EventHubReactorAmqpConnection(connectionId, connectionOptions, provider, handlerProvider,
tokenManagerProvider, messageSerializer, product, clientVersion);
});

return new EventHubConnection(connectionMono, connectionOptions);
return (EventHubAmqpConnection) new EventHubReactorAmqpConnection(connectionId, connectionOptions, provider,
handlerProvider, tokenManagerProvider, messageSerializer, product, clientVersion);
}).repeat();

return connectionFlux.subscribeWith(new EventHubConnectionProcessor(
connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getEntityPath(),
connectionOptions.getRetry()));
}

private ConnectionOptions getConnectionOptions() {
Expand Down

This file was deleted.

Loading

0 comments on commit cac2ac9

Please sign in to comment.