Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: fix RejectedExecutionException in Retriable Stream #9626

Merged
merged 4 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public void uncaughtException(Thread t, Throwable e) {
*/
private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
private final AtomicInteger inFlightSubStreams = new AtomicInteger();
private Status savedCancellationReason;

// Used for recording the share of buffer used for the current call out of the channel buffer.
// This field would not be necessary if there is no channel buffer limit.
Expand Down Expand Up @@ -366,6 +368,9 @@ public final void start(ClientStreamListener listener) {
state.buffer.add(new StartEntry());
}

if (inFlightSubStreams.incrementAndGet() < 0) {
return;
}
Substream substream = createSubstream(0, false);
if (isHedging) {
FutureCanceller scheduledHedgingRef = null;
Expand Down Expand Up @@ -434,6 +439,9 @@ private final class HedgingRunnable implements Runnable {

@Override
public void run() {
if (inFlightSubStreams.incrementAndGet() < 0) {
return;
}
callExecutor.execute(
new Runnable() {
@SuppressWarnings("GuardedBy")
Expand Down Expand Up @@ -489,16 +497,18 @@ public final void cancel(final Status reason) {
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
savedCancellationReason = reason;
runnable.run();
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());

}
});
if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
}
});
}
return;
}

Expand Down Expand Up @@ -840,6 +850,20 @@ public void closed(
closedSubstreamsInsight.append(status.getCode());
}

if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
assert savedCancellationReason != null;
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(savedCancellationReason, RpcProgress.PROCESSED,
new Metadata());
}
});
return;
}

// handle a race between buffer limit exceeded and closed, when setting
// substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
if (substream.bufferLimitExceeded) {
Expand Down Expand Up @@ -880,6 +904,9 @@ public void run() {
|| (rpcProgress == RpcProgress.REFUSED
&& noMoreTransparentRetry.compareAndSet(false, true))) {
// transparent retry
if (inFlightSubStreams.incrementAndGet() < 0) {
return;
}
final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
if (isHedging) {
boolean commit = false;
Expand Down Expand Up @@ -942,6 +969,9 @@ public void run() {
} else {
RetryPlan retryPlan = makeRetryDecision(status, trailers);
if (retryPlan.shouldRetry) {
if (inFlightSubStreams.incrementAndGet() < 0) {
return;
}
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
FutureCanceller scheduledRetryCopy;
Expand Down
12 changes: 10 additions & 2 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,8 @@ public boolean isReady() {
public void cancelWhileDraining() {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 =
mock(
Expand All @@ -818,14 +820,15 @@ public void request(int numMessages) {
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);

inOrder.verify(mockStream2).start(any(ClientStreamListener.class));
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).request(3);
inOrder.verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
inOrder.verify(mockStream2).cancel(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Stream thrown away because RetriableStream committed");
sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
verify(masterListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
Expand All @@ -848,6 +851,8 @@ public void start(ClientStreamListener listener) {
Status.CANCELLED.withDescription("cancelled while retry start"));
}
}));
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);

InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
Expand All @@ -860,13 +865,14 @@ public void start(ClientStreamListener listener) {
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);

inOrder.verify(mockStream2).start(any(ClientStreamListener.class));
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
inOrder.verify(mockStream2).cancel(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Stream thrown away because RetriableStream committed");
sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
verify(masterListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
Expand Down Expand Up @@ -2464,6 +2470,8 @@ public void hedging_cancelled() {
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());

inOrder.verify(retriableStreamRecorder).postCommit();
sublistenerCaptor1.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
inOrder.verify(masterListener).closed(
any(Status.class), any(RpcProgress.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,14 @@ public void streamClosed(Status status) {
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
fakeClock.forwardTime(7, SECONDS);
call.cancel("Cancelled before commit", null); // A noop substream will commit.
// The call listener is closed, but the netty substream listener is not yet closed.
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
// A noop substream will commit. But call is not yet closed.
call.cancel("Cancelled before commit", null);
// Let the netty substream listener be closed.
streamClosedLatch.countDown();
assertRetryStatsRecorded(1, 0, 10_000);
// The call listener is closed.
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1);
assertRetryStatsRecorded(1, 0, 10_000);
}

@Test
Expand Down