From 4a0cc253138277e963a2a1aeb56bdc414e7cf841 Mon Sep 17 00:00:00 2001 From: mike_xwm Date: Fri, 26 Nov 2021 19:04:02 +0800 Subject: [PATCH] Java sdk update (#615) * update java sdk * fix compile error * fix sdk error 1.remove the openmessage from connector-api 2.fix the standalone connector * 1.fix the standalone connector 2.fix the sdk --- .../org/apache/eventmesh/common/Constants.java | 6 +++--- .../consumer/StandaloneConsumerAdaptor.java | 16 ++++++++-------- .../demo/pub/eventmeshmessage/AsyncPublish.java | 1 - .../sub/eventmeshmessage/AsyncSubscribe.java | 12 ++++++++++-- ...apache.eventmesh.protocol.api.ProtocolAdaptor | 0 .../impl/cloudevent/CloudEventTCPPubClient.java | 5 +++-- .../EventMeshMessageTCPPubClient.java | 15 ++++++++------- .../EventMeshMessageTCPSubClient.java | 8 ++++---- 8 files changed, 36 insertions(+), 27 deletions(-) rename eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/resources/{META-INF.eventmesh => META-INF/eventmesh}/org.apache.eventmesh.protocol.api.ProtocolAdaptor (100%) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java index f5fa2cd698..20c1b4e742 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java @@ -29,11 +29,11 @@ public class Constants { public static final String HTTPS_PROTOCOL_PREFIX = "https://"; - public static final String PROTOCOL_TYPE = "protocol_type"; + public static final String PROTOCOL_TYPE = "protocoltype"; - public static final String PROTOCOL_VERSION = "protocol_version"; + public static final String PROTOCOL_VERSION = "protocolversion"; - public static final String PROTOCOL_DESC = "protocol_desc"; + public static final String PROTOCOL_DESC = "protocoldesc"; public static final int DEFAULT_HTTP_TIME_OUT = 3000; diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java index 56518a75d2..716b4a21d3 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java +++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java @@ -40,42 +40,42 @@ public StandaloneConsumerAdaptor() { @Override public boolean isStarted() { - return false; + return consumer.isStarted(); } @Override public boolean isClosed() { - return false; + return consumer.isClosed(); } @Override public void start() { - + consumer.start(); } @Override public void shutdown() { - + consumer.shutdown(); } @Override public void init(Properties keyValue) throws Exception { - + consumer = new StandaloneConsumer(keyValue); } @Override public void updateOffset(List cloudEvents, AbstractContext context) { - + consumer.updateOffset(cloudEvents, context); } @Override public void subscribe(String topic, EventListener listener) throws Exception { - + consumer.subscribe(topic, listener); } @Override public void unsubscribe(String topic) { - + consumer.unsubscribe(topic); } // @Override diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java index a01bb250b4..bc393523de 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java @@ -63,7 +63,6 @@ public static void main(String[] agrs) throws Exception { Thread.sleep(1000); } - client.listen(); Thread.sleep(2000); } catch (Exception e) { logger.warn("AsyncPublish failed", e); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java index f27dbd1b75..9df4548932 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java @@ -21,6 +21,7 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; +import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPClient; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; @@ -39,6 +40,8 @@ public class AsyncSubscribe implements ReceiveMsgHook { public static AsyncSubscribe handler = new AsyncSubscribe(); + private static EventMeshTCPClient client; + public static void main(String[] agrs) throws Exception { Properties properties = Utils.readPropertiesFile("application.properties"); final String eventMeshIp = properties.getProperty("eventmesh.ip"); @@ -49,8 +52,8 @@ public static void main(String[] agrs) throws Exception { .port(eventMeshTcpPort) .userAgent(userAgent) .build(); - try (EventMeshTCPClient client = EventMeshTCPClientFactory.createEventMeshTCPClient( - eventMeshTcpClientConfig, EventMeshMessage.class)) { + try { + client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class); client.init(); client.heartbeat(); @@ -59,6 +62,11 @@ public static void main(String[] agrs) throws Exception { client.listen(); + //client.unsubscribe(); + + // release resource and close client + // client.close(); + } catch (Exception e) { log.warn("AsyncSubscribe failed", e); } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.protocol.api.ProtocolAdaptor b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.protocol.api.ProtocolAdaptor similarity index 100% rename from eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.protocol.api.ProtocolAdaptor rename to eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.protocol.api.ProtocolAdaptor diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java index d4fbec7b36..ca5901a7ea 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java @@ -26,6 +26,7 @@ import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -128,7 +129,7 @@ public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshExce Package msg = MessageUtils.buildPackage(cloudEvent, Command.ASYNC_MESSAGE_TO_SERVER); log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), - msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg); return io(msg, timeout); } catch (Exception ex) { throw new EventMeshException("publish error", ex); @@ -141,7 +142,7 @@ public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshExcep // todo: transform EventMeshMessage to Package Package msg = MessageUtils.buildPackage(cloudEvent, Command.BROADCAST_MESSAGE_TO_SERVER); log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), - msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg); super.send(msg); } catch (Exception ex) { throw new EventMeshException("Broadcast message error", ex); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index a3dd6a41bd..18f555fcaa 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -26,6 +26,7 @@ import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; @@ -72,8 +73,8 @@ public void init() throws EventMeshException { @Override public void heartbeat() throws EventMeshException { - if (task != null) { - synchronized (EventMeshMessageTCPPubClient.class) { +// if (task != null) { +// synchronized (EventMeshMessageTCPPubClient.class) { task = scheduler.scheduleAtFixedRate(() -> { try { if (!isActive()) { @@ -86,8 +87,8 @@ public void heartbeat() throws EventMeshException { } }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); } - } - } +// } +// } @Override public void reconnect() throws EventMeshException { @@ -128,9 +129,9 @@ public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws E try { // todo: transform EventMeshMessage to Package Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.ASYNC_MESSAGE_TO_SERVER); - log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", + log.info("SimplePubClientImpl em message|{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), - msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg); return io(msg, timeout); } catch (Exception ex) { throw new EventMeshException("publish error", ex); @@ -143,7 +144,7 @@ public void broadcast(EventMeshMessage eventMeshMessage, long timeout) throws Ev // todo: transform EventMeshMessage to Package Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.BROADCAST_MESSAGE_TO_SERVER); log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), - msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg); super.send(msg); } catch (Exception ex) { throw new EventMeshException("Broadcast message error", ex); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java index add12a5d85..f9da554efd 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java @@ -71,8 +71,8 @@ public void init() throws EventMeshException { @Override public void heartbeat() throws EventMeshException { - if (task == null) { - synchronized (EventMeshMessageTCPSubClient.class) { +// if (task == null) { +// synchronized (EventMeshMessageTCPSubClient.class) { task = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -88,8 +88,8 @@ public void run() { } }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); } - } - } +// } +// } @Override public void reconnect() throws EventMeshException {