Skip to content

Commit

Permalink
Refactor clientfactory customizer (#27653)
Browse files Browse the repository at this point in the history
* refactor the ClientFactoryCustomizer into two separate factory customizers
  • Loading branch information
saragluna authored Mar 15, 2022
1 parent 1311533 commit 92e6bca
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*
* @param <T> The type of the service client builder.
*/
@FunctionalInterface
public interface AzureServiceClientBuilderCustomizer<T> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,30 @@

import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.spring.cloud.core.implementation.util.AzurePropertiesUtils;
import com.azure.spring.cloud.stream.binder.eventhubs.config.ClientFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProcessorFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProducerFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsConsumerProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsExtendedBindingProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsProducerProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.provisioning.EventHubsChannelProvisioner;
import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.messaging.eventhubs.core.properties.ProducerProperties;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProcessorFactory;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProducerFactory;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.core.handler.DefaultMessageHandler;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentation;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentationManager;
import com.azure.spring.integration.core.implementation.instrumentation.InstrumentationSendCallback;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.messaging.ConsumerIdentifier;
import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.messaging.eventhubs.core.properties.ProducerProperties;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProcessorFactory;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProducerFactory;
import com.azure.spring.messaging.eventhubs.implementation.properties.merger.ProcessorPropertiesMerger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,10 +82,9 @@ public class EventHubsMessageChannelBinder extends
private EventHubsExtendedBindingProperties bindingProperties = new EventHubsExtendedBindingProperties();
private final Map<String, ExtendedProducerProperties<EventHubsProducerProperties>>
extendedProducerPropertiesMap = new ConcurrentHashMap<>();
private final Map<ConsumerIdentifier, ExtendedConsumerProperties<EventHubsConsumerProperties>>
extendedConsumerPropertiesMap = new ConcurrentHashMap<>();

private List<ClientFactoryCustomizer> clientFactoryCustomizers = new ArrayList<>();
private List<EventHubsProducerFactoryCustomizer> producerFactoryCustomizers = new ArrayList<>();
private List<EventHubsProcessorFactoryCustomizer> processorFactoryCustomizers = new ArrayList<>();

/**
* Construct a {@link EventHubsMessageChannelBinder} with the specified headers to embed and {@link EventHubsChannelProvisioner}.
Expand Down Expand Up @@ -127,7 +126,6 @@ protected MessageHandler createProducerMessageHandler(
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<EventHubsConsumerProperties> properties) {
extendedConsumerPropertiesMap.put(new ConsumerIdentifier(destination.getName(), group), properties);
Assert.notNull(getProcessorFactory(), "processor factory can't be null when create a consumer");

boolean anonymous = !StringUtils.hasText(group);
Expand Down Expand Up @@ -225,7 +223,7 @@ private EventHubsTemplate getEventHubTemplate() {
DefaultEventHubsNamespaceProducerFactory factory = new DefaultEventHubsNamespaceProducerFactory(
this.namespaceProperties, getProducerPropertiesSupplier());

clientFactoryCustomizers.forEach(customizer -> customizer.customize(factory));
producerFactoryCustomizers.forEach(customizer -> customizer.customize(factory));

factory.addListener((name, producerAsyncClient) -> {
DefaultInstrumentation instrumentation = new DefaultInstrumentation(name, PRODUCER);
Expand All @@ -242,7 +240,7 @@ private EventHubsProcessorFactory getProcessorFactory() {
this.processorFactory = new DefaultEventHubsNamespaceProcessorFactory(
this.checkpointStore, this.namespaceProperties);

clientFactoryCustomizers.forEach(customizer -> customizer.customize(processorFactory));
processorFactoryCustomizers.forEach(customizer -> customizer.customize(processorFactory));

processorFactory.addListener((name, consumerGroup, processorClient) -> {
String instrumentationName = name + "/" + consumerGroup;
Expand Down Expand Up @@ -283,11 +281,21 @@ InstrumentationManager getInstrumentationManager() {
}

/**
* Set the client factory customizers.
* @param clientFactoryCustomizers The client factory customizers.
* Set the producer factory customizers.
*
* @param producerFactoryCustomizers The producer factory customizers.
*/
public void setProducerFactoryCustomizers(List<EventHubsProducerFactoryCustomizer> producerFactoryCustomizers) {
this.producerFactoryCustomizers = producerFactoryCustomizers;
}

/**
* Set the processor factory customizers.
*
* @param processorFactoryCustomizers The processor factory customizers.
*/
public void setClientFactoryCustomizers(List<ClientFactoryCustomizer> clientFactoryCustomizers) {
this.clientFactoryCustomizers = clientFactoryCustomizers;
public void setProcessorFactoryCustomizers(List<EventHubsProcessorFactoryCustomizer> processorFactoryCustomizers) {
this.processorFactoryCustomizers = processorFactoryCustomizers;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,26 @@ public class EventHubsBinderConfiguration {


/**
* Declare Event Hubs Channel Provisioner bean.
* Declare the ARM implementation of {@link EventHubsChannelProvisioner}.
*
* @param eventHubsProperties the event Hubs Properties
* @param eventHubsProvisioner the event Hubs Provisioner
* @return EventHubsChannelProvisioner bean the Event Hubs Channel Provisioner bean
* @param eventHubsProperties the event Hubs Properties.
* @param eventHubsProvisioner the event Hubs Provisioner.
*
* @return the {@link EventHubsChannelResourceManagerProvisioner}.
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({ EventHubsProvisioner.class, AzureEventHubsProperties.class })
EventHubsChannelProvisioner eventHubChannelArmProvisioner(
AzureEventHubsProperties eventHubsProperties, EventHubsProvisioner eventHubsProvisioner) {

return new EventHubsChannelResourceManagerProvisioner(eventHubsProperties.getNamespace(),
eventHubsProvisioner);
return new EventHubsChannelResourceManagerProvisioner(eventHubsProperties.getNamespace(), eventHubsProvisioner);
}

/**
* Declare Event Hubs Channel Provisioner bean.
* Declare the {@link EventHubsChannelProvisioner} bean.
*
* @return EventHubsChannelProvisioner bean the Event Hubs Channel Provisioner bean
* @return the {@link EventHubsChannelProvisioner} bean.
*/
@Bean
@ConditionalOnMissingBean({ EventHubsProvisioner.class, EventHubsChannelProvisioner.class })
Expand All @@ -87,60 +87,69 @@ public EventHubsChannelProvisioner eventHubChannelProvisioner() {
}

/**
* Declare Event Hubs Message Channel Binder bean.
* Declare the {@link EventHubsMessageChannelBinder} bean.
*
* @param channelProvisioner the channel provisioner.
* @param bindingProperties the binding properties.
* @param namespaceProperties the namespace properties.
* @param checkpointStores the checkpoint stores.
* @param producerFactoryCustomizers customizers to customize producer factories.
* @param processorFactoryCustomizers customizers to customize processor factories.
*
* @param channelProvisioner the channel Provisioner
* @param bindingProperties the binding Properties
* @param namespaceProperties the namespace Properties
* @param checkpointStores the checkpoint Stores
* @param customizers customizers to customize client factories
* @return EventHubsMessageChannelBinder bean the Event Hubs Message Channel Binder bean
* @return the {@link EventHubsMessageChannelBinder} bean.
*/
@Bean
@ConditionalOnMissingBean
public EventHubsMessageChannelBinder eventHubBinder(EventHubsChannelProvisioner channelProvisioner,
EventHubsExtendedBindingProperties bindingProperties,
ObjectProvider<NamespaceProperties> namespaceProperties,
ObjectProvider<CheckpointStore> checkpointStores,
ObjectProvider<ClientFactoryCustomizer> customizers) {
ObjectProvider<EventHubsProducerFactoryCustomizer> producerFactoryCustomizers,
ObjectProvider<EventHubsProcessorFactoryCustomizer> processorFactoryCustomizers) {
EventHubsMessageChannelBinder binder = new EventHubsMessageChannelBinder(null, channelProvisioner);
binder.setBindingProperties(bindingProperties);
binder.setNamespaceProperties(namespaceProperties.getIfAvailable());
checkpointStores.ifAvailable(binder::setCheckpointStore);
binder.setClientFactoryCustomizers(customizers.orderedStream().collect(Collectors.toList()));
binder.setProducerFactoryCustomizers(producerFactoryCustomizers.orderedStream().collect(Collectors.toList()));
binder.setProcessorFactoryCustomizers(processorFactoryCustomizers.orderedStream().collect(Collectors.toList()));
return binder;
}

@Bean
@ConditionalOnMissingBean
ClientFactoryCustomizer defaultClientFactoryCustomizer(
EventHubsProducerFactoryCustomizer defaultEventHubsProducerFactoryCustomizer(
AzureTokenCredentialResolver azureTokenCredentialResolver,
@Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) TokenCredential defaultAzureCredential,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers) {

return new DefaultProducerFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver, clientBuilderCustomizers);
}

@Bean
@ConditionalOnMissingBean
EventHubsProcessorFactoryCustomizer defaultEventHubsProcessorFactoryCustomizer(
AzureTokenCredentialResolver azureTokenCredentialResolver,
@Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) TokenCredential defaultAzureCredential,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers) {

return new DefaultClientFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver,
clientBuilderCustomizers, processorClientBuilderCustomizers);
return new DefaultProcessorFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver, processorClientBuilderCustomizers);
}

