diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java index 4be4fb24ce..d91b462b64 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java @@ -192,16 +192,25 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext for (String key : map.keySet()) { if (StringUtils.equals(subTopic.getTopic(), key)) { ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf(); - ConsumerGroupTopicConf currentTopicConf = map.get(key); latestTopicConf.setConsumerGroup(consumerGroup); latestTopicConf.setTopic(subTopic.getTopic()); latestTopicConf.setSubscriptionItem(subTopic); latestTopicConf.setUrls(new HashSet<>(Arrays.asList(url))); - latestTopicConf.getUrls().addAll(currentTopicConf.getUrls()); + ConsumerGroupTopicConf currentTopicConf = map.get(key); + latestTopicConf.getUrls().addAll(currentTopicConf.getUrls()); latestTopicConf.setIdcUrls(idcUrls); map.put(key, latestTopicConf); + } else { + //If there are multiple topics, append it + ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf(); + newTopicConf.setConsumerGroup(consumerGroup); + newTopicConf.setTopic(subTopic.getTopic()); + newTopicConf.setSubscriptionItem(subTopic); + newTopicConf.setUrls(new HashSet<>(Arrays.asList(url))); + newTopicConf.setIdcUrls(idcUrls); + map.put(subTopic.getTopic(), newTopicConf); } } }