Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #397]Remove subscription session failed error #398

Merged
merged 22 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c7c62f0
[ISSUE #325]Update gradle configuration for publishing package to mav…
xwm1992 Apr 30, 2021
327f149
update build.gradle
xwm1992 Apr 30, 2021
ed78353
update build.gradle and gradle.properties
xwm1992 Apr 30, 2021
ee89cca
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 6, 2021
f80d3a7
update build.gradle and gradle.properties for publish to maven reposi…
xwm1992 May 7, 2021
1eb7a8f
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 7, 2021
0605542
* update gradle version for instructions
xwm1992 May 8, 2021
58ec68f
[ISSUE #329]Missing Log4j dependency
xwm1992 May 10, 2021
16d0af9
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 10, 2021
72c05bc
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 11, 2021
a8fe7ff
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 19, 2021
d68186d
update eventmesh-runtime.png
xwm1992 May 19, 2021
e9e8a04
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 24, 2021
246fbd0
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 May 31, 2021
b7a19f7
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Jun 8, 2021
8323f89
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Jun 9, 2021
82711e2
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Jun 21, 2021
7a099c1
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Jun 23, 2021
44bb6fa
support unsubscribe topics while delconsumer in http mode
xwm1992 Jun 23, 2021
5877d78
[ISSUE #397]Remove subscription session failed error
xwm1992 Jun 25, 2021
ade47ee
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Jun 25, 2021
9054f10
[ISSUE #397]Remove subscription session failed error
xwm1992 Jun 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public synchronized void initClientGroupPersistentConsumer() throws Exception {
keyValue.put("isBroadcast", "false");
keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));

persistentMsgConsumer.init(keyValue);
// persistentMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
Expand Down Expand Up @@ -458,7 +458,7 @@ public synchronized void initClientGroupBroadcastConsumer() throws Exception {
keyValue.put("isBroadcast", "true");
keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
broadCastMsgConsumer.init(keyValue);
// broadCastMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
// @Override
Expand Down Expand Up @@ -536,7 +536,7 @@ public void consume(Message message, AsyncConsumeContext context) {
message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);

EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
if (CollectionUtils.isEmpty(groupConsumerSessions)) {
logger.warn("found no session to downstream broadcast msg");
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
Expand Down Expand Up @@ -586,7 +586,7 @@ public void consume(Message message, AsyncConsumeContext context) {
message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);

EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions);
String bizSeqNo = EventMeshUtil.getMessageBizSeq(message);
if (session == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,10 @@ public boolean isAvailable(String topic) {
return true;
}

@Override
public int hashCode() {
int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0)
+ (sessionState != null ? sessionState.hashCode() : 0);
return code;
}
// @Override
// public int hashCode() {
// int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0)
// + (sessionState != null ? sessionState.hashCode() : 0);
// return code;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static Package responseToClientAck(Package in) {

public static UserAgent generateSubClient(UserAgent agent) {
UserAgent user = new UserAgent();
user.setEnv(agent.getEnv());
user.setHost(agent.getHost());
user.setPassword(agent.getPassword());
user.setUsername(agent.getUsername());
Expand All @@ -116,6 +117,7 @@ public static UserAgent generateSubClient(UserAgent agent) {

public static UserAgent generatePubClient(UserAgent agent) {
UserAgent user = new UserAgent();
user.setEnv(agent.getEnv());
user.setHost(agent.getHost());
user.setPassword(agent.getPassword());
user.setUsername(agent.getUsername());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class EventMeshTestUtils {

public static UserAgent generateClient1() {
UserAgent user = new UserAgent();
user.setEnv("test");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
Expand All @@ -52,6 +53,7 @@ public static UserAgent generateClient1() {

public static UserAgent generateClient2() {
UserAgent user = new UserAgent();
user.setEnv("test");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
Expand Down