Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #344] Fixing racing condition issue in SubscribeProcessor and UnSubscribeProcessor #345

Merged
merged 19 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ba67c5c
Merge pull request #1 from apache/develop
jinrongluo May 10, 2021
d638ec4
[Issue #337] Fix HttpSubscriber startup issue
May 10, 2021
5ebfb54
[Issue #337] test commit
jinrongluo May 10, 2021
a3afff3
[Issue #337] revert test commit
jinrongluo May 10, 2021
50f959d
[Issue #337] Enhance Http Demo Subscriber by using ExecutorService, C…
jinrongluo May 11, 2021
7adc322
Merge remote-tracking branch 'origin/develop' into develop
jinrongluo May 11, 2021
d48ead5
[Issue #337] Enhance Http Demo Subscriber by using ExecutorService, C…
jinrongluo May 11, 2021
0abe76f
[Issue #344] Fixing racing condition issue in SubscribeProcessor and …
jinrongluo May 11, 2021
3402f71
[Issue #344] Fix import statements
jinrongluo May 12, 2021
c9021fe
[Issue #337] Address code review comment for Subscriber Demo App
jinrongluo May 12, 2021
96986ef
[Issue #344] Enhance client registration logic in SubscribeProcessor …
jinrongluo May 12, 2021
d077ff8
[Issue #344] Minor code clean up in SubscribeProcessor and Unsubscrib…
jinrongluo May 12, 2021
c6d732e
Merge branch 'apache:develop' into develop
jinrongluo May 14, 2021
09bda4b
Merge remote-tracking branch 'origin/develop' into develop-2
jinrongluo May 14, 2021
8ec0e22
[Issue #344] Fix NullPointerException in ConsumerManager occurs durin…
jinrongluo May 14, 2021
1ae7d0a
[Issue #344] Fix bugs in subscribe/unsunscribe code path
jinrongluo May 14, 2021
c613be8
Merge branch 'apache:develop' into develop
jinrongluo May 18, 2021
fc15c45
Merge remote-tracking branch 'origin/develop' into develop-2
jinrongluo May 18, 2021
1b27668
[Issue #344] use client.pid instead of client.ip for client comparasi…
jinrongluo May 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,19 @@ public void run() {
public void notifyConsumerManager(String consumerGroup, ConsumerGroupConf latestConsumerGroupConfig,
ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping) throws Exception {
ConsumerGroupManager cgm = eventMeshHTTPServer.getConsumerManager().getConsumer(consumerGroup);
if (cgm == null) {
if (latestConsumerGroupConfig == null) {
ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW;
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE;
notification.consumerGroup = consumerGroup;
notification.consumerGroupConfig = latestConsumerGroupConfig;
eventMeshHTTPServer.getEventBus().post(notification);
return;
}

if (latestConsumerGroupConfig == null) {
if (cgm == null) {
ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE;
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.NEW;
notification.consumerGroup = consumerGroup;
notification.consumerGroupConfig = latestConsumerGroupConfig;
eventMeshHTTPServer.getEventBus().post(notification);
return;
}
Expand Down Expand Up @@ -217,8 +217,10 @@ public synchronized void addConsumer(String consumerGroup, ConsumerGroupConf con
* restart consumer
*/
public synchronized void restartConsumer(String consumerGroup, ConsumerGroupConf consumerGroupConfig) throws Exception {
ConsumerGroupManager cgm = consumerTable.get(consumerGroup);
cgm.refresh(consumerGroupConfig);
if(consumerTable.containsKey(consumerGroup)) {
ConsumerGroupManager cgm = consumerTable.get(consumerGroup);
cgm.refresh(consumerGroupConfig);
}
}

/**
Expand All @@ -235,8 +237,10 @@ public ConsumerGroupManager getConsumer(String consumerGroup) throws Exception {
* @param consumerGroup
*/
public synchronized void delConsumer(String consumerGroup) throws Exception {
ConsumerGroupManager cgm = consumerTable.remove(consumerGroup);
cgm.shutdown();
if(consumerTable.containsKey(consumerGroup)) {
ConsumerGroupManager cgm = consumerTable.remove(consumerGroup);
cgm.shutdown();
}
}

@Subscribe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import com.alibaba.fastjson.JSONObject;

import io.netty.channel.ChannelHandlerContext;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
Expand Down Expand Up @@ -108,6 +106,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>

synchronized (eventMeshHTTPServer.localClientInfoMapping) {

registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);

for (String subTopic : subTopicList) {
List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic);
Expand Down Expand Up @@ -206,4 +205,41 @@ public boolean rejectRequest() {
return false;
}

private void registerClient(SubscribeRequestHeader subscribeRequestHeader, String consumerGroup,
List<String> topicList, String url) {
for(String topic: topicList) {
Client client = new Client();
client.env = subscribeRequestHeader.getEnv();
client.dcn = subscribeRequestHeader.getDcn();
client.idc = subscribeRequestHeader.getIdc();
client.sys = subscribeRequestHeader.getSys();
client.ip = subscribeRequestHeader.getIp();
client.pid = subscribeRequestHeader.getPid();
client.consumerGroup = consumerGroup;
client.topic = topic;
client.url = url;
client.lastUpTime = new Date();

String groupTopicKey = client.consumerGroup + "@" + client.topic;

if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
List<Client> localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
if (StringUtils.equals(localClient.url, client.url)) {
isContains = true;
localClient.lastUpTime = client.lastUpTime;
break;
}
}
if (!isContains) {
localClients.add(client);
}
} else {
List<Client> clients = new ArrayList<>();
clients.add(client);
eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.fastjson.JSONObject;

import io.netty.channel.ChannelHandlerContext;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
Expand All @@ -44,10 +41,8 @@
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupStateEvent;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
Expand Down Expand Up @@ -131,12 +126,15 @@ public void onResponse(HttpCommand httpCommand) {

synchronized (eventMeshHTTPServer.localClientInfoMapping) {
boolean isChange = true;

registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl);

for (String unSubTopic : unSubTopicList) {
List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + unSubTopic);
Iterator<Client> clientIterator = groupTopicClients.iterator();
while (clientIterator.hasNext()) {
Client client = clientIterator.next();
if (StringUtils.equals(client.ip, ip)) {
if (StringUtils.equals(client.pid, pid) && StringUtils.equals(client.url, unSubscribeUrl)) {
httpLogger.warn("client {} start unsubscribe", JSONObject.toJSONString(client));
clientIterator.remove();
}
Expand Down Expand Up @@ -239,4 +237,41 @@ public void onResponse(HttpCommand httpCommand) {
public boolean rejectRequest() {
return false;
}

private void registerClient(UnSubscribeRequestHeader unSubscribeRequestHeader, String consumerGroup,
List<String> topicList, String url) {
for(String topic: topicList) {
Client client = new Client();
client.env = unSubscribeRequestHeader.getEnv();
client.dcn = unSubscribeRequestHeader.getDcn();
client.idc = unSubscribeRequestHeader.getIdc();
client.sys = unSubscribeRequestHeader.getSys();
client.ip = unSubscribeRequestHeader.getIp();
client.pid = unSubscribeRequestHeader.getPid();
client.consumerGroup = consumerGroup;
client.topic = topic;
client.url = url;
client.lastUpTime = new Date();

String groupTopicKey = client.consumerGroup + "@" + client.topic;
if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
List<Client> localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
if (StringUtils.equals(localClient.url, client.url)) {
isContains = true;
localClient.lastUpTime = client.lastUpTime;
break;
}
}
if (!isContains) {
localClients.add(client);
}
} else {
List<Client> clients = new ArrayList<>();
clients.add(client);
eventMeshHTTPServer.localClientInfoMapping.put(groupTopicKey, clients);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,12 @@ public boolean subscribe(List<String> topicList, String url) throws Exception {
start();
}

RequestParam heartBeatParam = generateHeartBeatRequestParam(topicList, url);
RequestParam subscribeParam = generateSubscribeRequestParam(topicList, url);

long startTime = System.currentTimeMillis();
String target = selectEventMesh();
String subRes = "";
String heartRes = "";
try {
heartRes = HttpUtil.post(httpClient, target, heartBeatParam);
subRes = HttpUtil.post(httpClient, target, subscribeParam);
} catch (Exception ex) {
throw new EventMeshException(ex);
Expand Down Expand Up @@ -239,15 +236,12 @@ public void run() {

public boolean unsubscribe(List<String> topicList, String url) throws EventMeshException {
subscription.removeAll(topicList);
RequestParam heartBeatParam = generateHeartBeatRequestParam(topicList, url);
RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url);

long startTime = System.currentTimeMillis();
String target = selectEventMesh();
String unSubRes = "";
String heartRes = "";
try {
heartRes = HttpUtil.post(httpClient, target, heartBeatParam);
unSubRes = HttpUtil.post(httpClient, target, unSubscribeParam);
} catch (Exception ex) {
throw new EventMeshException(ex);
Expand Down