Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support unsubscribe topics while delconsumer in http mode #396

Merged
merged 19 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c7c62f0
[ISSUE #325]Update gradle configuration for publishing package to mav…
xwm1992 Apr 30, 2021
327f149
update build.gradle
xwm1992 Apr 30, 2021
ed78353
update build.gradle and gradle.properties
xwm1992 Apr 30, 2021
ee89cca
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 6, 2021
f80d3a7
update build.gradle and gradle.properties for publish to maven reposi…
xwm1992 May 7, 2021
1eb7a8f
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 7, 2021
0605542
* update gradle version for instructions
xwm1992 May 8, 2021
58ec68f
[ISSUE #329]Missing Log4j dependency
xwm1992 May 10, 2021
16d0af9
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 10, 2021
72c05bc
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 11, 2021
a8fe7ff
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 19, 2021
d68186d
update eventmesh-runtime.png
xwm1992 May 19, 2021
e9e8a04
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 24, 2021
246fbd0
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 31, 2021
b7a19f7
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Jun 8, 2021
8323f89
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Jun 9, 2021
82711e2
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Jun 21, 2021
7a099c1
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Jun 23, 2021
44bb6fa
support unsubscribe topics while delconsumer in http mode
xwm1992 Jun 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.eventmesh.runtime.core.protocol.http.consumer;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
Expand Down Expand Up @@ -82,4 +84,14 @@ public synchronized void refresh(ConsumerGroupConf consumerGroupConfig) throws E
public ConsumerGroupConf getConsumerGroupConfig() {
return consumerGroupConfig;
}

public void unsubscribe(String consumerGroup) throws Exception {
if(StringUtils.equals(consumerGroupConfig.getConsumerGroup(), consumerGroup)){
Set<String> topics = consumerGroupConfig.getConsumerGroupTopicConf().keySet();
for (String topic : topics){
ConsumerGroupTopicConf consumerGroupTopicConf = consumerGroupConfig.getConsumerGroupTopicConf().get(topic);
eventMeshConsumer.unsubscribe(topic, consumerGroupTopicConf.getSubscriptionItem().getMode());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,14 @@ public ConsumerGroupManager getConsumer(String consumerGroup) throws Exception {
* @param consumerGroup
*/
public synchronized void delConsumer(String consumerGroup) throws Exception {
logger.info("start delConsumer with consumerGroup {}", consumerGroup);
if(consumerTable.containsKey(consumerGroup)) {
ConsumerGroupManager cgm = consumerTable.remove(consumerGroup);
logger.info("start unsubscribe topic with consumer group manager {}", JSONObject.toJSONString(cgm));
cgm.unsubscribe(consumerGroup);
cgm.shutdown();
}
logger.info("end delConsumer with consumerGroup {}", consumerGroup);
}

@Subscribe
Expand Down