diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java index 7d07aafb42..72105db737 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java @@ -36,7 +36,6 @@ public class Acl { private static AclService aclService; public void init(String aclPluginType) throws AclException { - //aclService = getSpiAclService(); aclService = EventMeshExtensionFactory.getExtension(AclService.class, aclPluginType); if (aclService == null) { logger.error("can't load the aclService plugin, please check."); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java index b4e5386604..347e6e8609 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java @@ -96,14 +96,12 @@ public synchronized void init() throws Exception { @Override public void consume(CloudEvent event, AsyncConsumeContext context) { String topic = event.getSubject(); - //String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS); String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID); event = CloudEventBuilder.from(event) .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())) .build(); - //message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())); if (messageLogger.isDebugEnabled()) { messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event); } else { @@ -118,9 +116,6 @@ public void consume(CloudEvent event, AsyncConsumeContext context) { logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic); try { sendMessageBack(event, uniqueId, bizSeqNo); - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); - //context.ack(); eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); return; } catch (Exception ex) { @@ -135,9 +130,6 @@ public void consume(CloudEvent event, AsyncConsumeContext context) { consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); if (httpMessageHandler.handle(handleMsgContext)) { - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); - //context.ack(); eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); } else { try { @@ -145,9 +137,6 @@ public void consume(CloudEvent event, AsyncConsumeContext context) { } catch (Exception e) { //ignore } - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); - //context.ack(); eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); } } @@ -192,9 +181,6 @@ public void consume(CloudEvent event, AsyncConsumeContext context) { consumerGroupConf.getConsumerGroup(), topic); try { sendMessageBack(event, uniqueId, bizSeqNo); - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); - //context.ack(); eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); return; } catch (Exception ex) { @@ -209,9 +195,6 @@ public void consume(CloudEvent event, AsyncConsumeContext context) { consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); if (httpMessageHandler.handle(handleMsgContext)) { - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); - //context.ack(); eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); } else { try { @@ -219,9 +202,6 @@ public void consume(CloudEvent event, AsyncConsumeContext context) { } catch (Exception e) { //ignore } - //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); - //context.ack(); eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); } } @@ -256,15 +236,6 @@ public void unsubscribe(String topic, SubscriptionMode subscriptionMode) throws } } - //public boolean isPause() { - // return persistentMqConsumer.isPause() && broadcastMqConsumer.isPause(); - //} - // - //public void pause() { - // persistentMqConsumer.pause(); - // broadcastMqConsumer.pause(); - //} - public synchronized void shutdown() throws Exception { persistentMqConsumer.shutdown(); started4Persistent.compareAndSet(true, false); @@ -313,12 +284,6 @@ public void onException(OnExceptionContext context) { logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}", consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId); } - - //@Override - //public void onException(Throwable e) { - // logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}", - // consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId); - //} }); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java index 24c99ad6c8..cd0b8c463b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java @@ -139,8 +139,6 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext try { Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, client.topic, requestCode); } catch (Exception e) { - //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp); - responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( heartbeatResponseHeader, SendMessageResponseBody diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java index f35eb0e2bf..a65f7f7ee8 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java @@ -179,15 +179,11 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext long startTime = System.currentTimeMillis(); - //Message rocketMQMsg; - //Message omsMsg = new Message(); String replyTopic = EventMeshConstants.RR_REPLY_TOPIC; String origTopic = event.getSubject(); - //Map extFields = replyMessageRequestBody.getExtFields(); final String replyMQCluster = event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER).toString(); - //final String replyMQCluster = MapUtils.getString(extFields, EventMeshConstants.PROPERTY_MESSAGE_CLUSTER, null); if (!org.apache.commons.lang3.StringUtils.isEmpty(replyMQCluster)) { replyTopic = replyMQCluster + "-" + replyTopic; } else { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java index afadcee553..ad787f8e70 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java @@ -164,8 +164,6 @@ private void doRedirect(String group, String purpose, int judge, List ev Collections.shuffle(new ArrayList<>(sessionList)); for (int i = 0; i < judge; i++) { - //String redirectSessionAddr = ProxyTcp2Client.redirectClientForRebalance(sessionList.get(i), - // eventMeshTCPServer.getClientSessionGroupMapping()); String newProxyIp = eventMeshRecommendResult.get(i).split(":")[0]; String newProxyPort = eventMeshRecommendResult.get(i).split(":")[1]; String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, newProxyIp, diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java index ad75a241b9..1ac0a2a95a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java @@ -92,8 +92,6 @@ public void push(final DownStreamMsgContext downStreamMsgContext) { downStreamMsgContext.event = CloudEventBuilder.from(downStreamMsgContext.event) .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis())) .build(); - //downStreamMsgContext.event.getSystemProperties().put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, - //String.valueOf(System.currentTimeMillis())); EventMeshMessage body = null; int retCode = 0; String retMsg = null; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/BytesMessageImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/BytesMessageImpl.java deleted file mode 100644 index a91c9a3381..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/BytesMessageImpl.java +++ /dev/null @@ -1,113 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package org.apache.eventmesh.runtime.domain; -// -//import io.openmessaging.BytesMessage; -//import io.openmessaging.KeyValue; -//import io.openmessaging.Message; -//import io.openmessaging.OMS; -//import io.openmessaging.exception.OMSMessageFormatException; -//import org.apache.commons.lang3.builder.ToStringBuilder; -// -//public class BytesMessageImpl implements BytesMessage { -// private KeyValue sysHeaders; -// private KeyValue userHeaders; -// private byte[] body; -// -// public BytesMessageImpl() { -// this.sysHeaders = OMS.newKeyValue(); -// this.userHeaders = OMS.newKeyValue(); -// } -// -// @Override -// public T getBody(Class type) throws OMSMessageFormatException { -// if (type == byte[].class) { -// return (T)body; -// } -// -// throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName()); -// } -// -// @Override -// public BytesMessage setBody(final byte[] body) { -// this.body = body; -// return this; -// } -// -// @Override -// public KeyValue sysHeaders() { -// return sysHeaders; -// } -// -// @Override -// public KeyValue userHeaders() { -// return userHeaders; -// } -// -// @Override -// public Message putSysHeaders(String key, int value) { -// sysHeaders.put(key, value); -// return this; -// } -// -// @Override -// public Message putSysHeaders(String key, long value) { -// sysHeaders.put(key, value); -// return this; -// } -// -// @Override -// public Message putSysHeaders(String key, double value) { -// sysHeaders.put(key, value); -// return this; -// } -// -// @Override -// public Message putSysHeaders(String key, String value) { -// sysHeaders.put(key, value); -// return this; -// } -// -// @Override -// public Message putUserHeaders(String key, int value) { -// userHeaders.put(key, value); -// return this; -// } -// -// @Override -// public Message putUserHeaders(String key, long value) { -// userHeaders.put(key, value); -// return this; -// } -// -// @Override -// public Message putUserHeaders(String key, double value) { -// userHeaders.put(key, value); -// return this; -// } -// -// @Override -// public Message putUserHeaders(String key, String value) { -// userHeaders.put(key, value); -// return this; -// } -// -// @Override -// public String toString() { -// return ToStringBuilder.reflectionToString(this); -// } -//} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/ConsumeRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/ConsumeRequest.java deleted file mode 100644 index 38c9b2e58c..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/ConsumeRequest.java +++ /dev/null @@ -1,55 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package com.webank.runtime.domain; -// -//import org.apache.rocketmq.client.impl.consumer.ProcessQueue; -//import org.apache.rocketmq.common.message.MessageExt; -//import org.apache.rocketmq.common.message.MessageQueue; -// -//public class ConsumeRequest { -// private final MessageExt messageExt; -// private final MessageQueue messageQueue; -// private final ProcessQueue processQueue; -// private long startConsumeTimeMillis; -// -// public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue, -// final ProcessQueue processQueue) { -// this.messageExt = messageExt; -// this.messageQueue = messageQueue; -// this.processQueue = processQueue; -// } -// -// public MessageExt getMessageExt() { -// return messageExt; -// } -// -// public MessageQueue getMessageQueue() { -// return messageQueue; -// } -// -// public ProcessQueue getProcessQueue() { -// return processQueue; -// } -// -// public long getStartConsumeTimeMillis() { -// return startConsumeTimeMillis; -// } -// -// public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) { -// this.startConsumeTimeMillis = startConsumeTimeMillis; -// } -//} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/SendResultImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/SendResultImpl.java deleted file mode 100644 index 3a8d12f999..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/SendResultImpl.java +++ /dev/null @@ -1,39 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package org.apache.eventmesh.runtime.domain; -// -//import io.openmessaging.KeyValue; -//import io.openmessaging.producer.SendResult; -// -//public class SendResultImpl implements SendResult { -// private String messageId; -// private KeyValue properties; -// -// public SendResultImpl(final String messageId, final KeyValue properties) { -// this.messageId = messageId; -// this.properties = properties; -// } -// -// @Override -// public String messageId() { -// return messageId; -// } -// -// public KeyValue properties() { -// return properties; -// } -//} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java index dea72331d6..63039a7220 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java @@ -164,7 +164,6 @@ public void start() throws Exception { }), delay, period, TimeUnit.MILLISECONDS); monitorThreadPoolTask = eventMeshTCPServer.getScheduler().scheduleAtFixedRate(() -> { - //ThreadPoolHelper.printThreadPoolState(); eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState(); eventMeshTCPServer.getEventMeshTcpRetryer().printRetryThreadPoolState(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshConsumeConcurrentlyContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshConsumeConcurrentlyContext.java deleted file mode 100644 index 80c90bf4a5..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshConsumeConcurrentlyContext.java +++ /dev/null @@ -1,44 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.eventmesh.runtime.patch; -// -//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -//import org.apache.rocketmq.client.impl.consumer.ProcessQueue; -//import org.apache.rocketmq.common.message.MessageQueue; -// -//public class EventMeshConsumeConcurrentlyContext extends ConsumeConcurrentlyContext { -// private final ProcessQueue processQueue; -// private boolean manualAck = true; -// -// public EventMeshConsumeConcurrentlyContext(MessageQueue messageQueue, ProcessQueue processQueue) { -// super(messageQueue); -// this.processQueue = processQueue; -// } -// -// public ProcessQueue getProcessQueue() { -// return processQueue; -// } -// -// public boolean isManualAck() { -// return manualAck; -// } -// -// public void setManualAck(boolean manualAck) { -// this.manualAck = manualAck; -// } -//} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshMessageListenerConcurrently.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshMessageListenerConcurrently.java deleted file mode 100644 index 202c0e19d1..0000000000 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshMessageListenerConcurrently.java +++ /dev/null @@ -1,69 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.eventmesh.runtime.patch; -// -//import org.apache.commons.collections4.CollectionUtils; -//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -//import org.apache.rocketmq.common.message.MessageExt; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.util.List; -// -//public abstract class EventMeshMessageListenerConcurrently implements MessageListenerConcurrently { -// -// private static final Logger LOG = LoggerFactory.getLogger(EventMeshMessageListenerConcurrently.class); -// -// @Override -// public ConsumeConcurrentlyStatus consumeMessage(final List msgs, -// final ConsumeConcurrentlyContext context) { -// ConsumeConcurrentlyStatus status = null; -// -// if (CollectionUtils.isEmpty(msgs)) { -// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; -// } -// -// MessageExt msg = msgs.get(0); -// try { -// EventMeshConsumeConcurrentlyContext eventMeshConsumeConcurrentlyContext = (EventMeshConsumeConcurrentlyContext) context; -// EventMeshConsumeConcurrentlyStatus eventMeshConsumeStatus = handleMessage(msg, eventMeshConsumeConcurrentlyContext); -// try { -// switch (eventMeshConsumeStatus) { -// case CONSUME_SUCCESS: -// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; -// case RECONSUME_LATER: -// return ConsumeConcurrentlyStatus.RECONSUME_LATER; -// case CONSUME_FINISH: -// eventMeshConsumeConcurrentlyContext.setManualAck(true); -// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; -// } -// } catch (Throwable e) { -// LOG.info("handleMessage fail", e); -// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; -// } -// } catch (Throwable e) { -// LOG.info("handleMessage fail", e); -// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; -// } -// return status; -// } -// -// public abstract EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMeshConsumeConcurrentlyContext context); -//} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java index a3a606faa9..5d5fc1af9f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java @@ -159,87 +159,6 @@ public static Map getEventProp(CloudEvent event) { return prop; } - //public static org.apache.rocketmq.common.message.Message decodeMessage(AccessMessage accessMessage) { - // org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(); - // msg.setTopic(accessMessage.getTopic()); - // msg.setBody(accessMessage.getBody().getBytes()); - // msg.getProperty("init"); - // for (Map.Entry property : accessMessage.getProperties().entrySet()) { - // msg.getProperties().put(property.getKey(), property.getValue()); - // } - // return msg; - //} - - //public static Message decodeMessage(EventMeshMessage eventMeshMessage) { - // Message omsMsg = new Message(); - // omsMsg.setBody(eventMeshMessage.getBody().getBytes()); - // omsMsg.setTopic(eventMeshMessage.getTopic()); - // Properties systemProperties = new Properties(); - // Properties userProperties = new Properties(); - // - // final Set> entries = eventMeshMessage.getProperties().entrySet(); - // - // for (final Map.Entry entry : entries) { - // if (isOMSHeader(entry.getKey())) { - // systemProperties.put(entry.getKey(), entry.getValue()); - // } else { - // userProperties.put(entry.getKey(), entry.getValue()); - // } - // } - // - // systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, eventMeshMessage.getTopic()); - // omsMsg.setSystemProperties(systemProperties); - // omsMsg.setUserProperties(userProperties); - // return omsMsg; - //} - // - //public static AccessMessage encodeMessage(org.apache.rocketmq.common.message.Message msg) throws Exception { - // AccessMessage accessMessage = new AccessMessage(); - // accessMessage.setBody(new String(msg.getBody(), "UTF-8")); - // accessMessage.setTopic(msg.getTopic()); - // for (Map.Entry property : msg.getProperties().entrySet()) { - // accessMessage.getProperties().put(property.getKey(), property.getValue()); - // } - // return accessMessage; - //} - - //public static EventMeshMessage encodeMessage(Message omsMessage) throws Exception { - // - // EventMeshMessage eventMeshMessage = new EventMeshMessage(); - // eventMeshMessage.setBody(new String(omsMessage.getBody(), StandardCharsets.UTF_8)); - // - // Properties sysHeaders = omsMessage.getSystemProperties(); - // Properties userHeaders = omsMessage.getUserProperties(); - // - // //All destinations in RocketMQ use Topic - // eventMeshMessage.setTopic(sysHeaders.getProperty(Constants.PROPERTY_MESSAGE_DESTINATION)); - // - // if (sysHeaders.containsKey("START_TIME")) { - // long deliverTime; - // if (StringUtils.isBlank(sysHeaders.getProperty("START_TIME"))) { - // deliverTime = 0; - // } else { - // deliverTime = Long.parseLong(sysHeaders.getProperty("START_TIME")); - // } - // - // if (deliverTime > 0) { - // // rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); - // eventMeshMessage.getProperties().put("START_TIME", String.valueOf(deliverTime)); - // } - // } - // - // for (String key : userHeaders.stringPropertyNames()) { - // eventMeshMessage.getProperties().put(key, userHeaders.getProperty(key)); - // } - // - // //System headers has a high priority - // for (String key : sysHeaders.stringPropertyNames()) { - // eventMeshMessage.getProperties().put(key, sysHeaders.getProperty(key)); - // } - // - // return eventMeshMessage; - //} - public static String getLocalAddr() { //priority of networkInterface when generating client ip String priority = System.getProperty("networkInterface.priority", "bond1 headers, Str conn.addRequestProperty(iter.next(), iter.next()); } } - //conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding); String ts = String.valueOf(System.currentTimeMillis()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java index d8c339679f..a2c88dbc71 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java @@ -56,98 +56,6 @@ public static SocketAddress string2SocketAddress(final String addr) { return isa; } - //public static RemotingCommand invokeSync(final String addr, final RemotingCommand request, - // final long timeoutMillis) throws InterruptedException, RemotingConnectException, - // RemotingSendRequestException, RemotingTimeoutException { - // long beginTime = System.currentTimeMillis(); - // SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); - // SocketChannel socketChannel = RemotingUtil.connect(socketAddress); - // if (socketChannel != null) { - // boolean sendRequestOK = false; - // - // try { - // - // socketChannel.configureBlocking(true); - // - // //bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 - // socketChannel.socket().setSoTimeout((int) timeoutMillis); - // - // ByteBuffer byteBufferRequest = request.encode(); - // while (byteBufferRequest.hasRemaining()) { - // int length = socketChannel.write(byteBufferRequest); - // if (length > 0) { - // if (byteBufferRequest.hasRemaining()) { - // if ((System.currentTimeMillis() - beginTime) > timeoutMillis) { - // - // throw new RemotingSendRequestException(addr); - // } - // } - // } else { - // throw new RemotingSendRequestException(addr); - // } - // - // Thread.sleep(1); - // } - // - // sendRequestOK = true; - // - // ByteBuffer byteBufferSize = ByteBuffer.allocate(4); - // while (byteBufferSize.hasRemaining()) { - // int length = socketChannel.read(byteBufferSize); - // if (length > 0) { - // if (byteBufferSize.hasRemaining()) { - // if ((System.currentTimeMillis() - beginTime) > timeoutMillis) { - // - // throw new RemotingTimeoutException(addr, timeoutMillis); - // } - // } - // } else { - // throw new RemotingTimeoutException(addr, timeoutMillis); - // } - // - // Thread.sleep(1); - // } - // - // int size = byteBufferSize.getInt(0); - // ByteBuffer byteBufferBody = ByteBuffer.allocate(size); - // while (byteBufferBody.hasRemaining()) { - // int length = socketChannel.read(byteBufferBody); - // if (length > 0) { - // if (byteBufferBody.hasRemaining()) { - // if ((System.currentTimeMillis() - beginTime) > timeoutMillis) { - // - // throw new RemotingTimeoutException(addr, timeoutMillis); - // } - // } - // } else { - // throw new RemotingTimeoutException(addr, timeoutMillis); - // } - // - // Thread.sleep(1); - // } - // - // byteBufferBody.flip(); - // return RemotingCommand.decode(byteBufferBody); - // } catch (IOException e) { - // log.error("invokeSync failure", e); - // - // if (sendRequestOK) { - // throw new RemotingTimeoutException(addr, timeoutMillis); - // } else { - // throw new RemotingSendRequestException(addr); - // } - // } finally { - // try { - // socketChannel.close(); - // } catch (IOException e) { - // e.printStackTrace(); - // } - // } - // } else { - // throw new RemotingConnectException(addr); - // } - //} - public static String parseChannelRemoteAddr(final Channel channel) { if (null == channel) { return ""; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java index d2b357ca71..0265dc3c3b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java @@ -145,20 +145,6 @@ public static String printMqMessage(EventMeshMessage eventMeshMessage) { return result; } - /** - * print part of the mq message - * - * @param message - * @return - */ - //public static String printMqMessage(org.apache.rocketmq.common.message.Message message) { - // Map properties = message.getProperties(); - // String bizSeqNo = message.getKeys(); - // String result = String.format("Message [topic=%s,TTL=%s,uniqueId=%s,bizSeq=%s]", message.getTopic() - // , properties.get(EventMeshConstants.TTL), properties.get(EventMeshConstants.RR_REQUEST_UNIQ_ID), bizSeqNo); - // return result; - //} - /** * get serviceId according to topic */ diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java index 56b50a5453..0b53764c8a 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java @@ -48,18 +48,6 @@ public static void main(String[] args) throws Exception { client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); client.justSubscribe(BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC); client.listen(); - //for (int i = 0; i < 10000; i++) { - // Package rr = null; - // AccessMessage rrMessage = null; - // try { - // rr = client.rr(MessageUtils.rrMesssage("TEST-TOPIC-TCP-SYNC"), 3000); - // Thread.sleep(100); - // //rrMessage = (AccessMessage) rr.getBody(); - // logger.error( "rr-reply-------------------------------------------------" + rr.toString()); - // } catch (Exception e) { - // e.printStackTrace(); - // } - //} client.registerSubBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) {