Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: fix RejectedExecutionException in Retriable Stream #9626

Merged
merged 4 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 52 additions & 42 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public void uncaughtException(Thread t, Throwable e) {
*/
private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
private final AtomicInteger inFlightSubStreams = new AtomicInteger();
private Status savedCancellationReason;

// Used for recording the share of buffer used for the current call out of the channel buffer.
// This field would not be necessary if there is no channel buffer limit.
Expand Down Expand Up @@ -220,7 +222,12 @@ private void commitAndRun(Substream winningSubstream) {
}
}

@Nullable // returns null when cancelled
private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
// increment only when >= 0, i.e. not cancelled
if (inFlightSubStreams.updateAndGet(value -> value < 0 ? value : value + 1) < 0) {
return null;
}
Substream sub = new Substream(previousAttemptCount);
// one tracer per substream
final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
Expand Down Expand Up @@ -367,6 +374,9 @@ public final void start(ClientStreamListener listener) {
}

Substream substream = createSubstream(0, false);
if (substream == null) {
return;
}
if (isHedging) {
FutureCanceller scheduledHedgingRef = null;

Expand Down Expand Up @@ -434,16 +444,19 @@ private final class HedgingRunnable implements Runnable {

@Override
public void run() {
// It's safe to read state.hedgingAttemptCount here.
// If this run is not cancelled, the value of state.hedgingAttemptCount won't change
// until state.addActiveHedge() is called subsequently, even the state could possibly
// change.
Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
if (newSubstream == null) {
return;
}
callExecutor.execute(
new Runnable() {
@SuppressWarnings("GuardedBy")
@Override
public void run() {
// It's safe to read state.hedgingAttemptCount here.
// If this run is not cancelled, the value of state.hedgingAttemptCount won't change
// until state.addActiveHedge() is called subsequently, even the state could possibly
// change.
Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
boolean cancelled = false;
FutureCanceller future = null;

Expand Down Expand Up @@ -489,16 +502,11 @@ public final void cancel(final Status reason) {
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
savedCancellationReason = reason;
runnable.run();
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());

}
});
if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
}
return;
}

Expand Down Expand Up @@ -808,6 +816,17 @@ private interface BufferEntry {
void runWith(Substream substream);
}

private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, progress, metadata);
}
});
}

private final class Sublistener implements ClientStreamListener {
final Substream substream;

Expand Down Expand Up @@ -840,19 +859,18 @@ public void closed(
closedSubstreamsInsight.append(status.getCode());
}

if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
assert savedCancellationReason != null;
safeCloseMasterListener(savedCancellationReason, RpcProgress.PROCESSED, new Metadata());
return;
}

