Skip to content

Commit

Permalink
Refactor message listener interface (#27543)
Browse files Browse the repository at this point in the history
* add generic message listener as top interface for all azure messaging listeners
* change MessageListenerContainer.setupMessageListener from accepting Object to MessageListener<?>
  • Loading branch information
saragluna authored Mar 10, 2022
1 parent 5ccc9ba commit 18db3b8
Show file tree
Hide file tree
Showing 42 changed files with 367 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import com.azure.spring.cloud.autoconfigure.condition.ConditionalOnAnyProperty;
import com.azure.spring.cloud.autoconfigure.implementation.eventhubs.properties.AzureEventHubsProperties;
import com.azure.spring.cloud.core.AzureSpringIdentifier;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.service.AzureServiceType;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsBatchMessageListener;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsMessageListener;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener;
import com.azure.spring.cloud.service.implementation.eventhubs.factory.EventProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.listener.MessageListener;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand All @@ -24,13 +26,14 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

/**
* Configures a {@link EventProcessorClient}.
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EventProcessorClientBuilder.class)
@ConditionalOnBean({ EventHubsMessageListener.class, CheckpointStore.class, EventHubsErrorHandler.class })
@ConditionalOnBean({ MessageListener.class, CheckpointStore.class, EventHubsErrorHandler.class })
@Conditional(AzureEventHubsProcessorClientConfiguration.ProcessorAvailableCondition.class)
class AzureEventHubsProcessorClientConfiguration {

Expand All @@ -51,12 +54,17 @@ public EventProcessorClient eventProcessorClient(EventProcessorClientBuilder bui
@ConditionalOnMissingBean
EventProcessorClientBuilderFactory eventProcessorClientBuilderFactory(
CheckpointStore checkpointStore,
EventHubsMessageListener messageListener,
EventHubsErrorHandler errorHandler,
ObjectProvider<EventHubsRecordMessageListener> recordMessageListeners,
ObjectProvider<EventHubsBatchMessageListener> batchMessageListeners,
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.EventHubs>> connectionStringProviders,
ObjectProvider<AzureServiceClientBuilderCustomizer<EventProcessorClientBuilder>> customizers) {

MessageListener<?> listener = getMessageListener(recordMessageListeners, batchMessageListeners);
Assert.notNull(listener, "Expect only one record / batch message listener for Event Hubs.");

final EventProcessorClientBuilderFactory factory =
new EventProcessorClientBuilderFactory(this.processorProperties, checkpointStore, messageListener, errorHandler);
new EventProcessorClientBuilderFactory(this.processorProperties, checkpointStore, listener, errorHandler);

factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_EVENT_HUBS);
connectionStringProviders.orderedStream().findFirst().ifPresent(factory::setConnectionStringProvider);
Expand All @@ -70,6 +78,26 @@ EventProcessorClientBuilder eventProcessorClientBuilder(EventProcessorClientBuil
return factory.build();
}

private MessageListener<?> getMessageListener(ObjectProvider<EventHubsRecordMessageListener> recordListeners,
ObjectProvider<EventHubsBatchMessageListener> batchListeners) {

boolean isRecordListenerPresent = recordListeners.stream().findAny().isPresent();
boolean isBatchListenerPresent = batchListeners.stream().findAny().isPresent();
if (isRecordListenerPresent && isBatchListenerPresent) {
throw new IllegalArgumentException("Only one type of Event Hubs message listener can be provided, either a "
+ "'EventHubsRecordMessageListener'' or a 'EventHubsBatchMessageListener', but found both.");
}
if (!isRecordListenerPresent && !isBatchListenerPresent) {
throw new IllegalArgumentException("One listener of type 'EventHubsRecordMessageListener' or "
+ "'EventHubsBatchMessageListener' must be provided.");
}
if (isRecordListenerPresent) {
return recordListeners.getIfUnique();
}

return batchListeners.getIfUnique();
}

static class ProcessorAvailableCondition extends AllNestedConditions {

ProcessorAvailableCondition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import com.azure.spring.cloud.autoconfigure.condition.ConditionalOnAnyProperty;
import com.azure.spring.cloud.autoconfigure.implementation.servicebus.properties.AzureServiceBusProperties;
import com.azure.spring.cloud.core.AzureSpringIdentifier;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.service.AzureServiceType;
import com.azure.spring.cloud.service.implementation.servicebus.factory.ServiceBusProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.implementation.servicebus.factory.ServiceBusSessionProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusErrorHandler;
import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusMessageListener;
import com.azure.spring.cloud.service.servicebus.consumer.ServiceBusRecordMessageListener;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand All @@ -28,7 +28,7 @@
* Configuration for a {@link ServiceBusProcessorClient}.
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnBean({ ServiceBusMessageListener.class, ServiceBusErrorHandler.class })
@ConditionalOnBean({ ServiceBusRecordMessageListener.class, ServiceBusErrorHandler.class })
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.servicebus", name = { "entity-name", "processor.entity-name" })
@Import({
AzureServiceBusProcessorClientConfiguration.SessionProcessorClientConfiguration.class,
Expand All @@ -46,7 +46,7 @@ static class NoneSessionProcessorClientConfiguration {
@ConditionalOnMissingBean
ServiceBusProcessorClientBuilderFactory serviceBusProcessorClientBuilderFactory(
AzureServiceBusProperties serviceBusProperties,
ServiceBusMessageListener messageListener,
ServiceBusRecordMessageListener messageListener,
ServiceBusErrorHandler errorHandler,
ObjectProvider<ServiceBusClientBuilder> serviceBusClientBuilders,
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders,
Expand Down Expand Up @@ -97,7 +97,7 @@ static class SessionProcessorClientConfiguration {
@ConditionalOnMissingBean
ServiceBusSessionProcessorClientBuilderFactory serviceBusSessionProcessorClientBuilderFactory(
AzureServiceBusProperties serviceBusProperties,
ServiceBusMessageListener messageListener,
ServiceBusRecordMessageListener messageListener,
ServiceBusErrorHandler errorHandler,
ObjectProvider<ServiceBusClientBuilder> serviceBusClientBuilders,
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.spring.cloud.autoconfigure.TestBuilderCustomizer;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsBatchMessageListener;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsMessageListener;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener;
import com.azure.spring.cloud.service.implementation.eventhubs.factory.EventProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.listener.MessageListener;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
Expand All @@ -24,7 +23,6 @@ class AzureEventHubsProcessorClientConfigurationTests {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(AzureEventHubsProcessorClientConfiguration.class));


@Test
void noMessageListenerAndErrorHandlerShouldNotConfigure() {
contextRunner
Expand All @@ -49,7 +47,7 @@ void noMessageListenerShouldNotConfigure() {
@Test
void noErrorHandlerShouldNotConfigure() {
contextRunner
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub"
Expand All @@ -61,7 +59,7 @@ void noErrorHandlerShouldNotConfigure() {
void noEventHubNameProvidedShouldNotConfigure() {
contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withPropertyValues("spring.cloud.azure.eventhubs.consumer-group=test-cg")
.run(context -> assertThat(context).doesNotHaveBean(AzureEventHubsProcessorClientConfiguration.class));
}
Expand All @@ -70,7 +68,7 @@ void noEventHubNameProvidedShouldNotConfigure() {
void noConsumerGroupProvidedShouldNotConfigure() {
contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withPropertyValues("spring.cloud.azure.eventhubs.event-hub-name=test-eventhub")
.run(context -> assertThat(context).doesNotHaveBean(AzureEventHubsProcessorClientConfiguration.class));
}
Expand All @@ -79,7 +77,7 @@ void noConsumerGroupProvidedShouldNotConfigure() {
void eventHubNameAndConsumerGroupProvidedShouldConfigure() {
contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
Expand All @@ -100,7 +98,7 @@ void customizerShouldBeCalled() {
EventProcessorBuilderCustomizer customizer = new EventProcessorBuilderCustomizer();
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
Expand All @@ -119,7 +117,7 @@ void otherCustomizerShouldNotBeCalled() {
OtherBuilderCustomizer otherBuilderCustomizer = new OtherBuilderCustomizer();
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(EventHubsMessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
Expand All @@ -136,20 +134,92 @@ void otherCustomizerShouldNotBeCalled() {
});
}

private static class EventProcessorBuilderCustomizer extends TestBuilderCustomizer<EventProcessorClientBuilder> {
@Test
void bothRecordAndBatchListenersProvidedShouldThrowException() {
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withBean("recordListener", MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean("batchListener", MessageListener.class, () -> (EventHubsBatchMessageListener) message -> { })
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub",
"spring.cloud.azure.eventhubs.processor.consumer-group=test-consumer-group"
)
.run(context -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure()).isNotNull();
assertThat(context.getStartupFailure().getMessage()).contains("Only one type of Event Hubs message listener can be provided");
});
}

@Test
void multipleRecordListenersProvidedShouldThrowException() {
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withBean("recordListener1", MessageListener.class, TestEventHubsRecordMessageListener::new)
.withBean("recordListener2", MessageListener.class, TestEventHubsRecordMessageListener::new)
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub",
"spring.cloud.azure.eventhubs.processor.consumer-group=test-consumer-group"
)
.run(context -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure()).isNotNull();
assertThat(context.getStartupFailure().getMessage()).contains("Expect only one record / batch message listener for Event Hubs.");
});
}

private static class OtherBuilderCustomizer extends TestBuilderCustomizer<ConfigurationClientBuilder> {
@Test
void multipleBatchListenersProvidedShouldThrowException() {
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withBean("batchListener1", MessageListener.class, () -> (EventHubsBatchMessageListener) message -> { })
.withBean("batchListener2", MessageListener.class, () -> (EventHubsBatchMessageListener) message -> { })
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub",
"spring.cloud.azure.eventhubs.processor.consumer-group=test-consumer-group"
)
.run(context -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure()).isNotNull();
assertThat(context.getStartupFailure().getMessage()).contains("Expect only one record / batch message listener for Event Hubs.");
});
}

@Test
void noCorrectListenersProvidedShouldThrowException() {
this.contextRunner
.withBean(EventHubsErrorHandler.class, () -> errorContext -> { })
.withBean(CheckpointStore.class, TestCheckpointStore::new)
.withBean(MessageListener.class, () -> (MessageListener<String>) message -> { })
.withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class)
.withPropertyValues(
"spring.cloud.azure.eventhubs.namespace=test-namespace",
"spring.cloud.azure.eventhubs.event-hub-name=test-eventhub",
"spring.cloud.azure.eventhubs.processor.consumer-group=test-consumer-group"
)
.run(context -> {
assertThat(context).hasFailed();
assertThat(context.getStartupFailure()).isNotNull();
assertThat(context.getStartupFailure().getMessage()).contains("One listener of type 'EventHubsRecordMessageListener' or 'EventHubsBatchMessageListener' must be provided");
});
}

private static class TestEventHubsRecordMessageListener implements EventHubsRecordMessageListener {
private static class EventProcessorBuilderCustomizer extends TestBuilderCustomizer<EventProcessorClientBuilder> {

@Override
public void onEvent(EventContext eventContext) {
}

private static class OtherBuilderCustomizer extends TestBuilderCustomizer<ConfigurationClientBuilder> {

}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class TestEventHubsRecordMessageListener implements EventHubsRecordMessageListener {

@Override
public void onEvent(EventContext eventContext) {
public void onMessage(EventContext eventContext) {

}
}
Loading

0 comments on commit 18db3b8

Please sign in to comment.