diff --git a/sdk/spring/azure-spring-boot/README.md b/sdk/spring/azure-spring-boot/README.md
index fb29c306c4c7..378eda2ea4ef 100644
--- a/sdk/spring/azure-spring-boot/README.md
+++ b/sdk/spring/azure-spring-boot/README.md
@@ -34,6 +34,97 @@ variable and setting the appropriate properties used by auto-configuration code.
For details, please see sample code in the [azure-spring-boot-sample-cloud-foundry](https://github.com/Azure-Samples/azure-spring-boot-samples/tree/tag_azure-spring-boot_3.6.0/cloudfoundry/azure-cloud-foundry-service-sample)
+## Health indicator
+
+You can use health information to check the status of your running application. It is often used by
+monitoring software to alert someone when a production system goes down. The information exposed by
+the health endpoint depends on the management.endpoint.health.show-details and
+management.endpoint.health.show-components properties which can be configured with one of the
+following values:
+
+| key | Name |
+| ---- | ---- |
+| never | Details are never shown. |
+| when-authorized | Details are only shown to authorized users. Authorized roles can be configured using management.endpoint.health.roles. |
+| always |Details are shown to all users. |
+
+The default value is never. A user is considered to be authorized when they are in one or more of
+the endpoint’s roles. If the endpoint has no configured roles (the default) all authenticated users
+are considered to be authorized. The roles can be configured using the
+management.endpoint.health.roles property.
+
+**NOTE:** If you have secured your application and wish to use `always`, your security configuration
+must permit access to the health endpoint for both authenticated and unauthenticated users.
+
+### Auto-configured HealthIndicators
+
+The following HealthIndicators are auto-configured by Azure Spring Boot when appropriate. You can
+also enable/disable selected indicators by configuring management.health.key.enabled, with the key
+listed in the table below.
+
+| key | Name | Description |
+| ---- | ---- | ---- |
+| azure-cosmos | CosmosHealthIndicator | Checks that a cosmos database is up. |
+| azure-key-vault | KeyVaultHealthIndicator | Checks that a key vault is up. |
+| azure-storage | BlobStorageHealthIndicator | Checks that a storage blob is up. |
+| azure-storage | FileStorageHealthIndicator | Checks that a storage file is up. |
+
+### Add the dependent
+
+```yaml
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+```
+
+### Enabling the Actuator
+
+When you do the following configuration in `application.yml` , you can access the endpoint to get
+the health of the component.
+
+```yaml
+management:
+ health:
+ azure-cosmos:
+ enabled: true
+ azure-key-vault:
+ enabled: true
+ azure-storage:
+ enabled: true
+ endpoint:
+ health:
+ show-details: always
+```
+
+Access the Health Endpoint:
+
+```json
+{
+ "status": "UP",
+ "components": {
+ "blobStorage": {
+ "status": "UP",
+ "details": {
+ "URL": "https://xxxx.blob.core.windows.net"
+ }
+ },
+ "cosmos": {
+ "status": "UP",
+ "details": {
+ "database": "xxx"
+ }
+ },
+ "keyVault": {
+ "status": "UP"
+ }
+ }
+}
+```
+
+
## Examples
The following section provides sample projects illustrating how to use the Azure Spring Boot starters.
### More sample code
diff --git a/sdk/spring/azure-spring-cloud-autoconfigure/README.md b/sdk/spring/azure-spring-cloud-autoconfigure/README.md
index 145a9b7a0715..f307a75d73fe 100644
--- a/sdk/spring/azure-spring-cloud-autoconfigure/README.md
+++ b/sdk/spring/azure-spring-cloud-autoconfigure/README.md
@@ -28,6 +28,87 @@ This project provides auto-configuration for the following Azure services:
- [Service Bus][service_bus]
- [Storage Queue][storage_queue]
+## Health indicator
+
+You can use health information to check the status of your running application. It is often used by
+monitoring software to alert someone when a production system goes down. The information exposed by
+the health endpoint depends on the management.endpoint.health.show-details and
+management.endpoint.health.show-components properties which can be configured with one of the
+following values:
+
+| key | Name |
+| ---- | ---- |
+| never | Details are never shown. |
+| when-authorized | Details are only shown to authorized users. Authorized roles can be configured using management.endpoint.health.roles. |
+| always |Details are shown to all users. |
+
+The default value is never. A user is considered to be authorized when they are in one or more of
+the endpoint’s roles. If the endpoint has no configured roles (the default) all authenticated users
+are considered to be authorized. The roles can be configured using the
+management.endpoint.health.roles property.
+
+**NOTE:** If you have secured your application and wish to use `always`, your security configuration
+must permit access to the health endpoint for both authenticated and unauthenticated users.
+
+### Auto-configured HealthIndicators
+
+The following HealthIndicators are auto-configured by Azure Spring Boot when appropriate. You can
+also enable/disable selected indicators by configuring management.health.key.enabled, with the key
+listed in the table below.
+
+| key | Name | Description |
+| ---- | ---- | ---- |
+| binders | EventHubHealthIndicator | Checks that an event hub is up. |
+| binders | ServiceBusQueueHealthIndicator | Checks that a service bus queue is up. |
+| binders | ServiceBusTopicHealthIndicator | Checks that a service bus topic is up. |
+
+### Add the dependent
+
+```yaml
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+```
+
+### Enabling the Actuator
+
+When you do the following configuration in `application.yml` , you can access the endpoint to get
+the health of the component.
+
+```yaml
+management:
+ health:
+ binders:
+ enabled: true
+ endpoint:
+ health:
+ show-details: always
+```
+
+Access the Health Endpoint:
+
+```json
+{
+ "status": "UP",
+ "components": {
+ "binders": {
+ "status": "UP",
+ "components": {
+ "eventhub-1": {
+ "status": "UP"
+ },
+ "servicebus-1": {
+ "status": "UP"
+ }
+ }
+ }
+ }
+}
+```
+
## Examples
The following section provides sample projects illustrating how to use the Spring Cloud for Azure starters.
diff --git a/sdk/spring/azure-spring-cloud-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/servicebus/AzureServiceBusAutoConfiguration.java b/sdk/spring/azure-spring-cloud-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/servicebus/AzureServiceBusAutoConfiguration.java
index e35b8153733b..4b203cebc4ee 100644
--- a/sdk/spring/azure-spring-cloud-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/servicebus/AzureServiceBusAutoConfiguration.java
+++ b/sdk/spring/azure-spring-cloud-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/servicebus/AzureServiceBusAutoConfiguration.java
@@ -76,5 +76,4 @@ public ServiceBusConnectionStringProvider serviceBusConnectionStringProvider(
return null;
}
-
}
diff --git a/sdk/spring/azure-spring-cloud-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/servicebus/AzureServiceBusTopicAutoConfiguration.java b/sdk/spring/azure-spring-cloud-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/servicebus/AzureServiceBusTopicAutoConfiguration.java
index 4a5d15d6c751..29acae439e21 100644
--- a/sdk/spring/azure-spring-cloud-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/servicebus/AzureServiceBusTopicAutoConfiguration.java
+++ b/sdk/spring/azure-spring-cloud-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/servicebus/AzureServiceBusTopicAutoConfiguration.java
@@ -94,5 +94,4 @@ public ServiceBusTopicOperation topicOperation(ServiceBusTopicClientFactory fact
ServiceBusMessageConverter messageConverter) {
return new ServiceBusTopicTemplate(factory, messageConverter);
}
-
}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/pom.xml b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/pom.xml
index f32135542fd6..800b54ce5ec5 100644
--- a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/pom.xml
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/pom.xml
@@ -37,6 +37,13 @@
true
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+ 2.5.3
+ true
+
+
com.azure.spring
azure-spring-cloud-stream-binder-test
@@ -65,6 +72,7 @@
org.springframework.boot:spring-boot-configuration-processor:[2.5.3]
+ org.springframework.boot:spring-boot-starter-actuator:[2.5.3]
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueueHealthIndicator.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueueHealthIndicator.java
new file mode 100644
index 000000000000..2a51b0ea839a
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueueHealthIndicator.java
@@ -0,0 +1,47 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.spring.servicebus.stream.binder;
+
+import com.azure.spring.integration.servicebus.health.Instrumentation;
+import com.azure.spring.integration.servicebus.health.InstrumentationManager;
+import com.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation;
+import org.springframework.boot.actuate.health.AbstractHealthIndicator;
+import org.springframework.boot.actuate.health.Health;
+
+/**
+ * Implementation of a {@link AbstractHealthIndicator} returning status information for
+ * service bus queue.
+ */
+public class ServiceBusQueueHealthIndicator extends AbstractHealthIndicator {
+
+ private final InstrumentationManager instrumentationManager;
+
+ public ServiceBusQueueHealthIndicator(ServiceBusQueueOperation serviceBusQueueOperation) {
+ super("Service bus health check failed");
+ this.instrumentationManager = serviceBusQueueOperation.getInstrumentationManager();
+ }
+
+ @Override
+ protected void doHealthCheck(Health.Builder builder) {
+ if (instrumentationManager == null || instrumentationManager.getHealthInstrumentations().isEmpty()) {
+ builder.unknown();
+ return;
+ }
+ if (instrumentationManager.getHealthInstrumentations().stream()
+ .allMatch(Instrumentation::isUp)) {
+ builder.up();
+ return;
+ }
+ if (instrumentationManager.getHealthInstrumentations().stream()
+ .allMatch(Instrumentation::isOutOfService)) {
+ builder.outOfService();
+ return;
+ }
+ builder.down();
+ instrumentationManager.getHealthInstrumentations().stream()
+ .filter(instrumentation -> !instrumentation.isStarted())
+ .forEach(instrumentation -> builder
+ .withDetail(instrumentation.getName() + ":" + instrumentation.getType().getTypeName(),
+ instrumentation.getStartException()));
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusQueueBinderConfiguration.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusQueueBinderConfiguration.java
index 9d5fa36be0c1..6ddc4dd48870 100644
--- a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusQueueBinderConfiguration.java
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusQueueBinderConfiguration.java
@@ -32,7 +32,9 @@
AzureEnvironmentAutoConfiguration.class,
AzureContextAutoConfiguration.class,
AzureServiceBusAutoConfiguration.class,
- AzureServiceBusQueueAutoConfiguration.class })
+ AzureServiceBusQueueAutoConfiguration.class,
+ ServiceBusQueueBinderHealthIndicatorConfiguration.class
+})
@EnableConfigurationProperties({ AzureServiceBusProperties.class, ServiceBusQueueExtendedBindingProperties.class })
public class ServiceBusQueueBinderConfiguration {
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusQueueBinderHealthIndicatorConfiguration.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusQueueBinderHealthIndicatorConfiguration.java
new file mode 100644
index 000000000000..88a1670db2bd
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusQueueBinderHealthIndicatorConfiguration.java
@@ -0,0 +1,25 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.spring.servicebus.stream.binder.config;
+
+import com.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation;
+import com.azure.spring.servicebus.stream.binder.ServiceBusQueueHealthIndicator;
+import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Auto configuration for {@link ServiceBusQueueHealthIndicator}.
+ */
+@Configuration
+@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
+@ConditionalOnEnabledHealthIndicator("binders")
+public class ServiceBusQueueBinderHealthIndicatorConfiguration {
+
+ @Bean
+ public ServiceBusQueueHealthIndicator serviceBusQueueHealthIndicator(ServiceBusQueueOperation serviceBusQueueOperation) {
+ return new ServiceBusQueueHealthIndicator(serviceBusQueueOperation);
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueueBinderHealthIndicatorTest.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueueBinderHealthIndicatorTest.java
new file mode 100644
index 000000000000..4c51dd67e291
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueueBinderHealthIndicatorTest.java
@@ -0,0 +1,80 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.spring.servicebus.stream.binder;
+
+import com.azure.messaging.servicebus.ServiceBusProcessorClient;
+import com.azure.spring.integration.servicebus.ServiceBusClientConfig;
+import com.azure.spring.integration.servicebus.ServiceBusMessageProcessor;
+import com.azure.spring.integration.servicebus.ServiceBusRuntimeException;
+import com.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory;
+import com.azure.spring.integration.servicebus.queue.ServiceBusQueueTemplate;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+import org.springframework.messaging.Message;
+
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+public class ServiceBusQueueBinderHealthIndicatorTest {
+
+ @Mock
+ private ServiceBusQueueClientFactory serviceBusQueueClientFactory;
+
+ @Mock
+ private ServiceBusProcessorClient processorClient;
+
+ private ServiceBusQueueHealthIndicator serviceBusQueueHealthIndicator;
+
+ private ServiceBusQueueTemplate serviceBusQueueTemplate;
+
+ private Consumer> consumer = message -> {
+ };
+
+ @BeforeEach
+ public void init() {
+ MockitoAnnotations.openMocks(this);
+ serviceBusQueueTemplate = new ServiceBusQueueTemplate(serviceBusQueueClientFactory);
+ serviceBusQueueHealthIndicator = new ServiceBusQueueHealthIndicator(serviceBusQueueTemplate);
+ }
+
+ @Test
+ public void testNoInstrumentationInUse() {
+ final Health health = serviceBusQueueHealthIndicator.health();
+ assertThat(health.getStatus()).isEqualTo(Status.UNKNOWN);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testServiceBusQueueIsUp() {
+ when(serviceBusQueueClientFactory.getOrCreateProcessor(anyString(), any(ServiceBusClientConfig.class),
+ any(ServiceBusMessageProcessor.class))).thenReturn(processorClient);
+ serviceBusQueueTemplate.subscribe("queue-test-1", consumer, byte[].class);
+ final Health health = serviceBusQueueHealthIndicator.health();
+ assertThat(health.getStatus()).isEqualTo(Status.UP);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testServiceBusQueueIsDown() {
+ when(serviceBusQueueClientFactory.getOrCreateProcessor(anyString(), any(ServiceBusClientConfig.class),
+ any(ServiceBusMessageProcessor.class))).thenReturn(processorClient);
+ doThrow(NullPointerException.class).when(processorClient).start();
+ assertThrows(ServiceBusRuntimeException.class, () -> {
+ serviceBusQueueTemplate.subscribe("queue-test-1", consumer, byte[].class);
+ });
+ final Health health = serviceBusQueueHealthIndicator.health();
+ assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueuePartitionBinderTests.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueuePartitionBinderTests.java
index 3c99e26dece5..d9b848df251b 100644
--- a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueuePartitionBinderTests.java
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusQueuePartitionBinderTests.java
@@ -24,9 +24,10 @@
@RunWith(MockitoJUnitRunner.class)
public class ServiceBusQueuePartitionBinderTests
extends AzurePartitionBinderTests,
- ExtendedProducerProperties> {
- //TODO (Xiaobing Zhu): It is currently impossible to upgrade JUnit 4 to JUnit 5 due to the inheritance of Spring unit tests.
+ ExtendedConsumerProperties,
+ ExtendedProducerProperties> {
+ //TODO (Xiaobing Zhu): It is currently impossible to upgrade JUnit 4 to JUnit 5 due to the inheritance of Spring
+ // unit tests.
@Mock
ServiceBusQueueClientFactory clientFactory;
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/support/ServiceBusQueueTestOperation.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/support/ServiceBusQueueTestOperation.java
index 05492fc5144f..80231d62a442 100644
--- a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/support/ServiceBusQueueTestOperation.java
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-queue/src/test/java/com/azure/spring/servicebus/stream/binder/support/ServiceBusQueueTestOperation.java
@@ -13,11 +13,11 @@
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/pom.xml b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/pom.xml
index a746d3f1436d..28db213445e8 100644
--- a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/pom.xml
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/pom.xml
@@ -37,6 +37,13 @@
true
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+ 2.5.3
+ true
+
+
com.azure.spring
azure-spring-cloud-stream-binder-test
@@ -65,6 +72,7 @@
org.springframework.boot:spring-boot-configuration-processor:[2.5.3]
+ org.springframework.boot:spring-boot-starter-actuator:[2.5.3]
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicHealthIndicator.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicHealthIndicator.java
new file mode 100644
index 000000000000..2a09b91360a1
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicHealthIndicator.java
@@ -0,0 +1,45 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.spring.servicebus.stream.binder;
+
+import com.azure.spring.integration.servicebus.health.Instrumentation;
+import com.azure.spring.integration.servicebus.health.InstrumentationManager;
+import com.azure.spring.integration.servicebus.topic.ServiceBusTopicOperation;
+import org.springframework.boot.actuate.health.AbstractHealthIndicator;
+import org.springframework.boot.actuate.health.Health;
+
+/**
+ * Implementation of a {@link AbstractHealthIndicator} returning status information for service bus topic.
+ */
+public class ServiceBusTopicHealthIndicator extends AbstractHealthIndicator {
+ private final InstrumentationManager instrumentationManager;
+
+ public ServiceBusTopicHealthIndicator(ServiceBusTopicOperation serviceBusTopicOperation) {
+ super("Service bus health check failed");
+ this.instrumentationManager = serviceBusTopicOperation.getInstrumentationManager();
+ }
+
+ @Override
+ protected void doHealthCheck(Health.Builder builder) {
+ if (instrumentationManager == null || instrumentationManager.getHealthInstrumentations().isEmpty()) {
+ builder.unknown();
+ return;
+ }
+ if (instrumentationManager.getHealthInstrumentations().stream()
+ .allMatch(Instrumentation::isUp)) {
+ builder.up();
+ return;
+ }
+ if (instrumentationManager.getHealthInstrumentations().stream()
+ .allMatch(Instrumentation::isOutOfService)) {
+ builder.outOfService();
+ return;
+ }
+ builder.down();
+ instrumentationManager.getHealthInstrumentations().stream()
+ .filter(instrumentation -> !instrumentation.isStarted())
+ .forEach(instrumentation -> builder
+ .withDetail(instrumentation.getName() + ":" + instrumentation.getType().getTypeName(),
+ instrumentation.getStartException()));
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusTopicBinderConfiguration.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusTopicBinderConfiguration.java
index dd7e3c15bac3..1cd98d4bee64 100644
--- a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusTopicBinderConfiguration.java
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusTopicBinderConfiguration.java
@@ -33,7 +33,8 @@
AzureEnvironmentAutoConfiguration.class,
AzureContextAutoConfiguration.class,
AzureServiceBusAutoConfiguration.class,
- AzureServiceBusTopicAutoConfiguration.class
+ AzureServiceBusTopicAutoConfiguration.class,
+ ServiceBusTopicBinderHealthIndicatorConfiguration.class
})
@EnableConfigurationProperties({ AzureServiceBusProperties.class, ServiceBusTopicExtendedBindingProperties.class })
public class ServiceBusTopicBinderConfiguration {
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusTopicBinderHealthIndicatorConfiguration.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusTopicBinderHealthIndicatorConfiguration.java
new file mode 100644
index 000000000000..f7c81faa82dc
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/main/java/com/azure/spring/servicebus/stream/binder/config/ServiceBusTopicBinderHealthIndicatorConfiguration.java
@@ -0,0 +1,25 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.spring.servicebus.stream.binder.config;
+
+import com.azure.spring.integration.servicebus.topic.ServiceBusTopicOperation;
+import com.azure.spring.servicebus.stream.binder.ServiceBusTopicHealthIndicator;
+import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Auto configuration for {@link ServiceBusTopicHealthIndicator}.
+ */
+@Configuration
+@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
+@ConditionalOnEnabledHealthIndicator("binders")
+public class ServiceBusTopicBinderHealthIndicatorConfiguration {
+
+ @Bean
+ public ServiceBusTopicHealthIndicator serviceBusQueueHealthIndicator(ServiceBusTopicOperation serviceBusTopicOperation) {
+ return new ServiceBusTopicHealthIndicator(serviceBusTopicOperation);
+ }
+
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicBinderHealthIndicatorTest.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicBinderHealthIndicatorTest.java
new file mode 100644
index 000000000000..89a0e212acfc
--- /dev/null
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicBinderHealthIndicatorTest.java
@@ -0,0 +1,80 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.spring.servicebus.stream.binder;
+
+import com.azure.messaging.servicebus.ServiceBusProcessorClient;
+import com.azure.spring.integration.servicebus.ServiceBusClientConfig;
+import com.azure.spring.integration.servicebus.ServiceBusMessageProcessor;
+import com.azure.spring.integration.servicebus.ServiceBusRuntimeException;
+import com.azure.spring.integration.servicebus.factory.ServiceBusTopicClientFactory;
+import com.azure.spring.integration.servicebus.topic.ServiceBusTopicTemplate;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+import org.springframework.messaging.Message;
+
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+public class ServiceBusTopicBinderHealthIndicatorTest {
+ @Mock
+ private ServiceBusTopicClientFactory serviceBusTopicClientFactory;
+
+ @Mock
+ private ServiceBusProcessorClient processorClient;
+
+ private ServiceBusTopicHealthIndicator serviceBusTopicHealthIndicator;
+
+ private ServiceBusTopicTemplate serviceBusTopicTemplate;
+
+ private Consumer> consumer = message -> {
+ };
+
+ @BeforeEach
+ public void init() {
+ MockitoAnnotations.openMocks(this);
+ serviceBusTopicTemplate = new ServiceBusTopicTemplate(serviceBusTopicClientFactory);
+ serviceBusTopicHealthIndicator = new ServiceBusTopicHealthIndicator(serviceBusTopicTemplate);
+ }
+
+ @Test
+ public void testNoTopicInstrumentationInUse() {
+ final Health health = serviceBusTopicHealthIndicator.health();
+ assertThat(health.getStatus()).isEqualTo(Status.UNKNOWN);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testServiceBusTopicIsUp() {
+ when(serviceBusTopicClientFactory.getOrCreateProcessor(anyString(), anyString(),
+ any(ServiceBusClientConfig.class),
+ any(ServiceBusMessageProcessor.class))).thenReturn(processorClient);
+ serviceBusTopicTemplate.subscribe("topic-test-1", "topicSubTest", consumer, byte[].class);
+ final Health health = serviceBusTopicHealthIndicator.health();
+ assertThat(health.getStatus()).isEqualTo(Status.UP);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testServiceBusTopicIsDown() {
+ when(serviceBusTopicClientFactory.getOrCreateProcessor(anyString(), anyString(),
+ any(ServiceBusClientConfig.class),
+ any(ServiceBusMessageProcessor.class))).thenReturn(processorClient);
+ doThrow(NullPointerException.class).when(processorClient).start();
+ assertThrows(ServiceBusRuntimeException.class, () -> {
+ serviceBusTopicTemplate.subscribe("topic-test-1", "topicSubTest", consumer, byte[].class);
+ });
+ final Health health = serviceBusTopicHealthIndicator.health();
+ assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ }
+}
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicPartitionBinderTests.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicPartitionBinderTests.java
index 1de3436d94e0..e32741d8f98b 100644
--- a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicPartitionBinderTests.java
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/ServiceBusTopicPartitionBinderTests.java
@@ -24,8 +24,8 @@
@RunWith(MockitoJUnitRunner.class)
public class ServiceBusTopicPartitionBinderTests
extends AzurePartitionBinderTests,
- ExtendedProducerProperties> {
+ ExtendedConsumerProperties,
+ ExtendedProducerProperties> {
//TODO (Xiaobing Zhu): It is currently impossible to upgrade JUnit 4 to JUnit 5 due to the inheritance of Spring unit tests.
diff --git a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/support/ServiceBusTopicTestOperation.java b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/support/ServiceBusTopicTestOperation.java
index 181360945234..8f7a49aa4adf 100644
--- a/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/support/ServiceBusTopicTestOperation.java
+++ b/sdk/spring/azure-spring-cloud-stream-binder-servicebus-topic/src/test/java/com/azure/spring/servicebus/stream/binder/support/ServiceBusTopicTestOperation.java
@@ -13,8 +13,8 @@
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/ServiceBusTemplate.java b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/ServiceBusTemplate.java
index c576fb4b596f..e828f1748d3a 100644
--- a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/ServiceBusTemplate.java
+++ b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/ServiceBusTemplate.java
@@ -4,12 +4,15 @@
package com.azure.spring.integration.servicebus;
import com.azure.messaging.servicebus.ServiceBusMessage;
+import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import com.azure.spring.integration.core.api.CheckpointConfig;
import com.azure.spring.integration.core.api.CheckpointMode;
import com.azure.spring.integration.core.api.PartitionSupplier;
import com.azure.spring.integration.core.api.SendOperation;
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageConverter;
import com.azure.spring.integration.servicebus.factory.ServiceBusSenderFactory;
+import com.azure.spring.integration.servicebus.health.Instrumentation;
+import com.azure.spring.integration.servicebus.health.InstrumentationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
@@ -34,6 +37,7 @@ public class ServiceBusTemplate implements Se
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusTemplate.class);
private static final CheckpointConfig CHECKPOINT_RECORD = CheckpointConfig.builder().checkpointMode(RECORD).build();
private static final ServiceBusMessageConverter DEFAULT_CONVERTER = new ServiceBusMessageConverter();
+ protected InstrumentationManager instrumentationManager = new InstrumentationManager();
protected final T clientFactory;
protected CheckpointConfig checkpointConfig = CHECKPOINT_RECORD;
protected ServiceBusClientConfig clientConfig = ServiceBusClientConfig.builder().build();
@@ -54,13 +58,29 @@ public CompletableFuture sendAsync(String destination,
Message message,
PartitionSupplier partitionSupplier) {
Assert.hasText(destination, "destination can't be null or empty");
+ ServiceBusSenderAsyncClient senderAsyncClient = null;
ServiceBusMessage serviceBusMessage = messageConverter.fromMessage(message, ServiceBusMessage.class);
if (Objects.nonNull(serviceBusMessage) && !StringUtils.hasText(serviceBusMessage.getPartitionKey())) {
String partitionKey = getPartitionKey(partitionSupplier);
serviceBusMessage.setPartitionKey(partitionKey);
}
- return this.clientFactory.getOrCreateSender(destination).sendMessage(serviceBusMessage).toFuture();
+ Instrumentation instrumentation = new Instrumentation(destination, Instrumentation.Type.PRODUCE);
+ try {
+ instrumentationManager.addHealthInstrumentation(instrumentation);
+ senderAsyncClient = this.clientFactory.getOrCreateSender(destination);
+ instrumentationManager.getHealthInstrumentation(instrumentation).markStartedSuccessfully();
+ } catch (Exception e) {
+ instrumentationManager.getHealthInstrumentation(instrumentation).markStartFailed(e);
+ LOGGER.error("ServiceBus senderAsyncClient startup failed, Caused by " + e.getMessage());
+ throw new ServiceBusRuntimeException("ServiceBus send client startup failed, Caused by " + e.getMessage(), e);
+ }
+
+ return senderAsyncClient.sendMessage(serviceBusMessage).toFuture();
+ }
+
+ public InstrumentationManager getInstrumentationManager() {
+ return instrumentationManager;
}
public CheckpointConfig getCheckpointConfig() {
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/health/Instrumentation.java b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/health/Instrumentation.java
new file mode 100644
index 000000000000..84e2840a882c
--- /dev/null
+++ b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/health/Instrumentation.java
@@ -0,0 +1,83 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.spring.integration.servicebus.health;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * ServiceBus health details entity class.
+ */
+public class Instrumentation {
+
+ /**
+ * Specifies the type of queue and topic currently in use.
+ */
+ public enum Type {
+
+ CONSUME("consume"),
+
+ PRODUCE("produce");
+
+ private String typeName;
+
+ Type(String typeName) {
+ this.typeName = typeName;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+ }
+
+ private final String name;
+
+ private final Type type;
+
+ protected final AtomicBoolean started = new AtomicBoolean(false);
+
+ protected Exception startException = null;
+
+ public Instrumentation(String name, Type type) {
+ this.name = name;
+ this.type = type;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public boolean isDown() {
+ return startException != null;
+ }
+
+ public boolean isUp() {
+ return started.get();
+ }
+
+ public boolean isOutOfService() {
+ return !started.get() && startException == null;
+ }
+
+ public void markStartedSuccessfully() {
+ started.set(true);
+ }
+
+ public void markStartFailed(Exception e) {
+ started.set(false);
+ startException = e;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isStarted() {
+ return started.get();
+ }
+
+ public Exception getStartException() {
+ return startException;
+ }
+
+}
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/health/InstrumentationManager.java b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/health/InstrumentationManager.java
new file mode 100644
index 000000000000..825ca7d361a7
--- /dev/null
+++ b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/health/InstrumentationManager.java
@@ -0,0 +1,32 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.spring.integration.servicebus.health;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * ServiceBus health details management class.
+ */
+public class InstrumentationManager {
+
+ private final Map healthInstrumentations = new HashMap<>();
+
+ public Set getHealthInstrumentations() {
+ return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue)
+ .collect(Collectors.toSet());
+ }
+
+ public void addHealthInstrumentation(Instrumentation instrumentation) {
+ healthInstrumentations.put(instrumentation.getName() + ":" + instrumentation.getType().getTypeName(),
+ instrumentation);
+ }
+
+ public Instrumentation getHealthInstrumentation(Instrumentation instrumentation) {
+ return healthInstrumentations.get(instrumentation.getName() + ":" + instrumentation.getType().getTypeName());
+ }
+
+}
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/health/package-info.java b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/health/package-info.java
new file mode 100644
index 000000000000..86d2616c57ac
--- /dev/null
+++ b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/health/package-info.java
@@ -0,0 +1,7 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+/**
+ * Package com.azure.spring.integration.servicebus.health;
+ */
+package com.azure.spring.integration.servicebus.health;
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/queue/ServiceBusQueueOperation.java b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/queue/ServiceBusQueueOperation.java
index 81e49a066c86..2c35b72b4058 100644
--- a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/queue/ServiceBusQueueOperation.java
+++ b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/queue/ServiceBusQueueOperation.java
@@ -6,6 +6,7 @@
import com.azure.spring.integration.core.api.SendOperation;
import com.azure.spring.integration.core.api.SubscribeOperation;
import com.azure.spring.integration.servicebus.ServiceBusClientConfig;
+import com.azure.spring.integration.servicebus.health.InstrumentationManager;
import org.springframework.messaging.Message;
/**
@@ -17,6 +18,8 @@
*/
public interface ServiceBusQueueOperation extends SendOperation, SubscribeOperation {
+ InstrumentationManager getInstrumentationManager();
+
void setClientConfig(ServiceBusClientConfig clientConfig);
/**
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/queue/ServiceBusQueueTemplate.java b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/queue/ServiceBusQueueTemplate.java
index cbaca1add070..4ac9111631a1 100644
--- a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/queue/ServiceBusQueueTemplate.java
+++ b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/queue/ServiceBusQueueTemplate.java
@@ -12,6 +12,7 @@
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageConverter;
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders;
import com.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory;
+import com.azure.spring.integration.servicebus.health.Instrumentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
@@ -68,10 +69,18 @@ protected String buildCheckpointSuccessMessage(Message> message) {
return String.format(MSG_SUCCESS_CHECKPOINT, message, name, getCheckpointConfig().getCheckpointMode());
}
};
-
- ServiceBusProcessorClient processorClient = this.clientFactory.getOrCreateProcessor(name, clientConfig,
- messageProcessor);
- processorClient.start();
+ Instrumentation instrumentation = new Instrumentation(name, Instrumentation.Type.CONSUME);
+ try {
+ instrumentationManager.addHealthInstrumentation(instrumentation);
+ ServiceBusProcessorClient processorClient = this.clientFactory.getOrCreateProcessor(name, clientConfig,
+ messageProcessor);
+ processorClient.start();
+ instrumentationManager.getHealthInstrumentation(instrumentation).markStartedSuccessfully();
+ } catch (Exception e) {
+ instrumentationManager.getHealthInstrumentation(instrumentation).markStartFailed(e);
+ LOGGER.error("ServiceBus processorClient startup failed, Caused by " + e.getMessage());
+ throw new ServiceBusRuntimeException("ServiceBus processor client startup failed, Caused by " + e.getMessage(), e);
+ }
}
@Override
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/topic/ServiceBusTopicOperation.java b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/topic/ServiceBusTopicOperation.java
index c615f469b4df..e255031c6079 100644
--- a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/topic/ServiceBusTopicOperation.java
+++ b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/topic/ServiceBusTopicOperation.java
@@ -6,6 +6,7 @@
import com.azure.spring.integration.core.api.SendOperation;
import com.azure.spring.integration.core.api.SubscribeByGroupOperation;
import com.azure.spring.integration.servicebus.ServiceBusClientConfig;
+import com.azure.spring.integration.servicebus.health.InstrumentationManager;
/**
* Azure service bus topic operation to support sending {@link org.springframework.messaging.Message} asynchronously
@@ -15,6 +16,8 @@
*/
public interface ServiceBusTopicOperation extends SendOperation, SubscribeByGroupOperation {
+ InstrumentationManager getInstrumentationManager();
+
void setClientConfig(ServiceBusClientConfig clientConfig);
}
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/topic/ServiceBusTopicTemplate.java b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/topic/ServiceBusTopicTemplate.java
index cd0a2f904789..73652d9cf821 100644
--- a/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/topic/ServiceBusTopicTemplate.java
+++ b/sdk/spring/azure-spring-integration-servicebus/src/main/java/com/azure/spring/integration/servicebus/topic/ServiceBusTopicTemplate.java
@@ -11,6 +11,7 @@
import com.azure.spring.integration.servicebus.ServiceBusTemplate;
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageConverter;
import com.azure.spring.integration.servicebus.factory.ServiceBusTopicClientFactory;
+import com.azure.spring.integration.servicebus.health.Instrumentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
@@ -104,13 +105,21 @@ protected String buildCheckpointFailMessage(Message> message) {
@Override
protected String buildCheckpointSuccessMessage(Message> message) {
return String.format(MSG_SUCCESS_CHECKPOINT, consumer, name, message,
- getCheckpointConfig().getCheckpointMode());
+ getCheckpointConfig().getCheckpointMode());
}
};
- ServiceBusProcessorClient processorClient = this.clientFactory.getOrCreateProcessor(name, consumerGroup,
- this.clientConfig,
- messageProcessor);
- processorClient.start();
+ Instrumentation instrumentation = new Instrumentation(name + consumerGroup, Instrumentation.Type.CONSUME);
+ try {
+ instrumentationManager.addHealthInstrumentation(instrumentation);
+ ServiceBusProcessorClient processorClient = this.clientFactory.getOrCreateProcessor(name, consumerGroup,
+ this.clientConfig, messageProcessor);
+ processorClient.start();
+ instrumentationManager.getHealthInstrumentation(instrumentation).markStartedSuccessfully();
+ } catch (Exception e) {
+ instrumentationManager.getHealthInstrumentation(instrumentation).markStartFailed(e);
+ LOGGER.error("ServiceBus processorClient startup failed, Caused by " + e.getMessage());
+ throw new ServiceBusRuntimeException("ServiceBus processor client startup failed, Caused by " + e.getMessage(), e);
+ }
}
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/test/java/com/azure/spring/integration/servicebus/queue/QueueTemplateSubscribeTest.java b/sdk/spring/azure-spring-integration-servicebus/src/test/java/com/azure/spring/integration/servicebus/queue/QueueTemplateSubscribeTest.java
index 2b0652dffb3c..6e7d4e6e1277 100644
--- a/sdk/spring/azure-spring-integration-servicebus/src/test/java/com/azure/spring/integration/servicebus/queue/QueueTemplateSubscribeTest.java
+++ b/sdk/spring/azure-spring-integration-servicebus/src/test/java/com/azure/spring/integration/servicebus/queue/QueueTemplateSubscribeTest.java
@@ -31,6 +31,7 @@ public class QueueTemplateSubscribeTest extends SubscribeOperationTest