Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update changelog and add test for webSocket in service bus #22778

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/spring/azure-spring-cloud-autoconfigure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
## 2.7.0-beta.1 (Unreleased)
### Key Bug Fixes
- Fixed `EventHubMessageConverter` to load all system properties of `EventData` and put in the header of org.springframework.messaging.Message.([#22683](https://github.com/Azure/azure-sdk-for-java/pull/22683/))

### New Features
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add it in README as well

- Support configure `AmqpTransportType` by adding `spring.cloud.azure.servicebus.transportType`.

## 2.6.0 (2021-06-23)
### Breaking Changes
Expand Down
2 changes: 2 additions & 0 deletions sdk/spring/azure-spring-cloud-autoconfigure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ This project provides auto-configuration for the following Azure services:
- [Storage][storage]
- [Storage Queue][storage_queue]

`AmqpTransportType` can be specified by configuring `spring.cloud.azure.servicebus.transportType`, the default value is `AMQP`.

## Examples

The following section provides sample projects illustrating how to use the Azure Spring Cloud starters.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.spring.sample.servicebus.binder;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.ActiveProfiles;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(classes = { ServiceBusQueueAndTopicWebSocketBinderIT.TestQueueConfig.class,
ServiceBusQueueAndTopicWebSocketBinderIT.TestTopicConfig.class })
@ActiveProfiles("websocket")
public class ServiceBusQueueAndTopicWebSocketBinderIT {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueAndTopicWebSocketBinderIT.class);

private static String message = UUID.randomUUID().toString();

private static CountDownLatch latch = new CountDownLatch(2);

@Autowired
private Sinks.Many<Message<String>> manyQueue;

@Autowired
private Sinks.Many<Message<String>> manyTopic;

@EnableAutoConfiguration
public static class TestQueueConfig {

@Bean
public Sinks.Many<Message<String>> manyQueue() {
return Sinks.many().unicast().onBackpressureBuffer();
}

@Bean
public Supplier<Flux<Message<String>>> queueSupply(Sinks.Many<Message<String>> manyQueue) {
return () -> manyQueue.asFlux()
.doOnNext(m -> LOGGER.info("Manually sending message {}", m))
.doOnError(t -> LOGGER.error("Error encountered", t));
}

@Bean
public Consumer<Message<String>> queueConsume() {
return message -> {
LOGGER.info("---Test queue new message received: '{}'", message);
if (message.getPayload().equals(ServiceBusQueueAndTopicWebSocketBinderIT.message)) {
latch.countDown();
}
};
}
}

@EnableAutoConfiguration
public static class TestTopicConfig {

@Bean
public Sinks.Many<Message<String>> manyTopic() {
return Sinks.many().unicast().onBackpressureBuffer();
}

@Bean
public Supplier<Flux<Message<String>>> topicSupply(Sinks.Many<Message<String>> manyTopic) {
return () -> manyTopic.asFlux()
.doOnNext(m -> LOGGER.info("Manually sending message {}", m))
.doOnError(t -> LOGGER.error("Error encountered", t));
}

@Bean
public Consumer<Message<String>> topicConsume() {
return message -> {
LOGGER.info("---Test topic new message received: '{}'", message);
if (message.getPayload().equals(ServiceBusQueueAndTopicWebSocketBinderIT.message)) {
latch.countDown();
}
};
}
}

@Test
public void testServiceBusSendAndReceiveMessage() throws InterruptedException {
LOGGER.info("SingleServiceBusQueueAndTopicBinderIT begin.");
GenericMessage<String> genericMessage = new GenericMessage<>(message);

LOGGER.info("Send a message:" + message + " to the queue.");
manyQueue.emitNext(genericMessage, Sinks.EmitFailureHandler.FAIL_FAST);
LOGGER.info("Send a message:" + message + " to the topic.");
manyTopic.emitNext(genericMessage, Sinks.EmitFailureHandler.FAIL_FAST);

assertThat(ServiceBusQueueAndTopicWebSocketBinderIT.latch.await(15, TimeUnit.SECONDS)).isTrue();
LOGGER.info("SingleServiceBusQueueAndTopicBinderIT end.");
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
spring:
cloud:
stream:
function:
definition: queueConsume;queueSupply;topicConsume;topicSupply;
bindings:
topicConsume-in-0:
destination: topic1
group: topicSub
topicSupply-out-0:
destination: topic1
queueConsume-in-0:
binder: servicebus-2
destination: queue1
queueSupply-out-0:
binder: servicebus-2
destination: queue1
binders:
servicebus-1:
type: servicebus-topic
default-candidate: true
environment:
spring:
cloud:
azure:
servicebus:
connection-string: ${SERVICEBUS4_BINDER_TEST_CONNECTION_STRING}
transportType: AMQP_WEB_SOCKETS
servicebus-2:
type: servicebus-queue
default-candidate: false
environment:
spring:
cloud:
azure:
servicebus:
connection-string: ${SERVICEBUS4_BINDER_TEST_CONNECTION_STRING}
transportType: AMQP_WEB_SOCKETS
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"namespaces_name_standard_1": "[concat(parameters('baseName'), '-Standard-1')]",
"namespaces_name_standard_2": "[concat(parameters('baseName'), '-Standard-2')]",
"namespaces_name_standard_3": "[concat(parameters('baseName'), '-Standard-3')]",
"namespaces_name_standard_4": "[concat(parameters('baseName'), '-Standard-4')]",
"location": "[resourceGroup().location]"
},
"resources": [
Expand Down Expand Up @@ -53,6 +54,19 @@
"zoneRedundant": false
}
},
{
"type": "Microsoft.ServiceBus/namespaces",
"apiVersion": "2018-01-01-preview",
"name": "[variables('namespaces_name_standard_4')]",
"location": "[variables('location')]",
"sku": {
"name": "Standard",
"tier": "Standard"
},
"properties": {
"zoneRedundant": false
}
},
{
"type": "Microsoft.ServiceBus/namespaces/AuthorizationRules",
"apiVersion": "2017-04-01",
Expand Down Expand Up @@ -101,6 +115,22 @@
]
}
},
{
"type": "Microsoft.ServiceBus/namespaces/AuthorizationRules",
"apiVersion": "2017-04-01",
"name": "[concat(variables('namespaces_name_standard_4'), '/RootManageSharedAccessKey')]",
"location": "[variables('location')]",
"dependsOn": [
"[resourceId('Microsoft.ServiceBus/namespaces', variables('namespaces_name_standard_4'))]"
],
"properties": {
"rights": [
"Listen",
"Manage",
"Send"
]
}
},
{
"type": "Microsoft.ServiceBus/namespaces/queues",
"apiVersion": "2017-04-01",
Expand Down Expand Up @@ -149,6 +179,30 @@
"enableExpress": false
}
},
{
"type": "Microsoft.ServiceBus/namespaces/queues",
"apiVersion": "2017-04-01",
"name": "[concat(variables('namespaces_name_standard_4'), '/queue1')]",
"location": "[variables('location')]",
"dependsOn": [
"[resourceId('Microsoft.ServiceBus/namespaces', variables('namespaces_name_standard_4'))]"
],
"properties": {
"lockDuration": "PT30S",
"maxSizeInMegabytes": 1024,
"requiresDuplicateDetection": false,
"requiresSession": false,
"defaultMessageTimeToLive": "P14D",
"deadLetteringOnMessageExpiration": false,
"enableBatchedOperations": true,
"duplicateDetectionHistoryTimeWindow": "PT10M",
"maxDeliveryCount": 10,
"status": "Active",
"autoDeleteOnIdle": "P10675199DT2H48M5.4775807S",
"enablePartitioning": false,
"enableExpress": false
}
},
{
"type": "Microsoft.ServiceBus/namespaces/topics",
"apiVersion": "2017-04-01",
Expand Down Expand Up @@ -191,6 +245,27 @@
"enableExpress": false
}
},
{
"type": "Microsoft.ServiceBus/namespaces/topics",
"apiVersion": "2017-04-01",
"name": "[concat(variables('namespaces_name_standard_4'), '/topic1')]",
"location": "[variables('location')]",
"dependsOn": [
"[resourceId('Microsoft.ServiceBus/namespaces', variables('namespaces_name_standard_4'))]"
],
"properties": {
"defaultMessageTimeToLive": "P14D",
"maxSizeInMegabytes": 1024,
"requiresDuplicateDetection": false,
"duplicateDetectionHistoryTimeWindow": "PT10M",
"enableBatchedOperations": true,
"status": "Active",
"supportOrdering": true,
"autoDeleteOnIdle": "P10675199DT2H48M5.4775807S",
"enablePartitioning": false,
"enableExpress": false
}
},
{
"type": "Microsoft.ServiceBus/namespaces/topics/subscriptions",
"apiVersion": "2018-01-01-preview",
Expand Down Expand Up @@ -232,6 +307,27 @@
"enableBatchedOperations": true,
"autoDeleteOnIdle": "P14D"
}
},
{
"type": "Microsoft.ServiceBus/namespaces/topics/subscriptions",
"apiVersion": "2018-01-01-preview",
"name": "[concat(variables('namespaces_name_standard_4'), '/topic1/topicSub')]",
"location": "[variables('location')]",
"dependsOn": [
"[resourceId('Microsoft.ServiceBus/namespaces/topics', variables('namespaces_name_standard_4'), 'topic1')]",
"[resourceId('Microsoft.ServiceBus/namespaces', variables('namespaces_name_standard_4'))]"
],
"properties": {
"lockDuration": "PT30S",
"requiresSession": false,
"defaultMessageTimeToLive": "P14D",
"deadLetteringOnMessageExpiration": false,
"deadLetteringOnFilterEvaluationExceptions": false,
"maxDeliveryCount": 1,
"status": "Active",
"enableBatchedOperations": true,
"autoDeleteOnIdle": "P14D"
}
}
],
"outputs": {
Expand All @@ -246,6 +342,10 @@
"SERVICEBUS3_BINDER_TEST_CONNECTION_STRING": {
"type": "string",
"value": "[listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', variables('namespaces_name_standard_3'), 'RootManageSharedAccessKey'), '2017-04-01').primaryConnectionString]"
},
"SERVICEBUS4_BINDER_TEST_CONNECTION_STRING": {
"type": "string",
"value": "[listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', variables('namespaces_name_standard_4'), 'RootManageSharedAccessKey'), '2017-04-01').primaryConnectionString]"
}
}
}