/**
* The {@link ClientFactoryCustomizer} to configure the credential related properties.
* The default {@link EventHubsProducerFactoryCustomizer} to configure the credential related properties and client builder customizers.
*/
static class DefaultClientFactoryCustomizer implements ClientFactoryCustomizer {
static class DefaultProducerFactoryCustomizer implements EventHubsProducerFactoryCustomizer {

private final TokenCredential defaultAzureCredential;
private final AzureTokenCredentialResolver tokenCredentialResolver;
private final ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers;
private final ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers;

DefaultClientFactoryCustomizer(TokenCredential defaultAzureCredential,
AzureTokenCredentialResolver azureTokenCredentialResolver,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers) {
DefaultProducerFactoryCustomizer(TokenCredential defaultAzureCredential,
AzureTokenCredentialResolver azureTokenCredentialResolver,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers) {
this.defaultAzureCredential = defaultAzureCredential;
this.tokenCredentialResolver = azureTokenCredentialResolver;
this.clientBuilderCustomizers = clientBuilderCustomizers;
this.processorClientBuilderCustomizers = processorClientBuilderCustomizers;
}

@Override
Expand All @@ -155,6 +164,28 @@ public void customize(EventHubsProducerFactory factory) {
}
}

ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> getClientBuilderCustomizers() {
return clientBuilderCustomizers;
}
}

/**
* The default {@link EventHubsProcessorFactoryCustomizer} to configure the credential related properties and client builder customizers.
*/
static class DefaultProcessorFactoryCustomizer implements EventHubsProcessorFactoryCustomizer {

private final TokenCredential defaultAzureCredential;
private final AzureTokenCredentialResolver tokenCredentialResolver;
private final ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers;

DefaultProcessorFactoryCustomizer(TokenCredential defaultAzureCredential,
AzureTokenCredentialResolver azureTokenCredentialResolver,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers) {
this.defaultAzureCredential = defaultAzureCredential;
this.tokenCredentialResolver = azureTokenCredentialResolver;
this.processorClientBuilderCustomizers = processorClientBuilderCustomizers;
}

@Override
public void customize(EventHubsProcessorFactory factory) {
if (factory instanceof DefaultEventHubsNamespaceProcessorFactory) {
Expand All @@ -167,10 +198,6 @@ public void customize(EventHubsProcessorFactory factory) {
}
}

ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> getClientBuilderCustomizers() {
return clientBuilderCustomizers;
}

ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> getProcessorClientBuilderCustomizers() {
return processorClientBuilderCustomizers;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.cloud.stream.binder.eventhubs.config;


import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;

/**
* Called by the binder to customize the {@link EventHubsProcessorFactory}.
*/
@FunctionalInterface
public interface EventHubsProcessorFactoryCustomizer {

/**
* Customize the processor factory.
*
* @param factory The processor factory.
*/
void customize(EventHubsProcessorFactory factory);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.cloud.stream.binder.eventhubs.config;


import com.azure.spring.messaging.eventhubs.core.EventHubsProducerFactory;

/**
* Called by the binder to customize the {@link EventHubsProducerFactory}.
*/
@FunctionalInterface
public interface EventHubsProducerFactoryCustomizer {

/**
* Customize the producer factory.
*
* @param factory The producer factory.
*/
void customize(EventHubsProducerFactory factory);

}
Loading

0 comments on commit 92e6bca

Please sign in to comment.