Skip to content

Commit

Permalink
[ISSUE #783] clean useless code in runtime module (#787)
Browse files Browse the repository at this point in the history
* [ISSUE #783] clean useless code in runtime module

* format code

close #783
  • Loading branch information
sarihuangshanrong authored Feb 24, 2022
1 parent 6c8c0c5 commit 9e42e65
Show file tree
Hide file tree
Showing 17 changed files with 0 additions and 567 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public class Acl {
private static AclService aclService;

public void init(String aclPluginType) throws AclException {
//aclService = getSpiAclService();
aclService = EventMeshExtensionFactory.getExtension(AclService.class, aclPluginType);
if (aclService == null) {
logger.error("can't load the aclService plugin, please check.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,12 @@ public synchronized void init() throws Exception {
@Override
public void consume(CloudEvent event, AsyncConsumeContext context) {
String topic = event.getSubject();
//String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS);
String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID);

event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
//message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
if (messageLogger.isDebugEnabled()) {
messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
} else {
Expand All @@ -118,9 +116,6 @@ public void consume(CloudEvent event, AsyncConsumeContext context) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
//context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
// EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
//context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
Expand All @@ -135,19 +130,13 @@ public void consume(CloudEvent event, AsyncConsumeContext context) {
consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
//context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
// EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
//context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(event, uniqueId, bizSeqNo);
} catch (Exception e) {
//ignore
}
//context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
// EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
//context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
Expand Down Expand Up @@ -192,9 +181,6 @@ public void consume(CloudEvent event, AsyncConsumeContext context) {
consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
//context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
// EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
//context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
Expand All @@ -209,19 +195,13 @@ public void consume(CloudEvent event, AsyncConsumeContext context) {
consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
//context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
// EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
//context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(event, uniqueId, bizSeqNo);
} catch (Exception e) {
//ignore
}
//context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
// EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
//context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
Expand Down Expand Up @@ -256,15 +236,6 @@ public void unsubscribe(String topic, SubscriptionMode subscriptionMode) throws
}
}

//public boolean isPause() {
// return persistentMqConsumer.isPause() && broadcastMqConsumer.isPause();
//}
//
//public void pause() {
// persistentMqConsumer.pause();
// broadcastMqConsumer.pause();
//}

public synchronized void shutdown() throws Exception {
persistentMqConsumer.shutdown();
started4Persistent.compareAndSet(true, false);
Expand Down Expand Up @@ -313,12 +284,6 @@ public void onException(OnExceptionContext context) {
logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
}

//@Override
//public void onException(Throwable e) {
// logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
// consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
//}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
try {
Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, client.topic, requestCode);
} catch (Exception e) {
//String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);

responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
heartbeatResponseHeader,
SendMessageResponseBody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,11 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>

long startTime = System.currentTimeMillis();

//Message rocketMQMsg;
//Message omsMsg = new Message();
String replyTopic = EventMeshConstants.RR_REPLY_TOPIC;

String origTopic = event.getSubject();

//Map<String, String> extFields = replyMessageRequestBody.getExtFields();
final String replyMQCluster = event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER).toString();
//final String replyMQCluster = MapUtils.getString(extFields, EventMeshConstants.PROPERTY_MESSAGE_CLUSTER, null);
if (!org.apache.commons.lang3.StringUtils.isEmpty(replyMQCluster)) {
replyTopic = replyMQCluster + "-" + replyTopic;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ private void doRedirect(String group, String purpose, int judge, List<String> ev
Collections.shuffle(new ArrayList<>(sessionList));

for (int i = 0; i < judge; i++) {
//String redirectSessionAddr = ProxyTcp2Client.redirectClientForRebalance(sessionList.get(i),
// eventMeshTCPServer.getClientSessionGroupMapping());
String newProxyIp = eventMeshRecommendResult.get(i).split(":")[0];
String newProxyPort = eventMeshRecommendResult.get(i).split(":")[1];
String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, newProxyIp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ public void push(final DownStreamMsgContext downStreamMsgContext) {
downStreamMsgContext.event = CloudEventBuilder.from(downStreamMsgContext.event)
.withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
//downStreamMsgContext.event.getSystemProperties().put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
//String.valueOf(System.currentTimeMillis()));
EventMeshMessage body = null;
int retCode = 0;
String retMsg = null;
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ public void start() throws Exception {
}), delay, period, TimeUnit.MILLISECONDS);

monitorThreadPoolTask = eventMeshTCPServer.getScheduler().scheduleAtFixedRate(() -> {
//ThreadPoolHelper.printThreadPoolState();
eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState();
eventMeshTCPServer.getEventMeshTcpRetryer().printRetryThreadPoolState();

Expand Down
Loading

0 comments on commit 9e42e65

Please sign in to comment.