diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java index 1c3fc93d15..636ff79471 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.java @@ -1,5 +1,6 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.push; +import io.grpc.stub.StreamObserver; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.RandomUtils; @@ -54,7 +55,10 @@ public void tryPushRequest() { .build(); try { // catch the error and retry, don't use eventEmitter.onNext() to hide the error - eventEmitter.getEmitter().onNext(simpleMessage); + StreamObserver emitter = eventEmitter.getEmitter(); + synchronized (emitter) { + emitter.onNext(simpleMessage); + } long cost = System.currentTimeMillis() - lastPushTime; messageLogger.info( diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/EventEmitter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/EventEmitter.java index 221aaf795a..fd84170acb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/EventEmitter.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/EventEmitter.java @@ -13,7 +13,7 @@ public EventEmitter(StreamObserver emitter) { this.emitter = emitter; } - public void onNext(T event) { + public synchronized void onNext(T event) { try { emitter.onNext(event); } catch (Throwable t) { @@ -21,7 +21,7 @@ public void onNext(T event) { } } - public void onCompleted() { + public synchronized void onCompleted() { try { emitter.onCompleted(); } catch (Throwable t) { @@ -29,7 +29,7 @@ public void onCompleted() { } } - public void onError(Throwable t) { + public synchronized void onError(Throwable t) { try { emitter.onError(t); } catch (Throwable t1) { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index b1959a49f7..0b549a7e16 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -122,7 +122,9 @@ public void close() { private void senderOnNext(Subscription subscription) { try { - sender.onNext(subscription); + synchronized (sender) { + sender.onNext(subscription); + } } catch (Throwable t) { logger.warn("StreamObserver Error onNext {}", t.getMessage()); } @@ -130,7 +132,9 @@ private void senderOnNext(Subscription subscription) { private void senderOnComplete() { try { - sender.onCompleted(); + synchronized (sender) { + sender.onCompleted(); + } } catch (Throwable t) { logger.warn("StreamObserver Error onComplete {}", t.getMessage()); }