// handle a race between buffer limit exceeded and closed, when setting
// substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
if (substream.bufferLimitExceeded) {
commitAndRun(substream);
if (state.winningSubstream == substream) {
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
safeCloseMasterListener(status, rpcProgress, trailers);
}
return;
}
Expand All @@ -863,14 +881,7 @@ public void run() {
Status tooManyTransparentRetries = Status.INTERNAL
.withDescription("Too many transparent retries. Might be a bug in gRPC")
.withCause(status.asRuntimeException());
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(tooManyTransparentRetries, rpcProgress, trailers);
}
});
safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
}
return;
}
Expand All @@ -881,6 +892,9 @@ public void run() {
&& noMoreTransparentRetry.compareAndSet(false, true))) {
// transparent retry
final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
if (newSubstream == null) {
return;
}
if (isHedging) {
boolean commit = false;
synchronized (lock) {
Expand Down Expand Up @@ -942,6 +956,13 @@ public void run() {
} else {
RetryPlan retryPlan = makeRetryDecision(status, trailers);
if (retryPlan.shouldRetry) {
// retry
Substream newSubstream = createSubstream(
substream.previousAttemptCount + 1,
false);
if (newSubstream == null) {
return;
}
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
FutureCanceller scheduledRetryCopy;
Expand All @@ -955,10 +976,6 @@ public void run() {
new Runnable() {
@Override
public void run() {
// retry
Substream newSubstream = createSubstream(
substream.previousAttemptCount + 1,
false);
drain(newSubstream);
}
});
Expand All @@ -978,14 +995,7 @@ public void run() {

commitAndRun(substream);
if (state.winningSubstream == substream) {
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
safeCloseMasterListener(status, rpcProgress, trailers);
}
}

Expand Down
19 changes: 12 additions & 7 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public Void answer(InvocationOnMock in) {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
inOrder.verify(retriableStreamRecorder).newSubstream(1);
assertEquals(1, fakeClock.numPendingTasks());

// send more messages during backoff
Expand All @@ -294,7 +295,6 @@ public Void answer(InvocationOnMock in) {
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
inOrder.verify(retriableStreamRecorder).newSubstream(1);
inOrder.verify(mockStream2).setAuthority(AUTHORITY);
inOrder.verify(mockStream2).setCompressor(COMPRESSOR);
inOrder.verify(mockStream2).setDecompressorRegistry(DECOMPRESSOR_REGISTRY);
Expand Down Expand Up @@ -339,6 +339,7 @@ public Void answer(InvocationOnMock in) {
doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2);
sublistenerCaptor2.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
inOrder.verify(retriableStreamRecorder).newSubstream(2);
assertEquals(1, fakeClock.numPendingTasks());

// send more messages during backoff
Expand All @@ -353,7 +354,6 @@ public Void answer(InvocationOnMock in) {
assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks());
inOrder.verify(retriableStreamRecorder).newSubstream(2);
inOrder.verify(mockStream3).setAuthority(AUTHORITY);
inOrder.verify(mockStream3).setCompressor(COMPRESSOR);
inOrder.verify(mockStream3).setDecompressorRegistry(DECOMPRESSOR_REGISTRY);
Expand Down Expand Up @@ -792,6 +792,8 @@ public boolean isReady() {
public void cancelWhileDraining() {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 =
mock(
Expand All @@ -818,14 +820,15 @@ public void request(int numMessages) {
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);

inOrder.verify(mockStream2).start(any(ClientStreamListener.class));
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).request(3);
inOrder.verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
inOrder.verify(mockStream2).cancel(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Stream thrown away because RetriableStream committed");
sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
verify(masterListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
Expand All @@ -848,6 +851,8 @@ public void start(ClientStreamListener listener) {
Status.CANCELLED.withDescription("cancelled while retry start"));
}
}));
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);

InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
Expand All @@ -860,13 +865,14 @@ public void start(ClientStreamListener listener) {
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);

inOrder.verify(mockStream2).start(any(ClientStreamListener.class));
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
inOrder.verify(mockStream2).cancel(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Stream thrown away because RetriableStream committed");
sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
verify(masterListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
Expand Down Expand Up @@ -1121,7 +1127,6 @@ public void perRpcBufferLimitExceededDuringBackoff() {
sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());

// bufferSizeTracer.outboundWireSize() quits immediately while backoff b/c substream1 is closed
assertEquals(1, fakeClock.numPendingTasks());
bufferSizeTracer.outboundWireSize(2);
verify(retriableStreamRecorder, never()).postCommit();
Expand All @@ -1132,8 +1137,6 @@ public void perRpcBufferLimitExceededDuringBackoff() {

// bufferLimitExceeded
bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
verify(retriableStreamRecorder, never()).postCommit();
bufferSizeTracer.outboundWireSize(2);
verify(retriableStreamRecorder).postCommit();

verifyNoMoreInteractions(mockStream1);
Expand Down Expand Up @@ -2464,6 +2467,8 @@ public void hedging_cancelled() {
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());

inOrder.verify(retriableStreamRecorder).postCommit();
sublistenerCaptor1.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
inOrder.verify(masterListener).closed(
any(Status.class), any(RpcProgress.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ public void statsRecorded() throws Exception {
call.request(1);
assertInboundMessageRecorded();
assertInboundWireSizeRecorded(1);
assertRpcStatusRecorded(Status.Code.OK, 2000, 2);
assertRetryStatsRecorded(1, 0, 10_000);
assertRpcStatusRecorded(Status.Code.OK, 12000, 2);
assertRetryStatsRecorded(1, 0, 0);
}

@Test
Expand Down Expand Up @@ -410,13 +410,14 @@ public void streamClosed(Status status) {
serverCall.request(2);
assertOutboundWireSizeRecorded(message.length());
fakeClock.forwardTime(7, SECONDS);
call.cancel("Cancelled before commit", null); // A noop substream will commit.
// The call listener is closed, but the netty substream listener is not yet closed.
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
// A noop substream will commit. But call is not yet closed.
call.cancel("Cancelled before commit", null);
// Let the netty substream listener be closed.
streamClosedLatch.countDown();
assertRetryStatsRecorded(1, 0, 10_000);
assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1);
// The call listener is closed.
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1);
assertRetryStatsRecorded(1, 0, 0);
}

@Test
Expand Down