diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java index 3d1b7df206..d76c99e86a 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsSubscribeReply.java @@ -37,7 +37,7 @@ public static void main(String[] args) throws InterruptedException { SubscriptionItem subscriptionItem = new SubscriptionItem(); subscriptionItem.setTopic(topic); subscriptionItem.setMode(SubscriptionMode.CLUSTERING); - subscriptionItem.setType(SubscriptionType.ASYNC); + subscriptionItem.setType(SubscriptionType.SYNC); EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java index cd0405018a..cebb8f708a 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshSubscribeReply.java @@ -37,7 +37,7 @@ public static void main(String[] args) throws InterruptedException { SubscriptionItem subscriptionItem = new SubscriptionItem(); subscriptionItem.setTopic(topic); subscriptionItem.setMode(SubscriptionMode.CLUSTERING); - subscriptionItem.setType(SubscriptionType.ASYNC); + subscriptionItem.setType(SubscriptionType.SYNC); EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java index b4fa9715a1..9849553b06 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java @@ -150,11 +150,7 @@ public synchronized void deregisterClient(ConsumerGroupClient client) { private void closeEventStream(ConsumerGroupClient client) { if (client.getEventEmitter() != null) { - try { - client.getEventEmitter().onCompleted(); - } catch (Exception e) { - logger.warn("GRPC client {} already closed.", client.toString()); - } + client.getEventEmitter().onCompleted(); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java index 8b8223c997..9f4d54381c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java @@ -107,6 +107,11 @@ public synchronized boolean deregisterClient(ConsumerGroupClient client) { } public synchronized void init() throws Exception { + if (consumerGroupTopicConfig.size() == 0) { + // no topics, don't init the consumer + return; + } + Properties keyValue = new Properties(); keyValue.put("isBroadcast", "false"); keyValue.put("consumerGroup", consumerGroup); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/UnsubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/UnsubscribeProcessor.java index be25e30c6d..69dc8a7d11 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/UnsubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/UnsubscribeProcessor.java @@ -63,7 +63,6 @@ public void process(Subscription subscription, EventEmitter emitter) t .lastUpTime(new Date()) .build(); removeClients.add(newClient); - consumerManager.deregisterClient(newClient); } // deregister clients from ConsumerManager diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java index 6ba2bac5c2..dc47741d16 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java @@ -3,11 +3,9 @@ import io.grpc.stub.StreamObserver; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc; -import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.grpc.protos.Response; import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.grpc.processor.ReplyMessageProcessor; @@ -16,9 +14,6 @@ import org.apache.eventmesh.runtime.core.protocol.grpc.processor.UnsubscribeProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; public class ConsumerService extends ConsumerServiceGrpc.ConsumerServiceImplBase { @@ -81,13 +76,13 @@ public void onNext(Subscription subscription) { @Override public void onError(Throwable t) { logger.error("Receive error from client: " + t.getMessage()); - responseObserver.onCompleted(); + emitter.onCompleted(); } @Override public void onCompleted() { logger.info("Client finish sending messages"); - responseObserver.onCompleted(); + emitter.onCompleted(); } }; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index 67e140505d..b1959a49f7 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -40,7 +40,7 @@ public void sendSubscription(Subscription subscription) { this.sender = consumerAsyncClient.subscribeStream(createReceiver()); } } - this.sender.onNext(subscription); + senderOnNext(subscription); } private StreamObserver createReceiver() { @@ -65,7 +65,7 @@ public void onNext(SimpleMessage message) { if (streamReply != null) { logger.info("Sending reply message to Server.|seq={}|uniqueId={}|", streamReply.getReply().getSeqNum(), streamReply.getReply().getUniqueId()); - sender.onNext(streamReply); + senderOnNext(streamReply); } } } @@ -113,10 +113,26 @@ public void run() { public void close() { if (this.sender != null) { - this.sender.onCompleted(); + senderOnComplete(); this.sender = null; } latch.countDown(); logger.info("SubStreamHandler closed."); } + + private void senderOnNext(Subscription subscription) { + try { + sender.onNext(subscription); + } catch (Throwable t) { + logger.warn("StreamObserver Error onNext {}", t.getMessage()); + } + } + + private void senderOnComplete() { + try { + sender.onCompleted(); + } catch (Throwable t) { + logger.warn("StreamObserver Error onComplete {}", t.getMessage()); + } + } }