Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #780] Modify the define level of EventListener from Topic to Consumer #781

Merged
merged 8 commits into from
Feb 23, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ public interface Consumer extends LifeCycle {

void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context);

void subscribe(String topic, final EventListener listener) throws Exception;
//void subscribe(String topic, final EventListener listener) throws Exception;

void subscribe(String topic) throws Exception;

void unsubscribe(String topic);

void registerEventListener(EventListener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class PushConsumerImpl {
private final DefaultMQPushConsumer rocketmqPushConsumer;
private final Properties properties;
private AtomicBoolean started = new AtomicBoolean(false);
private final Map<String, EventListener> subscribeTable = new ConcurrentHashMap<>();
private EventListener eventListener;
private final ClientConfig clientConfig;

public PushConsumerImpl(final Properties properties) {
Expand Down Expand Up @@ -134,9 +134,7 @@ public DefaultMQPushConsumer getRocketmqPushConsumer() {
return rocketmqPushConsumer;
}


public void subscribe(String topic, String subExpression, EventListener listener) {
this.subscribeTable.put(topic, listener);
public void subscribe(String topic, String subExpression) {
try {
this.rocketmqPushConsumer.subscribe(topic, subExpression);
} catch (MQClientException e) {
Expand All @@ -146,7 +144,6 @@ public void subscribe(String topic, String subExpression, EventListener listener


public void unsubscribe(String topic) {
this.subscribeTable.remove(topic);
try {
this.rocketmqPushConsumer.unsubscribe(topic);
} catch (Exception e) {
Expand Down Expand Up @@ -197,9 +194,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
cloudEvent = cloudEventBuilder.build();
}

EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic());

if (listener == null) {
if (eventListener == null) {
throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
msg.getTopic()));
}
Expand Down Expand Up @@ -231,7 +226,7 @@ public void commit(EventMeshAction action) {

eventMeshAsyncConsumeContext.setAbstractContext(context);

listener.consume(cloudEvent, eventMeshAsyncConsumeContext);
eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(
contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand Down Expand Up @@ -270,9 +265,7 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg,
cloudEvent = cloudEventBuilder.build();
}

EventListener listener = PushConsumerImpl.this.subscribeTable.get(msg.getTopic());

if (listener == null) {
if (eventListener == null) {
throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
msg.getTopic()));
}
Expand Down Expand Up @@ -306,12 +299,14 @@ public void commit(EventMeshAction action) {

eventMeshAsyncConsumeContext.setAbstractContext(context);

listener.consume(cloudEvent, eventMeshAsyncConsumeContext);
eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(
contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
}


public void registerEventListener(EventListener listener) {
this.eventListener = listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public synchronized void init(Properties keyValue) throws Exception {
}

@Override
public void subscribe(String topic, EventListener listener) throws Exception {
pushConsumer.subscribe(topic, "*", listener);
public void subscribe(String topic) throws Exception {
pushConsumer.subscribe(topic, "*");
}

@Override
Expand Down Expand Up @@ -99,6 +99,11 @@ public void unsubscribe(String topic) {
pushConsumer.unsubscribe(topic);
}

@Override
public void registerEventListener(EventListener listener) {
pushConsumer.registerEventListener(listener);
}

@Override
public synchronized void shutdown() {
pushConsumer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,7 @@ public void testConsumeMessage() {
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
consumer.subscribe("HELLO_QUEUE", "*", new EventListener() {

@Override
public void consume(CloudEvent cloudEvent, org.apache.eventmesh.api.AsyncConsumeContext context) {
assertThat(cloudEvent.getExtension("MESSAGE_ID")).isEqualTo("NewMsgId");
assertThat(cloudEvent.getData()).isEqualTo(testBody);
context.commit(EventMeshAction.CommitMessage);
}
});
consumer.subscribe("HELLO_QUEUE", "*");
((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class StandaloneConsumer implements Consumer {

private StandaloneBroker standaloneBroker;

private EventListener listener;

private AtomicBoolean isStarted;

private final ConcurrentHashMap<String, SubScribeTask> subscribeTaskTable;
Expand Down Expand Up @@ -90,10 +92,8 @@ public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context)
}

@Override
public void subscribe(String topic, EventListener listener) throws Exception {
if (listener == null) {
throw new IllegalArgumentException("listener cannot be null");
}
public void subscribe(String topic) throws Exception {

if (subscribeTaskTable.containsKey(topic)) {
return;
}
Expand All @@ -116,4 +116,9 @@ public void unsubscribe(String topic) {
subscribeTaskTable.remove(topic);
}
}

@Override
public void registerEventListener(EventListener listener) {
this.listener = listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,104 +69,17 @@ public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context)
}

@Override
public void subscribe(String topic, EventListener listener) throws Exception {
consumer.subscribe(topic, listener);
public void subscribe(String topic) throws Exception {
consumer.subscribe(topic);
}

@Override
public void unsubscribe(String topic) {
consumer.unsubscribe(topic);
}

//@Override
//public void init(Properties keyValue) throws Exception {
// String producerGroup = keyValue.getProperty("producerGroup");
//
// MessagingAccessPointImpl messagingAccessPoint = new MessagingAccessPointImpl(keyValue);
// consumer = (StandaloneConsumer) messagingAccessPoint.createConsumer(keyValue);
//
//}
//
//@Override
//public void updateOffset(List<Message> msgs, AbstractContext context) {
// for(Message message : msgs) {
// consumer.updateOffset(message);
// }
//}
//
//@Override
//public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
// // todo: support subExpression
// consumer.subscribe(topic, "*", listener);
//}
//
//@Override
//public void unsubscribe(String topic) {
// consumer.unsubscribe(topic);
//}
//
//@Override
//public void subscribe(String topic, String subExpression, MessageListener listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public void subscribe(String topic, MessageSelector selector, MessageListener listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public <T> void subscribe(String topic, String subExpression, GenericMessageListener<T> listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public <T> void subscribe(String topic, MessageSelector selector, GenericMessageListener<T> listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public void subscribe(String topic, String subExpression, AsyncMessageListener listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public void subscribe(String topic, MessageSelector selector, AsyncMessageListener listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public <T> void subscribe(String topic, String subExpression, AsyncGenericMessageListener<T> listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public <T> void subscribe(String topic, MessageSelector selector, AsyncGenericMessageListener<T> listener) {
// throw new UnsupportedOperationException("not supported yet");
//}
//
//@Override
//public void updateCredential(Properties credentialProperties) {
//
//}
//
//@Override
//public boolean isStarted() {
// return consumer.isStarted();
//}
//
//@Override
//public boolean isClosed() {
// return consumer.isClosed();
//}
//
//@Override
//public void start() {
// consumer.start();
//}
//
//@Override
//public void shutdown() {
// consumer.shutdown();
//}
@Override
public void registerEventListener(EventListener listener) {
consumer.registerEventListener(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public MQConsumerWrapper(String connectorPluginType) {
}
}

public void subscribe(String topic, EventListener listener) throws Exception {
meshMQPushConsumer.subscribe(topic, listener);
public void subscribe(String topic) throws Exception {
meshMQPushConsumer.subscribe(topic);
}

public void unsubscribe(String topic) throws Exception {
Expand All @@ -69,9 +69,9 @@ public synchronized void shutdown() throws Exception {
started.compareAndSet(false, true);
}

//public void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently) {
// meshMQPushConsumer.registerMessageListener(messageListenerConcurrently);
//}
public void registerEventListener(EventListener listener) {
meshMQPushConsumer.registerEventListener(listener);
}

public void updateOffset(List<CloudEvent> events, AbstractContext eventMeshConsumeConcurrentlyContext) {
meshMQPushConsumer.updateOffset(events, eventMeshConsumeConcurrentlyContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public synchronized void init() throws Exception {
keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup,
eventMeshGrpcConfiguration.eventMeshCluster));
persistentMqConsumer.init(keyValue);
EventListener clusterEventListner = createEventListener(SubscriptionMode.CLUSTERING);
persistentMqConsumer.registerEventListener(clusterEventListner);

Properties broadcastKeyValue = new Properties();
broadcastKeyValue.put("isBroadcast", "true");
Expand All @@ -148,6 +150,8 @@ public synchronized void init() throws Exception {
broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroup,
eventMeshGrpcConfiguration.eventMeshCluster));
broadcastMqConsumer.init(broadcastKeyValue);
EventListener broadcastEventListner = createEventListener(SubscriptionMode.BROADCASTING);
broadcastMqConsumer.registerEventListener(broadcastEventListner);

serviceState = ServiceState.INITED;
logger.info("EventMeshConsumer [{}] initialized.............", consumerGroup);
Expand Down Expand Up @@ -184,9 +188,9 @@ public ServiceState getStatus() {

public void subscribe(String topic, SubscriptionMode subscriptionMode) throws Exception {
if (SubscriptionMode.CLUSTERING.equals(subscriptionMode)) {
persistentMqConsumer.subscribe(topic, createEventListener(subscriptionMode));
persistentMqConsumer.subscribe(topic);
} else if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
broadcastMqConsumer.subscribe(topic, createEventListener(subscriptionMode));
broadcastMqConsumer.subscribe(topic);
} else {
logger.error("Subscribe Failed. Incorrect Subscription Mode");
throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
Expand Down
Loading