Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook #343

Merged
merged 8 commits into from
May 14, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class AsyncPublishInstance {

public static Logger logger = LoggerFactory.getLogger(AsyncPublishInstance.class);

// This messageSize is also used in SubService.java (Subscriber)
public static int messageSize = 5;

public static void main(String[] args) throws Exception {
Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
Expand Down Expand Up @@ -62,7 +65,7 @@ public static void main(String[] args) throws Exception {

liteProducer = new LiteProducer(eventMeshClientConfig);
liteProducer.start();
for (int i = 0; i < 5; i++) {
for (int i = 0; i < messageSize; i++) {
LiteMessage liteMessage = new LiteMessage();
liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30))
// .setContent("contentStr with special protocal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import com.alibaba.fastjson.JSONObject;

import org.apache.eventmesh.http.demo.sub.service.SubService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
Expand All @@ -33,12 +35,17 @@ public class SubController {

public static Logger logger = LoggerFactory.getLogger(SubController.class);

@Autowired
private SubService subService;

@RequestMapping(value = "/test", method = RequestMethod.POST)
public String subTest(@RequestBody String message) {
logger.info("=======receive message======= {}", JSONObject.toJSONString(message));
JSONObject result = new JSONObject();
result.put("retCode", 1);
return result.toJSONString();
String strResult = result.toJSONString();
subService.consumeMessage(strResult);
Copy link
Contributor

@xwm1992 xwm1992 May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of the 47 code line looks like it should pass in the received message as a parameter, not the result of consumption

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comment! I will make the change to pass the message instead of result.

return strResult;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.client.http.conf.LiteClientConfig;
import org.apache.eventmesh.client.http.consumer.LiteConsumer;
import org.apache.eventmesh.common.EventMeshException;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.ThreadUtil;
import org.apache.eventmesh.http.demo.AsyncPublishInstance;
import org.apache.eventmesh.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;

@Component
public class SubService implements InitializingBean {

Expand All @@ -38,6 +44,11 @@ public class SubService implements InitializingBean {
final String dcn = "FT0";
final String subsys = "1234";

// CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher)
private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize);

private ExecutorService executorService = Executors.newFixedThreadPool(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of cource it won't make a difference, but I wonder here is it necessary to use a thread pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a best practice to use the JDK Concurrency library (ExecutorService + ThreadPool) to manage the lifecycle of Thread.

Just suggestion. I can rollback to create Thread manually. Thanks for review comment.


@Override
public void afterPropertiesSet() throws Exception {

Expand All @@ -59,31 +70,41 @@ public void afterPropertiesSet() throws Exception {
liteConsumer.heartBeat(topicList, url);
liteConsumer.subscribe(topicList, url);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("start destory ....");
try {
liteConsumer.unsubscribe(topicList, url);
} catch (EventMeshException e) {
e.printStackTrace();
}
// Wait for all messaged to be consumed
executorService.submit(() ->{
try {
liteConsumer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
logger.info("end destory.");
}));

Thread stopThread = new Thread(() -> {
try {
Thread.sleep(5 * 60 * 1000);
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("stopThread start....");
System.exit(0);
});
}

@PreDestroy
public void cleanup() {
logger.info("start destory ....");
try {
liteConsumer.unsubscribe(topicList, url);
} catch (EventMeshException e) {
e.printStackTrace();
}
try {
liteConsumer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown();
logger.info("end destory.");
}

stopThread.start();
/**
* Count the message already consumed
*/
public void consumeMessage(String msg) {
logger.info("consume message {}", msg);
countDownLatch.countDown();
logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount());
}
}