From f1b7fc79ad3fd9006e430e48430331b360bb22e3 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 19 Dec 2023 12:14:03 -0500 Subject: [PATCH] feat: handle retry info so client respect the delay server sets (#2026) * feat: add a flag to add / remove routing cookie from callable chain --- google-cloud-bigtable/pom.xml | 5 +- .../data/v2/stub/EnhancedBigtableStub.java | 33 +- .../v2/stub/EnhancedBigtableStubSettings.java | 34 ++ .../mutaterows/MutateRowsAttemptCallable.java | 4 +- .../bigtable/gaxx/retrying/ApiExceptions.java | 34 ++ .../bigtable/gaxx/retrying/Callables.java | 37 +- .../retrying/RetryInfoRetryAlgorithm.java | 109 ++++ .../EnhancedBigtableStubSettingsTest.java | 55 +- .../bigtable/data/v2/stub/RetryInfoTest.java | 514 ++++++++++++++++++ 9 files changed, 780 insertions(+), 45 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiExceptions.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index e669a2006c..dda6694b65 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -161,7 +161,10 @@ grpc-alts runtime - + + org.checkerframework + checker-qual + com.google.http-client google-http-client diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index f1339d083e..a575aa8607 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -28,6 +28,7 @@ import com.google.api.gax.grpc.GrpcCallSettings; import com.google.api.gax.grpc.GrpcRawCallableFactory; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; import com.google.api.gax.retrying.RetryAlgorithm; import com.google.api.gax.retrying.RetryingExecutorWithContext; @@ -108,6 +109,7 @@ import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; +import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -762,11 +764,19 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(convertException); + BasicResultRetryAlgorithm resultRetryAlgorithm; + if (settings.getEnableRetryInfo()) { + resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>(); + } else { + resultRetryAlgorithm = new ApiResultRetryAlgorithm<>(); + } + RetryAlgorithm retryAlgorithm = new RetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + resultRetryAlgorithm, new ExponentialRetryAlgorithm( settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock())); + RetryingExecutorWithContext retryingExecutor = new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); @@ -1056,8 +1066,14 @@ public Map extract(PingAndWarmRequest request) { private UnaryCallable withRetries( UnaryCallable innerCallable, UnaryCallSettings unaryCallSettings) { - UnaryCallable retrying = - Callables.retrying(innerCallable, unaryCallSettings, clientContext); + UnaryCallable retrying; + if (settings.getEnableRetryInfo()) { + retrying = + com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( + innerCallable, unaryCallSettings, clientContext); + } else { + retrying = Callables.retrying(innerCallable, unaryCallSettings, clientContext); + } if (settings.getEnableRoutingCookie()) { return new CookiesUnaryCallable<>(retrying); } @@ -1067,8 +1083,15 @@ private UnaryCallable withRetries( private ServerStreamingCallable withRetries( ServerStreamingCallable innerCallable, ServerStreamingCallSettings serverStreamingCallSettings) { - ServerStreamingCallable retrying = - Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext); + + ServerStreamingCallable retrying; + if (settings.getEnableRetryInfo()) { + retrying = + com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( + innerCallable, serverStreamingCallSettings, clientContext); + } else { + retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext); + } if (settings.getEnableRoutingCookie()) { return new CookiesServerStreamingCallable<>(retrying); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 64f44bb52f..c9587964c8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -213,6 +213,7 @@ public class EnhancedBigtableStubSettings extends StubSettings primedTableIds; private final Map jwtAudienceMapping; private final boolean enableRoutingCookie; + private final boolean enableRetryInfo; private final ServerStreamingCallSettings readRowsSettings; private final UnaryCallSettings readRowSettings; @@ -255,6 +256,7 @@ private EnhancedBigtableStubSettings(Builder builder) { primedTableIds = builder.primedTableIds; jwtAudienceMapping = builder.jwtAudienceMapping; enableRoutingCookie = builder.enableRoutingCookie; + enableRetryInfo = builder.enableRetryInfo; // Per method settings. readRowsSettings = builder.readRowsSettings.build(); @@ -325,6 +327,15 @@ public boolean getEnableRoutingCookie() { return enableRoutingCookie; } + /** + * Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on server + * returned RetryInfo value. Otherwise, client uses {@link RetrySettings}. + */ + @BetaApi("RetryInfo is not currently stable and may change in the future") + public boolean getEnableRetryInfo() { + return enableRetryInfo; + } + /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { return BigtableStubSettings.defaultGrpcTransportProviderBuilder() @@ -608,6 +619,7 @@ public static class Builder extends StubSettings.Builder primedTableIds; private Map jwtAudienceMapping; private boolean enableRoutingCookie; + private boolean enableRetryInfo; private final ServerStreamingCallSettings.Builder readRowsSettings; private final UnaryCallSettings.Builder readRowSettings; @@ -641,6 +653,7 @@ private Builder() { jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING; setCredentialsProvider(defaultCredentialsProviderBuilder().build()); this.enableRoutingCookie = true; + this.enableRetryInfo = true; // Defaults provider BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder(); @@ -760,6 +773,7 @@ private Builder(EnhancedBigtableStubSettings settings) { primedTableIds = settings.primedTableIds; jwtAudienceMapping = settings.jwtAudienceMapping; enableRoutingCookie = settings.enableRoutingCookie; + enableRetryInfo = settings.enableRetryInfo; // Per method settings. readRowsSettings = settings.readRowsSettings.toBuilder(); @@ -927,6 +941,25 @@ public boolean getEnableRoutingCookie() { return enableRoutingCookie; } + /** + * Sets if RetryInfo is enabled. If true, client bases retry decision and back off time on + * server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}. + */ + @BetaApi("RetryInfo is not currently stable and may change in the future") + public Builder setEnableRetryInfo(boolean enableRetryInfo) { + this.enableRetryInfo = enableRetryInfo; + return this; + } + + /** + * Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on + * server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}. + */ + @BetaApi("RetryInfo is not currently stable and may change in the future") + public boolean getEnableRetryInfo() { + return enableRetryInfo; + } + /** Returns the builder for the settings used for calls to readRows. */ public ServerStreamingCallSettings.Builder readRowsSettings() { return readRowsSettings; @@ -1054,6 +1087,7 @@ public String toString() { .add("primedTableIds", primedTableIds) .add("jwtAudienceMapping", jwtAudienceMapping) .add("enableRoutingCookie", enableRoutingCookie) + .add("enableRetryInfo", enableRetryInfo) .add("readRowsSettings", readRowsSettings) .add("readRowSettings", readRowSettings) .add("sampleRowKeysSettings", sampleRowKeysSettings) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java index b049219a95..269ce79031 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -31,6 +31,7 @@ import com.google.bigtable.v2.MutateRowsResponse.Entry; import com.google.cloud.bigtable.data.v2.models.MutateRowsException; import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation; +import com.google.cloud.bigtable.gaxx.retrying.ApiExceptions; import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -235,7 +236,8 @@ private void handleAttemptError(Throwable rpcError) { FailedMutation failedMutation = FailedMutation.create(origIndex, entryError); allFailures.add(failedMutation); - if (!failedMutation.getError().isRetryable()) { + if (!ApiExceptions.isRetryable2(failedMutation.getError()) + && !failedMutation.getError().isRetryable()) { permanentFailures.add(failedMutation); } else { // Schedule the mutation entry for the next RPC by adding it to the request builder and diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiExceptions.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiExceptions.java new file mode 100644 index 0000000000..4e794fa41a --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiExceptions.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.retrying; + +import com.google.api.core.InternalApi; + +// TODO: move this to gax later +@InternalApi +public class ApiExceptions { + + private ApiExceptions() {} + + // TODO: this should replace the existing ApiException#isRetryable() method, + // but that cant be done in bigtable, so this lives here for now. + public static boolean isRetryable2(Throwable e) { + if (RetryInfoRetryAlgorithm.extractRetryDelay(e) != null) { + return true; + } + return false; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java index e62d371ac3..a78e7643b0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/Callables.java @@ -18,16 +18,13 @@ import com.google.api.core.InternalApi; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; import com.google.api.gax.retrying.RetryAlgorithm; -import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.ScheduledRetryingExecutor; import com.google.api.gax.retrying.StreamingRetryAlgorithm; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.ServerStreamingCallable; -import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.UnaryCallable; -import java.util.Collection; // TODO: remove this once ApiResultRetryAlgorithm is added to gax. /** @@ -48,23 +45,14 @@ public static UnaryCallable retrying( UnaryCallSettings settings = callSettings; - if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) { - // When retries are disabled, the total timeout can be treated as the rpc timeout. - settings = - settings - .toBuilder() - .setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout()) - .build(); - } - RetryAlgorithm retryAlgorithm = new RetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new RetryInfoRetryAlgorithm<>(), new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock())); - ScheduledRetryingExecutor retryingExecutor = + ScheduledRetryingExecutor executor = new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); - return new RetryingCallable<>( - clientContext.getDefaultCallContext(), innerCallable, retryingExecutor); + + return new RetryingCallable<>(clientContext.getDefaultCallContext(), innerCallable, executor); } public static ServerStreamingCallable retrying( @@ -73,18 +61,10 @@ public static ServerStreamingCallable ClientContext clientContext) { ServerStreamingCallSettings settings = callSettings; - if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) { - // When retries are disabled, the total timeout can be treated as the rpc timeout. - settings = - settings - .toBuilder() - .setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout()) - .build(); - } StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new RetryInfoRetryAlgorithm<>(), new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock())); ScheduledRetryingExecutor retryingExecutor = @@ -93,11 +73,4 @@ public static ServerStreamingCallable return new RetryingServerStreamingCallable<>( innerCallable, retryingExecutor, settings.getResumptionStrategy()); } - - private static boolean areRetriesDisabled( - Collection retryableCodes, RetrySettings retrySettings) { - return retrySettings.getMaxAttempts() == 1 - || retryableCodes.isEmpty() - || (retrySettings.getMaxAttempts() == 0 && retrySettings.getTotalTimeout().isZero()); - } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java new file mode 100644 index 0000000000..71457f7e9a --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java @@ -0,0 +1,109 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.retrying; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; +import com.google.api.gax.retrying.RetryingContext; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.ApiException; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.util.Durations; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.protobuf.ProtoUtils; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.threeten.bp.Duration; + +// TODO move this algorithm to gax +/** + * This retry algorithm checks the metadata of an exception for additional error details. If the + * metadata has a RetryInfo field, use the retry delay to set the wait time between attempts. + */ +@InternalApi +public class RetryInfoRetryAlgorithm extends BasicResultRetryAlgorithm { + + @VisibleForTesting + public static final Metadata.Key RETRY_INFO_KEY = + ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()); + + @Override + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { + Duration retryDelay = extractRetryDelay(prevThrowable); + if (retryDelay != null) { + return prevSettings + .toBuilder() + .setRandomizedRetryDelay(retryDelay) + .setAttemptCount(prevSettings.getAttemptCount() + 1) + .build(); + } + return null; + } + + /** Returns true if previousThrowable is an {@link ApiException} that is retryable. */ + @Override + public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) { + return shouldRetry(null, previousThrowable, previousResponse); + } + + /** + * If {@link RetryingContext#getRetryableCodes()} is not null: Returns true if the status code of + * previousThrowable is in the list of retryable code of the {@link RetryingContext}. + * + *

Otherwise it returns the result of {@link #shouldRetry(Throwable, Object)}. + */ + @Override + public boolean shouldRetry( + @Nullable RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) { + if (extractRetryDelay(previousThrowable) != null) { + // First check if server wants us to retry + return true; + } + if (context != null && context.getRetryableCodes() != null) { + // Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list + // of codes that should be retried. + return ((previousThrowable instanceof ApiException) + && context + .getRetryableCodes() + .contains(((ApiException) previousThrowable).getStatusCode().getCode())); + } + // Server didn't have retry information and there's no retry context, use the local status + // code config. + return previousThrowable instanceof ApiException + && ((ApiException) previousThrowable).isRetryable(); + } + + @Nullable + static Duration extractRetryDelay(@Nullable Throwable throwable) { + if (throwable == null) { + return null; + } + Metadata trailers = Status.trailersFromThrowable(throwable); + if (trailers == null) { + return null; + } + RetryInfo retryInfo = trailers.get(RETRY_INFO_KEY); + if (retryInfo == null) { + return null; + } + if (!retryInfo.hasRetryDelay()) { + return null; + } + return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay())); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index 2c05ca4ee8..22b4aed612 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -78,6 +78,7 @@ public void settingsAreNotLostTest() { WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class); Duration watchdogInterval = Duration.ofSeconds(12); boolean enableRoutingCookie = false; + boolean enableRetryInfo = false; EnhancedBigtableStubSettings.Builder builder = EnhancedBigtableStubSettings.newBuilder() @@ -89,7 +90,8 @@ public void settingsAreNotLostTest() { .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) .setStreamWatchdogCheckInterval(watchdogInterval) - .setEnableRoutingCookie(enableRoutingCookie); + .setEnableRoutingCookie(enableRoutingCookie) + .setEnableRetryInfo(enableRetryInfo); verifyBuilder( builder, @@ -101,7 +103,8 @@ public void settingsAreNotLostTest() { credentialsProvider, watchdogProvider, watchdogInterval, - enableRoutingCookie); + enableRoutingCookie, + enableRetryInfo); verifySettings( builder.build(), projectId, @@ -112,7 +115,8 @@ public void settingsAreNotLostTest() { credentialsProvider, watchdogProvider, watchdogInterval, - enableRoutingCookie); + enableRoutingCookie, + enableRetryInfo); verifyBuilder( builder.build().toBuilder(), projectId, @@ -123,7 +127,8 @@ public void settingsAreNotLostTest() { credentialsProvider, watchdogProvider, watchdogInterval, - enableRoutingCookie); + enableRoutingCookie, + enableRetryInfo); } private void verifyBuilder( @@ -136,7 +141,8 @@ private void verifyBuilder( CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, Duration watchdogInterval, - boolean enableRoutingCookie) { + boolean enableRoutingCookie, + boolean enableRetryInfo) { assertThat(builder.getProjectId()).isEqualTo(projectId); assertThat(builder.getInstanceId()).isEqualTo(instanceId); assertThat(builder.getAppProfileId()).isEqualTo(appProfileId); @@ -146,6 +152,7 @@ private void verifyBuilder( assertThat(builder.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); assertThat(builder.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); assertThat(builder.getEnableRoutingCookie()).isEqualTo(enableRoutingCookie); + assertThat(builder.getEnableRetryInfo()).isEqualTo(enableRetryInfo); } private void verifySettings( @@ -158,7 +165,8 @@ private void verifySettings( CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, Duration watchdogInterval, - boolean enableRoutingCookie) { + boolean enableRoutingCookie, + boolean enableRetryInfo) { assertThat(settings.getProjectId()).isEqualTo(projectId); assertThat(settings.getInstanceId()).isEqualTo(instanceId); assertThat(settings.getAppProfileId()).isEqualTo(appProfileId); @@ -168,6 +176,7 @@ private void verifySettings( assertThat(settings.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); assertThat(settings.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); assertThat(settings.getEnableRoutingCookie()).isEqualTo(enableRoutingCookie); + assertThat(settings.getEnableRetryInfo()).isEqualTo(enableRetryInfo); } @Test @@ -801,11 +810,27 @@ public void routingCookieIsEnabled() throws IOException { .setProjectId(dummyProjectId) .setInstanceId(dummyInstanceId) .setCredentialsProvider(credentialsProvider); + assertThat(builder.getEnableRoutingCookie()).isTrue(); assertThat(builder.build().getEnableRoutingCookie()).isTrue(); assertThat(builder.build().toBuilder().getEnableRoutingCookie()).isTrue(); } + public void enableRetryInfoDefaultValueTest() throws IOException { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + Mockito.when(credentialsProvider.getCredentials()).thenReturn(new FakeCredentials()); + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId) + .setCredentialsProvider(credentialsProvider); + assertThat(builder.getEnableRetryInfo()).isTrue(); + assertThat(builder.build().getEnableRetryInfo()).isTrue(); + assertThat(builder.build().toBuilder().getEnableRetryInfo()).isTrue(); + } + @Test public void routingCookieFalseValueSet() throws IOException { String dummyProjectId = "my-project"; @@ -823,6 +848,23 @@ public void routingCookieFalseValueSet() throws IOException { assertThat(builder.build().toBuilder().getEnableRoutingCookie()).isFalse(); } + @Test + public void enableRetryInfoFalseValueTest() throws IOException { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + Mockito.when(credentialsProvider.getCredentials()).thenReturn(new FakeCredentials()); + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId) + .setEnableRetryInfo(false) + .setCredentialsProvider(credentialsProvider); + assertThat(builder.getEnableRetryInfo()).isFalse(); + assertThat(builder.build().getEnableRetryInfo()).isFalse(); + assertThat(builder.build().toBuilder().getEnableRetryInfo()).isFalse(); + } + static final String[] SETTINGS_LIST = { "projectId", "instanceId", @@ -831,6 +873,7 @@ public void routingCookieFalseValueSet() throws IOException { "primedTableIds", "jwtAudienceMapping", "enableRoutingCookie", + "enableRetryInfo", "readRowsSettings", "readRowSettings", "sampleRowKeysSettings", diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java new file mode 100644 index 0000000000..b38e53480c --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java @@ -0,0 +1,514 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm.RETRY_INFO_KEY; +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.InternalException; +import com.google.api.gax.rpc.UnavailableException; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.CheckAndMutateRowRequest; +import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest; +import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.ReadChangeStreamRequest; +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.bigtable.v2.ReadModifyWriteRowRequest; +import com.google.bigtable.v2.ReadModifyWriteRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.SampleRowKeysRequest; +import com.google.bigtable.v2.SampleRowKeysResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; +import com.google.cloud.bigtable.data.v2.models.Filters; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import com.google.cloud.bigtable.data.v2.models.Mutation; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery; +import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Queues; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import java.io.IOException; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RetryInfoTest { + + @Rule public GrpcServerRule serverRule = new GrpcServerRule(); + + private FakeBigtableService service; + private BigtableDataClient client; + private BigtableDataSettings.Builder settings; + + private AtomicInteger attemptCounter = new AtomicInteger(); + private com.google.protobuf.Duration delay = + com.google.protobuf.Duration.newBuilder().setSeconds(1).setNanos(0).build(); + + @Before + public void setUp() throws IOException { + service = new FakeBigtableService(); + serverRule.getServiceRegistry().addService(service); + + settings = + BigtableDataSettings.newBuilder() + .setProjectId("fake-project") + .setInstanceId("fake-instance") + .setCredentialsProvider(NoCredentialsProvider.create()); + + settings + .stubSettings() + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create(serverRule.getChannel()))) + // channel priming doesn't work with FixedTransportChannelProvider. Disable it for the test + .setRefreshingChannel(false) + .build(); + + this.client = BigtableDataClient.create(settings.build()); + } + + @Test + public void testReadRow() { + verifyRetryInfoIsUsed(() -> client.readRow("table", "row"), true); + } + + @Test + public void testReadRowNonRetryableErrorWithRetryInfo() { + verifyRetryInfoIsUsed(() -> client.readRow("table", "row"), false); + } + + @Test + public void testReadRowDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient newClient = BigtableDataClient.create(settings.build())) { + verifyRetryInfoCanBeDisabled(() -> newClient.readRow("table", "row")); + } + } + + @Test + public void testReadRows() { + verifyRetryInfoIsUsed(() -> client.readRows(Query.create("table")).iterator().hasNext(), true); + } + + @Test + public void testReadRowsNonRetraybleErrorWithRetryInfo() { + verifyRetryInfoIsUsed(() -> client.readRows(Query.create("table")).iterator().hasNext(), false); + } + + @Test + public void testReadRowsDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient newClient = BigtableDataClient.create(settings.build())) { + verifyRetryInfoCanBeDisabled( + () -> newClient.readRows(Query.create("table")).iterator().hasNext()); + } + } + + @Test + public void testMutateRows() { + verifyRetryInfoIsUsed( + () -> + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))), + true); + } + + @Test + public void testMutateRowsNonRetryableErrorWithRetryInfo() { + verifyRetryInfoIsUsed( + () -> + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))), + false); + } + + @Test + public void testMutateRowsDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient newClient = BigtableDataClient.create(settings.build())) { + verifyRetryInfoCanBeDisabled( + () -> + newClient.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v")))); + } + } + + @Test + public void testMutateRow() { + verifyRetryInfoIsUsed( + () -> client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v")), true); + } + + @Test + public void testMutateRowNonRetryableErrorWithRetryInfo() { + verifyRetryInfoIsUsed( + () -> client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v")), false); + } + + @Test + public void testMutateRowDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient newClient = BigtableDataClient.create(settings.build())) { + + verifyRetryInfoCanBeDisabled( + () -> newClient.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v"))); + } + } + + @Test + public void testSampleRowKeys() { + verifyRetryInfoIsUsed(() -> client.sampleRowKeys("table"), true); + } + + @Test + public void testSampleRowKeysNonRetryableErrorWithRetryInfo() { + verifyRetryInfoIsUsed(() -> client.sampleRowKeys("table"), false); + } + + @Test + public void testSampleRowKeysDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient newClient = BigtableDataClient.create(settings.build())) { + verifyRetryInfoCanBeDisabled(() -> newClient.sampleRowKeys("table")); + } + } + + @Test + public void testCheckAndMutateRow() { + verifyRetryInfoIsUsed( + () -> + client.checkAndMutateRow( + ConditionalRowMutation.create("table", "key") + .condition(Filters.FILTERS.value().regex("old-value")) + .then(Mutation.create().setCell("cf", "q", "v"))), + true); + } + + @Test + public void testCheckAndMutateDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + ApiException exception = enqueueNonRetryableExceptionWithDelay(delay); + try { + client.checkAndMutateRow( + ConditionalRowMutation.create("table", "key") + .condition(Filters.FILTERS.value().regex("old-value")) + .then(Mutation.create().setCell("cf", "q", "v"))); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + + @Test + public void testReadModifyWrite() { + verifyRetryInfoIsUsed( + () -> + client.readModifyWriteRow( + ReadModifyWriteRow.create("table", "row").append("cf", "q", "v")), + true); + } + + @Test + public void testReadModifyWriteDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + ApiException exception = enqueueNonRetryableExceptionWithDelay(delay); + try { + client.readModifyWriteRow(ReadModifyWriteRow.create("table", "row").append("cf", "q", "v")); + } catch (ApiException e) { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + } + + @Test + public void testReadChangeStream() { + verifyRetryInfoIsUsed( + () -> client.readChangeStream(ReadChangeStreamQuery.create("table")).iterator().hasNext(), + true); + } + + @Test + public void testReadChangeStreamNonRetryableErrorWithRetryInfo() { + verifyRetryInfoIsUsed( + () -> client.readChangeStream(ReadChangeStreamQuery.create("table")).iterator().hasNext(), + false); + } + + @Test + public void testReadChangeStreamDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient newClient = BigtableDataClient.create(settings.build())) { + verifyRetryInfoCanBeDisabled( + () -> + newClient + .readChangeStream(ReadChangeStreamQuery.create("table")) + .iterator() + .hasNext()); + } + } + + @Test + public void testGenerateInitialChangeStreamPartition() { + verifyRetryInfoIsUsed( + () -> client.generateInitialChangeStreamPartitions("table").iterator().hasNext(), true); + } + + @Test + public void testGenerateInitialChangeStreamPartitionNonRetryableError() { + verifyRetryInfoIsUsed( + () -> client.generateInitialChangeStreamPartitions("table").iterator().hasNext(), false); + } + + @Test + public void testGenerateInitialChangeStreamPartitionDisableRetryInfo() throws IOException { + settings.stubSettings().setEnableRetryInfo(false); + + try (BigtableDataClient newClient = BigtableDataClient.create(settings.build())) { + verifyRetryInfoCanBeDisabled( + () -> newClient.generateInitialChangeStreamPartitions("table").iterator().hasNext()); + } + } + + private void verifyRetryInfoIsUsed(Runnable runnable, boolean retryableError) { + if (retryableError) { + enqueueRetryableExceptionWithDelay(delay); + } else { + enqueueNonRetryableExceptionWithDelay(delay); + } + Stopwatch stopwatch = Stopwatch.createStarted(); + runnable.run(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(Duration.ofSeconds(delay.getSeconds())); + } + + private void verifyRetryInfoCanBeDisabled(Runnable runnable) { + enqueueRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + runnable.run(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isLessThan(Duration.ofSeconds(delay.getSeconds())); + + attemptCounter.set(0); + ApiException exception = enqueueNonRetryableExceptionWithDelay(delay); + try { + runnable.run(); + } catch (ApiException e) { + if (e instanceof MutateRowsException) { + assertThat(((MutateRowsException) e).getFailedMutations().get(0).getError().getStatusCode()) + .isEqualTo(exception.getStatusCode()); + } else { + assertThat(e.getStatusCode()).isEqualTo(exception.getStatusCode()); + } + } + assertThat(attemptCounter.get()).isEqualTo(1); + } + + private void enqueueRetryableExceptionWithDelay(com.google.protobuf.Duration delay) { + Metadata trailers = new Metadata(); + RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build(); + trailers.put(RETRY_INFO_KEY, retryInfo); + + ApiException exception = + new UnavailableException( + new StatusRuntimeException(Status.UNAVAILABLE, trailers), + GrpcStatusCode.of(Status.Code.UNAVAILABLE), + true); + + service.expectations.add(exception); + } + + private ApiException enqueueNonRetryableExceptionWithDelay(com.google.protobuf.Duration delay) { + Metadata trailers = new Metadata(); + RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build(); + trailers.put(RETRY_INFO_KEY, retryInfo); + + ApiException exception = + new InternalException( + new StatusRuntimeException(Status.INTERNAL, trailers), + GrpcStatusCode.of(Status.Code.INTERNAL), + false); + + service.expectations.add(exception); + + return exception; + } + + private class FakeBigtableService extends BigtableGrpc.BigtableImplBase { + Queue expectations = Queues.newArrayDeque(); + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(ReadRowsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void mutateRow( + MutateRowRequest request, StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(MutateRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void mutateRows( + MutateRowsRequest request, StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void sampleRowKeys( + SampleRowKeysRequest request, StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(SampleRowKeysResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void checkAndMutateRow( + CheckAndMutateRowRequest request, + StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(CheckAndMutateRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void readModifyWriteRow( + ReadModifyWriteRowRequest request, + StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(ReadModifyWriteRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void generateInitialChangeStreamPartitions( + GenerateInitialChangeStreamPartitionsRequest request, + StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(GenerateInitialChangeStreamPartitionsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void readChangeStream( + ReadChangeStreamRequest request, + StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext( + ReadChangeStreamResponse.newBuilder() + .setCloseStream(ReadChangeStreamResponse.CloseStream.getDefaultInstance()) + .build()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + } +}