From d638ec44b83f0f6b79e74077b5c24b63c57f30bf Mon Sep 17 00:00:00 2001 From: j00441484 Date: Mon, 10 May 2021 17:30:47 -0400 Subject: [PATCH 1/6] [Issue #337] Fix HttpSubscriber startup issue --- .../eventmesh/http/demo/sub/service/SubService.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index e20fea98be..84432ca003 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -75,12 +75,15 @@ public void afterPropertiesSet() throws Exception { })); Thread stopThread = new Thread(() -> { + try { + Thread.sleep(5 * 60 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } logger.info("stopThread start...."); System.exit(0); }); - Thread.sleep(5 * 60 * 1000); - -// stopThread.start(); + stopThread.start(); } } From 5ebfb54ece6b2020fb4d1b5fe2be26a3a2a6bbd8 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Mon, 10 May 2021 17:46:45 -0400 Subject: [PATCH 2/6] [Issue #337] test commit --- .../org/apache/eventmesh/http/demo/sub/service/SubService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..c75cef7c51 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -84,6 +84,6 @@ public void afterPropertiesSet() throws Exception { System.exit(0); }); - stopThread.start(); + // stopThread.start(); } } From a3afff3582a3a0e9baf600276116f34d9e95e191 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Mon, 10 May 2021 17:49:04 -0400 Subject: [PATCH 3/6] [Issue #337] revert test commit --- .../org/apache/eventmesh/http/demo/sub/service/SubService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index c75cef7c51..84432ca003 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -84,6 +84,6 @@ public void afterPropertiesSet() throws Exception { System.exit(0); }); - // stopThread.start(); + stopThread.start(); } } From 50f959d6c7416da01d51da0b33e8c9b47cee0611 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Tue, 11 May 2021 11:40:24 -0400 Subject: [PATCH 4/6] [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook --- .../http/demo/AsyncPublishInstance.java | 5 +- .../demo/sub/controller/SubController.java | 9 ++- .../http/demo/sub/service/SubService.java | 57 ++++++++++++------- 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java index b718bcc2e0..558773fc56 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java @@ -35,6 +35,9 @@ public class AsyncPublishInstance { public static Logger logger = LoggerFactory.getLogger(AsyncPublishInstance.class); + // This messageSize is also used in SubService.java (Subscriber) + public static int messageSize = 5; + public static void main(String[] args) throws Exception { Properties properties = Utils.readPropertiesFile("application.properties"); final String eventMeshIp = properties.getProperty("eventmesh.ip"); @@ -62,7 +65,7 @@ public static void main(String[] args) throws Exception { liteProducer = new LiteProducer(eventMeshClientConfig); liteProducer.start(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < messageSize; i++) { LiteMessage liteMessage = new LiteMessage(); liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) // .setContent("contentStr with special protocal") diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java index 8f8a7a7f9b..a3b9f4ede4 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java @@ -19,8 +19,10 @@ import com.alibaba.fastjson.JSONObject; +import org.apache.eventmesh.http.demo.sub.service.SubService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -33,12 +35,17 @@ public class SubController { public static Logger logger = LoggerFactory.getLogger(SubController.class); + @Autowired + private SubService subService; + @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(@RequestBody String message) { logger.info("=======receive message======= {}", JSONObject.toJSONString(message)); JSONObject result = new JSONObject(); result.put("retCode", 1); - return result.toJSONString(); + String strResult = result.toJSONString(); + subService.consumeMessage(strResult); + return strResult; } } diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..8fb4746cef 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -3,18 +3,22 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; import org.apache.eventmesh.client.http.consumer.LiteConsumer; import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.http.demo.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; @Component public class SubService implements InitializingBean { @@ -38,6 +42,11 @@ public class SubService implements InitializingBean { final String dcn = "FT0"; final String subsys = "1234"; + // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) + private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); + + private ExecutorService executorService = Executors.newFixedThreadPool(5); + @Override public void afterPropertiesSet() throws Exception { @@ -59,31 +68,41 @@ public void afterPropertiesSet() throws Exception { liteConsumer.heartBeat(topicList, url); liteConsumer.subscribe(topicList, url); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("start destory ...."); - try { - liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { - e.printStackTrace(); - } + // Wait for all messaged to be consumed + executorService.submit(() ->{ try { - liteConsumer.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("end destory."); - })); - - Thread stopThread = new Thread(() -> { - try { - Thread.sleep(5 * 60 * 1000); + countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("stopThread start...."); System.exit(0); }); + } + + @PreDestroy + public void cleanup() { + logger.info("start destory ...."); + try { + liteConsumer.unsubscribe(topicList, url); + } catch (EventMeshException e) { + e.printStackTrace(); + } + try { + liteConsumer.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + executorService.shutdown(); + logger.info("end destory."); + } - stopThread.start(); + /** + * Count the message already consumed + */ + public void consumeMessage(String msg) { + logger.info("consume message {}", msg); + countDownLatch.countDown(); + logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); } } From d48ead5aac61d602880a6d8715e7bc131c908bf8 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Tue, 11 May 2021 12:11:20 -0400 Subject: [PATCH 5/6] [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook --- .../http/demo/sub/service/SubService.java | 57 +++++++++++++------ 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..213ea40078 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -3,6 +3,9 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; @@ -10,12 +13,15 @@ import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.http.demo.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; + @Component public class SubService implements InitializingBean { @@ -38,6 +44,11 @@ public class SubService implements InitializingBean { final String dcn = "FT0"; final String subsys = "1234"; + // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) + private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); + + private ExecutorService executorService = Executors.newFixedThreadPool(5); + @Override public void afterPropertiesSet() throws Exception { @@ -59,31 +70,41 @@ public void afterPropertiesSet() throws Exception { liteConsumer.heartBeat(topicList, url); liteConsumer.subscribe(topicList, url); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("start destory ...."); - try { - liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { - e.printStackTrace(); - } + // Wait for all messaged to be consumed + executorService.submit(() ->{ try { - liteConsumer.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("end destory."); - })); - - Thread stopThread = new Thread(() -> { - try { - Thread.sleep(5 * 60 * 1000); + countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("stopThread start...."); System.exit(0); }); + } + + @PreDestroy + public void cleanup() { + logger.info("start destory ...."); + try { + liteConsumer.unsubscribe(topicList, url); + } catch (EventMeshException e) { + e.printStackTrace(); + } + try { + liteConsumer.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + executorService.shutdown(); + logger.info("end destory."); + } - stopThread.start(); + /** + * Count the message already consumed + */ + public void consumeMessage(String msg) { + logger.info("consume message {}", msg); + countDownLatch.countDown(); + logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); } } From c9021fea94148c118ab27608fc8d34404a0c9a3f Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Wed, 12 May 2021 11:25:11 -0400 Subject: [PATCH 6/6] [Issue #337] Address code review comment for Subscriber Demo App --- .../http/demo/sub/controller/SubController.java | 6 +++--- .../eventmesh/http/demo/sub/service/SubService.java | 10 ++-------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java index a3b9f4ede4..92ca09d700 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java @@ -41,11 +41,11 @@ public class SubController { @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(@RequestBody String message) { logger.info("=======receive message======= {}", JSONObject.toJSONString(message)); + subService.consumeMessage(message); + JSONObject result = new JSONObject(); result.put("retCode", 1); - String strResult = result.toJSONString(); - subService.consumeMessage(strResult); - return strResult; + return result.toJSONString(); } } diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 213ea40078..9a51e4d2fa 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -4,9 +4,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; import org.apache.eventmesh.client.http.consumer.LiteConsumer; @@ -19,7 +16,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; - import javax.annotation.PreDestroy; @Component @@ -47,8 +43,6 @@ public class SubService implements InitializingBean { // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); - private ExecutorService executorService = Executors.newFixedThreadPool(5); - @Override public void afterPropertiesSet() throws Exception { @@ -71,7 +65,7 @@ public void afterPropertiesSet() throws Exception { liteConsumer.subscribe(topicList, url); // Wait for all messaged to be consumed - executorService.submit(() ->{ + Thread stopThread = new Thread(() -> { try { countDownLatch.await(); } catch (InterruptedException e) { @@ -80,6 +74,7 @@ public void afterPropertiesSet() throws Exception { logger.info("stopThread start...."); System.exit(0); }); + stopThread.start(); } @PreDestroy @@ -95,7 +90,6 @@ public void cleanup() { } catch (Exception e) { e.printStackTrace(); } - executorService.shutdown(); logger.info("end destory."); }