diff --git a/alloydb-jdbc-connector/pom.xml b/alloydb-jdbc-connector/pom.xml index bf072883..503aed29 100644 --- a/alloydb-jdbc-connector/pom.xml +++ b/alloydb-jdbc-connector/pom.xml @@ -129,6 +129,12 @@ google-auth-library-credentials + + io.grpc + grpc-api + ${grpc.version} + + org.slf4j @@ -187,13 +193,6 @@ test - - io.grpc - grpc-api - ${grpc.version} - test - - io.grpc grpc-inprocess diff --git a/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/DefaultConnectionInfoRepository.java b/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/DefaultConnectionInfoRepository.java index 3c950dec..4f1c53e0 100644 --- a/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/DefaultConnectionInfoRepository.java +++ b/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/DefaultConnectionInfoRepository.java @@ -27,6 +27,8 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; +import io.grpc.Status; +import io.grpc.Status.Code; import java.io.ByteArrayInputStream; import java.io.Closeable; import java.security.KeyPair; @@ -34,6 +36,7 @@ import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; class DefaultConnectionInfoRepository implements ConnectionInfoRepository, Closeable { @@ -42,6 +45,8 @@ class DefaultConnectionInfoRepository implements ConnectionInfoRepository, Close private static final String OPENSSL_PUBLIC_KEY_END = "-----END RSA PUBLIC KEY-----"; private static final String X_509 = "X.509"; private static final int PEM_LINE_LENGTH = 64; + private static final List TERMINAL_STATUS_CODES = + Arrays.asList(Code.NOT_FOUND, Code.PERMISSION_DENIED, Code.INVALID_ARGUMENT); private final ListeningScheduledExecutorService executor; private final AlloyDBAdminClient alloyDBAdminClient; @@ -94,7 +99,11 @@ public void close() { private com.google.cloud.alloydb.v1alpha.ConnectionInfo getConnectionInfo( InstanceName instanceName) { - return alloyDBAdminClient.getConnectionInfo(instanceName); + try { + return alloyDBAdminClient.getConnectionInfo(instanceName); + } catch (Exception e) { + throw handleException(e); + } } private GenerateClientCertificateResponse getGenerateClientCertificateResponse( @@ -107,7 +116,24 @@ private GenerateClientCertificateResponse getGenerateClientCertificateResponse( .setUseMetadataExchange(true) .build(); - return alloyDBAdminClient.generateClientCertificate(request); + try { + return alloyDBAdminClient.generateClientCertificate(request); + } catch (Exception e) { + throw handleException(e); + } + } + + private RuntimeException handleException(Exception e) { + Status status = Status.fromThrowable(e); + String message = + String.format( + "AlloyDB Admin API failed to return the connection info. Reason: %s", e.getMessage()); + + if (TERMINAL_STATUS_CODES.contains(status.getCode())) { + return new TerminalException(message, e); + } + + return new RuntimeException(message, e); } private String getParent(InstanceName instanceName) { diff --git a/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/Refresher.java b/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/Refresher.java index d1acf3da..a0dd8c40 100644 --- a/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/Refresher.java +++ b/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/Refresher.java @@ -257,6 +257,14 @@ private ListenableFuture handleRefreshResult( } } catch (ExecutionException | InterruptedException e) { + + // No refresh retry when the TerminalException is raised. + final Throwable cause = e.getCause(); + if (cause instanceof TerminalException) { + logger.info(String.format("[%s] Refresh Operation: Failed! No retry.", name), e); + throw (TerminalException) cause; + } + logger.info( String.format( "[%s] Refresh Operation: Failed! Starting next refresh operation immediately.", name), diff --git a/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/TerminalException.java b/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/TerminalException.java new file mode 100644 index 00000000..3cd70993 --- /dev/null +++ b/alloydb-jdbc-connector/src/main/java/com/google/cloud/alloydb/TerminalException.java @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.alloydb; + +class TerminalException extends RuntimeException { + public TerminalException() { + super(); + } + + public TerminalException(String message) { + super(message); + } + + public TerminalException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/alloydb-jdbc-connector/src/test/java/com/google/cloud/alloydb/ConnectorTest.java b/alloydb-jdbc-connector/src/test/java/com/google/cloud/alloydb/ConnectorTest.java index f98e7c81..b9f0c0cd 100644 --- a/alloydb-jdbc-connector/src/test/java/com/google/cloud/alloydb/ConnectorTest.java +++ b/alloydb-jdbc-connector/src/test/java/com/google/cloud/alloydb/ConnectorTest.java @@ -51,6 +51,9 @@ public class ConnectorTest { private static final String PRIVATE_IP = "127.0.0.2"; private static final String SERVER_MESSAGE = "HELLO"; private static final String ERROR_MESSAGE_NOT_FOUND = "Resource 'instance' was not found"; + private static final String ERROR_MESSAGE_PERMISSION_DENIED = + "Location not found or access is unauthorized."; + private static final String ERROR_MESSAGE_INTERNAL = "Internal Error"; ListeningScheduledExecutorService defaultExecutor; @@ -82,7 +85,7 @@ public void create_successfulPrivateConnection() } @Test - public void create_throwsException_notFound() + public void create_throwsTerminalException_notFound() throws IOException, NoSuchAlgorithmException, InvalidKeySpecException { MockAlloyDBAdminGrpc mock = new MockAlloyDBAdminGrpc(Code.NOT_FOUND.getNumber(), ERROR_MESSAGE_NOT_FOUND); @@ -91,11 +94,39 @@ public void create_throwsException_notFound() new ConnectionConfig.Builder().withInstanceName(InstanceName.parse(INSTANCE_NAME)).build(); Connector connector = newConnector(config.getConnectorConfig(), mock); - RuntimeException ex = assertThrows(RuntimeException.class, () -> connector.connect(config)); - + TerminalException ex = assertThrows(TerminalException.class, () -> connector.connect(config)); assertThat(ex).hasMessageThat().contains(ERROR_MESSAGE_NOT_FOUND); } + @Test + public void create_throwsTerminalException_notAuthorized() + throws IOException, NoSuchAlgorithmException, InvalidKeySpecException { + MockAlloyDBAdminGrpc mock = + new MockAlloyDBAdminGrpc( + Code.PERMISSION_DENIED.getNumber(), ERROR_MESSAGE_PERMISSION_DENIED); + + ConnectionConfig config = + new ConnectionConfig.Builder().withInstanceName(InstanceName.parse(INSTANCE_NAME)).build(); + Connector connector = newConnector(config.getConnectorConfig(), mock); + + TerminalException ex = assertThrows(TerminalException.class, () -> connector.connect(config)); + assertThat(ex).hasMessageThat().contains(ERROR_MESSAGE_PERMISSION_DENIED); + } + + @Test + public void create_throwsNonTerminalException_internalError() + throws IOException, NoSuchAlgorithmException, InvalidKeySpecException { + MockAlloyDBAdminGrpc mock = + new MockAlloyDBAdminGrpc(Code.INTERNAL.getNumber(), ERROR_MESSAGE_INTERNAL); + + ConnectionConfig config = + new ConnectionConfig.Builder().withInstanceName(InstanceName.parse(INSTANCE_NAME)).build(); + Connector connector = newConnector(config.getConnectorConfig(), mock); + + RuntimeException ex = assertThrows(RuntimeException.class, () -> connector.connect(config)); + assertThat(ex).hasMessageThat().contains(ERROR_MESSAGE_INTERNAL); + } + private Connector newConnector(ConnectorConfig config, MockAlloyDBAdminGrpc mock) throws NoSuchAlgorithmException, InvalidKeySpecException { CredentialFactoryProvider stubCredentialFactoryProvider = diff --git a/alloydb-jdbc-connector/src/test/java/com/google/cloud/alloydb/RefresherTest.java b/alloydb-jdbc-connector/src/test/java/com/google/cloud/alloydb/RefresherTest.java index ab8d339d..be692ddc 100644 --- a/alloydb-jdbc-connector/src/test/java/com/google/cloud/alloydb/RefresherTest.java +++ b/alloydb-jdbc-connector/src/test/java/com/google/cloud/alloydb/RefresherTest.java @@ -58,7 +58,10 @@ public void testDataRetrievedSuccessfully() { ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS)); Refresher r = new Refresher( - "testcase", executorService, () -> Futures.immediateFuture(data), rateLimiter); + "RefresherTest.testDataRetrievedSuccessfully", + executorService, + () -> Futures.immediateFuture(data), + rateLimiter); ConnectionInfo gotInfo = r.getConnectionInfo(TEST_TIMEOUT_MS); assertThat(gotInfo).isSameInstanceAs(data); } @@ -82,7 +85,11 @@ public void testRateLimiterInUse() { ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS)); SpyRateLimiter rl = new SpyRateLimiter(10); Refresher r = - new Refresher("testcase", executorService, () -> Futures.immediateFuture(data), rl); + new Refresher( + "RefresherTest.testRateLimiterInUse", + executorService, + () -> Futures.immediateFuture(data), + rl); ConnectionInfo gotInfo = r.getConnectionInfo(TEST_TIMEOUT_MS); assertThat(gotInfo).isSameInstanceAs(data); assertThat(rl.counter).isNotEqualTo(0); @@ -92,7 +99,7 @@ public void testRateLimiterInUse() { public void testInstanceFailsOnConnectionError() { Refresher r = new Refresher( - "testcase", + "RefresherTest.testInstanceFailsOnConnectionError", executorService, () -> Futures.immediateFailedFuture(new RuntimeException("always fails")), rateLimiter); @@ -107,7 +114,7 @@ public void testInstanceFailsOnTooLongToRetrieve() { ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS)); Refresher r = new Refresher( - "testcase", + "RefresherTest.testInstanceFailsOnTooLongToRetrieve", executorService, () -> { cond.pause(); @@ -126,7 +133,7 @@ public void testForcesRefresh() throws Exception { final PauseCondition cond = new PauseCondition(); Refresher r = new Refresher( - "testcase", + "RefresherTest.testForcesRefresh", executorService, () -> { int c = refreshCount.get(); @@ -168,7 +175,7 @@ public void testRetriesOnInitialFailures() throws Exception { Refresher r = new Refresher( - "testcase", + "RefresherTest.testRetriesOnInitialFailures", executorService, () -> { int c = refreshCount.get(); @@ -199,7 +206,7 @@ public void testRefreshesExpiredData() throws Exception { Refresher r = new Refresher( - "testcase", + "RefresherTest.testRefreshesExpiredData", executorService, () -> { int c = refreshCount.get(); @@ -253,7 +260,7 @@ public void testThatForceRefreshBalksWhenAScheduledRefreshIsInProgress() throws Refresher r = new Refresher( - "testcase", + "RefresherTest.testThatForceRefreshBalksWhenAScheduledRefreshIsInProgress", executorService, () -> { int c = refreshCount.get(); @@ -310,7 +317,7 @@ public void testThatForceRefreshBalksWhenAForceRefreshIsInProgress() throws Exce Refresher r = new Refresher( - "testcase", + "RefresherTest.testThatForceRefreshBalksWhenAForceRefreshIsInProgress", executorService, () -> { int c = refreshCount.get(); @@ -363,7 +370,7 @@ public void testRefreshRetriesOnAfterFailedAttempts() throws Exception { Refresher r = new Refresher( - "testcase", + "RefresherTest.testRefreshRetriesOnAfterFailedAttempts", executorService, () -> { int c = refreshCount.get(); @@ -424,7 +431,10 @@ public void testClosedInstanceDataThrowsException() { ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS)); Refresher r = new Refresher( - "testcase", executorService, () -> Futures.immediateFuture(data), rateLimiter); + "RefresherTest.testClosedInstanceDataThrowsException", + executorService, + () -> Futures.immediateFuture(data), + rateLimiter); r.close(); assertThrows(IllegalStateException.class, () -> r.getConnectionInfo(TEST_TIMEOUT_MS)); @@ -440,7 +450,7 @@ public void testClosedInstanceDataStopsRefreshTasks() throws Exception { Refresher r = new Refresher( - "testcase", + "RefresherTest.testClosedInstanceDataStopsRefreshTasks", executorService, () -> { int c = refreshCount.get(); @@ -510,6 +520,66 @@ public void testRefreshesTokenIfExpired() throws Exception { assertThat(refreshCount.get()).isEqualTo(2); } + @Test + public void testGetConnectionInfo_throwsTerminalException_refreshOperationNotScheduled() { + ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS)); + AtomicInteger refreshCount = new AtomicInteger(); + + Refresher r = + new Refresher( + "RefresherTest.testGetConnectionInfo_throwsTerminalException_refreshOperationNotScheduled", + executorService, + () -> { + int c = refreshCount.get(); + ExampleData refreshResult = data; + switch (c) { + case 0: + // refresh 0 should throw an exception + refreshCount.incrementAndGet(); + throw new TerminalException("Not authorized"); + } + // refresh 2 and on should return data immediately + refreshCount.incrementAndGet(); + return Futures.immediateFuture(refreshResult); + }, + rateLimiter); + + // Raising TerminalException stops the refresher's executor from running the next task. + assertThrows(TerminalException.class, () -> r.getConnectionInfo(TEST_TIMEOUT_MS)); + assertThat(refreshCount.get()).isEqualTo(1); + } + + @Test + public void testGetConnectionInfo_throwsRuntimeException_refreshOperationScheduled() + throws Exception { + ExampleData data = new ExampleData(Instant.now().plus(1, ChronoUnit.HOURS)); + AtomicInteger refreshCount = new AtomicInteger(); + final PauseCondition refresh1 = new PauseCondition(); + + Refresher r = + new Refresher( + "RefresherTest.testGetConnectionInfo_throwsRuntimeException_refreshOperationScheduled", + executorService, + () -> { + int c = refreshCount.get(); + ExampleData refreshResult = data; + switch (c) { + case 0: + // refresh 0 should throw an exception + refreshCount.incrementAndGet(); + throw new RuntimeException("Bad Gateway"); + } + // refresh 2 and on should return data immediately + refreshCount.incrementAndGet(); + return Futures.immediateFuture(refreshResult); + }, + rateLimiter); + + // getConnectionInfo again, and assert the refresh operation completed. + refresh1.waitForCondition(() -> r.getConnectionInfo(TEST_TIMEOUT_MS) == data, 1000L); + assertThat(refreshCount.get()).isEqualTo(2); + } + private static class ExampleData extends ConnectionInfo { private final Instant expiration;