diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index bbd62c0b81..5829e4941a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -369,7 +369,7 @@ public synchronized void initClientGroupPersistentConsumer() throws Exception { keyValue.put("isBroadcast", "false"); keyValue.put("consumerGroup", consumerGroup); keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC); - keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster)); + keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster)); persistentMsgConsumer.init(keyValue); // persistentMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() { @@ -458,7 +458,7 @@ public synchronized void initClientGroupBroadcastConsumer() throws Exception { keyValue.put("isBroadcast", "true"); keyValue.put("consumerGroup", consumerGroup); keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC); - keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster)); + keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster)); broadCastMsgConsumer.init(keyValue); // broadCastMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() { // @Override @@ -536,7 +536,7 @@ public void consume(Message message, AsyncConsumeContext context) { message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())); message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp); - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context; + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; if (CollectionUtils.isEmpty(groupConsumerSessions)) { logger.warn("found no session to downstream broadcast msg"); // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); @@ -586,7 +586,7 @@ public void consume(Message message, AsyncConsumeContext context) { message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())); message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp); - EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context; + EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions); String bizSeqNo = EventMeshUtil.getMessageBizSeq(message); if (session == null) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java index 280ff3bf86..ddfced04fc 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java @@ -326,10 +326,10 @@ public boolean isAvailable(String topic) { return true; } - @Override - public int hashCode() { - int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0) - + (sessionState != null ? sessionState.hashCode() : 0); - return code; - } +// @Override +// public int hashCode() { +// int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0) +// + (sessionState != null ? sessionState.hashCode() : 0); +// return code; +// } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java index 950c06be95..2d6c7ca834 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java @@ -99,6 +99,7 @@ public static Package responseToClientAck(Package in) { public static UserAgent generateSubClient(UserAgent agent) { UserAgent user = new UserAgent(); + user.setEnv(agent.getEnv()); user.setHost(agent.getHost()); user.setPassword(agent.getPassword()); user.setUsername(agent.getUsername()); @@ -116,6 +117,7 @@ public static UserAgent generateSubClient(UserAgent agent) { public static UserAgent generatePubClient(UserAgent agent) { UserAgent user = new UserAgent(); + user.setEnv(agent.getEnv()); user.setHost(agent.getHost()); user.setPassword(agent.getPassword()); user.setUsername(agent.getUsername()); diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java index 88578f5a24..88f5016e9d 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java @@ -36,6 +36,7 @@ public class EventMeshTestUtils { public static UserAgent generateClient1() { UserAgent user = new UserAgent(); + user.setEnv("test"); user.setHost("127.0.0.1"); user.setPassword(generateRandomString(8)); user.setUsername("PU4283"); @@ -52,6 +53,7 @@ public static UserAgent generateClient1() { public static UserAgent generateClient2() { UserAgent user = new UserAgent(); + user.setEnv("test"); user.setHost("127.0.0.1"); user.setPassword(generateRandomString(8)); user.setUsername("PU4283");