Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor clientfactory customizer #27653

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*
* @param <T> The type of the service client builder.
*/
@FunctionalInterface
public interface AzureServiceClientBuilderCustomizer<T> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,30 @@

import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.spring.cloud.core.implementation.util.AzurePropertiesUtils;
import com.azure.spring.cloud.stream.binder.eventhubs.config.ClientFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProcessorFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProducerFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsConsumerProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsExtendedBindingProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsProducerProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.provisioning.EventHubsChannelProvisioner;
import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.messaging.eventhubs.core.properties.ProducerProperties;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProcessorFactory;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProducerFactory;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.core.handler.DefaultMessageHandler;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentation;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentationManager;
import com.azure.spring.integration.core.implementation.instrumentation.InstrumentationSendCallback;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.messaging.ConsumerIdentifier;
import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.messaging.eventhubs.core.properties.ProducerProperties;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProcessorFactory;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProducerFactory;
import com.azure.spring.messaging.eventhubs.implementation.properties.merger.ProcessorPropertiesMerger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,10 +82,9 @@ public class EventHubsMessageChannelBinder extends
private EventHubsExtendedBindingProperties bindingProperties = new EventHubsExtendedBindingProperties();
private final Map<String, ExtendedProducerProperties<EventHubsProducerProperties>>
extendedProducerPropertiesMap = new ConcurrentHashMap<>();
private final Map<ConsumerIdentifier, ExtendedConsumerProperties<EventHubsConsumerProperties>>
extendedConsumerPropertiesMap = new ConcurrentHashMap<>();

private List<ClientFactoryCustomizer> clientFactoryCustomizers = new ArrayList<>();
private List<EventHubsProducerFactoryCustomizer> producerFactoryCustomizers = new ArrayList<>();
private List<EventHubsProcessorFactoryCustomizer> processorFactoryCustomizers = new ArrayList<>();

/**
* Construct a {@link EventHubsMessageChannelBinder} with the specified headers to embed and {@link EventHubsChannelProvisioner}.
Expand Down Expand Up @@ -127,7 +126,6 @@ protected MessageHandler createProducerMessageHandler(
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<EventHubsConsumerProperties> properties) {
extendedConsumerPropertiesMap.put(new ConsumerIdentifier(destination.getName(), group), properties);
Assert.notNull(getProcessorFactory(), "processor factory can't be null when create a consumer");

boolean anonymous = !StringUtils.hasText(group);
Expand Down Expand Up @@ -225,7 +223,7 @@ private EventHubsTemplate getEventHubTemplate() {
DefaultEventHubsNamespaceProducerFactory factory = new DefaultEventHubsNamespaceProducerFactory(
this.namespaceProperties, getProducerPropertiesSupplier());

clientFactoryCustomizers.forEach(customizer -> customizer.customize(factory));
producerFactoryCustomizers.forEach(customizer -> customizer.customize(factory));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we design the interface to accept an argument of abstract type while in the implementation use its implementation? How about always using the implementation for clarity? By this way, we avoid the is instanceof judgement then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use API instead of impl classes in APIs.


factory.addListener((name, producerAsyncClient) -> {
DefaultInstrumentation instrumentation = new DefaultInstrumentation(name, PRODUCER);
Expand All @@ -242,7 +240,7 @@ private EventHubsProcessorFactory getProcessorFactory() {
this.processorFactory = new DefaultEventHubsNamespaceProcessorFactory(
this.checkpointStore, this.namespaceProperties);

clientFactoryCustomizers.forEach(customizer -> customizer.customize(processorFactory));
processorFactoryCustomizers.forEach(customizer -> customizer.customize(processorFactory));

processorFactory.addListener((name, consumerGroup, processorClient) -> {
String instrumentationName = name + "/" + consumerGroup;
Expand Down Expand Up @@ -283,11 +281,21 @@ InstrumentationManager getInstrumentationManager() {
}

/**
* Set the client factory customizers.
* @param clientFactoryCustomizers The client factory customizers.
* Set the producer factory customizers.
*
* @param producerFactoryCustomizers The producer factory customizers.
*/
public void setProducerFactoryCustomizers(List<EventHubsProducerFactoryCustomizer> producerFactoryCustomizers) {
this.producerFactoryCustomizers = producerFactoryCustomizers;
}

