Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add producer / processor client builder customizer to the producer / processor factory #27452

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@

import com.azure.identity.DefaultAzureCredential;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.spring.cloud.autoconfigure.context.AzureGlobalPropertiesAutoConfiguration;
import com.azure.spring.cloud.autoconfigure.context.AzureTokenCredentialAutoConfiguration;
import com.azure.spring.cloud.autoconfigure.eventhubs.AzureEventHubsAutoConfiguration;
import com.azure.spring.cloud.autoconfigure.eventhubs.AzureEventHubsMessagingAutoConfiguration;
import com.azure.spring.cloud.autoconfigure.implementation.eventhubs.properties.AzureEventHubsProperties;
import com.azure.spring.cloud.autoconfigure.resourcemanager.AzureEventHubsResourceManagerAutoConfiguration;
import com.azure.spring.cloud.autoconfigure.resourcemanager.AzureResourceManagerAutoConfiguration;
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
import com.azure.spring.cloud.core.implementation.credential.resolver.AzureTokenCredentialResolver;
import com.azure.spring.cloud.resourcemanager.provisioning.EventHubsProvisioner;
import com.azure.spring.cloud.stream.binder.eventhubs.EventHubsMessageChannelBinder;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsExtendedBindingProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.provisioning.EventHubsChannelProvisioner;
import com.azure.spring.cloud.stream.binder.eventhubs.provisioning.EventHubsChannelResourceManagerProvisioner;
import com.azure.spring.cloud.core.implementation.credential.resolver.AzureTokenCredentialResolver;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProducerFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsProducerFactory;
import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.cloud.resourcemanager.provisioning.EventHubsProvisioner;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProcessorFactory;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProducerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand Down Expand Up @@ -112,23 +115,32 @@ public EventHubsMessageChannelBinder eventHubBinder(EventHubsChannelProvisioner
@ConditionalOnMissingBean
ClientFactoryCustomizer defaultClientFactoryCustomizer(
AzureTokenCredentialResolver azureTokenCredentialResolver,
@Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME)DefaultAzureCredential defaultAzureCredential) {
@Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME)DefaultAzureCredential defaultAzureCredential,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers) {

return new CredentialClientFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver);
return new DefaultClientFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver,
clientBuilderCustomizers, processorClientBuilderCustomizers);
}

/**
* The {@link ClientFactoryCustomizer} to configure the credential related properties.
*/
private static class CredentialClientFactoryCustomizer implements ClientFactoryCustomizer {
static class DefaultClientFactoryCustomizer implements ClientFactoryCustomizer {

private final DefaultAzureCredential defaultAzureCredential;
private final AzureTokenCredentialResolver tokenCredentialResolver;
private final ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers;
private final ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers;

CredentialClientFactoryCustomizer(DefaultAzureCredential defaultAzureCredential,
AzureTokenCredentialResolver azureTokenCredentialResolver) {
DefaultClientFactoryCustomizer(DefaultAzureCredential defaultAzureCredential,
AzureTokenCredentialResolver azureTokenCredentialResolver,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> clientBuilderCustomizers,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> processorClientBuilderCustomizers) {
this.defaultAzureCredential = defaultAzureCredential;
this.tokenCredentialResolver = azureTokenCredentialResolver;
this.clientBuilderCustomizers = clientBuilderCustomizers;
this.processorClientBuilderCustomizers = processorClientBuilderCustomizers;
}

@Override
Expand All @@ -139,6 +151,7 @@ public void customize(EventHubsProducerFactory factory) {

defaultFactory.setDefaultAzureCredential(defaultAzureCredential);
defaultFactory.setTokenCredentialResolver(tokenCredentialResolver);
clientBuilderCustomizers.orderedStream().forEach(defaultFactory::addBuilderCustomizer);
}
}

Expand All @@ -150,8 +163,16 @@ public void customize(EventHubsProcessorFactory factory) {

defaultFactory.setDefaultAzureCredential(defaultAzureCredential);
defaultFactory.setTokenCredentialResolver(tokenCredentialResolver);
processorClientBuilderCustomizers.orderedStream().forEach(defaultFactory::addBuilderCustomizer);
}
}

ObjectProvider<AzureServiceClientBuilderCustomizer<EventHubClientBuilder>> getClientBuilderCustomizers() {
return clientBuilderCustomizers;
}

ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> getProcessorClientBuilderCustomizers() {
return processorClientBuilderCustomizers;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
package com.azure.spring.cloud.stream.binder.eventhubs.config;

import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.spring.cloud.autoconfigure.implementation.eventhubs.properties.AzureEventHubsProperties;
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
import com.azure.spring.cloud.resourcemanager.provisioning.EventHubsProvisioner;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsMessageListener;
import com.azure.spring.cloud.stream.binder.eventhubs.EventHubsMessageChannelBinder;
import com.azure.spring.cloud.stream.binder.eventhubs.EventHubsMessageChannelTestBinder;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsConsumerProperties;
Expand All @@ -17,26 +17,19 @@
import com.azure.spring.cloud.stream.binder.eventhubs.core.provisioning.EventHubsChannelProvisioner;
import com.azure.spring.cloud.stream.binder.eventhubs.provisioning.EventHubsChannelResourceManagerProvisioner;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.ConsumerIdentifier;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.messaging.eventhubs.core.properties.ProcessorProperties;
import com.azure.spring.messaging.eventhubs.implementation.core.DefaultEventHubsNamespaceProcessorFactory;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -188,6 +181,39 @@ void testExtendedBindingPropertiesShouldBind() {
});
}

