diff --git a/eventmesh-common/build.gradle b/eventmesh-common/build.gradle index 84a03a5d39..699e9c4fdb 100644 --- a/eventmesh-common/build.gradle +++ b/eventmesh-common/build.gradle @@ -34,9 +34,9 @@ dependencies { implementation "com.lmax:disruptor" - implementation "com.fasterxml.jackson.core:jackson-databind" - implementation "com.fasterxml.jackson.core:jackson-core" - implementation "com.fasterxml.jackson.core:jackson-annotations" + api "com.fasterxml.jackson.core:jackson-databind" + api "com.fasterxml.jackson.core:jackson-core" + api "com.fasterxml.jackson.core:jackson-annotations" implementation "org.apache.httpcomponents:httpclient" diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java index fedbbe046e..2c33aec9bf 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java @@ -5,7 +5,9 @@ import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.v03.CloudEventV03; import io.cloudevents.core.v1.CloudEventV1; + import org.apache.commons.lang3.StringUtils; + import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -42,37 +44,37 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { event = JsonUtils.deserialize(content, CloudEventV1.class); event = CloudEventBuilder.from(event) - .withExtension(ProtocolKey.REQUEST_CODE, code) - .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) - .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) - .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) - .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) - .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) - .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) - .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) - .withExtension(ProtocolKey.VERSION, version.getVersion()) - .withExtension(ProtocolKey.LANGUAGE, language) - .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) - .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) - .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) - .build(); + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .build(); } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { event = JsonUtils.deserialize(content, CloudEventV03.class); event = CloudEventBuilder.from(event) - .withExtension(ProtocolKey.REQUEST_CODE, code) - .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) - .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) - .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) - .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) - .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) - .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) - .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) - .withExtension(ProtocolKey.VERSION, version.getVersion()) - .withExtension(ProtocolKey.LANGUAGE, language) - .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) - .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) - .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) - .build(); + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .build(); } return event; } catch (Exception e) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java index ef025a007a..5842eed21b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java @@ -273,8 +273,11 @@ public void onSuccess(CloudEvent event) { HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse( sendMessageResponseHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(), - JsonUtils.serialize(new SendMessageResponseBody.ReplyMessage(topic, - rtnMsg)))); + JsonUtils.serialize(SendMessageResponseBody.ReplyMessage.builder() + .topic(topic) + .body(rtnMsg) + .properties(EventMeshUtil.getEventProp(event)) + .build()))); asyncContext.onComplete(succ, handler); } catch (Exception ex) { HttpCommand err = asyncContext.getRequest().createHttpCommandResponse( diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index 5fa56ef70f..0bcf02c6c6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -36,7 +36,7 @@ import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext; -import org.apache.eventmesh.runtime.util.OMSUtil; +import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; @@ -150,7 +150,7 @@ public void tryHTTPRequest() { body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic())); body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS, - JsonUtils.serialize(OMSUtil.getEventProp(handleMsgContext.getEvent())))); + JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent())))); try { builder.setEntity(new UrlEncodedFormEntity(body)); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java index 674fb18e83..8f3e383eb4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java @@ -46,6 +46,7 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,9 +57,11 @@ public class ClientSessionGroupMapping { private ConcurrentHashMap sessionTable = new ConcurrentHashMap<>(); - private ConcurrentHashMap clientGroupMap = new ConcurrentHashMap(); + private ConcurrentHashMap clientGroupMap = + new ConcurrentHashMap(); - private ConcurrentHashMap lockMap = new ConcurrentHashMap(); + private ConcurrentHashMap lockMap = + new ConcurrentHashMap(); private EventMeshTCPServer eventMeshTCPServer; @@ -123,7 +126,7 @@ public synchronized void closeSession(ChannelHandlerContext ctx) throws Exceptio @Override public void operationComplete(ChannelFuture future) throws Exception { logger.info("close the connection to remote address[{}] result: {}", remoteAddress, - future.isSuccess()); + future.isSuccess()); } }); sessionLogger.info("session|close|succeed|address={}|msg={}", addr, "no session was found"); @@ -169,7 +172,7 @@ private void closeSession(Session session) throws Exception { @Override public void operationComplete(ChannelFuture future) throws Exception { logger.info("close the connection to remote address[{}] result: {}", remoteAddress, - future.isSuccess()); + future.isSuccess()); } }); } @@ -179,7 +182,8 @@ public void operationComplete(ChannelFuture future) throws Exception { private ClientGroupWrapper constructClientGroupWrapper(String sysId, String producerGroup, String consumerGroup, EventMeshTCPServer eventMeshTCPServer, DownstreamDispatchStrategy downstreamDispatchStrategy) { - return new ClientGroupWrapper(sysId, producerGroup, consumerGroup, eventMeshTCPServer, downstreamDispatchStrategy); + return new ClientGroupWrapper(sysId, producerGroup, consumerGroup, eventMeshTCPServer, + downstreamDispatchStrategy); } private void initClientGroupWrapper(UserAgent user, Session session) throws Exception { @@ -192,7 +196,7 @@ private void initClientGroupWrapper(UserAgent user, Session session) throws Exce synchronized (lockMap.get(user.getSubsystem())) { if (!clientGroupMap.containsKey(user.getSubsystem())) { ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getProducerGroup(), - user.getConsumerGroup(), eventMeshTCPServer, new FreePriorityDispatchStrategy()); + user.getConsumerGroup(), eventMeshTCPServer, new FreePriorityDispatchStrategy()); clientGroupMap.put(user.getSubsystem(), cgw); logger.info("create new ClientGroupWrapper, subsystem:{}", user.getSubsystem()); } @@ -297,32 +301,40 @@ private void handleUnackMsgsInSession(Session session) { for (Map.Entry entry : unAckMsg.entrySet()) { DownStreamMsgContext downStreamMsgContext = entry.getValue(); if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) { - logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event), session.getClient()); + logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", + downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event), + session.getClient()); continue; } - Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy().select(session.getClientGroupWrapper().get().getConsumerGroup(), - downStreamMsgContext.event.getSubject(), Objects.requireNonNull(session.getClientGroupWrapper().get()).groupConsumerSessions); + Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy() + .select(session.getClientGroupWrapper().get().getConsumerGroup(), + downStreamMsgContext.event.getSubject(), + Objects.requireNonNull(session.getClientGroupWrapper().get()).groupConsumerSessions); if (reChooseSession != null) { downStreamMsgContext.session = reChooseSession; reChooseSession.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); reChooseSession.downstreamMsg(downStreamMsgContext); - logger.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), downStreamMsgContext.session.getClient()); + logger.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), + downStreamMsgContext.session.getClient()); } else { - logger.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(), downStreamMsgContext.event.getSubject()); + logger.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(), + downStreamMsgContext.event.getSubject()); } } } } private void cleanClientGroupWrapperCommon(Session session) throws Exception { - logger.info("GroupConsumerSessions size:{}", session.getClientGroupWrapper().get().getGroupConsumerSessions().size()); + logger.info("GroupConsumerSessions size:{}", + session.getClientGroupWrapper().get().getGroupConsumerSessions().size()); if (session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0) { shutdownClientGroupConsumer(session); } - logger.info("GroupProducerSessions size:{}", session.getClientGroupWrapper().get().getGroupProducerSessions().size()); + logger.info("GroupProducerSessions size:{}", + session.getClientGroupWrapper().get().getGroupProducerSessions().size()); if ((session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0) - && (session.getClientGroupWrapper().get().getGroupProducerSessions().size() == 0)) { + && (session.getClientGroupWrapper().get().getGroupProducerSessions().size() == 0)) { shutdownClientGroupProducer(session); clientGroupMap.remove(session.getClientGroupWrapper().get().getSysId()); @@ -350,22 +362,24 @@ private void shutdownClientGroupProducer(Session session) throws Exception { private void initSessionCleaner() { eventMeshTCPServer.getScheduler().scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - Iterator sessionIterator = sessionTable.values().iterator(); - while (sessionIterator.hasNext()) { - Session tmp = sessionIterator.next(); - if (System.currentTimeMillis() - tmp.getLastHeartbeatTime() > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) { - try { - logger.warn("clean expired session,client:{}", tmp.getClient()); - closeSession(tmp.getContext()); - } catch (Exception e) { - logger.error("say goodbye to session error! {}", tmp, e); - } - } - } - } - }, 1000, eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills, TimeUnit.MILLISECONDS); + @Override + public void run() { + Iterator sessionIterator = sessionTable.values().iterator(); + while (sessionIterator.hasNext()) { + Session tmp = sessionIterator.next(); + if (System.currentTimeMillis() - tmp.getLastHeartbeatTime() > + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) { + try { + logger.warn("clean expired session,client:{}", tmp.getClient()); + closeSession(tmp.getContext()); + } catch (Exception e) { + logger.error("say goodbye to session error! {}", tmp, e); + } + } + } + } + }, 1000, eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills, + TimeUnit.MILLISECONDS); } private void initDownStreamMsgContextCleaner() { @@ -386,7 +400,7 @@ public void run() { downStreamMsgContext.ackMsg(); tmp.getPusher().getUnAckMsg().remove(seqKey); logger.warn("remove expire downStreamMsgContext, session:{}, topic:{}, seq:{}", tmp, - downStreamMsgContext.event.getSubject(), seqKey); + downStreamMsgContext.event.getSubject(), seqKey); } } } @@ -406,8 +420,8 @@ public void start() throws Exception { public void shutdown() throws Exception { logger.info("begin to close sessions gracefully"); - for(ClientGroupWrapper clientGroupWrapper : clientGroupMap.values()){ - for(Session subSession : clientGroupWrapper.getGroupConsumerSessions()){ + for (ClientGroupWrapper clientGroupWrapper : clientGroupMap.values()) { + for (Session subSession : clientGroupWrapper.getGroupConsumerSessions()) { try { EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, subSession, this); } catch (Exception e) { @@ -415,7 +429,7 @@ public void shutdown() throws Exception { } } - for(Session pubSession : clientGroupWrapper.getGroupProducerSessions()){ + for (Session pubSession : clientGroupWrapper.getGroupProducerSessions()) { try { EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, pubSession, this); } catch (Exception e) { @@ -437,7 +451,7 @@ public void shutdown() throws Exception { sessionTable.values().parallelStream().forEach(itr -> { try { - EventMeshTcp2Client.serverGoodby2Client(this.eventMeshTCPServer,itr, this); + EventMeshTcp2Client.serverGoodby2Client(this.eventMeshTCPServer, itr, this); } catch (Exception e) { logger.error("say goodbye to session error! {}", itr, e); } @@ -470,15 +484,15 @@ public Map> prepareEventMeshClientDistributionData( return result; } - public Map> prepareProxyClientDistributionData(){ + public Map> prepareProxyClientDistributionData() { Map> result = null; - if(!clientGroupMap.isEmpty()){ + if (!clientGroupMap.isEmpty()) { result = new HashMap<>(); - for(Map.Entry entry : clientGroupMap.entrySet()){ + for (Map.Entry entry : clientGroupMap.entrySet()) { Map map = new HashMap(); - map.put(EventMeshConstants.PURPOSE_SUB,entry.getValue().getGroupConsumerSessions().size()); - map.put(EventMeshConstants.PURPOSE_PUB,entry.getValue().getGroupProducerSessions().size()); + map.put(EventMeshConstants.PURPOSE_SUB, entry.getValue().getGroupConsumerSessions().size()); + map.put(EventMeshConstants.PURPOSE_PUB, entry.getValue().getGroupProducerSessions().size()); result.put(entry.getKey(), map); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java index 58e05e0d65..035e9d4c97 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java @@ -17,9 +17,6 @@ package org.apache.eventmesh.runtime.util; - -import static org.apache.eventmesh.runtime.util.OMSUtil.isOMSHeader; - import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; @@ -28,6 +25,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Enumeration; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -149,6 +147,15 @@ public static String getMessageBizSeq(CloudEvent event) { return keys; } + public static Map getEventProp(CloudEvent event) { + Set extensionSet = event.getExtensionNames(); + Map prop = new HashMap<>(); + for (String extensionKey : extensionSet) { + prop.put(extensionKey, event.getExtension(extensionKey).toString()); + } + return prop; + } + // public static org.apache.rocketmq.common.message.Message decodeMessage(AccessMessage accessMessage) { // org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(); // msg.setTopic(accessMessage.getTopic()); @@ -160,28 +167,28 @@ public static String getMessageBizSeq(CloudEvent event) { // return msg; // } - public static Message decodeMessage(EventMeshMessage eventMeshMessage) { - Message omsMsg = new Message(); - omsMsg.setBody(eventMeshMessage.getBody().getBytes()); - omsMsg.setTopic(eventMeshMessage.getTopic()); - Properties systemProperties = new Properties(); - Properties userProperties = new Properties(); - - final Set> entries = eventMeshMessage.getProperties().entrySet(); - - for (final Map.Entry entry : entries) { - if (isOMSHeader(entry.getKey())) { - systemProperties.put(entry.getKey(), entry.getValue()); - } else { - userProperties.put(entry.getKey(), entry.getValue()); - } - } - - systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, eventMeshMessage.getTopic()); - omsMsg.setSystemProperties(systemProperties); - omsMsg.setUserProperties(userProperties); - return omsMsg; - } +// public static Message decodeMessage(EventMeshMessage eventMeshMessage) { +// Message omsMsg = new Message(); +// omsMsg.setBody(eventMeshMessage.getBody().getBytes()); +// omsMsg.setTopic(eventMeshMessage.getTopic()); +// Properties systemProperties = new Properties(); +// Properties userProperties = new Properties(); +// +// final Set> entries = eventMeshMessage.getProperties().entrySet(); +// +// for (final Map.Entry entry : entries) { +// if (isOMSHeader(entry.getKey())) { +// systemProperties.put(entry.getKey(), entry.getValue()); +// } else { +// userProperties.put(entry.getKey(), entry.getValue()); +// } +// } +// +// systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, eventMeshMessage.getTopic()); +// omsMsg.setSystemProperties(systemProperties); +// omsMsg.setUserProperties(userProperties); +// return omsMsg; +// } // public static AccessMessage encodeMessage(org.apache.rocketmq.common.message.Message msg) throws Exception { // AccessMessage accessMessage = new AccessMessage(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/OMSUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/OMSUtil.java deleted file mode 100644 index b8f2d1e8cb..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/OMSUtil.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Apache Software Foundation (ASF) under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Apache Software Foundation (ASF) licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.eventmesh.runtime.util; - -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import io.cloudevents.CloudEvent; -import io.openmessaging.api.Message; -import io.openmessaging.api.OMSBuiltinKeys; - -public class OMSUtil { - - public static boolean isOMSHeader(String value) { - for (Field field : OMSBuiltinKeys.class.getDeclaredFields()) { - try { - if (field.get(OMSBuiltinKeys.class).equals(value)) { - return true; - } - } catch (IllegalAccessException e) { - return false; - } - } - return false; - } - -// public static Properties convertKeyValue2Prop(KeyValue keyValue){ -// Properties properties = new Properties(); -// for (String key : keyValue.keySet()){ -// properties.put(key, keyValue.getString(key)); -// } -// return properties; -// } - - @SuppressWarnings("unchecked") - public static Map combineProp(Properties p1, Properties p2) { - Properties properties = new Properties(); - properties.putAll(p1); - properties.putAll(p2); - - return new HashMap<>((Map) properties); - } - - public static Map getEventProp(CloudEvent event) { - Set extensionSet = event.getExtensionNames(); - Map prop = new HashMap<>(); - for (String extensionKey : extensionSet) { - prop.put(extensionKey, event.getExtension(extensionKey).toString()); - } - return prop; - } - -}