Skip to content

Commit

Permalink
feat(amqp): scan all queues
Browse files Browse the repository at this point in the history
In addition to the routingKeys
  • Loading branch information
timonback committed Sep 3, 2024
1 parent 11cf3a2 commit 05b074e
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPChannelType;
import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPMessageBinding;
import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPOperationBinding;
import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.channel.ChannelObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
Expand All @@ -33,6 +35,7 @@

@Slf4j
public class RabbitListenerUtil {
public static final String BINDING_NAME = "amqp";
private static final Boolean DEFAULT_AUTO_DELETE = false;
private static final Boolean DEFAULT_DURABLE = true;
private static final Boolean DEFAULT_EXCLUSIVE = false;
Expand Down Expand Up @@ -98,7 +101,7 @@ public static Map<String, ChannelBinding> buildChannelBinding(
channelBinding.exchange(buildExchangeProperties(annotation, exchangeName, context));
}

return Map.of("amqp", channelBinding.build());
return Map.of(BINDING_NAME, channelBinding.build());
}

private static AMQPChannelExchangeProperties buildExchangeProperties(
Expand Down Expand Up @@ -163,6 +166,24 @@ private static AMQPChannelQueueProperties buildQueueProperties(
.build();
}

public static ChannelObject buildChannelObject(org.springframework.amqp.core.Queue queue) {
return ChannelObject.builder()
.channelId(ReferenceUtil.toValidId(queue.getName()))
.address(queue.getName())
.bindings(Map.of(
BINDING_NAME,
AMQPChannelBinding.builder()
.is(AMQPChannelType.QUEUE)
.queue(AMQPChannelQueueProperties.builder()
.name(queue.getName())
.autoDelete(queue.isAutoDelete())
.durable(queue.isDurable())
.exclusive(queue.isExclusive())
.build())
.build()))
.build();
}

private static Boolean parse(String value, Boolean defaultIfEmpty) {
if ("".equals(value)) {
return defaultIfEmpty;
Expand Down Expand Up @@ -194,7 +215,7 @@ private static String getExchangeName(
public static Map<String, OperationBinding> buildOperationBinding(
RabbitListener annotation, StringValueResolver resolver, RabbitListenerUtilContext context) {
return Map.of(
"amqp",
BINDING_NAME,
AMQPOperationBinding.builder()
.cc(getRoutingKeys(annotation, resolver, context))
.build());
Expand Down Expand Up @@ -234,7 +255,7 @@ private static List<String> getRoutingKeys(

public static Map<String, MessageBinding> buildMessageBinding() {
// currently the feature to define amqp message binding is not implemented.
return Map.of("amqp", new AMQPMessageBinding());
return Map.of(BINDING_NAME, new AMQPMessageBinding());
}

public record RabbitListenerUtilContext(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.plugins.amqp.asyncapi.scanners.channels;

import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPChannelBinding;
import io.github.springwolf.asyncapi.v3.model.channel.ChannelObject;
import io.github.springwolf.core.asyncapi.scanners.ChannelsScanner;
import io.github.springwolf.plugins.amqp.asyncapi.scanners.bindings.RabbitListenerUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Queue;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@RequiredArgsConstructor
public class RabbitQueueBeanScanner implements ChannelsScanner {
private final List<Queue> queues;

@Override
public Map<String, ChannelObject> scan() {
return queues.stream()
.map(RabbitListenerUtil::buildChannelObject)
.collect(Collectors.toMap(
o -> ((AMQPChannelBinding) o.getBindings().get(RabbitListenerUtil.BINDING_NAME))
.getQueue()
.getName(),
c -> c,
(a, b) -> a));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.github.springwolf.core.asyncapi.scanners.operations.annotations.SpringAnnotationClassLevelOperationsScanner;
import io.github.springwolf.core.asyncapi.scanners.operations.annotations.SpringAnnotationMethodLevelOperationsScanner;
import io.github.springwolf.plugins.amqp.asyncapi.scanners.bindings.AmqpBindingFactory;
import io.github.springwolf.plugins.amqp.asyncapi.scanners.channels.RabbitQueueBeanScanner;
import io.github.springwolf.plugins.amqp.asyncapi.scanners.common.headers.AsyncHeadersForAmqpBuilder;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
Expand Down Expand Up @@ -158,4 +159,14 @@ public SpringAnnotationOperationsScanner simpleRabbitMethodLevelListenerAnnotati

return new SpringAnnotationOperationsScanner(springwolfClassScanner, strategy);
}

@Bean
@ConditionalOnProperty(
name = SPRINGWOLF_SCANNER_RABBIT_LISTENER_ENABLED,
havingValue = "true",
matchIfMissing = true)
@Order(value = ChannelPriority.AUTO_DISCOVERED)
public RabbitQueueBeanScanner rabbitQueueBeanScanner(List<Queue> queues) {
return new RabbitQueueBeanScanner(queues);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.plugins.amqp.asyncapi.scanners.channels;

import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPChannelBinding;
import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPChannelQueueProperties;
import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPChannelType;
import io.github.springwolf.asyncapi.v3.model.channel.ChannelObject;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Queue;

import java.util.List;
import java.util.Map;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class RabbitQueueBeanScannerTest {

@Test
void scan() {
// given
var queue = new Queue("name");
var scanner = new RabbitQueueBeanScanner(List.of(queue));

// when
var result = scanner.scan();

// then
assertThat(result)
.isEqualTo(Map.of(
"name",
ChannelObject.builder()
.channelId("name")
.address("name")
.bindings(Map.of(
"amqp",
AMQPChannelBinding.builder()
.is(AMQPChannelType.QUEUE)
.queue(AMQPChannelQueueProperties.builder()
.name("name")
.autoDelete(false)
.durable(true)
.exclusive(false)
.build())
.build()))
.build()));
}
}

0 comments on commit 05b074e

Please sign in to comment.