Skip to content

Commit

Permalink
[Issue apache#744] Code optimization for Grpc Request-Reply API
Browse files Browse the repository at this point in the history
  • Loading branch information
jinrongluo committed Feb 2, 2022
1 parent 7d05921 commit 2e085d8
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static void main(String[] args) throws InterruptedException {
SubscriptionItem subscriptionItem = new SubscriptionItem();
subscriptionItem.setTopic(topic);
subscriptionItem.setMode(SubscriptionMode.CLUSTERING);
subscriptionItem.setType(SubscriptionType.ASYNC);
subscriptionItem.setType(SubscriptionType.SYNC);


EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static void main(String[] args) throws InterruptedException {
SubscriptionItem subscriptionItem = new SubscriptionItem();
subscriptionItem.setTopic(topic);
subscriptionItem.setMode(SubscriptionMode.CLUSTERING);
subscriptionItem.setType(SubscriptionType.ASYNC);
subscriptionItem.setType(SubscriptionType.SYNC);

EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,7 @@ public synchronized void deregisterClient(ConsumerGroupClient client) {

private void closeEventStream(ConsumerGroupClient client) {
if (client.getEventEmitter() != null) {
try {
client.getEventEmitter().onCompleted();
} catch (Exception e) {
logger.warn("GRPC client {} already closed.", client.toString());
}
client.getEventEmitter().onCompleted();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public synchronized boolean deregisterClient(ConsumerGroupClient client) {
}

public synchronized void init() throws Exception {
if (consumerGroupTopicConfig.size() == 0) {
// no topics, don't init the consumer
return;
}

Properties keyValue = new Properties();
keyValue.put("isBroadcast", "false");
keyValue.put("consumerGroup", consumerGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public void process(Subscription subscription, EventEmitter<Response> emitter) t
.lastUpTime(new Date())
.build();
removeClients.add(newClient);
consumerManager.deregisterClient(newClient);
}

// deregister clients from ConsumerManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import io.grpc.stub.StreamObserver;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader;
import org.apache.eventmesh.common.protocol.grpc.protos.Response;
import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.grpc.processor.ReplyMessageProcessor;
Expand All @@ -16,9 +14,6 @@
import org.apache.eventmesh.runtime.core.protocol.grpc.processor.UnsubscribeProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;

public class ConsumerService extends ConsumerServiceGrpc.ConsumerServiceImplBase {
Expand Down Expand Up @@ -81,13 +76,13 @@ public void onNext(Subscription subscription) {
@Override
public void onError(Throwable t) {
logger.error("Receive error from client: " + t.getMessage());
responseObserver.onCompleted();
emitter.onCompleted();
}

@Override
public void onCompleted() {
logger.info("Client finish sending messages");
responseObserver.onCompleted();
emitter.onCompleted();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void sendSubscription(Subscription subscription) {
this.sender = consumerAsyncClient.subscribeStream(createReceiver());
}
}
this.sender.onNext(subscription);
senderOnNext(subscription);
}

private StreamObserver<SimpleMessage> createReceiver() {
Expand All @@ -65,7 +65,7 @@ public void onNext(SimpleMessage message) {
if (streamReply != null) {
logger.info("Sending reply message to Server.|seq={}|uniqueId={}|", streamReply.getReply().getSeqNum(),
streamReply.getReply().getUniqueId());
sender.onNext(streamReply);
senderOnNext(streamReply);
}
}
}
Expand Down Expand Up @@ -113,10 +113,26 @@ public void run() {

public void close() {
if (this.sender != null) {
this.sender.onCompleted();
senderOnComplete();
this.sender = null;
}
latch.countDown();
logger.info("SubStreamHandler closed.");
}

private void senderOnNext(Subscription subscription) {
try {
sender.onNext(subscription);
} catch (Throwable t) {
logger.warn("StreamObserver Error onNext {}", t.getMessage());
}
}

private void senderOnComplete() {
try {
sender.onCompleted();
} catch (Throwable t) {
logger.warn("StreamObserver Error onComplete {}", t.getMessage());
}
}
}

0 comments on commit 2e085d8

Please sign in to comment.