Skip to content

Commit

Permalink
xds: Unconditionally apply backoff on LRS stream recreation
Browse files Browse the repository at this point in the history
This would limit LRS stream creation to one per second, even if the
old stream was considered good as it received a response. This is the
same change as made to ADS in 9570791.

b/224833499
  • Loading branch information
ejona86 committed Apr 7, 2022
1 parent 054cb49 commit 569b7b0
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
14 changes: 5 additions & 9 deletions xds/src/main/java/io/grpc/xds/LoadReportClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,20 +259,16 @@ private void handleStreamClosed(Status status) {
closed = true;
cleanUp();

long delayNanos = 0;
if (initialResponseReceived || lrsRpcRetryPolicy == null) {
// Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
// has never been initialized.
lrsRpcRetryPolicy = backoffPolicyProvider.get();
}
// Backoff only when balancer wasn't working previously.
if (!initialResponseReceived) {
// The back-off policy determines the interval between consecutive RPC upstarts, thus the
// actual delay may be smaller than the value from the back-off policy, or even negative,
// depending how much time was spent in the previous RPC.
delayNanos =
lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
}
// The back-off policy determines the interval between consecutive RPC upstarts, thus the
// actual delay may be smaller than the value from the back-off policy, or even negative,
// depending how much time was spent in the previous RPC.
long delayNanos =
lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
if (delayNanos <= 0) {
startLrsRpc();
Expand Down
18 changes: 13 additions & 5 deletions xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,15 @@ public void lrsStreamClosedAndRetried() {
// Then breaks the RPC
responseObserver.onError(Status.UNAVAILABLE.asException());

// Will reset the retry sequence and retry immediately, because balancer has responded.
// Will reset the retry sequence retry after a delay. We want to always delay, to restrict any
// accidental closed loop of retries to 1 QPS.
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(backoffPolicy2).nextBackoffNanos();
// Fast-forward to a moment before the retry of backoff sequence 2 (2s)
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(2) - 1);
verifyNoMoreInteractions(mockLoadReportingService);
// Then time for retry
fakeClock.forwardNanos(1);
inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
Expand All @@ -446,12 +453,12 @@ public void lrsStreamClosedAndRetried() {
fakeClock.forwardNanos(4);
responseObserver.onError(Status.UNAVAILABLE.asException());

// Will be on the first retry (2s) of backoff sequence 2.
// Will be on the second retry (20s) of backoff sequence 2.
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertEquals(1, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));

// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(2) - 4 - 1);
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(20) - 4 - 1);
verifyNoMoreInteractions(mockLoadReportingService);
// Then time for retry
fakeClock.forwardNanos(1);
Expand All @@ -471,7 +478,8 @@ public void lrsStreamClosedAndRetried() {
ClusterStats clusterStats = Iterables.getOnlyElement(request.getClusterStatsList());
assertThat(clusterStats.getClusterName()).isEqualTo(CLUSTER1);
assertThat(clusterStats.getClusterServiceName()).isEqualTo(EDS_SERVICE_NAME1);
assertThat(Durations.toSeconds(clusterStats.getLoadReportInterval())).isEqualTo(1L + 10L + 2L);
assertThat(Durations.toSeconds(clusterStats.getLoadReportInterval()))
.isEqualTo(1L + 10L + 2L + 20L);
assertThat(Iterables.getOnlyElement(clusterStats.getDroppedRequestsList()).getCategory())
.isEqualTo("lb");
assertThat(Iterables.getOnlyElement(clusterStats.getDroppedRequestsList()).getDroppedCount())
Expand All @@ -490,7 +498,7 @@ public void lrsStreamClosedAndRetried() {
// Wrapping up
verify(backoffPolicyProvider, times(2)).get();
verify(backoffPolicy1, times(2)).nextBackoffNanos();
verify(backoffPolicy2, times(1)).nextBackoffNanos();
verify(backoffPolicy2, times(2)).nextBackoffNanos();
}

@Test
Expand Down

0 comments on commit 569b7b0

Please sign in to comment.