Skip to content

Commit

Permalink
feat: Background refresh stops on unrecoverable Admin API response (#403
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ttosta-google authored Feb 5, 2024
1 parent 15dde74 commit 107f80c
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 24 deletions.
13 changes: 6 additions & 7 deletions alloydb-jdbc-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@
<artifactId>google-auth-library-credentials</artifactId>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
<version>${grpc.version}</version>
</dependency>

<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down Expand Up @@ -187,13 +193,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
<version>${grpc.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-inprocess</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.security.KeyPair;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

class DefaultConnectionInfoRepository implements ConnectionInfoRepository, Closeable {
Expand All @@ -42,6 +45,8 @@ class DefaultConnectionInfoRepository implements ConnectionInfoRepository, Close
private static final String OPENSSL_PUBLIC_KEY_END = "-----END RSA PUBLIC KEY-----";
private static final String X_509 = "X.509";
private static final int PEM_LINE_LENGTH = 64;
private static final List<Code> TERMINAL_STATUS_CODES =
Arrays.asList(Code.NOT_FOUND, Code.PERMISSION_DENIED, Code.INVALID_ARGUMENT);
private final ListeningScheduledExecutorService executor;
private final AlloyDBAdminClient alloyDBAdminClient;

Expand Down Expand Up @@ -94,7 +99,11 @@ public void close() {

private com.google.cloud.alloydb.v1alpha.ConnectionInfo getConnectionInfo(
InstanceName instanceName) {
return alloyDBAdminClient.getConnectionInfo(instanceName);
try {
return alloyDBAdminClient.getConnectionInfo(instanceName);
} catch (Exception e) {
throw handleException(e);
}
}

private GenerateClientCertificateResponse getGenerateClientCertificateResponse(
Expand All @@ -107,7 +116,24 @@ private GenerateClientCertificateResponse getGenerateClientCertificateResponse(
.setUseMetadataExchange(true)
.build();

return alloyDBAdminClient.generateClientCertificate(request);
try {
return alloyDBAdminClient.generateClientCertificate(request);
} catch (Exception e) {
throw handleException(e);
}
}

private RuntimeException handleException(Exception e) {
Status status = Status.fromThrowable(e);
String message =
String.format(
"AlloyDB Admin API failed to return the connection info. Reason: %s", e.getMessage());

if (TERMINAL_STATUS_CODES.contains(status.getCode())) {
return new TerminalException(message, e);
}

return new RuntimeException(message, e);
}

private String getParent(InstanceName instanceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,14 @@ private ListenableFuture<ConnectionInfo> handleRefreshResult(
}

} catch (ExecutionException | InterruptedException e) {

// No refresh retry when the TerminalException is raised.
final Throwable cause = e.getCause();
if (cause instanceof TerminalException) {
logger.info(String.format("[%s] Refresh Operation: Failed! No retry.", name), e);
throw (TerminalException) cause;
}

logger.info(
String.format(
"[%s] Refresh Operation: Failed! Starting next refresh operation immediately.", name),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.alloydb;

class TerminalException extends RuntimeException {
public TerminalException() {
super();
}

public TerminalException(String message) {
super(message);
}

public TerminalException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class ConnectorTest {
private static final String PRIVATE_IP = "127.0.0.2";
private static final String SERVER_MESSAGE = "HELLO";
private static final String ERROR_MESSAGE_NOT_FOUND = "Resource 'instance' was not found";
private static final String ERROR_MESSAGE_PERMISSION_DENIED =
"Location not found or access is unauthorized.";
private static final String ERROR_MESSAGE_INTERNAL = "Internal Error";

ListeningScheduledExecutorService defaultExecutor;

Expand Down Expand Up @@ -82,7 +85,7 @@ public void create_successfulPrivateConnection()
}

@Test
public void create_throwsException_notFound()
public void create_throwsTerminalException_notFound()
throws IOException, NoSuchAlgorithmException, InvalidKeySpecException {
MockAlloyDBAdminGrpc mock =
new MockAlloyDBAdminGrpc(Code.NOT_FOUND.getNumber(), ERROR_MESSAGE_NOT_FOUND);
Expand All @@ -91,11 +94,39 @@ public void create_throwsException_notFound()
new ConnectionConfig.Builder().withInstanceName(InstanceName.parse(INSTANCE_NAME)).build();
Connector connector = newConnector(config.getConnectorConfig(), mock);

RuntimeException ex = assertThrows(RuntimeException.class, () -> connector.connect(config));

TerminalException ex = assertThrows(TerminalException.class, () -> connector.connect(config));
assertThat(ex).hasMessageThat().contains(ERROR_MESSAGE_NOT_FOUND);
}

@Test
public void create_throwsTerminalException_notAuthorized()
throws IOException, NoSuchAlgorithmException, InvalidKeySpecException {
MockAlloyDBAdminGrpc mock =
new MockAlloyDBAdminGrpc(
Code.PERMISSION_DENIED.getNumber(), ERROR_MESSAGE_PERMISSION_DENIED);

ConnectionConfig config =
new ConnectionConfig.Builder().withInstanceName(InstanceName.parse(INSTANCE_NAME)).build();
Connector connector = newConnector(config.getConnectorConfig(), mock);

TerminalException ex = assertThrows(TerminalException.class, () -> connector.connect(config));
assertThat(ex).hasMessageThat().contains(ERROR_MESSAGE_PERMISSION_DENIED);
}

@Test
public void create_throwsNonTerminalException_internalError()
throws IOException, NoSuchAlgorithmException, InvalidKeySpecException {
MockAlloyDBAdminGrpc mock =
new MockAlloyDBAdminGrpc(Code.INTERNAL.getNumber(), ERROR_MESSAGE_INTERNAL);

ConnectionConfig config =
new ConnectionConfig.Builder().withInstanceName(InstanceName.parse(INSTANCE_NAME)).build();
Connector connector = newConnector(config.getConnectorConfig(), mock);

RuntimeException ex = assertThrows(RuntimeException.class, () -> connector.connect(config));
assertThat(ex).hasMessageThat().contains(ERROR_MESSAGE_INTERNAL);
}

private Connector newConnector(ConnectorConfig config, MockAlloyDBAdminGrpc mock)
throws NoSuchAlgorithmException, InvalidKeySpecException {
CredentialFactoryProvider stubCredentialFactoryProvider =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ public void testDataRetrievedSuccessfully() {
ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS));
Refresher r =
new Refresher(
"testcase", executorService, () -> Futures.immediateFuture(data), rateLimiter);
"RefresherTest.testDataRetrievedSuccessfully",
executorService,
() -> Futures.immediateFuture(data),
rateLimiter);
ConnectionInfo gotInfo = r.getConnectionInfo(TEST_TIMEOUT_MS);
assertThat(gotInfo).isSameInstanceAs(data);
}
Expand All @@ -82,7 +85,11 @@ public void testRateLimiterInUse() {
ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS));
SpyRateLimiter rl = new SpyRateLimiter(10);
Refresher r =
new Refresher("testcase", executorService, () -> Futures.immediateFuture(data), rl);
new Refresher(
"RefresherTest.testRateLimiterInUse",
executorService,
() -> Futures.immediateFuture(data),
rl);
ConnectionInfo gotInfo = r.getConnectionInfo(TEST_TIMEOUT_MS);
assertThat(gotInfo).isSameInstanceAs(data);
assertThat(rl.counter).isNotEqualTo(0);
Expand All @@ -92,7 +99,7 @@ public void testRateLimiterInUse() {
public void testInstanceFailsOnConnectionError() {
Refresher r =
new Refresher(
"testcase",
"RefresherTest.testInstanceFailsOnConnectionError",
executorService,
() -> Futures.immediateFailedFuture(new RuntimeException("always fails")),
rateLimiter);
Expand All @@ -107,7 +114,7 @@ public void testInstanceFailsOnTooLongToRetrieve() {
ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS));
Refresher r =
new Refresher(
"testcase",
"RefresherTest.testInstanceFailsOnTooLongToRetrieve",
executorService,
() -> {
cond.pause();
Expand All @@ -126,7 +133,7 @@ public void testForcesRefresh() throws Exception {
final PauseCondition cond = new PauseCondition();
Refresher r =
new Refresher(
"testcase",
"RefresherTest.testForcesRefresh",
executorService,
() -> {
int c = refreshCount.get();
Expand Down Expand Up @@ -168,7 +175,7 @@ public void testRetriesOnInitialFailures() throws Exception {

Refresher r =
new Refresher(
"testcase",
"RefresherTest.testRetriesOnInitialFailures",
executorService,
() -> {
int c = refreshCount.get();
Expand Down Expand Up @@ -199,7 +206,7 @@ public void testRefreshesExpiredData() throws Exception {

Refresher r =
new Refresher(
"testcase",
"RefresherTest.testRefreshesExpiredData",
executorService,
() -> {
int c = refreshCount.get();
Expand Down Expand Up @@ -253,7 +260,7 @@ public void testThatForceRefreshBalksWhenAScheduledRefreshIsInProgress() throws

Refresher r =
new Refresher(
"testcase",
"RefresherTest.testThatForceRefreshBalksWhenAScheduledRefreshIsInProgress",
executorService,
() -> {
int c = refreshCount.get();
Expand Down Expand Up @@ -310,7 +317,7 @@ public void testThatForceRefreshBalksWhenAForceRefreshIsInProgress() throws Exce

Refresher r =
new Refresher(
"testcase",
"RefresherTest.testThatForceRefreshBalksWhenAForceRefreshIsInProgress",
executorService,
() -> {
int c = refreshCount.get();
Expand Down Expand Up @@ -363,7 +370,7 @@ public void testRefreshRetriesOnAfterFailedAttempts() throws Exception {

Refresher r =
new Refresher(
"testcase",
"RefresherTest.testRefreshRetriesOnAfterFailedAttempts",
executorService,
() -> {
int c = refreshCount.get();
Expand Down Expand Up @@ -424,7 +431,10 @@ public void testClosedInstanceDataThrowsException() {
ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS));
Refresher r =
new Refresher(
"testcase", executorService, () -> Futures.immediateFuture(data), rateLimiter);
"RefresherTest.testClosedInstanceDataThrowsException",
executorService,
() -> Futures.immediateFuture(data),
rateLimiter);
r.close();

assertThrows(IllegalStateException.class, () -> r.getConnectionInfo(TEST_TIMEOUT_MS));
Expand All @@ -440,7 +450,7 @@ public void testClosedInstanceDataStopsRefreshTasks() throws Exception {

Refresher r =
new Refresher(
"testcase",
"RefresherTest.testClosedInstanceDataStopsRefreshTasks",
executorService,
() -> {
int c = refreshCount.get();
Expand Down Expand Up @@ -510,6 +520,66 @@ public void testRefreshesTokenIfExpired() throws Exception {
assertThat(refreshCount.get()).isEqualTo(2);
}

@Test
public void testGetConnectionInfo_throwsTerminalException_refreshOperationNotScheduled() {
ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS));
AtomicInteger refreshCount = new AtomicInteger();

Refresher r =
new Refresher(
"RefresherTest.testGetConnectionInfo_throwsTerminalException_refreshOperationNotScheduled",
executorService,
() -> {
int c = refreshCount.get();
ExampleData refreshResult = data;
switch (c) {
case 0:
// refresh 0 should throw an exception
refreshCount.incrementAndGet();
throw new TerminalException("Not authorized");
}
// refresh 2 and on should return data immediately
refreshCount.incrementAndGet();
return Futures.immediateFuture(refreshResult);
},
rateLimiter);

// Raising TerminalException stops the refresher's executor from running the next task.
assertThrows(TerminalException.class, () -> r.getConnectionInfo(TEST_TIMEOUT_MS));
assertThat(refreshCount.get()).isEqualTo(1);
}

@Test
public void testGetConnectionInfo_throwsRuntimeException_refreshOperationScheduled()
throws Exception {
ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS));
AtomicInteger refreshCount = new AtomicInteger();
final PauseCondition refresh1 = new PauseCondition();

Refresher r =
new Refresher(
"RefresherTest.testGetConnectionInfo_throwsRuntimeException_refreshOperationScheduled",
executorService,
() -> {
int c = refreshCount.get();
ExampleData refreshResult = data;
switch (c) {
case 0:
// refresh 0 should throw an exception
refreshCount.incrementAndGet();
throw new RuntimeException("Bad Gateway");
}
// refresh 2 and on should return data immediately
refreshCount.incrementAndGet();
return Futures.immediateFuture(refreshResult);
},
rateLimiter);

// getConnectionInfo again, and assert the refresh operation completed.
refresh1.waitForCondition(() -> r.getConnectionInfo(TEST_TIMEOUT_MS) == data, 1000L);
assertThat(refreshCount.get()).isEqualTo(2);
}

private static class ExampleData extends ConnectionInfo {

private final Instant expiration;
Expand Down

0 comments on commit 107f80c

Please sign in to comment.