/**
* Set the processor factory customizers.
*
* @param processorFactoryCustomizers The processor factory customizers.
*/
public void setClientFactoryCustomizers(List<ClientFactoryCustomizer> clientFactoryCustomizers) {
this.clientFactoryCustomizers = clientFactoryCustomizers;
public void setProcessorFactoryCustomizers(List<EventHubsProcessorFactoryCustomizer> processorFactoryCustomizers) {
this.processorFactoryCustomizers = processorFactoryCustomizers;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,26 @@ public class EventHubsBinderConfiguration {


/**
* Declare Event Hubs Channel Provisioner bean.
* Declare the ARM implementation of {@link EventHubsChannelProvisioner}.
*
* @param eventHubsProperties the event Hubs Properties
* @param eventHubsProvisioner the event Hubs Provisioner
* @return EventHubsChannelProvisioner bean the Event Hubs Channel Provisioner bean
* @param eventHubsProperties the event Hubs Properties.
* @param eventHubsProvisioner the event Hubs Provisioner.
*
* @return the {@link EventHubsChannelResourceManagerProvisioner}.
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({ EventHubsProvisioner.class, AzureEventHubsProperties.class })
EventHubsChannelProvisioner eventHubChannelArmProvisioner(
AzureEventHubsProperties eventHubsProperties, EventHubsProvisioner eventHubsProvisioner) {

return new EventHubsChannelResourceManagerProvisioner(eventHubsProperties.getNamespace(),
eventHubsProvisioner);
return new EventHubsChannelResourceManagerProvisioner(eventHubsProperties.getNamespace(), eventHubsProvisioner);
}

/**
* Declare Event Hubs Channel Provisioner bean.
* Declare the {@link EventHubsChannelProvisioner} bean.
*
* @return EventHubsChannelProvisioner bean the Event Hubs Channel Provisioner bean
* @return the {@link EventHubsChannelProvisioner} bean.
*/
@Bean
@ConditionalOnMissingBean({ EventHubsProvisioner.class, EventHubsChannelProvisioner.class })
Expand All @@ -87,60 +87,69 @@ public EventHubsChannelProvisioner eventHubChannelProvisioner() {
}

/**
* Declare Event Hubs Message Channel Binder bean.
* Declare the {@link EventHubsMessageChannelBinder} bean.
*
* @param channelProvisioner the channel provisioner.
* @param bindingProperties the binding properties.
* @param namespaceProperties the namespace properties.
* @param checkpointStores the checkpoint stores.
* @param producerFactoryCustomizers customizers to customize producer factories.
* @param processorFactoryCustomizers customizers to customize processor factories.
*
* @param channelProvisioner the channel Provisioner
* @param bindingProperties the binding Properties
* @param namespaceProperties the namespace Properties
* @param checkpointStores the checkpoint Stores
* @param customizers customizers to customize client factories
* @return EventHubsMessageChannelBinder bean the Event Hubs Message Channel Binder bean
* @return the {@link EventHubsMessageChannelBinder} bean.
*/
@Bean
@ConditionalOnMissingBean
public EventHubsMessageChannelBinder eventHubBinder(EventHubsChannelProvisioner channelProvisioner,
EventHubsExtendedBindingProperties bindingProperties,
ObjectProvider<NamespaceProperties> namespaceProperties,
ObjectProvider<CheckpointStore> checkpointStores,
ObjectProvider<ClientFactoryCustomizer> customizers) {
ObjectProvider<EventHubsProducerFactoryCustomizer> producerFactoryCustomizers,
ObjectProvider<EventHubsProcessorFactoryCustomizer> processorFactoryCustomizers) {
EventHubsMessageChannelBinder binder = new EventHubsMessageChannelBinder(null, channelProvisioner);
binder.setBindingProperties(bindingProperties);
binder.setNamespaceProperties(namespaceProperties.getIfAvailable());
checkpointStores.ifAvailable(binder::setCheckpointStore);
binder.setClientFactoryCustomizers(customizers.orderedStream().collect(Collectors.toList()));
binder.setProducerFactoryCustomizers(producerFactoryCustomizers.orderedStream().collect(Collectors.toList()));
binder.setProcessorFactoryCustomizers(processorFactoryCustomizers.orderedStream().collect(Collectors.toList()));
return binder;
}

@Bean
@ConditionalOnMissingBean
ClientFactoryCustomizer defaultClientFactoryCustomizer(
EventHubsProducerFactoryCustomizer defaultEventHubsProducerFactoryCustomizer(
AzureTokenCredentialResolver azureTokenCredentialResolver,
@Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) TokenCredential defaultAzureCredential,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers) {

return new DefaultProducerFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver, clientBuilderCustomizers);
}

@Bean
@ConditionalOnMissingBean
EventHubsProcessorFactoryCustomizer defaultEventHubsProcessorFactoryCustomizer(
AzureTokenCredentialResolver azureTokenCredentialResolver,
@Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) TokenCredential defaultAzureCredential,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers) {

return new DefaultClientFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver,
clientBuilderCustomizers, processorClientBuilderCustomizers);
return new DefaultProcessorFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver, processorClientBuilderCustomizers);
}

