Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor spring messaging eventhubs #25057

Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,8 @@ the main ServiceBusClientBuilder. -->
<suppress checks="MethodName" files="com.azure.spring.servicebus.support.ServiceBusClientConfig.java"/>

<!-- Suppress warnings for EventProcessorSharedAuthenticationClientBuilder and EventHubSharedAuthenticationClientBuilder. They are used by Azure Spring. -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="EventHubSharedAuthenticationClientBuilder.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="EventProcessorSharedAuthenticationClientBuilder.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="EventHubNamespaceProcessorClientBuilder.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="EventHubNamespaceClientBuilder.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="com.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusConsumerClientConfiguration.java"/>

<!-- Checkstyle suppressions for azure.spring.data.cosmos package -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.storage.blob.BlobContainerAsyncClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
Expand All @@ -20,6 +21,7 @@
import java.time.Duration;

import static com.azure.spring.cloud.autoconfigure.context.AzureContextUtils.EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME;
import static com.azure.spring.core.properties.AzurePropertiesUtils.mergeAzureCommonProperties;

/**
* Configures a {@link BlobCheckpointStore}
Expand All @@ -33,18 +35,16 @@ public class AzureBlobCheckpointStoreConfiguration {

@Bean
@ConditionalOnMissingBean
public BlobCheckpointStore blobCheckpointStore(@Qualifier(EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME)
BlobServiceClientBuilderFactory blobServiceClientBuilderFactory,
AzureEventHubProperties eventHubProperties,
ObjectProvider<BlobCheckpointStoreContainerInitializer> initializers) {
final AzureEventHubProperties.Processor.BlobCheckpointStore checkpointStoreProperties = eventHubProperties
.getProcessor()
.getCheckpointStore();

final BlobContainerAsyncClient blobContainerAsyncClient = blobServiceClientBuilderFactory
.build()
.buildAsyncClient()
.getBlobContainerAsyncClient(checkpointStoreProperties.getContainerName());
public BlobCheckpointStore blobCheckpointStore(
@Qualifier(EVENT_HUB_PROCESSOR_CHECKPOINT_STORE_STORAGE_CLIENT_BUILDER_FACTORY_BEAN_NAME)
BlobServiceClientBuilderFactory factory,
AzureEventHubProperties eventHubProperties,
ObjectProvider<BlobCheckpointStoreContainerInitializer> initializers) {
final AzureEventHubProperties.Processor.BlobCheckpointStore csProperties =
getCheckpointStoreProperties(eventHubProperties);

final BlobContainerAsyncClient blobContainerAsyncClient = factory
.build().buildAsyncClient().getBlobContainerAsyncClient(csProperties.getContainerName());

initializers.ifAvailable(initializer -> initializer.init(blobContainerAsyncClient));

Expand All @@ -69,4 +69,18 @@ public BlobServiceClientBuilderFactory eventHubProcessorBlobServiceClientBuilder
return new BlobServiceClientBuilderFactory(eventHubProperties.getProcessor().getCheckpointStore());
}

private AzureEventHubProperties.Processor.BlobCheckpointStore getCheckpointStoreProperties(
AzureEventHubProperties ehProperties) {

AzureEventHubProperties.Processor.BlobCheckpointStore result = new AzureEventHubProperties.Processor
.BlobCheckpointStore();
AzureEventHubProperties.Processor.BlobCheckpointStore csProperties = ehProperties.getProcessor()
.getCheckpointStore();

mergeAzureCommonProperties(ehProperties, csProperties, result);
BeanUtils.copyProperties(csProperties, result);

return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.spring.core.ApplicationId;
import com.azure.spring.core.connectionstring.StaticConnectionStringProvider;
import com.azure.spring.core.service.AzureServiceType;
import com.azure.spring.service.core.PropertyMapper;
import com.azure.spring.service.eventhubs.factory.EventHubClientBuilderFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand Down Expand Up @@ -43,16 +44,26 @@ class AzureEventHubConsumerClientConfiguration {
@ConditionalOnBean(EventHubClientBuilder.class)
@Configuration(proxyBeanMethods = false)
static class SharedConsumerConnectionConfiguration {

private final EventHubClientBuilder builder;
SharedConsumerConnectionConfiguration(AzureEventHubProperties properties, EventHubClientBuilder builder) {
this.builder = builder;

PropertyMapper mapper = new PropertyMapper();
mapper.from(properties.getConsumer().getConsumerGroup()).to(builder::consumerGroup);
mapper.from(properties.getConsumer().getPrefetchCount()).to(builder::prefetchCount);
}

@Bean
@ConditionalOnMissingBean
public EventHubConsumerAsyncClient eventHubConsumerAsyncClient(EventHubClientBuilder builder) {
return builder.buildAsyncConsumerClient();
public EventHubConsumerAsyncClient eventHubConsumerAsyncClient() {
return this.builder.buildAsyncConsumerClient();
}

@Bean
@ConditionalOnMissingBean
public EventHubConsumerClient eventHubConsumerClient(EventHubClientBuilder builder) {
return builder.buildConsumerClient();
return this.builder.buildConsumerClient();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.cloud.autoconfigure.eventhubs;

import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.spring.cloud.autoconfigure.condition.ConditionalOnAnyProperty;
import com.azure.spring.cloud.autoconfigure.eventhubs.properties.AzureEventHubProperties;
import com.azure.spring.eventhubs.core.EventHubProcessorContainer;
import com.azure.spring.eventhubs.core.EventHubsTemplate;
import com.azure.spring.eventhubs.core.processor.DefaultEventHubNamespaceProcessorFactory;
import com.azure.spring.eventhubs.core.processor.EventHubProcessorFactory;
import com.azure.spring.eventhubs.core.producer.DefaultEventHubNamespaceProducerFactory;
import com.azure.spring.eventhubs.core.producer.EventHubProducerFactory;
import com.azure.spring.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.eventhubs.core.properties.ProcessorProperties;
import com.azure.spring.eventhubs.core.properties.ProducerProperties;
import com.azure.spring.messaging.PropertiesSupplier;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import reactor.util.function.Tuple2;

/**
* An auto-configuration for Event Hub, which provides {@link EventHubsTemplate} and {@link
* EventHubProcessorContainer}.
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EventHubsTemplate.class)
@AutoConfigureAfter(AzureEventHubAutoConfiguration.class)
@ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs", name = { "connection-string", "namespace" })
@ConditionalOnBean(AzureEventHubProperties.class)
@Import({
AzureEventHubMessagingAutoConfiguration.EventHubsTemplateConfiguration.class,
AzureEventHubMessagingAutoConfiguration.ProcessorContainerConfiguration.class
})
public class AzureEventHubMessagingAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public NamespaceProperties eventHubNamespaceProperties(AzureEventHubProperties properties) {
NamespaceProperties namespaceProperties = new NamespaceProperties();
BeanUtils.copyProperties(properties, namespaceProperties);
return namespaceProperties;
}

/**
* Configure the {@link EventHubProcessorContainer}
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnBean(CheckpointStore.class)
public static class ProcessorContainerConfiguration {


@Bean
@ConditionalOnMissingBean
public EventHubProcessorFactory defaultEventProcessorFactory(
NamespaceProperties properties, CheckpointStore checkpointStore,
ObjectProvider<PropertiesSupplier<Tuple2<String, String>, ProcessorProperties>> suppliers) {
return new DefaultEventHubNamespaceProcessorFactory(checkpointStore, properties, suppliers.getIfAvailable());
}

@Bean
@ConditionalOnMissingBean
public EventHubProcessorContainer eventProcessorContainer(EventHubProcessorFactory processorFactory) {
return new EventHubProcessorContainer(processorFactory);
}

}

/**
* Configure the {@link EventHubsTemplate}
*/
@Configuration(proxyBeanMethods = false)
public static class EventHubsTemplateConfiguration {

@Bean
@ConditionalOnMissingBean
public EventHubProducerFactory defaultEventHubProducerFactory(
NamespaceProperties properties,
ObjectProvider<PropertiesSupplier<String, ProducerProperties>> suppliers) {
return new DefaultEventHubNamespaceProducerFactory(properties, suppliers.getIfAvailable());
}

@Bean
@ConditionalOnMissingBean
public EventHubsTemplate eventHubsTemplate(EventHubProducerFactory producerFactory) {
return new EventHubsTemplate(producerFactory);
}

}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.azure.spring.core.ApplicationId;
import com.azure.spring.core.connectionstring.StaticConnectionStringProvider;
import com.azure.spring.core.service.AzureServiceType;
import com.azure.spring.eventhubs.core.EventHubOperation;
import com.azure.spring.service.eventhubs.factory.EventHubClientBuilderFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand All @@ -24,7 +23,8 @@
import static com.azure.spring.cloud.autoconfigure.context.AzureContextUtils.EVENT_HUB_PRODUCER_CLIENT_BUILDER_FACTORY_BEAN_NAME;

/**
* An auto-configuration for Event Hub, which provides {@link EventHubOperation}
* An auto-configuration for Event Hub, which provides {@link EventHubProducerAsyncClient} and
* {@link EventHubProducerClient}.
*
*/
@Configuration(proxyBeanMethods = false)
Expand Down

This file was deleted.

Loading