diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index cb94195cce1..c8c45c61f85 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -107,6 +107,8 @@ public void uncaughtException(Thread t, Throwable e) { */ private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean(); private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger(); + private final AtomicInteger inFlightSubStreams = new AtomicInteger(); + private Status savedCancellationReason; // Used for recording the share of buffer used for the current call out of the channel buffer. // This field would not be necessary if there is no channel buffer limit. @@ -220,7 +222,12 @@ private void commitAndRun(Substream winningSubstream) { } } + @Nullable // returns null when cancelled private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) { + // increment only when >= 0, i.e. not cancelled + if (inFlightSubStreams.updateAndGet(value -> value < 0 ? value : value + 1) < 0) { + return null; + } Substream sub = new Substream(previousAttemptCount); // one tracer per substream final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub); @@ -367,6 +374,9 @@ public final void start(ClientStreamListener listener) { } Substream substream = createSubstream(0, false); + if (substream == null) { + return; + } if (isHedging) { FutureCanceller scheduledHedgingRef = null; @@ -434,16 +444,19 @@ private final class HedgingRunnable implements Runnable { @Override public void run() { + // It's safe to read state.hedgingAttemptCount here. + // If this run is not cancelled, the value of state.hedgingAttemptCount won't change + // until state.addActiveHedge() is called subsequently, even the state could possibly + // change. + Substream newSubstream = createSubstream(state.hedgingAttemptCount, false); + if (newSubstream == null) { + return; + } callExecutor.execute( new Runnable() { @SuppressWarnings("GuardedBy") @Override public void run() { - // It's safe to read state.hedgingAttemptCount here. - // If this run is not cancelled, the value of state.hedgingAttemptCount won't change - // until state.addActiveHedge() is called subsequently, even the state could possibly - // change. - Substream newSubstream = createSubstream(state.hedgingAttemptCount, false); boolean cancelled = false; FutureCanceller future = null; @@ -489,16 +502,11 @@ public final void cancel(final Status reason) { Runnable runnable = commit(noopSubstream); if (runnable != null) { + savedCancellationReason = reason; runnable.run(); - listenerSerializeExecutor.execute( - new Runnable() { - @Override - public void run() { - isClosed = true; - masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata()); - - } - }); + if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) { + safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata()); + } return; } @@ -803,6 +811,17 @@ private void freezeHedging() { } } + private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) { + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + isClosed = true; + masterListener.closed(status, progress, metadata); + } + }); + } + private interface BufferEntry { /** Replays the buffer entry with the given stream. */ void runWith(Substream substream); @@ -840,19 +859,18 @@ public void closed( closedSubstreamsInsight.append(status.getCode()); } + if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) { + assert savedCancellationReason != null; + safeCloseMasterListener(savedCancellationReason, RpcProgress.PROCESSED, new Metadata()); + return; + } + // handle a race between buffer limit exceeded and closed, when setting // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream). if (substream.bufferLimitExceeded) { commitAndRun(substream); if (state.winningSubstream == substream) { - listenerSerializeExecutor.execute( - new Runnable() { - @Override - public void run() { - isClosed = true; - masterListener.closed(status, rpcProgress, trailers); - } - }); + safeCloseMasterListener(status, rpcProgress, trailers); } return; } @@ -863,14 +881,7 @@ public void run() { Status tooManyTransparentRetries = Status.INTERNAL .withDescription("Too many transparent retries. Might be a bug in gRPC") .withCause(status.asRuntimeException()); - listenerSerializeExecutor.execute( - new Runnable() { - @Override - public void run() { - isClosed = true; - masterListener.closed(tooManyTransparentRetries, rpcProgress, trailers); - } - }); + safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers); } return; } @@ -881,6 +892,9 @@ public void run() { && noMoreTransparentRetry.compareAndSet(false, true))) { // transparent retry final Substream newSubstream = createSubstream(substream.previousAttemptCount, true); + if (newSubstream == null) { + return; + } if (isHedging) { boolean commit = false; synchronized (lock) { @@ -942,6 +956,11 @@ public void run() { } else { RetryPlan retryPlan = makeRetryDecision(status, trailers); if (retryPlan.shouldRetry) { + // retry + Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false); + if (newSubstream == null) { + return; + } // The check state.winningSubstream == null, checking if is not already committed, is // racy, but is still safe b/c the retry will also handle committed/cancellation FutureCanceller scheduledRetryCopy; @@ -955,10 +974,6 @@ public void run() { new Runnable() { @Override public void run() { - // retry - Substream newSubstream = createSubstream( - substream.previousAttemptCount + 1, - false); drain(newSubstream); } }); @@ -978,14 +993,7 @@ public void run() { commitAndRun(substream); if (state.winningSubstream == substream) { - listenerSerializeExecutor.execute( - new Runnable() { - @Override - public void run() { - isClosed = true; - masterListener.closed(status, rpcProgress, trailers); - } - }); + safeCloseMasterListener(status, rpcProgress, trailers); } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index f20e772e92b..12bf697027c 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -283,6 +283,7 @@ public Void answer(InvocationOnMock in) { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); + inOrder.verify(retriableStreamRecorder).newSubstream(1); assertEquals(1, fakeClock.numPendingTasks()); // send more messages during backoff @@ -294,7 +295,6 @@ public Void answer(InvocationOnMock in) { assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); - inOrder.verify(retriableStreamRecorder).newSubstream(1); inOrder.verify(mockStream2).setAuthority(AUTHORITY); inOrder.verify(mockStream2).setCompressor(COMPRESSOR); inOrder.verify(mockStream2).setDecompressorRegistry(DECOMPRESSOR_REGISTRY); @@ -339,6 +339,7 @@ public Void answer(InvocationOnMock in) { doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2); sublistenerCaptor2.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); + inOrder.verify(retriableStreamRecorder).newSubstream(2); assertEquals(1, fakeClock.numPendingTasks()); // send more messages during backoff @@ -353,7 +354,6 @@ public Void answer(InvocationOnMock in) { assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); - inOrder.verify(retriableStreamRecorder).newSubstream(2); inOrder.verify(mockStream3).setAuthority(AUTHORITY); inOrder.verify(mockStream3).setCompressor(COMPRESSOR); inOrder.verify(mockStream3).setDecompressorRegistry(DECOMPRESSOR_REGISTRY); @@ -792,6 +792,8 @@ public boolean isReady() { public void cancelWhileDraining() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); ClientStream mockStream1 = mock(ClientStream.class); ClientStream mockStream2 = mock( @@ -818,7 +820,7 @@ public void request(int numMessages) { Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); - inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); + inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).request(3); inOrder.verify(retriableStreamRecorder).postCommit(); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); @@ -826,6 +828,7 @@ public void request(int numMessages) { assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); assertThat(statusCaptor.getValue().getDescription()) .isEqualTo("Stream thrown away because RetriableStream committed"); + sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata()); verify(masterListener).closed( statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); @@ -848,6 +851,8 @@ public void start(ClientStreamListener listener) { Status.CANCELLED.withDescription("cancelled while retry start")); } })); + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); @@ -860,13 +865,14 @@ public void start(ClientStreamListener listener) { Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); - inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); + inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(retriableStreamRecorder).postCommit(); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); inOrder.verify(mockStream2).cancel(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); assertThat(statusCaptor.getValue().getDescription()) .isEqualTo("Stream thrown away because RetriableStream committed"); + sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata()); verify(masterListener).closed( statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); @@ -1121,7 +1127,6 @@ public void perRpcBufferLimitExceededDuringBackoff() { sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - // bufferSizeTracer.outboundWireSize() quits immediately while backoff b/c substream1 is closed assertEquals(1, fakeClock.numPendingTasks()); bufferSizeTracer.outboundWireSize(2); verify(retriableStreamRecorder, never()).postCommit(); @@ -1132,8 +1137,6 @@ public void perRpcBufferLimitExceededDuringBackoff() { // bufferLimitExceeded bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); - verify(retriableStreamRecorder, never()).postCommit(); - bufferSizeTracer.outboundWireSize(2); verify(retriableStreamRecorder).postCommit(); verifyNoMoreInteractions(mockStream1); @@ -2464,6 +2467,8 @@ public void hedging_cancelled() { assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); inOrder.verify(retriableStreamRecorder).postCommit(); + sublistenerCaptor1.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata()); + sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata()); inOrder.verify(masterListener).closed( any(Status.class), any(RpcProgress.class), any(Metadata.class)); inOrder.verifyNoMoreInteractions(); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index 157fd79524e..eca563fb7c1 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -351,8 +351,8 @@ public void statsRecorded() throws Exception { call.request(1); assertInboundMessageRecorded(); assertInboundWireSizeRecorded(1); - assertRpcStatusRecorded(Status.Code.OK, 2000, 2); - assertRetryStatsRecorded(1, 0, 10_000); + assertRpcStatusRecorded(Status.Code.OK, 12000, 2); + assertRetryStatsRecorded(1, 0, 0); } @Test @@ -410,13 +410,14 @@ public void streamClosed(Status status) { serverCall.request(2); assertOutboundWireSizeRecorded(message.length()); fakeClock.forwardTime(7, SECONDS); - call.cancel("Cancelled before commit", null); // A noop substream will commit. - // The call listener is closed, but the netty substream listener is not yet closed. - verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); + // A noop substream will commit. But call is not yet closed. + call.cancel("Cancelled before commit", null); // Let the netty substream listener be closed. streamClosedLatch.countDown(); - assertRetryStatsRecorded(1, 0, 10_000); - assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1); + // The call listener is closed. + verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); + assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1); + assertRetryStatsRecorded(1, 0, 0); } @Test