diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java index c205f5514b..59406f8917 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java @@ -42,28 +42,40 @@ @Slf4j public class AsyncPublishInstance { - + // This messageSize is also used in SubService.java (Subscriber) - public static int messageSize = 5; - + public static final int MESSAGE_SIZE = 5; + + public static final String DEFAULT_IP_PORT = "127.0.0.1:10105"; + + public static final String FILE_NAME = "application.properties"; + + public static final String IP_KEY = "eventmesh.ip"; + + public static final String PORT_KEY = "eventmesh.http.port"; + + public static final String TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC"; + + public static final String TEST_GROUP = "EventMeshTest-producerGroup"; + + public static final String CONTENT_TYPE = "application/cloudevents+json"; + + public static void main(String[] args) throws Exception { - Properties properties = Utils.readPropertiesFile("application.properties"); - final String eventMeshIp = properties.getProperty("eventmesh.ip"); - final String eventMeshHttpPort = properties.getProperty("eventmesh.http.port"); - - final String eventMeshIPPort; - if (StringUtils.isBlank(eventMeshIp) || StringUtils.isBlank(eventMeshHttpPort)) { - // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 - eventMeshIPPort = "127.0.0.1:10105"; - } else { + + Properties properties = Utils.readPropertiesFile(FILE_NAME); + final String eventMeshIp = properties.getProperty(IP_KEY); + final String eventMeshHttpPort = properties.getProperty(PORT_KEY); + + // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 + String eventMeshIPPort = DEFAULT_IP_PORT; + if (StringUtils.isNotBlank(eventMeshIp) || StringUtils.isNotBlank(eventMeshHttpPort)) { eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort; } - final String topic = "TEST-TOPIC-HTTP-ASYNC"; - EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder() .liteEventMeshAddr(eventMeshIPPort) - .producerGroup("EventMeshTest-producerGroup") + .producerGroup(TEST_GROUP) .env("env") .idc("idc") .ip(IPUtils.getLocalAddress()) @@ -74,20 +86,21 @@ public static void main(String[] args) throws Exception { .build(); try (EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig)) { - for (int i = 0; i < messageSize; i++) { + for (int i = 0; i < MESSAGE_SIZE; i++) { Map content = new HashMap<>(); content.put("content", "testAsyncMessage"); CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSubject(topic) + .withSubject(TEST_TOPIC) .withSource(URI.create("/")) - .withDataContentType("application/cloudevents+json") + .withDataContentType(CONTENT_TYPE) .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) .build(); eventMeshHttpProducer.publish(event); + log.info("publish event success content:{}",content); } Thread.sleep(30000); } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java index b872c3af8d..d64f629765 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/util/Utils.java @@ -32,9 +32,8 @@ public class Utils { /** * Get local IP address * - * @throws SocketException */ - public static String getLocalIP() throws UnknownHostException, SocketException { + public static String getLocalIP() throws UnknownHostException { if (isWindowsOS()) { return InetAddress.getLocalHost().getHostAddress(); } else { @@ -55,9 +54,8 @@ public static boolean isWindowsOS() { * Get local IP address under Linux system * * @return IP address - * @throws SocketException */ - private static String getLinuxLocalIp() throws SocketException { + private static String getLinuxLocalIp() { String ip = ""; try { for (Enumeration en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java index 000a5c0451..23e1546dd3 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpUtils.java @@ -23,7 +23,6 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.ResponseHandler; import org.apache.http.client.config.RequestConfig; @@ -57,17 +56,14 @@ public static String post(CloseableHttpClient client, RequestParam requestParam) throws Exception { final ResponseHolder responseHolder = new ResponseHolder(); final CountDownLatch countDownLatch = new CountDownLatch(1); - post(client, null, uri, requestParam, new ResponseHandler() { - @Override - public String handleResponse(HttpResponse response) throws IOException { - responseHolder.response = - EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET)); - countDownLatch.countDown(); - if (log.isDebugEnabled()) { - log.debug("{}", responseHolder); - } - return responseHolder.response; + post(client, null, uri, requestParam, response -> { + responseHolder.response = + EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET)); + countDownLatch.countDown(); + if (log.isDebugEnabled()) { + log.debug("{}", responseHolder); } + return responseHolder.response; }); try { @@ -85,17 +81,14 @@ public static String post(CloseableHttpClient client, RequestParam requestParam) throws Exception { final ResponseHolder responseHolder = new ResponseHolder(); final CountDownLatch countDownLatch = new CountDownLatch(1); - post(client, forwardAgent, uri, requestParam, new ResponseHandler() { - @Override - public String handleResponse(HttpResponse response) throws IOException { - responseHolder.response = - EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET)); - countDownLatch.countDown(); - if (log.isDebugEnabled()) { - log.debug("{}", responseHolder); - } - return responseHolder.response; + post(client, forwardAgent, uri, requestParam, response -> { + responseHolder.response = + EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET)); + countDownLatch.countDown(); + if (log.isDebugEnabled()) { + log.debug("{}", responseHolder); } + return responseHolder.response; }); try { @@ -203,17 +196,14 @@ public static String get(CloseableHttpClient client, RequestParam requestParam) throws Exception { final ResponseHolder responseHolder = new ResponseHolder(); final CountDownLatch countDownLatch = new CountDownLatch(1); - get(client, null, url, requestParam, new ResponseHandler() { - @Override - public String handleResponse(HttpResponse response) throws IOException { - responseHolder.response = - EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET)); - countDownLatch.countDown(); - if (log.isDebugEnabled()) { - log.debug("{}", responseHolder); - } - return responseHolder.response; + get(client, null, url, requestParam, response -> { + responseHolder.response = + EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET)); + countDownLatch.countDown(); + if (log.isDebugEnabled()) { + log.debug("{}", responseHolder); } + return responseHolder.response; }); try { @@ -231,17 +221,14 @@ public static String get(CloseableHttpClient client, RequestParam requestParam) throws Exception { final ResponseHolder responseHolder = new ResponseHolder(); final CountDownLatch countDownLatch = new CountDownLatch(1); - get(client, forwardAgent, url, requestParam, new ResponseHandler() { - @Override - public String handleResponse(HttpResponse response) throws IOException { - responseHolder.response = - EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET)); - countDownLatch.countDown(); - if (log.isDebugEnabled()) { - log.debug("{}", responseHolder); - } - return responseHolder.response; + get(client, forwardAgent, url, requestParam, response -> { + responseHolder.response = + EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET)); + countDownLatch.countDown(); + if (log.isDebugEnabled()) { + log.debug("{}", responseHolder); } + return responseHolder.response; }); try {