/**
* The {@link ClientFactoryCustomizer} to configure the credential related properties.
* The default {@link EventHubsProducerFactoryCustomizer} to configure the credential related properties and client builder customizers.
*/
static class DefaultClientFactoryCustomizer implements ClientFactoryCustomizer {
static class DefaultProducerFactoryCustomizer implements EventHubsProducerFactoryCustomizer {

private final TokenCredential defaultAzureCredential;
private final AzureTokenCredentialResolver tokenCredentialResolver;
private final ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers;
private final ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers;

DefaultClientFactoryCustomizer(TokenCredential defaultAzureCredential,
AzureTokenCredentialResolver azureTokenCredentialResolver,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers) {
DefaultProducerFactoryCustomizer(TokenCredential defaultAzureCredential,
AzureTokenCredentialResolver azureTokenCredentialResolver,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers) {
this.defaultAzureCredential = defaultAzureCredential;
this.tokenCredentialResolver = azureTokenCredentialResolver;
this.clientBuilderCustomizers = clientBuilderCustomizers;
this.processorClientBuilderCustomizers = processorClientBuilderCustomizers;
}

@Override
Expand All @@ -155,6 +164,28 @@ public void customize(EventHubsProducerFactory factory) {
}
}

ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> getClientBuilderCustomizers() {
return clientBuilderCustomizers;
}
}

/**
* The default {@link EventHubsProcessorFactoryCustomizer} to configure the credential related properties and client builder customizers.
*/
static class DefaultProcessorFactoryCustomizer implements EventHubsProcessorFactoryCustomizer {

private final TokenCredential defaultAzureCredential;
private final AzureTokenCredentialResolver tokenCredentialResolver;
private final ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers;

DefaultProcessorFactoryCustomizer(TokenCredential defaultAzureCredential,
AzureTokenCredentialResolver azureTokenCredentialResolver,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers) {
this.defaultAzureCredential = defaultAzureCredential;
this.tokenCredentialResolver = azureTokenCredentialResolver;
this.processorClientBuilderCustomizers = processorClientBuilderCustomizers;
}

@Override
public void customize(EventHubsProcessorFactory factory) {
if (factory instanceof DefaultEventHubsNamespaceProcessorFactory) {
Expand All @@ -167,10 +198,6 @@ public void customize(EventHubsProcessorFactory factory) {
}
}

ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> getClientBuilderCustomizers() {
return clientBuilderCustomizers;
}

ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> getProcessorClientBuilderCustomizers() {
return processorClientBuilderCustomizers;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.cloud.stream.binder.eventhubs.config;


import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;

/**
* Called by the binder to customize the {@link EventHubsProcessorFactory}.
*/
@FunctionalInterface
public interface EventHubsProcessorFactoryCustomizer {

/**
* Customize the processor factory.
*
* @param factory The processor factory.
*/
void customize(EventHubsProcessorFactory factory);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.cloud.stream.binder.eventhubs.config;


import com.azure.spring.messaging.eventhubs.core.EventHubsProducerFactory;

/**
* Called by the binder to customize the {@link EventHubsProducerFactory}.
*/
@FunctionalInterface
public interface EventHubsProducerFactoryCustomizer {

/**
* Customize the producer factory.
*
* @param factory The producer factory.
*/
void customize(EventHubsProducerFactory factory);

}
Loading