Skip to content

Commit

Permalink
[Issue apache#718] add synchronized calls for grpc streamObserver
Browse files Browse the repository at this point in the history
  • Loading branch information
jinrongluo authored and xwm1992 committed Feb 16, 2022
1 parent 651850f commit c22bd1e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.push;

import io.grpc.stub.StreamObserver;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.RandomUtils;
Expand Down Expand Up @@ -54,7 +55,10 @@ public void tryPushRequest() {
.build();
try {
// catch the error and retry, don't use eventEmitter.onNext() to hide the error
eventEmitter.getEmitter().onNext(simpleMessage);
StreamObserver<SimpleMessage> emitter = eventEmitter.getEmitter();
synchronized (emitter) {
emitter.onNext(simpleMessage);
}

long cost = System.currentTimeMillis() - lastPushTime;
messageLogger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@ public EventEmitter(StreamObserver<T> emitter) {
this.emitter = emitter;
}

public void onNext(T event) {
public synchronized void onNext(T event) {
try {
emitter.onNext(event);
} catch (Throwable t) {
logger.warn("StreamObserver Error onNext. {}", t.getMessage());
}
}

public void onCompleted() {
public synchronized void onCompleted() {
try {
emitter.onCompleted();
} catch (Throwable t) {
logger.warn("StreamObserver Error onCompleted. {}", t.getMessage());
}
}

public void onError(Throwable t) {
public synchronized void onError(Throwable t) {
try {
emitter.onError(t);
} catch (Throwable t1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,19 @@ public void close() {

private void senderOnNext(Subscription subscription) {
try {
sender.onNext(subscription);
synchronized (sender) {
sender.onNext(subscription);
}
} catch (Throwable t) {
logger.warn("StreamObserver Error onNext {}", t.getMessage());
}
}

private void senderOnComplete() {
try {
sender.onCompleted();
synchronized (sender) {
sender.onCompleted();
}
} catch (Throwable t) {
logger.warn("StreamObserver Error onComplete {}", t.getMessage());
}
Expand Down

0 comments on commit c22bd1e

Please sign in to comment.