@Test
void clientFactoryCustomizerShouldBeConfigured() {
AzureEventHubsProperties properties = new AzureEventHubsProperties();
properties.setNamespace("fake-namespace");
this.contextRunner
.withBean(EventHubsProvisioner.class, () -> mock(EventHubsProvisioner.class))
.withBean(AzureEventHubsProperties.class, () -> properties)
.run(context -> assertThat(context).hasSingleBean(ClientFactoryCustomizer.class));
}

@Test
void builderCustomizerShouldBeConfiguredToClientFactoryCustomizer() {
AzureEventHubsProperties properties = new AzureEventHubsProperties();
properties.setNamespace("fake-namespace");
this.contextRunner
.withBean(EventHubsProvisioner.class, () -> mock(EventHubsProvisioner.class))
.withBean(AzureEventHubsProperties.class, () -> properties)
.withBean("producer-customizer1", EventHubBuilderCustomizer.class, EventHubBuilderCustomizer::new)
.withBean("processor-customizer1", EventProcessorBuilderCustomizer.class, EventProcessorBuilderCustomizer::new)
.withBean("processor-customizer2", EventProcessorBuilderCustomizer.class, EventProcessorBuilderCustomizer::new)
.withBean("other-customizer1", OtherBuilderCustomizer.class, OtherBuilderCustomizer::new)
.withBean("other-customizer2", OtherBuilderCustomizer.class, OtherBuilderCustomizer::new)
.run(context -> {
assertThat(context).hasSingleBean(ClientFactoryCustomizer.class);
ClientFactoryCustomizer clientFactoryCustomizer = context.getBean(ClientFactoryCustomizer.class);

EventHubsBinderConfiguration.DefaultClientFactoryCustomizer defaultFactoryCustomizer = (EventHubsBinderConfiguration.DefaultClientFactoryCustomizer) clientFactoryCustomizer;

assertEquals(1, (int) defaultFactoryCustomizer.getClientBuilderCustomizers().stream().count());
assertEquals(2, (int) defaultFactoryCustomizer.getProcessorClientBuilderCustomizers().stream().count());
});
}

