Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor message listener interface #27543

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import com.azure.spring.cloud.autoconfigure.condition.ConditionalOnAnyProperty;
import com.azure.spring.cloud.autoconfigure.implementation.eventhubs.properties.AzureEventHubsProperties;
import com.azure.spring.cloud.core.AzureSpringIdentifier;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.service.AzureServiceType;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsBatchMessageListener;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsMessageListener;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener;
import com.azure.spring.cloud.service.implementation.eventhubs.factory.EventProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.listener.MessageListener;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand All @@ -24,13 +26,14 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

/**
* Configures a {@link EventProcessorClient}.
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EventProcessorClientBuilder.class)
@ConditionalOnBean({ EventHubsMessageListener.class, CheckpointStore.class, EventHubsErrorHandler.class })
@ConditionalOnBean({ MessageListener.class, CheckpointStore.class, EventHubsErrorHandler.class })
@Conditional(AzureEventHubsProcessorClientConfiguration.ProcessorAvailableCondition.class)
class AzureEventHubsProcessorClientConfiguration {

Expand All @@ -51,12 +54,17 @@ public EventProcessorClient eventProcessorClient(EventProcessorClientBuilder bui
@ConditionalOnMissingBean
EventProcessorClientBuilderFactory eventProcessorClientBuilderFactory(
CheckpointStore checkpointStore,
EventHubsMessageListener messageListener,
EventHubsErrorHandler errorHandler,
ObjectProvider<EventHubsRecordMessageListener> recordMessageListeners,
ObjectProvider<EventHubsBatchMessageListener> batchMessageListeners,
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.EventHubs>> connectionStringProviders,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> customizers) {

MessageListener<?> listener = getMessageListener(recordMessageListeners, batchMessageListeners);
Assert.notNull(listener, "Expect only one record / batch message listener for Event Hubs.");

final EventProcessorClientBuilderFactory factory =
new EventProcessorClientBuilderFactory(this.processorProperties, checkpointStore, messageListener, errorHandler);
new EventProcessorClientBuilderFactory(this.processorProperties, checkpointStore, listener, errorHandler);

factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_EVENT_HUBS);
connectionStringProviders.orderedStream().findFirst().ifPresent(factory::setConnectionStringProvider);
Expand All @@ -70,6 +78,26 @@ EventProcessorClientBuilder eventProcessorClientBuilder(EventProcessorClientBuil
return factory.build();
}

private MessageListener<?> getMessageListener(ObjectProvider<EventHubsRecordMessageListener> recordListeners,
ObjectProvider<EventHubsBatchMessageListener> batchListeners) {

boolean isRecordListenerPresent = recordListeners.stream().findAny().isPresent();
boolean isBatchListenerPresent = batchListeners.stream().findAny().isPresent();
if (isRecordListenerPresent && isBatchListenerPresent) {
throw new IllegalArgumentException("Only one type of Event Hubs message listener can be provided, either a "
+ "'EventHubsRecordMessageListener'' or a 'EventHubsBatchMessageListener', but found both.");
}
if (!isRecordListenerPresent && !isBatchListenerPresent) {
throw new IllegalArgumentException("One listener of type 'EventHubsRecordMessageListener' or "
+ "'EventHubsBatchMessageListener' must be provided.");
}
if (isRecordListenerPresent) {
return recordListeners.getIfUnique();
}

return batchListeners.getIfUnique();
}

static class ProcessorAvailableCondition extends AllNestedConditions {

ProcessorAvailableCondition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import com.azure.spring.cloud.autoconfigure.condition.ConditionalOnAnyProperty;
import com.azure.spring.cloud.autoconfigure.implementation.servicebus.properties.AzureServiceBusProperties;
import com.azure.spring.cloud.core.AzureSpringIdentifier;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.service.AzureServiceType;
import com.azure.spring.cloud.service.implementation.servicebus.factory.ServiceBusProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.implementation.servicebus.factory.ServiceBusSessionProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusErrorHandler;
import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusMessageListener;
import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusRecordMessageListener;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand All @@ -28,7 +28,7 @@
* Configuration for a {@link ServiceBusProcessorClient}.
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnBean({ ServiceBusMessageListener.class, ServiceBusErrorHandler.class })
@ConditionalOnBean({ ServiceBusRecordMessageListener.class, ServiceBusErrorHandler.class })
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.servicebus", name = { "entity-name", "processor.entity-name" })
@Import({
AzureServiceBusProcessorClientConfiguration.SessionProcessorClientConfiguration.class,
Expand All @@ -46,7 +46,7 @@ static class NoneSessionProcessorClientConfiguration {
@ConditionalOnMissingBean
ServiceBusProcessorClientBuilderFactory serviceBusProcessorClientBuilderFactory(
AzureServiceBusProperties serviceBusProperties,
ServiceBusMessageListener messageListener,
ServiceBusRecordMessageListener messageListener,
ServiceBusErrorHandler errorHandler,
ObjectProvider<ServiceBusClientBuilder> serviceBusClientBuilders,
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders,
Expand Down Expand Up @@ -97,7 +97,7 @@ static class SessionProcessorClientConfiguration {
@ConditionalOnMissingBean
ServiceBusSessionProcessorClientBuilderFactory serviceBusSessionProcessorClientBuilderFactory(
AzureServiceBusProperties serviceBusProperties,
ServiceBusMessageListener messageListener,
ServiceBusRecordMessageListener messageListener,
ServiceBusErrorHandler errorHandler,
ObjectProvider<ServiceBusClientBuilder> serviceBusClientBuilders,
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.spring.cloud.autoconfigure.TestBuilderCustomizer;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsBatchMessageListener;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsMessageListener;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener;
import com.azure.spring.cloud.service.implementation.eventhubs.factory.EventProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.listener.MessageListener;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
Expand All @@ -24,7 +23,6 @@ class AzureEventHubsProcessorClientConfigurationTests {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(AzureEventHubsProcessorClientConfiguration.class));


@Test
void noMessageListenerAndErrorHandlerShouldNotConfigure() {
contextRunner
Expand All @@ -49,7 +47,7 @@ void noMessageListenerShouldNotConfigure() {
@Test
void noErrorHandlerShouldNotConfigure() {
contextRunner
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub"
Expand All @@ -61,7 +59,7 @@ void noErrorHandlerShouldNotConfigure() {
void noEventHubNameProvidedShouldNotConfigure() {
contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withPropertyValues("spring.cloud.azure.eventhubs.consumer-group=test-cg")
.run(context -> assertThat(context).doesNotHaveBean(AzureEventHubsProcessorClientConfiguration.class));
}
Expand All @@ -70,7 +68,7 @@ void noEventHubNameProvidedShouldNotConfigure() {
void noConsumerGroupProvidedShouldNotConfigure() {
contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withPropertyValues("spring.cloud.azure.eventhubs.event-hub-name=test-eventhub")
.run(context -> assertThat(context).doesNotHaveBean(AzureEventHubsProcessorClientConfiguration.class));
}
Expand All @@ -79,7 +77,7 @@ void noConsumerGroupProvidedShouldNotConfigure() {
void eventHubNameAndConsumerGroupProvidedShouldConfigure() {
contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
Expand All @@ -100,7 +98,7 @@ void customizerShouldBeCalled() {
EventProcessorBuilderCustomizer customizer = new EventProcessorBuilderCustomizer();
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
Expand All @@ -119,7 +117,7 @@ void otherCustomizerShouldNotBeCalled() {
OtherBuilderCustomizer otherBuilderCustomizer = new OtherBuilderCustomizer();
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
Expand All @@ -136,20 +134,92 @@ void otherCustomizerShouldNotBeCalled() {
});
}

private static class EventProcessorBuilderCustomizer extends TestBuilderCustomizer<EventProcessorClientBuilder> {
@Test
void bothRecordAndBatchListenersProvidedShouldThrowException() {
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withBean("recordListener", MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean("batchListener", MessageListener.class, () -> (EventHubsBatchMessageListener) message -> { })
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub",
"spring.cloud.azure.eventhubs.processor.consumer-group=test-consumer-group"
)
.run(context -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure()).isNotNull();
assertThat(context.getStartupFailure().getMessage()).contains("Only one type of Event Hubs message listener can be provided");
});
}

@Test
void multipleRecordListenersProvidedShouldThrowException() {
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withBean("recordListener1", MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean("recordListener2", MessageListener.class, TestEventHubsRecordMessageListener::new)
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub",
"spring.cloud.azure.eventhubs.processor.consumer-group=test-consumer-group"
)
.run(context -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure()).isNotNull();
assertThat(context.getStartupFailure().getMessage()).contains("Expect only one record / batch message listener for Event Hubs.");
});
}

private static class OtherBuilderCustomizer extends TestBuilderCustomizer<ConfigurationClientBuilder> {
@Test
void multipleBatchListenersProvidedShouldThrowException() {
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withBean("batchListener1", MessageListener.class, () -> (EventHubsBatchMessageListener) message -> { })
.withBean("batchListener2", MessageListener.class, () -> (EventHubsBatchMessageListener) message -> { })
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub",
"spring.cloud.azure.eventhubs.processor.consumer-group=test-consumer-group"
)
.run(context -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure()).isNotNull();
assertThat(context.getStartupFailure().getMessage()).contains("Expect only one record / batch message listener for Event Hubs.");
});
}

@Test
void noCorrectListenersProvidedShouldThrowException() {
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withBean(MessageListener.class, () -> (MessageListener<String>) message -> { })
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub",
"spring.cloud.azure.eventhubs.processor.consumer-group=test-consumer-group"
)
.run(context -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure()).isNotNull();
assertThat(context.getStartupFailure().getMessage()).contains("One listener of type 'EventHubsRecordMessageListener' or 'EventHubsBatchMessageListener' must be provided");
});
}

private static class TestEventHubsRecordMessageListener implements EventHubsRecordMessageListener {
private static class EventProcessorBuilderCustomizer extends TestBuilderCustomizer<EventProcessorClientBuilder> {

@Override
public void onEvent(EventContext eventContext) {
}

private static class OtherBuilderCustomizer extends TestBuilderCustomizer<ConfigurationClientBuilder> {

}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class TestEventHubsRecordMessageListener implements EventHubsRecordMessageListener {

@Override
public void onEvent(EventContext eventContext) {
public void onMessage(EventContext eventContext) {

}
}
Loading