diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index f301eee1f98..ba9424ea25c 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -149,11 +149,10 @@ public void uncaughtException(Thread t, Throwable e) { this.throttle = throttle; } - @SuppressWarnings("GuardedBy") + @SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok @Nullable // null if already committed @CheckReturnValue private Runnable commit(final Substream winningSubstream) { - synchronized (lock) { if (state.winningSubstream != null) { return null; @@ -165,10 +164,9 @@ private Runnable commit(final Substream winningSubstream) { // subtract the share of this RPC from channelBufferUsed. channelBufferUsed.addAndGet(-perRpcBufferUsed); + final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false; final Future retryFuture; if (scheduledRetry != null) { - // TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead - // found: 'this.lock' retryFuture = scheduledRetry.markCancelled(); scheduledRetry = null; } else { @@ -177,8 +175,6 @@ private Runnable commit(final Substream winningSubstream) { // cancel the scheduled hedging if it is scheduled prior to the commitment final Future hedgingFuture; if (scheduledHedging != null) { - // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead - // found: 'this.lock' hedgingFuture = scheduledHedging.markCancelled(); scheduledHedging = null; } else { @@ -196,7 +192,21 @@ public void run() { } if (retryFuture != null) { retryFuture.cancel(false); + if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) { + assert savedCloseMasterListenerReason != null; + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + isClosed = true; + masterListener.closed(savedCloseMasterListenerReason.status, + savedCloseMasterListenerReason.progress, + savedCloseMasterListenerReason.metadata); + } + }); + } } + if (hedgingFuture != null) { hedgingFuture.cancel(false); } @@ -415,7 +425,7 @@ public final void start(ClientStreamListener listener) { drain(substream); } - @SuppressWarnings("GuardedBy") + @SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok private void pushbackHedging(@Nullable Integer delayMillis) { if (delayMillis == null) { return; @@ -434,8 +444,6 @@ private void pushbackHedging(@Nullable Integer delayMillis) { return; } - // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead - // found: 'this.lock' futureToBeCancelled = scheduledHedging.markCancelled(); scheduledHedging = future = new FutureCanceller(lock); } @@ -469,16 +477,13 @@ public void run() { } callExecutor.execute( new Runnable() { - @SuppressWarnings("GuardedBy") + @SuppressWarnings("GuardedBy") //TODO(b/145386688) lock==ScheduledCancellor.lock so ok @Override public void run() { boolean cancelled = false; FutureCanceller future = null; synchronized (lock) { - // TODO(b/145386688): This access should be guarded by - // 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found: - // 'RetriableStream.this.lock' if (scheduledHedgingRef.isCancelled()) { cancelled = true; } else { @@ -810,13 +815,11 @@ private boolean hasPotentialHedging(State state) { && !state.hedgingFrozen; } - @SuppressWarnings("GuardedBy") + @SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok private void freezeHedging() { Future futureToBeCancelled = null; synchronized (lock) { if (scheduledHedging != null) { - // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead - // found: 'this.lock' futureToBeCancelled = scheduledHedging.markCancelled(); scheduledHedging = null; } @@ -999,9 +1002,19 @@ public void run() { synchronized (lock) { scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); } + class RetryBackoffRunnable implements Runnable { @Override + @SuppressWarnings("FutureReturnValueIgnored") public void run() { + synchronized (scheduledRetryCopy.lock) { + if (scheduledRetryCopy.isCancelled()) { + return; + } else { + scheduledRetryCopy.markCancelled(); + } + } + callExecutor.execute( new Runnable() { @Override @@ -1563,11 +1576,16 @@ private static final class FutureCanceller { } void setFuture(Future future) { + boolean wasCancelled; synchronized (lock) { - if (!cancelled) { + wasCancelled = cancelled; + if (!wasCancelled) { this.future = future; } } + if (wasCancelled) { + future.cancel(false); + } } @GuardedBy("lock") diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 3487ef02b46..21ec46668fc 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -705,6 +705,7 @@ public void retry_unretriableClosed_cancel() { // cancel retriableStream.cancel(Status.CANCELLED); inOrder.verify(retriableStreamRecorder, never()).postCommit(); + verify(masterListener, times(1)).closed(any(), any(), any()); } @Test @@ -733,6 +734,7 @@ public void retry_cancelWhileBackoff() { verifyNoMoreInteractions(mockStream1); verifyNoMoreInteractions(mockStream2); + verify(masterListener, times(1)).closed(any(), any(), any()); } @Test