From 0fcb40127fb60fe37f51f7e1b1e0df1b2f09f24e Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 1 Dec 2021 10:13:47 +0800 Subject: [PATCH] Change TCP Decoder and Encoder (#621) --- .../common/protocol/tcp/EventMeshMessage.java | 55 +---- .../eventmesh/common/protocol/tcp/Header.java | 25 +-- .../common/protocol/tcp/Package.java | 11 +- .../common/protocol/tcp/codec/Codec.java | 210 +++++++++++------- .../tcp/common/EventMeshTestUtils.java | 5 +- .../demo/pub/cloudevents/AsyncPublish.java | 1 - .../pub/eventmeshmessage/AsyncPublish.java | 1 - .../AsyncPublishBroadcast.java | 1 - .../pub/eventmeshmessage/SyncRequest.java | 1 - .../demo/sub/cloudevents/AsyncSubscribe.java | 13 +- .../sub/eventmeshmessage/AsyncSubscribe.java | 21 +- .../AsyncSubscribeBroadcast.java | 10 +- .../sub/eventmeshmessage/SyncResponse.java | 7 +- .../CloudEventsProtocolAdaptor.java | 11 +- .../tcp/TcpMessageProtocolResolver.java | 8 +- .../MeshMessageProtocolAdaptor.java | 13 +- .../tcp/TcpMessageProtocolResolver.java | 6 +- .../runtime/boot/EventMeshServer.java | 15 +- .../group/ClientSessionGroupMapping.java | 4 +- .../client/tcp/EventMeshTCPClient.java | 2 - .../client/tcp/EventMeshTCPPubClient.java | 2 - .../client/tcp/EventMeshTCPSubClient.java | 2 - .../client/tcp/common/ReceiveMsgHook.java | 17 +- .../client/tcp/common/TcpClient.java | 25 +++ .../impl/cloudevent/CloudEventTCPClient.java | 6 - .../cloudevent/CloudEventTCPPubClient.java | 37 ++- .../cloudevent/CloudEventTCPSubClient.java | 69 +++--- .../EventMeshMessageTCPClient.java | 8 +- .../EventMeshMessageTCPPubClient.java | 24 +- .../EventMeshMessageTCPSubClient.java | 56 ++--- .../openmessage/OpenMessageTCPClient.java | 6 - .../openmessage/OpenMessageTCPPubClient.java | 5 - .../openmessage/OpenMessageTCPSubClient.java | 5 - 33 files changed, 315 insertions(+), 367 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java index 01d744eea1..a51f9e022a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java @@ -20,51 +20,16 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class EventMeshMessage { - - private String topic; - Map properties = new ConcurrentHashMap<>(); - private String body; - - public EventMeshMessage() { - } - - public EventMeshMessage(String topic, Map properties, String body) { - this.topic = topic; - this.properties = properties; - this.body = body; - } - - public String getTopic() { - return topic; - } +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; - public void setTopic(String topic) { - this.topic = topic; - } - - public Map getProperties() { - return properties; - } - - public void setProperties(Map properties) { - this.properties = properties; - } - - public String getBody() { - return body; - } - - public void setBody(String body) { - this.body = body; - } +@Data +@NoArgsConstructor +@AllArgsConstructor +public class EventMeshMessage { - @Override - public String toString() { - return "EventMeshMessage{" + - "topic='" + topic + '\'' + - ", properties=" + properties + - ", body='" + body + '\'' + - '}'; - } + private String topic; + private Map properties = new ConcurrentHashMap<>(); + private String body; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java index d3d5cc65e8..da30aea710 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java @@ -20,13 +20,16 @@ import java.util.HashMap; import java.util.Map; +import lombok.Data; + +@Data public class Header { - private Command cmd; - private int code; - private String desc; - private String seq; - private Map properties; + private Command cmd; + private int code; + private String desc; + private String seq; + private Map properties = new HashMap<>(); public Header() { } @@ -45,6 +48,7 @@ public Header(int code, String desc, String seq, Map properties) this.properties = properties; } + public Command getCommand() { return cmd; } @@ -97,18 +101,7 @@ public Object getProperty(final String name) { if (null == this.properties) { this.properties = new HashMap<>(); } - return this.properties.get(name); } - @Override - public String toString() { - return "Header{" + - "cmd=" + cmd + - ", code=" + code + - ", desc='" + desc + '\'' + - ", seq='" + seq + '\'' + - ", properties=" + properties + - '}'; - } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java index f277d06adf..1a2a223bf2 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java @@ -19,23 +19,20 @@ import org.apache.eventmesh.common.protocol.ProtocolTransportObject; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data +@NoArgsConstructor +@AllArgsConstructor public class Package implements ProtocolTransportObject { private Header header; private Object body; - public Package() { - } - public Package(Header header) { this.header = header; } - public Package(Header header, Object body) { - this.header = header; - this.body = body; - } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index d520e04dbc..191a09961d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -17,12 +17,23 @@ package org.apache.eventmesh.common.protocol.tcp.codec; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.tcp.Command; +import org.apache.eventmesh.common.protocol.tcp.Header; +import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.protocol.tcp.RedirectInfo; +import org.apache.eventmesh.common.protocol.tcp.Subscription; +import org.apache.eventmesh.common.protocol.tcp.UserAgent; + +import org.apache.commons.lang3.ArrayUtils; + import java.nio.charset.Charset; import java.util.Arrays; import java.util.List; import java.util.TimeZone; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -31,53 +42,48 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.ReplayingDecoder; +import lombok.extern.slf4j.Slf4j; -import org.apache.eventmesh.common.protocol.tcp.Command; -import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; -import org.apache.eventmesh.common.protocol.tcp.Header; -import org.apache.eventmesh.common.protocol.tcp.Package; -import org.apache.eventmesh.common.protocol.tcp.RedirectInfo; -import org.apache.eventmesh.common.protocol.tcp.Subscription; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +@Slf4j public class Codec { - private final static Logger logger = LoggerFactory.getLogger(Codec.class); - private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4; - private static Charset UTF8 = Charset.forName("UTF-8"); + private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4; + private static final Charset DEFAULT_CHARSET = Charset.forName(Constants.DEFAULT_CHARSET); - private static final byte[] CONSTANT_MAGIC_FLAG = "EventMesh".getBytes(UTF8); + private static final byte[] CONSTANT_MAGIC_FLAG = serializeBytes("EventMesh"); + private static final byte[] VERSION = serializeBytes("0000"); - private static final byte[] VERSION = "0000".getBytes(UTF8); + // todo: move to constants + public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents"; + public static String EM_MESSAGE_PROTOCOL_NAME = "eventmeshmessage"; + public static String OPEN_MESSAGE_PROTOCOL_NAME = "openmessage"; - private static ObjectMapper jsonMapper; + // todo: use json util + private static ObjectMapper OBJECT_MAPPER; static { - jsonMapper = new ObjectMapper(); - jsonMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - jsonMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - jsonMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); - jsonMapper.setTimeZone(TimeZone.getDefault()); + OBJECT_MAPPER = new ObjectMapper(); + OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); + OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + OBJECT_MAPPER.setTimeZone(TimeZone.getDefault()); } public static class Encoder extends MessageToByteEncoder { @Override public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception { - byte[] headerData; - byte[] bodyData; + final String headerJson = pkg != null ? OBJECT_MAPPER.writeValueAsString(pkg.getHeader()) : null; + final String bodyJson = pkg != null ? OBJECT_MAPPER.writeValueAsString(pkg.getBody()) : null; - final String headerJson = pkg != null ? jsonMapper.writeValueAsString(pkg.getHeader()) : null; - final String bodyJson = pkg != null ? jsonMapper.writeValueAsString(pkg.getBody()) : null; + final byte[] headerData = serializeBytes(headerJson); + final byte[] bodyData = serializeBytes(bodyJson); - headerData = headerJson == null ? null : headerJson.getBytes(UTF8); - bodyData = bodyJson == null ? null : bodyJson.getBytes(UTF8); - - logger.debug("headerJson={}|bodyJson={}", headerJson, bodyJson); + if (log.isDebugEnabled()) { + log.debug("Encoder headerJson={}|bodyJson={}", headerJson, bodyJson); + } - int headerLength = headerData == null ? 0 : headerData.length; - int bodyLength = bodyData == null ? 0 : bodyData.length; + int headerLength = ArrayUtils.getLength(headerData); + int bodyLength = ArrayUtils.getLength(bodyData); int length = 4 + 4 + headerLength + bodyLength; @@ -89,94 +95,140 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E out.writeBytes(VERSION); out.writeInt(length); out.writeInt(headerLength); - if (headerData != null) + if (headerData != null) { out.writeBytes(headerData); - if (bodyData != null) + } + if (bodyData != null) { out.writeBytes(bodyData); + } } } - public static class Decoder extends ReplayingDecoder { + public static class Decoder extends ReplayingDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - Header header = null; - Object body = null; - - int length = 0; - int headerLength = 0; - int bodyLength = 0; - try { - if (null == in) + if (null == in) { return; - - byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length]; - byte[] versionBytes = new byte[VERSION.length]; - - in.readBytes(flagBytes); - in.readBytes(versionBytes); - if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) || !Arrays.equals(versionBytes, VERSION)) { - String errorMsg = String.format("invalid magic flag or " + - "version|flag=%s|version=%s|remoteAddress=%s", new String(flagBytes, UTF8), new String - (versionBytes, UTF8), ctx.channel().remoteAddress()); - throw new Exception(errorMsg); } - length = in.readInt(); - headerLength = in.readInt(); - bodyLength = length - 8 - headerLength; - byte[] headerData = new byte[headerLength]; - byte[] bodyData = new byte[bodyLength]; + byte[] flagBytes = parseFlag(in); + byte[] versionBytes = parseVersion(in); + validateFlag(flagBytes, versionBytes, ctx); - if (headerLength > 0) { - in.readBytes(headerData); - header = jsonMapper.readValue(new String(headerData, UTF8), Header.class); - } - - if (bodyLength > 0 && header != null) { - in.readBytes(bodyData); - body = parseFromJson(header.getCommand(), new String(bodyData, UTF8)); - } - - logger.debug("headerJson={}|bodyJson={}", new String(headerData, UTF8), new String(bodyData, UTF8)); + final int length = in.readInt(); + final int headerLength = in.readInt(); + final int bodyLength = length - 8 - headerLength; + Header header = parseHeader(in, headerLength); + Object body = parseBody(in, header, bodyLength); Package pkg = new Package(header, body); out.add(pkg); } catch (Exception e) { - logger.error("decode|length={}|headerLength={}|bodyLength={}|header={}|body={}.", length, - headerLength, bodyLength, header, body); + log.error("decode error| receive: {}.", deserializeBytes(in.array())); throw e; } } + + private byte[] parseFlag(ByteBuf in) { + final byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length]; + in.readBytes(flagBytes); + return flagBytes; + } + + private byte[] parseVersion(ByteBuf in) { + final byte[] versionBytes = new byte[VERSION.length]; + in.readBytes(versionBytes); + return versionBytes; + } + + private Header parseHeader(ByteBuf in, int headerLength) throws JsonProcessingException { + if (headerLength <= 0) { + return null; + } + final byte[] headerData = new byte[headerLength]; + in.readBytes(headerData); + if (log.isDebugEnabled()) { + log.debug("Decode headerJson={}", deserializeBytes(headerData)); + } + return OBJECT_MAPPER.readValue(deserializeBytes(headerData), Header.class); + } + + private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonProcessingException { + if (bodyLength <= 0 || header == null) { + return null; + } + final byte[] bodyData = new byte[bodyLength]; + in.readBytes(bodyData); + if (log.isDebugEnabled()) { + log.debug("Decode bodyJson={}", deserializeBytes(bodyData)); + } + return deserializeBody(deserializeBytes(bodyData), header); + } + + private void validateFlag(byte[] flagBytes, byte[] versionBytes, ChannelHandlerContext ctx) { + if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) || !Arrays.equals(versionBytes, VERSION)) { + String errorMsg = String.format( + "invalid magic flag or version|flag=%s|version=%s|remoteAddress=%s", + deserializeBytes(flagBytes), deserializeBytes(versionBytes), ctx.channel().remoteAddress()); + throw new IllegalArgumentException(errorMsg); + } + } } - private static Object parseFromJson(Command cmd, String data) throws Exception { - switch (cmd) { + private static Object deserializeBody(String bodyJsonString, Header header) throws JsonProcessingException { + Command command = header.getCommand(); + switch (command) { case HELLO_REQUEST: case RECOMMEND_REQUEST: - return jsonMapper.readValue(data, UserAgent.class); + return OBJECT_MAPPER.readValue(bodyJsonString, UserAgent.class); case SUBSCRIBE_REQUEST: case UNSUBSCRIBE_REQUEST: - return jsonMapper.readValue(data, Subscription.class); + return OBJECT_MAPPER.readValue(bodyJsonString, Subscription.class); case REQUEST_TO_SERVER: case RESPONSE_TO_SERVER: case ASYNC_MESSAGE_TO_SERVER: case BROADCAST_MESSAGE_TO_SERVER: - return jsonMapper.readValue(data, EventMeshMessage.class); case REQUEST_TO_CLIENT: case RESPONSE_TO_CLIENT: case ASYNC_MESSAGE_TO_CLIENT: case BROADCAST_MESSAGE_TO_CLIENT: - return jsonMapper.readValue(data, EventMeshMessage.class); case REQUEST_TO_CLIENT_ACK: case RESPONSE_TO_CLIENT_ACK: case ASYNC_MESSAGE_TO_CLIENT_ACK: case BROADCAST_MESSAGE_TO_CLIENT_ACK: - return jsonMapper.readValue(data, EventMeshMessage.class); + // The message json will be deserialized by protocol plugin + return bodyJsonString; case REDIRECT_TO_CLIENT: - return jsonMapper.readValue(data, RedirectInfo.class); + return OBJECT_MAPPER.readValue(bodyJsonString, RedirectInfo.class); default: + log.error("Invalidate TCP command: {}", command); return null; } } + + /** + * Deserialize bytes to String. + * + * @param bytes + * @return + */ + private static String deserializeBytes(byte[] bytes) { + return new String(bytes, DEFAULT_CHARSET); + } + + /** + * Serialize String to bytes. + * + * @param str + * @return + */ + private static byte[] serializeBytes(String str) { + if (str == null) { + return null; + } + return str.getBytes(DEFAULT_CHARSET); + } + + } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index 8c3a06f513..43598324a8 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -32,6 +32,7 @@ import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.common.utils.JsonUtils; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -107,10 +108,10 @@ public static Package broadcastMessage() { return msg; } - public static Package rrResponse(Package request) { + public static Package rrResponse(EventMeshMessage request) { Package msg = new Package(); msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null, generateRandomString(seqLength))); - msg.setBody(request.getBody()); + msg.setBody(request); return msg; } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java index f279308a53..6864bf1258 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java @@ -55,7 +55,6 @@ public static void main(String[] agrs) throws Exception { client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class); client.init(); - client.heartbeat(); for (int i = 0; i < 5; i++) { CloudEvent event = EventMeshTestUtils.generateCloudEventV1(); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java index bc393523de..7cf1a6469a 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java @@ -53,7 +53,6 @@ public static void main(String[] agrs) throws Exception { client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class); client.init(); - client.heartbeat(); for (int i = 0; i < 5; i++) { EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateAsyncEventMqMsg(); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java index 07c4de5a3b..642a2c8764 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java @@ -48,7 +48,6 @@ public static void main(String[] agrs) throws Exception { try (final EventMeshTCPClient client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); - client.heartbeat(); EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateBroadcastMqMsg(); logger.info("begin send broadcast msg============={}", eventMeshMessage); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java index f13ee46757..30f2ac130e 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java @@ -41,7 +41,6 @@ public static void main(String[] agrs) throws Exception { try (EventMeshTCPClient client = EventMeshTCPClientFactory.createEventMeshTCPClient( eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); - client.heartbeat(); EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateSyncRRMqMsg(); log.info("begin send rr msg=================={}", eventMeshMessage); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java index b94d12fb44..b41a65c234 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java @@ -42,7 +42,7 @@ public class AsyncSubscribe implements ReceiveMsgHook { public static AsyncSubscribe handler = new AsyncSubscribe(); - private static EventMeshTCPClient client; + private static EventMeshTCPClient client; public static void main(String[] agrs) throws Exception { Properties properties = Utils.readPropertiesFile("application.properties"); @@ -57,7 +57,6 @@ public static void main(String[] agrs) throws Exception { try { client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class); client.init(); - client.heartbeat(); client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); @@ -75,13 +74,7 @@ public static void main(String[] agrs) throws Exception { } @Override - public void handle(Package msg, ChannelHandlerContext ctx) { - CloudEvent event = convertToProtocolMessage(msg); - log.info("receive async msg====================={}", event); - } - - @Override - public CloudEvent convertToProtocolMessage(Package pkg) { - return CloudEventBuilder.from((CloudEvent) pkg.getBody()).build(); + public void handle(CloudEvent msg, ChannelHandlerContext ctx) { + log.info("receive async msg====================={}", msg); } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java index 9df4548932..dcc90820d9 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java @@ -21,12 +21,12 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory; -import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPClient; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet; import org.apache.eventmesh.tcp.common.EventMeshTestUtils; import org.apache.eventmesh.util.Utils; @@ -40,7 +40,7 @@ public class AsyncSubscribe implements ReceiveMsgHook { public static AsyncSubscribe handler = new AsyncSubscribe(); - private static EventMeshTCPClient client; + private static EventMeshTCPClient client; public static void main(String[] agrs) throws Exception { Properties properties = Utils.readPropertiesFile("application.properties"); @@ -53,11 +53,12 @@ public static void main(String[] agrs) throws Exception { .userAgent(userAgent) .build(); try { - client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class); + client = + EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class); client.init(); - client.heartbeat(); - client.subscribe("TEST-TOPIC-TCP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); + client.subscribe(EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast, SubscriptionMode.CLUSTERING, + SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); client.listen(); @@ -73,13 +74,7 @@ public static void main(String[] agrs) throws Exception { } @Override - public void handle(Package msg, ChannelHandlerContext ctx) { - EventMeshMessage eventMeshMessage = convertToProtocolMessage(msg); - log.info("receive async msg====================={}", eventMeshMessage); - } - - @Override - public EventMeshMessage convertToProtocolMessage(Package pkg) { - return (EventMeshMessage) pkg.getBody(); + public void handle(EventMeshMessage msg, ChannelHandlerContext ctx) { + log.info("receive async msg====================={}", msg); } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java index 31355e0980..09890a38c2 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java @@ -52,7 +52,6 @@ public static void main(String[] agrs) throws Exception { try (EventMeshTCPClient client = EventMeshTCPClientFactory.createEventMeshTCPClient( eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); - client.heartbeat(); client.subscribe("TEST-TOPIC-TCP-BROADCAST", SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); client.registerSubBusiHandler(handler); @@ -65,13 +64,8 @@ public static void main(String[] agrs) throws Exception { } @Override - public void handle(Package msg, ChannelHandlerContext ctx) { - EventMeshMessage eventMeshMessage = convertToProtocolMessage(msg); - log.info("receive broadcast msg==============={}", eventMeshMessage); + public void handle(EventMeshMessage msg, ChannelHandlerContext ctx) { + log.info("receive broadcast msg==============={}", msg); } - @Override - public EventMeshMessage convertToProtocolMessage(Package pkg) { - return (EventMeshMessage) pkg.getBody(); - } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java index 5ed9dae349..1c5effd133 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java @@ -46,7 +46,6 @@ public static void main(String[] agrs) throws Exception { try (EventMeshTCPClient client = EventMeshTCPClientFactory .createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class)) { client.init(); - client.heartbeat(); client.subscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC); // Synchronize RR messages @@ -60,14 +59,10 @@ public static void main(String[] agrs) throws Exception { } @Override - public void handle(Package msg, ChannelHandlerContext ctx) { + public void handle(EventMeshMessage msg, ChannelHandlerContext ctx) { log.info("receive sync rr msg================{}", msg); Package pkg = EventMeshTestUtils.rrResponse(msg); ctx.writeAndFlush(pkg); } - @Override - public EventMeshMessage convertToProtocolMessage(Package pkg) { - return null; - } } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java index 2ca1e0845a..b670ef9084 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java @@ -50,10 +50,11 @@ public class CloudEventsProtocolAdaptor public CloudEvent toCloudEvent(ProtocolTransportObject cloudEvent) throws ProtocolHandleException { if (cloudEvent instanceof Package) { - Header header = ((Package) cloudEvent).getHeader(); - Object body = ((Package) cloudEvent).getBody(); + Package tcpPackage = (Package) cloudEvent; + Header header = tcpPackage.getHeader(); + String cloudEventJson = tcpPackage.getBody().toString(); - return deserializeTcpProtocol(header, body); + return deserializeTcpProtocol(header, cloudEventJson); } else if (cloudEvent instanceof HttpCommand) { org.apache.eventmesh.common.protocol.http.header.Header header = ((HttpCommand) cloudEvent).getHeader(); @@ -66,8 +67,8 @@ public CloudEvent toCloudEvent(ProtocolTransportObject cloudEvent) throws Protoc } } - private CloudEvent deserializeTcpProtocol(Header header, Object body) throws ProtocolHandleException { - return TcpMessageProtocolResolver.buildEvent(header, body); + private CloudEvent deserializeTcpProtocol(Header header, String cloudEventJson) throws ProtocolHandleException { + return TcpMessageProtocolResolver.buildEvent(header, cloudEventJson); } private CloudEvent deserializeHttpProtocol(String requestCode, org.apache.eventmesh.common.protocol.http.header.Header header, Body body) throws ProtocolHandleException { diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java index e5fa7f218c..0e2b303562 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java @@ -11,7 +11,7 @@ public class TcpMessageProtocolResolver { - public static CloudEvent buildEvent(Header header, Object body) throws ProtocolHandleException { + public static CloudEvent buildEvent(Header header, String cloudEventJson) throws ProtocolHandleException { CloudEventBuilder cloudEventBuilder; String protocolType = header.getProperty(Constants.PROTOCOL_TYPE).toString(); @@ -29,7 +29,8 @@ public static CloudEvent buildEvent(Header header, Object body) throws ProtocolH throw new ProtocolHandleException(String.format("Unsupported protocolType: %s", protocolType)); } if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { - cloudEventBuilder = CloudEventBuilder.v1((CloudEvent) body); + // todo: transform cloudEventJson to cloudEvent + cloudEventBuilder = CloudEventBuilder.v1(null); for (String propKey : header.getProperties().keySet()) { cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString()); @@ -38,7 +39,8 @@ public static CloudEvent buildEvent(Header header, Object body) throws ProtocolH return cloudEventBuilder.build(); } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { - cloudEventBuilder = CloudEventBuilder.v03((CloudEvent) body); + // todo: transform cloudEventJson to cloudEvent + cloudEventBuilder = CloudEventBuilder.v03(null); for (String propKey : header.getProperties().keySet()) { cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString()); diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java index 9dac2952c6..75a99fd51c 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java @@ -22,8 +22,10 @@ import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.common.RequestCode; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; import org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageBatchProtocolResolver; @@ -45,10 +47,11 @@ public class MeshMessageProtocolAdaptor implements ProtocolAdaptor> prepareProxyClientDistributionData() { if (!clientGroupMap.isEmpty()) { result = new HashMap<>(); for (Map.Entry entry : clientGroupMap.entrySet()) { - Map map = new HashMap(); + Map map = new HashMap<>(); map.put(EventMeshConstants.PURPOSE_SUB, entry.getValue().getGroupConsumerSessions().size()); map.put(EventMeshConstants.PURPOSE_PUB, entry.getValue().getGroupProducerSessions().size()); result.put(entry.getKey(), map); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java index bec64ca08b..f378469462 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java @@ -42,8 +42,6 @@ public interface EventMeshTCPClient extends AutoCloseable { void broadcast(ProtocolMessage msg, long timeout) throws EventMeshException; - void heartbeat() throws EventMeshException; - void listen() throws EventMeshException; void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java index bbe9a837cd..652c99ac94 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java @@ -34,8 +34,6 @@ public interface EventMeshTCPPubClient extends AutoCloseable { void init() throws EventMeshException; - void heartbeat() throws EventMeshException; - void reconnect() throws EventMeshException; // todo: Hide package method, use ProtocolMessage diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java index 21186486a5..97cedd8e55 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java @@ -34,8 +34,6 @@ public interface EventMeshTCPSubClient extends AutoCloseable { void init() throws EventMeshException; - void heartbeat() throws EventMeshException; - void reconnect() throws EventMeshException; void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java index eb29f174cd..3110dccdb8 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java @@ -17,8 +17,6 @@ package org.apache.eventmesh.client.tcp.common; -import org.apache.eventmesh.common.protocol.tcp.Package; - import io.netty.channel.ChannelHandlerContext; /** @@ -26,19 +24,8 @@ * * @param receive message type. */ +@FunctionalInterface public interface ReceiveMsgHook { - void handle(Package msg, ChannelHandlerContext ctx); + void handle(ProtocolMessage msg, ChannelHandlerContext ctx); - /** - * Convert tcp package to protocolMessage. - *
    - *
  • {@link org.apache.eventmesh.common.EventMeshMessage}
  • - *
  • {@link io.openmessaging.api.Message}
  • - *
  • {@link io.cloudevents.CloudEvent}
  • - *
- * - * @param pkg - * @return - */ - ProtocolMessage convertToProtocolMessage(Package pkg); } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java index 61098ad50f..c3c7e65916 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.client.tcp.common; import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; +import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.codec.Codec; @@ -26,6 +27,7 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -66,6 +68,8 @@ public abstract class TcpClient implements Closeable { private Channel channel; + private ScheduledFuture heartTask; + protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setNameFormat("TCPClientScheduler").setDaemon(true).build()); @@ -107,12 +111,33 @@ public void close() { try { channel.disconnect().sync(); workers.shutdownGracefully(); + if (heartTask != null) { + heartTask.cancel(false); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("close tcp client failed.|remote address={}", channel.remoteAddress(), e); } } + protected void heartbeat() { + if (heartTask != null) { + synchronized (TcpClient.class) { + heartTask = scheduler.scheduleAtFixedRate(() -> { + try { + if (!isActive()) { + reconnect(); + } + Package msg = MessageUtils.heartBeat(); + io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); + } catch (Exception ignore) { + // ignore + } + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); + } + } + } + protected synchronized void reconnect() throws Exception { ChannelFuture f = bootstrap.connect(host, port).sync(); channel = f.channel(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java index 39faa72ddd..abe029825c 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java @@ -67,12 +67,6 @@ public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshExcep cloudEventTCPPubClient.broadcast(cloudEvent, timeout); } - @Override - public void heartbeat() throws EventMeshException { - cloudEventTCPPubClient.heartbeat(); - cloudEventTCPSubClient.heartbeat(); - } - @Override public void listen() throws EventMeshException { cloudEventTCPSubClient.listen(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java index ca713ad3ce..56dcfa52f0 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java @@ -28,8 +28,11 @@ import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.common.protocol.tcp.Command; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.common.utils.JsonUtils; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; @@ -64,30 +67,12 @@ public void init() throws EventMeshException { try { open(new Handler()); hello(); + heartbeat(); } catch (Exception ex) { throw new EventMeshException("Initialize EventMeshMessageTCPPubClient error", ex); } } - @Override - public void heartbeat() throws EventMeshException { - if (task != null) { - synchronized (CloudEventTCPPubClient.class) { - task = scheduler.scheduleAtFixedRate(() -> { - try { - if (!isActive()) { - reconnect(); - } - Package msg = MessageUtils.heartBeat(); - io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception ignore) { - // ignore - } - }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); - } - } - } - @Override public void reconnect() throws EventMeshException { try { @@ -154,9 +139,6 @@ public void registerBusiHandler(ReceiveMsgHook handler) throws Event @Override public void close() { try { - if (task != null) { - task.cancel(false); - } goodbye(); super.close(); } catch (Exception ex) { @@ -173,10 +155,10 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep Command cmd = msg.getHeader().getCommand(); if (cmd == Command.RESPONSE_TO_CLIENT) { + Package pkg = responseToClientAck(msg); if (callback != null) { - callback.handle(msg, ctx); + callback.handle((CloudEvent) pkg.getBody(), ctx); } - Package pkg = MessageUtils.responseToClientAck(msg); send(pkg); } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { //TODO @@ -201,4 +183,11 @@ private void goodbye() throws Exception { Package msg = MessageUtils.goodbye(); this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); } + + private Package responseToClientAck(Package in) { + Package msg = new Package(); + msg.setHeader(new Header(Command.RESPONSE_TO_CLIENT_ACK, 0, null, in.getHeader().getSeq())); + msg.setBody(JsonUtils.deserialize(in.getBody().toString(), EventMeshMessage.class)); + return msg; + } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java index d19f66326b..9ea509be3c 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java @@ -29,18 +29,20 @@ import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; @@ -54,7 +56,6 @@ class CloudEventTCPSubClient extends TcpClient implements EventMeshTCPSubClient< private final UserAgent userAgent; private final List subscriptionItems = Collections.synchronizedList(new ArrayList<>()); private ReceiveMsgHook callback; - private ScheduledFuture task; public CloudEventTCPSubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) { super(eventMeshTcpClientConfig); @@ -66,34 +67,13 @@ public void init() throws EventMeshException { try { open(new Handler()); hello(); + heartbeat(); log.info("SimpleSubClientImpl|{}|started!", clientNo); } catch (Exception ex) { throw new EventMeshException("Initialize EventMeshMessageTcpSubClient error", ex); } } - @Override - public void heartbeat() throws EventMeshException { - if (task == null) { - synchronized (CloudEventTCPSubClient.class) { - task = scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - if (!isActive()) { - reconnect(); - } - Package msg = MessageUtils.heartBeat(); - io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception ignore) { - // - } - } - }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); - } - } - } - @Override public void reconnect() throws EventMeshException { try { @@ -161,7 +141,6 @@ public void registerBusiHandler(ReceiveMsgHook handler) throws Event @Override public void close() { try { - task.cancel(false); goodbye(); super.close(); } catch (Exception ex) { @@ -173,24 +152,24 @@ private class Handler extends SimpleChannelInboundHandler { @SuppressWarnings("Duplicates") @Override protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception { - Command cmd = msg.getHeader().getCommand(); + Command cmd = msg.getHeader().getCmd(); log.info("|receive|type={}|msg={}", cmd, msg); if (cmd == Command.REQUEST_TO_CLIENT) { + Package pkg = requestToClientAck(msg); if (callback != null) { - callback.handle(msg, ctx); + callback.handle((CloudEvent) pkg.getBody(), ctx); } - Package pkg = MessageUtils.requestToClientAck(msg); send(pkg); } else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) { - Package pkg = MessageUtils.asyncMessageAck(msg); + Package pkg = asyncMessageAck(msg); if (callback != null) { - callback.handle(msg, ctx); + callback.handle((CloudEvent) msg, ctx); } send(pkg); } else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) { - Package pkg = MessageUtils.broadcastMessageAck(msg); + Package pkg = broadcastMessageAck(msg); if (callback != null) { - callback.handle(msg, ctx); + callback.handle((CloudEvent) msg, ctx); } send(pkg); } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { @@ -208,4 +187,28 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep } } + private Package requestToClientAck(Package tcpPackage) { + Package msg = new Package(); + msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq())); + // todo: Transform json to CloudEvents + msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class)); + return msg; + } + + private Package asyncMessageAck(Package tcpPackage) { + Package msg = new Package(); + msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq())); + // todo: Transform to CloudEvents + msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class)); + return msg; + } + + private Package broadcastMessageAck(Package tcpPackage) { + Package msg = new Package(); + msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq())); + // todo: Transform to CloudEvents + msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class)); + return msg; + } + } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java index 0011435f84..6faf050c06 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java @@ -41,8 +41,8 @@ public EventMeshMessageTCPClient(EventMeshTCPClientConfig eventMeshTcpClientConf @Override public void init() throws EventMeshException { - eventMeshMessageTCPSubClient.init(); eventMeshMessageTCPPubClient.init(); + eventMeshMessageTCPSubClient.init(); } @Override @@ -66,12 +66,6 @@ public void broadcast(EventMeshMessage eventMeshMessage, long timeout) throws Ev eventMeshMessageTCPPubClient.broadcast(eventMeshMessage, timeout); } - @Override - public void heartbeat() throws EventMeshException { - eventMeshMessageTCPPubClient.heartbeat(); - eventMeshMessageTCPSubClient.heartbeat(); - } - @Override public void listen() throws EventMeshException { eventMeshMessageTCPSubClient.listen(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java index 990322cd06..fb7cd55626 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -64,28 +63,13 @@ public void init() throws EventMeshException { try { open(new Handler()); hello(); + heartbeat(); } catch (Exception ex) { throw new EventMeshException("Initialize EventMeshMessageTCPPubClient error", ex); } } - @Override - public void heartbeat() throws EventMeshException { - task = scheduler.scheduleAtFixedRate(() -> { - try { - if (!isActive()) { - reconnect(); - } - Package msg = MessageUtils.heartBeat(); - io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - log.debug("heartbeat to server from pub client|package {}", msg); - } catch (Exception ignore) { - // ignore - } - }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); - } - @Override public void reconnect() throws EventMeshException { try { @@ -124,7 +108,6 @@ public void asyncRR(EventMeshMessage eventMeshMessage, AsyncRRCallback callback, @Override public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws EventMeshException { try { - // todo: transform EventMeshMessage to Package Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.ASYNC_MESSAGE_TO_SERVER); log.info("SimplePubClientImpl em message|{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(), @@ -175,10 +158,11 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep Command cmd = msg.getHeader().getCommand(); if (cmd == Command.RESPONSE_TO_CLIENT) { + // todo: Transform to CloudEvents + Package pkg = MessageUtils.responseToClientAck(msg); if (callback != null) { - callback.handle(msg, ctx); + callback.handle((EventMeshMessage) pkg.getBody(), ctx); } - Package pkg = MessageUtils.responseToClientAck(msg); send(pkg); } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { //TODO diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java index c1887b6249..01b9b0d3f1 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java @@ -30,8 +30,10 @@ import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Command; import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.commons.collections4.CollectionUtils; @@ -39,7 +41,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -63,31 +64,13 @@ public void init() throws EventMeshException { try { open(new Handler()); hello(); + heartbeat(); log.info("SimpleSubClientImpl|{}|started!", clientNo); } catch (Exception ex) { throw new EventMeshException("Initialize EventMeshMessageTcpSubClient error", ex); } } - @Override - public void heartbeat() throws EventMeshException { - task = scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - if (!isActive()) { - reconnect(); - } - Package msg = MessageUtils.heartBeat(); - io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - log.debug("heartbeat to server from sub client|package {}", msg); - } catch (Exception ignore) { - // - } - } - }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); - } - @Override public void reconnect() throws EventMeshException { try { @@ -170,21 +153,21 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep Command cmd = msg.getHeader().getCommand(); log.info("|receive|type={}|msg={}", cmd, msg); if (cmd == Command.REQUEST_TO_CLIENT) { + Package pkg = requestToClientAck(msg); if (callback != null) { - callback.handle(msg, ctx); + callback.handle((EventMeshMessage) pkg.getBody(), ctx); } - Package pkg = MessageUtils.requestToClientAck(msg); send(pkg); } else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) { - Package pkg = MessageUtils.asyncMessageAck(msg); + Package pkg = asyncMessageAck(msg); if (callback != null) { - callback.handle(msg, ctx); + callback.handle((EventMeshMessage) pkg.getBody(), ctx); } send(pkg); } else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) { - Package pkg = MessageUtils.broadcastMessageAck(msg); + Package pkg = broadcastMessageAck(msg); if (callback != null) { - callback.handle(msg, ctx); + callback.handle((EventMeshMessage) pkg.getBody(), ctx); } send(pkg); } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { @@ -202,4 +185,25 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep } } + private Package requestToClientAck(Package tcpPackage) { + Package msg = new Package(); + msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq())); + msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class)); + return msg; + } + + private Package asyncMessageAck(Package tcpPackage) { + Package msg = new Package(); + msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq())); + msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class)); + return msg; + } + + private Package broadcastMessageAck(Package tcpPackage) { + Package msg = new Package(); + msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, null, tcpPackage.getHeader().getSeq())); + msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), EventMeshMessage.class)); + return msg; + } + } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java index 6381b6f0ed..25a929e5c5 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java @@ -68,12 +68,6 @@ public void broadcast(Message openMessage, long timeout) throws EventMeshExcepti eventMeshTCPPubClient.broadcast(openMessage, timeout); } - @Override - public void heartbeat() throws EventMeshException { - eventMeshTCPPubClient.heartbeat(); - eventMeshTCPSubClient.heartbeat(); - } - @Override public void listen() throws EventMeshException { eventMeshTCPSubClient.listen(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java index 7e3d40ddde..3eebdd206d 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java @@ -41,11 +41,6 @@ public void init() throws EventMeshException { } - @Override - public void heartbeat() throws EventMeshException { - - } - @Override public void reconnect() throws EventMeshException { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java index c0de8b14bd..34c3ad0380 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java @@ -41,11 +41,6 @@ public void init() throws EventMeshException { } - @Override - public void heartbeat() throws EventMeshException { - - } - @Override public void reconnect() throws EventMeshException {