Skip to content

Commit

Permalink
[Issue #386] fixing ConsumerGroup Queue Consumer Offset not synced up…
Browse files Browse the repository at this point in the history
… issue (#387)

* [Issue #337] Fix HttpSubscriber startup issue

* [Issue #337] test commit

* [Issue #337] revert test commit

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook

* [Issue #337] Address code review comment for Subscriber Demo App

* [Issue #386] fixing ConsumerGroup Queuen Consumer Offset not synced up issue

* [Issue #386] adding license header to new file

* [Issue #386] Fix license header missing issue

Co-authored-by: j00441484 <jin.rong.luo@huawei.com>
  • Loading branch information
jinrongluo and j00441484 authored Jun 16, 2021
1 parent d3000dc commit 904956c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

import io.openmessaging.api.AsyncConsumeContext;

public abstract class MeshAsyncConsumeContext extends AsyncConsumeContext {
private AbstractContext context;

public AbstractContext getContext() {
return context;
}

public void setContext(AbstractContext context) {
this.context = context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,4 @@ public interface MeshMQPushConsumer extends Consumer {

@Override
void unsubscribe(String topic);

AbstractContext getContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.openmessaging.api.exception.OMSRuntimeException;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfig;
Expand All @@ -55,7 +56,6 @@ public class PushConsumerImpl implements Consumer {
private AtomicBoolean started = new AtomicBoolean(false);
private final Map<String, AsyncMessageListener> subscribeTable = new ConcurrentHashMap<>();
private final ClientConfig clientConfig;
private EventMeshConsumeConcurrentlyContext context;

public PushConsumerImpl(final Properties properties) {
this.rocketmqPushConsumer = new DefaultMQPushConsumer();
Expand Down Expand Up @@ -93,7 +93,6 @@ public PushConsumerImpl(final Properties properties) {

@Override
public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMeshConsumeConcurrentlyContext context) {
PushConsumerImpl.this.setContext(context);
if (msg == null) {
return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
Expand All @@ -117,12 +116,13 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMes

final Properties contextProperties = new Properties();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
AsyncConsumeContext omsContext = new AsyncConsumeContext() {
MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
@Override
public void commit(Action action) {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
};
omsContext.setContext(context);
listener.consume(omsMsg, omsContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand All @@ -133,7 +133,6 @@ public void commit(Action action) {

@Override
public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMeshConsumeConcurrentlyContext context) {
PushConsumerImpl.this.setContext(context);
if (msg == null) {
return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
Expand All @@ -157,13 +156,14 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMes

contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());

AsyncConsumeContext omsContext = new AsyncConsumeContext() {
MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
@Override
public void commit(Action action) {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
};
omsContext.setContext(context);
listener.consume(omsMsg, omsContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand Down Expand Up @@ -254,14 +254,6 @@ public DefaultMQPushConsumer getRocketmqPushConsumer() {
// }
// }

public AbstractContext getContext() {
return this.context;
}

public void setContext(EventMeshConsumeConcurrentlyContext context) {
this.context = context;
}

@Override
public void subscribe(String topic, String subExpression, MessageListener listener) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,6 @@ public void subscribe(String topic, AsyncMessageListener listener) throws Except
pushConsumer.subscribe(topic, "*", listener);
}

@Override
public AbstractContext getContext() {
return pushConsumer.getContext();
}


@Override
public boolean isStarted() {
return pushConsumer.isStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,4 @@ public synchronized void shutdown() throws Exception {
public void updateOffset(List<Message> msgs, AbstractContext eventMeshConsumeConcurrentlyContext) {
meshMQPushConsumer.updateOffset(msgs, eventMeshConsumeConcurrentlyContext);
}

public AbstractContext getContext() {
return meshMQPushConsumer.getContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
Expand Down Expand Up @@ -138,7 +139,7 @@ public void consume(Message message, AsyncConsumeContext context) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, message, persistentMqConsumer.getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
Expand Down Expand Up @@ -187,7 +188,7 @@ public void consume(Message message, AsyncConsumeContext context) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, message, broadcastMqConsumer.getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
Expand Down Expand Up @@ -533,7 +534,7 @@ public void consume(Message message, AsyncConsumeContext context) {
Iterator<Session> sessionsItr = groupConsumerSessions.iterator();

DownStreamMsgContext downStreamMsgContext =
new DownStreamMsgContext(message, null, broadCastMsgConsumer, broadCastMsgConsumer.getContext(), false);
new DownStreamMsgContext(message, null, broadCastMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false);

while (sessionsItr.hasNext()) {
Session session = sessionsItr.next();
Expand Down Expand Up @@ -605,7 +606,7 @@ public void consume(Message message, AsyncConsumeContext context) {
}

DownStreamMsgContext downStreamMsgContext =
new DownStreamMsgContext(message, session, persistentMsgConsumer, persistentMsgConsumer.getContext(), false);
new DownStreamMsgContext(message, session, persistentMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false);
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
Expand Down

0 comments on commit 904956c

Please sign in to comment.