@Configuration
@EnableConfigurationProperties(EventHubsExtendedBindingProperties.class)
static class TestProcessorContainerConfiguration {
Expand All @@ -200,13 +226,13 @@ public EventHubsMessageChannelTestBinder eventHubBinder(EventHubsExtendedBinding
EventHubsConsumerProperties consumerProperties = bindingProperties.getExtendedConsumerProperties(
"consume-in-0");
CheckpointStore checkpointStore = mock(CheckpointStore.class);
TestDefaultEventHubsNamespaceProcessorFactory factory = spy(new TestDefaultEventHubsNamespaceProcessorFactory(
DefaultEventHubsNamespaceProcessorFactory factory = spy(new DefaultEventHubsNamespaceProcessorFactory(
checkpointStore, new NamespaceProperties(), (key) -> {
consumerProperties.setEventHubName(key.getDestination());
consumerProperties.setConsumerGroup(key.getGroup());
return consumerProperties;
}));
TestEventHubsMessageListenerContainer container = spy(new TestEventHubsMessageListenerContainer(factory));
EventHubsMessageListenerContainer container = spy(new EventHubsMessageListenerContainer(factory, new EventHubsContainerProperties()));
EventHubsInboundChannelAdapter messageProducer = spy(new EventHubsInboundChannelAdapter(container));
EventHubsMessageChannelTestBinder binder = new EventHubsMessageChannelTestBinder(null, new EventHubsChannelProvisioner(), null, messageProducer);
binder.setBindingProperties(bindingProperties);
Expand All @@ -216,68 +242,28 @@ checkpointStore, new NamespaceProperties(), (key) -> {
}
}

static class TestDefaultEventHubsNamespaceProcessorFactory implements EventHubsProcessorFactory, DisposableBean {
private DefaultEventHubsNamespaceProcessorFactory delegate;

/**
* Construct a factory with the provided {@link CheckpointStore}, namespace level properties and processor {@link PropertiesSupplier}.
* @param checkpointStore the checkpoint store.
* @param namespaceProperties the namespace properties.
* @param supplier the {@link PropertiesSupplier} to supply {@link ProcessorProperties} for each event hub.
*/
TestDefaultEventHubsNamespaceProcessorFactory(CheckpointStore checkpointStore,
NamespaceProperties namespaceProperties,
PropertiesSupplier<ConsumerIdentifier,
ProcessorProperties> supplier) {
Assert.notNull(checkpointStore, "CheckpointStore must be provided.");
this.delegate = new DefaultEventHubsNamespaceProcessorFactory(checkpointStore, namespaceProperties, supplier);
}
private static class EventHubBuilderCustomizer implements AzureServiceClientBuilderCustomizer<EventHubClientBuilder> {

@Override
public EventProcessorClient createProcessor(@NonNull String eventHub,
@NonNull String consumerGroup,
@NonNull EventHubsMessageListener listener,
@NonNull EventHubsErrorHandler errorHandler) {
return this.delegate.createProcessor(eventHub, consumerGroup, listener, errorHandler);
}
public void customize(EventHubClientBuilder builder) {

@Override
public EventProcessorClient createProcessor(String eventHub, String consumerGroup, EventHubsContainerProperties containerProperties) {
return createProcessor(eventHub, consumerGroup, containerProperties.getMessageListener(), containerProperties.getErrorHandler());
}
}

@Override
public void destroy() {
this.delegate.destroy();
}
private static class EventProcessorBuilderCustomizer implements AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder> {

@Override
public void addListener(EventHubsProcessorFactory.Listener listener) {
this.delegate.addListener(listener);
}
public void customize(EventProcessorClientBuilder builder) {

@Override
public boolean removeListener(EventHubsProcessorFactory.Listener listener) {
return this.delegate.removeListener(listener);
}
}

static class TestEventHubsMessageListenerContainer extends EventHubsMessageListenerContainer {

private EventHubsProcessorFactory processorFactory;
private static class OtherBuilderCustomizer implements AzureServiceClientBuilderCustomizer<Object> {

/**
* Create an instance using the supplied processor factory.
*
* @param processorFactory the processor factory.
*/
TestEventHubsMessageListenerContainer(EventHubsProcessorFactory processorFactory) {
super(processorFactory, null);
this.processorFactory = processorFactory;
}
@Override
public void customize(Object builder) {

public EventHubsProcessorFactory getProcessorFactory() {
return processorFactory;
}
}

Expand Down
Loading