From 8f96175623767a0c53eb6ee1faf5674382662b1a Mon Sep 17 00:00:00 2001 From: Mike Jensen Date: Wed, 6 Mar 2019 20:38:13 -0700 Subject: [PATCH] DelegatingAuroraConnection: Delay connect failures till connection is used This changes DelegatingAuroraConnection so that as long as at least one connection works, failures to connect wont propagate out of the constructor. Instead the invalid connection will appear valid until it is attempted to be used, at which point an exception will be thrown with the original failure as the cause. This is to improve the ability to use the driver and establish new connections while the aurora cluster is partially unhealthy. Without this Hikari (or other pools) may end up fully removing the connections from the pool as they appear to be invalid / unhealthy, and be also unable to replace them with new connections, even though the aurora cluster is partially usable. This does NOT delay errors when starting up the AuroraClusterMonitor, so starting up a service and needing to monitor a cluster that is partially unhealthy still wont work. This is better than lazily establishing the connections because we don't want the connection establish delay to occur at the time of the operation / request (rather have the overhead while the pool is filling / replenishing). --- .../org/threadly/db/ErrorSqlConnection.java | 345 ++++++++++++++++++ .../db/aurora/AuroraClusterMonitor.java | 9 +- .../db/aurora/DelegatingAuroraConnection.java | 99 +++-- .../threadly/db/ErrorSqlConnectionTest.java | 251 +++++++++++++ .../threadly/db/aurora/DriverLocalDbTest.java | 21 ++ 5 files changed, 702 insertions(+), 23 deletions(-) create mode 100644 arcCommon/src/main/java/org/threadly/db/ErrorSqlConnection.java create mode 100644 arcCommon/src/test/java/org/threadly/db/ErrorSqlConnectionTest.java diff --git a/arcCommon/src/main/java/org/threadly/db/ErrorSqlConnection.java b/arcCommon/src/main/java/org/threadly/db/ErrorSqlConnection.java new file mode 100644 index 0000000..4fd5c26 --- /dev/null +++ b/arcCommon/src/main/java/org/threadly/db/ErrorSqlConnection.java @@ -0,0 +1,345 @@ +package org.threadly.db; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +import org.threadly.concurrent.DoNothingRunnable; +import org.threadly.util.ArgumentVerifier; + +/** + * Implementation of {@link Connection} which is perpetually in a state of error. Any operation on + * this connection will result in an exception being thrown. + *

+ * The connection will appear valid (from {@link #isValid(int)} and non-closed UNTIL the exception + * is thrown. After that point the connection will appear as if it was closed. + * + * @since 0.9 + */ +public class ErrorSqlConnection implements Connection { + private final Runnable errorThrownListener; + private final SQLException sqlError; + private final RuntimeException runtimeError; + private volatile boolean closed = false; + private volatile boolean errorThrown = false; + + public ErrorSqlConnection(Runnable errorThrownListener, SQLException error) { + ArgumentVerifier.assertNotNull(error, "error"); + + if (errorThrownListener == null) { + errorThrownListener = DoNothingRunnable.instance(); + } + + this.errorThrownListener = errorThrownListener; + this.sqlError = error; + this.runtimeError = null; + } + + public ErrorSqlConnection(Runnable errorThrownListener, RuntimeException error) { + ArgumentVerifier.assertNotNull(error, "error"); + + if (errorThrownListener == null) { + errorThrownListener = DoNothingRunnable.instance(); + } + + this.errorThrownListener = errorThrownListener; + this.sqlError = null; + this.runtimeError = error; + } + + protected SQLException error() throws SQLException { + errorThrown = true; + errorThrownListener.run(); + + if (sqlError != null) { + throw new SQLException(sqlError); + } else { + throw new SQLException(runtimeError); + } + } + + @Override + public void close() { + closed = true; + } + + @Override + public boolean isClosed() { + return errorThrown || closed; + } + + @Override + public boolean isValid(int timeout) { + return ! isClosed(); + } + + @Override + public void setClientInfo(Properties arg0) throws SQLClientInfoException { + // ignored + } + + @Override + public void setClientInfo(String arg0, String arg1) throws SQLClientInfoException { + // ignored + } + + @Override + public boolean isWrapperFor(Class arg0) throws SQLException { + throw error(); + } + + @Override + public T unwrap(Class arg0) throws SQLException { + throw error(); + } + + @Override + public void abort(Executor arg0) throws SQLException { + throw error(); + } + + @Override + public void clearWarnings() throws SQLException { + throw error(); + } + + @Override + public void commit() throws SQLException { + throw error(); + } + + @Override + public Array createArrayOf(String arg0, Object[] arg1) throws SQLException { + throw error(); + } + + @Override + public Blob createBlob() throws SQLException { + throw error(); + } + + @Override + public Clob createClob() throws SQLException { + throw error(); + } + + @Override + public NClob createNClob() throws SQLException { + throw error(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + throw error(); + } + + @Override + public Statement createStatement() throws SQLException { + throw error(); + } + + @Override + public Statement createStatement(int arg0, int arg1) throws SQLException { + throw error(); + } + + @Override + public Statement createStatement(int arg0, int arg1, int arg2) throws SQLException { + throw error(); + } + + @Override + public Struct createStruct(String arg0, Object[] arg1) throws SQLException { + throw error(); + } + + @Override + public boolean getAutoCommit() throws SQLException { + throw error(); + } + + @Override + public String getCatalog() throws SQLException { + throw error(); + } + + @Override + public Properties getClientInfo() throws SQLException { + throw error(); + } + + @Override + public String getClientInfo(String arg0) throws SQLException { + throw error(); + } + + @Override + public int getHoldability() throws SQLException { + throw error(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + throw error(); + } + + @Override + public int getNetworkTimeout() throws SQLException { + throw error(); + } + + @Override + public String getSchema() throws SQLException { + throw error(); + } + + @Override + public int getTransactionIsolation() throws SQLException { + throw error(); + } + + @Override + public Map> getTypeMap() throws SQLException { + throw error(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + throw error(); + } + + @Override + public boolean isReadOnly() throws SQLException { + throw error(); + } + + @Override + public String nativeSQL(String arg0) throws SQLException { + throw error(); + } + + @Override + public CallableStatement prepareCall(String arg0) throws SQLException { + throw error(); + } + + @Override + public CallableStatement prepareCall(String arg0, int arg1, int arg2) throws SQLException { + throw error(); + } + + @Override + public CallableStatement prepareCall(String arg0, int arg1, int arg2, int arg3) throws SQLException { + throw error(); + } + + @Override + public PreparedStatement prepareStatement(String arg0) throws SQLException { + throw error(); + } + + @Override + public PreparedStatement prepareStatement(String arg0, int arg1) throws SQLException { + throw error(); + } + + @Override + public PreparedStatement prepareStatement(String arg0, int[] arg1) throws SQLException { + throw error(); + } + + @Override + public PreparedStatement prepareStatement(String arg0, String[] arg1) throws SQLException { + throw error(); + } + + @Override + public PreparedStatement prepareStatement(String arg0, int arg1, int arg2) throws SQLException { + throw error(); + } + + @Override + public PreparedStatement prepareStatement(String arg0, int arg1, int arg2, int arg3) throws SQLException { + throw error(); + } + + @Override + public void releaseSavepoint(Savepoint arg0) throws SQLException { + throw error(); + } + + @Override + public void rollback() throws SQLException { + throw error(); + } + + @Override + public void rollback(Savepoint arg0) throws SQLException { + throw error(); + } + + @Override + public void setAutoCommit(boolean arg0) throws SQLException { + throw error(); + } + + @Override + public void setCatalog(String arg0) throws SQLException { + throw error(); + } + + @Override + public void setHoldability(int arg0) throws SQLException { + throw error(); + } + + @Override + public void setNetworkTimeout(Executor arg0, int arg1) throws SQLException { + throw error(); + } + + @Override + public void setReadOnly(boolean arg0) throws SQLException { + throw error(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + throw error(); + } + + @Override + public Savepoint setSavepoint(String arg0) throws SQLException { + throw error(); + } + + @Override + public void setSchema(String arg0) throws SQLException { + throw error(); + } + + @Override + public void setTransactionIsolation(int arg0) throws SQLException { + throw error(); + } + + @Override + public void setTypeMap(Map> arg0) throws SQLException { + throw error(); + } +} diff --git a/arcCommon/src/main/java/org/threadly/db/aurora/AuroraClusterMonitor.java b/arcCommon/src/main/java/org/threadly/db/aurora/AuroraClusterMonitor.java index 54f0151..27301ec 100644 --- a/arcCommon/src/main/java/org/threadly/db/aurora/AuroraClusterMonitor.java +++ b/arcCommon/src/main/java/org/threadly/db/aurora/AuroraClusterMonitor.java @@ -97,7 +97,12 @@ protected static AuroraClusterMonitor getMonitor(DelegateAuroraDriver driver, Au protected AuroraClusterMonitor(SchedulerService scheduler, long checkIntervalMillis, DelegateAuroraDriver driver, AuroraServer[] clusterServers) { - clusterStateChecker = new ClusterChecker(scheduler, checkIntervalMillis, driver, clusterServers); + this(new ClusterChecker(scheduler, checkIntervalMillis, driver, clusterServers)); + } + + // used in testing + protected AuroraClusterMonitor(ClusterChecker clusterChecker) { + this.clusterStateChecker = clusterChecker; replicaIndex = new AtomicLong(); } @@ -183,7 +188,7 @@ public void expediteServerCheck(AuroraServer auroraServer) { * * @since 0.8 */ - private static class AuroraServersKey { + protected static class AuroraServersKey { private final AuroraServer[] clusterServers; private final int hashCode; diff --git a/arcCommon/src/main/java/org/threadly/db/aurora/DelegatingAuroraConnection.java b/arcCommon/src/main/java/org/threadly/db/aurora/DelegatingAuroraConnection.java index e4d9d25..71abd79 100644 --- a/arcCommon/src/main/java/org/threadly/db/aurora/DelegatingAuroraConnection.java +++ b/arcCommon/src/main/java/org/threadly/db/aurora/DelegatingAuroraConnection.java @@ -12,8 +12,11 @@ import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; import org.threadly.db.AbstractDelegatingConnection; +import org.threadly.db.ErrorSqlConnection; import org.threadly.db.aurora.DelegatingAuroraConnection.ConnectionStateManager.ConnectionHolder; import org.threadly.util.Clock; import org.threadly.util.Pair; @@ -112,6 +115,8 @@ public class DelegatingAuroraConnection extends AbstractDelegatingConnection imp protected static final String CLIENT_INFO_VALUE_DELEGATE_CHOICE_DEFAULT = // may be set by providing null or empty CLIENT_INFO_VALUE_DELEGATE_CHOICE_SMART; + protected static final Logger LOG = Logger.getLogger(AuroraClusterMonitor.class.getSimpleName()); + /** * Check if a given URL is accepted by this connection. * @@ -194,21 +199,35 @@ public DelegatingAuroraConnection(String url, Properties info) throws SQLExcepti Pair connectException = null; for (int i = 0; i < serverCount; i++) { this.servers[i] = new AuroraServer(servers[i], info); - if (connectException == null) { - try { - connections[i] = connectionStateManager.wrapConnection(dDriver.connect(servers[i] + urlArgs, info)); - if (firstConnectionHolder == null) { - firstConnectionHolder = connections[i]; - } - } catch (SQLException e) { + try { + connections[i] = connectionStateManager.wrapConnection(dDriver.connect(servers[i] + urlArgs, info)); + if (firstConnectionHolder == null) { + firstConnectionHolder = connections[i]; + } + } catch (SQLException e) { + LOG.log(Level.WARNING, "Delaying connect error for server: " + this.servers[i], e); + if (connectException == null) { connectException = new Pair<>(this.servers[i], e); } + connections[i] = + new ConnectionStateManager.UnverifiedConnectionHolder( + new ErrorSqlConnection(this::closeSilently, e)); + } catch (RuntimeException e) { + LOG.log(Level.WARNING, "Delaying connect error for server: " + this.servers[i], e); + if (connectException == null) { + connectException = new Pair<>(this.servers[i], new SQLException(e)); + } + connections[i] = + new ConnectionStateManager.UnverifiedConnectionHolder( + new ErrorSqlConnection(this::closeSilently, e)); } } clusterMonitor = AuroraClusterMonitor.getMonitor(dDriver, this.servers); if (connectException != null) { clusterMonitor.expediteServerCheck(connectException.getLeft()); - throw connectException.getRight(); + if (firstConnectionHolder == null) { // all connections are in error, throw now + throw connectException.getRight(); + } } referenceConnection = firstConnectionHolder.uncheckedState(); closed = new AtomicBoolean(); @@ -235,15 +254,36 @@ public String toString() { // TODO - I think we can produce a better string than this return DelegatingAuroraConnection.class.getSimpleName() + ":" + Arrays.toString(servers); } + + protected void closeSilently() { + try { + close(); + } catch (SQLException e) { + // ignored + } + } @Override public void close() throws SQLException { if (closed.compareAndSet(false, true)) { + SQLException sqlError = null; + RuntimeException runtimeError = null; synchronized (connections) { for (ConnectionHolder ch : connections) { - ch.uncheckedState().close(); + try { + ch.uncheckedState().close(); + } catch (SQLException e) { + sqlError = e; + } catch (RuntimeException e) { + runtimeError = e; + } } } + if (sqlError != null) { + throw sqlError; + } else if (runtimeError != null) { + throw runtimeError; + } } } @@ -517,23 +557,26 @@ public void setHoldability(int holdability) throws SQLException { @Override public boolean isValid(int timeout) throws SQLException { - long startTime = timeout == 0 ? 0 : Clock.accurateForwardProgressingMillis(); - for (ConnectionHolder ch : connections) { - int remainingTimeout; - if (timeout == 0) { - remainingTimeout = 0; - } else { - // seconds are gross - remainingTimeout = timeout - (int)Math.floor((Clock.lastKnownForwardProgressingMillis() - startTime) / 1000.); + if (timeout > 0) { + long startTime = Clock.accurateForwardProgressingMillis(); + for (ConnectionHolder ch : connections) { + int remainingTimeout = // seconds are gross + timeout - (int)Math.floor((Clock.lastKnownForwardProgressingMillis() - startTime) / 1000.); if (remainingTimeout <= 0) { return false; + } else if (! ch.uncheckedState().isValid(remainingTimeout)) { + return false; } } - if (! ch.uncheckedState().isValid(remainingTimeout)) { - return false; + return true; + } else { + for (ConnectionHolder ch : connections) { + if (! ch.uncheckedState().isValid(timeout)) { + return false; + } } + return true; } - return true; } @Override @@ -658,7 +701,7 @@ public int getTransactionIsolationLevel() { * @return Holder specific to the connection provided * @throws SQLException Thrown if delegate connection throws while initializing the state */ - public ConnectionHolder wrapConnection(Connection connection) throws SQLException { + protected ConnectionHolder wrapConnection(Connection connection) throws SQLException { if (transactionIsolationLevel == Integer.MIN_VALUE) { // startup state from first connection we see readOnly = connection.isReadOnly(); @@ -689,6 +732,20 @@ public Connection uncheckedState() { public abstract Connection verifiedState() throws SQLException; } + + /** + * Implementation of {@link ConnectionHolder} that will never do any state verifications. + */ + public static class UnverifiedConnectionHolder extends ConnectionHolder { + public UnverifiedConnectionHolder(Connection connection) { + super(connection); + } + + @Override + public Connection verifiedState() { + return connection; + } + } } /** diff --git a/arcCommon/src/test/java/org/threadly/db/ErrorSqlConnectionTest.java b/arcCommon/src/test/java/org/threadly/db/ErrorSqlConnectionTest.java new file mode 100644 index 0000000..3911c53 --- /dev/null +++ b/arcCommon/src/test/java/org/threadly/db/ErrorSqlConnectionTest.java @@ -0,0 +1,251 @@ +package org.threadly.db; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.concurrent.Callable; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.threadly.test.concurrent.TestRunnable; +import org.threadly.util.ExceptionUtils; + +public class ErrorSqlConnectionTest { + private static final SQLException ERROR = new SQLException(); + + private TestRunnable testListener; + private ErrorSqlConnection connection; + + @Before + public void setup() { + testListener = new TestRunnable(); + connection = new ErrorSqlConnection(testListener, ERROR); + } + + @After + public void cleanup() { + testListener = null; + connection = null; + } + + @Test + public void closeTest() { + assertFalse(connection.isClosed()); + + connection.close(); + + assertTrue(connection.isClosed()); + } + + @Test + public void isValidTest() { + assertTrue(connection.isValid(0)); + + + verifyAction(connection::error); + + assertFalse(connection.isValid(0)); + assertTrue(connection.isClosed()); + } + + private void verifyAction(Callable operation) { + try { + operation.call(); + fail("Exception should have thrown"); + } catch (SQLException e) { + assertTrue(e.getCause() == ERROR); + assertTrue(testListener.ranOnce()); + } catch (Exception e) { + fail("Unexpected error: \n" + ExceptionUtils.stackToString(e)); + } + } + + @Test + public void isWrapperForTest() { + verifyAction(() -> connection.isWrapperFor(null)); + } + + @Test + public void unwrapTest() { + verifyAction(() -> connection.unwrap(null)); + } + + @Test + public void abortTest() { + verifyAction(() -> { connection.abort(null); return null; }); + } + + @Test + public void clearWarningsTest() { + verifyAction(() -> { connection.clearWarnings(); return null; }); + } + + @Test + public void commitTest() { + verifyAction(() -> { connection.commit(); return null; }); + } + + @Test + public void createArrayOfTest() { + verifyAction(() -> connection.createArrayOf(null, null)); + } + + @Test + public void createBlobTest() { + verifyAction(connection::createBlob); + } + + @Test + public void createClobTest() { + verifyAction(connection::createClob); + } + + @Test + public void createNClobTest() { + verifyAction(connection::createNClob); + } + + @Test + public void createSQLXMLTest() { + verifyAction(connection::createSQLXML); + } + + @Test + public void createStatementTest() { + verifyAction(connection::createStatement); + } + + @Test + public void createStructTest() { + verifyAction(() -> connection.createStruct(null, null)); + } + + @Test + public void getAutoCommitTest() { + verifyAction(connection::getAutoCommit); + } + + @Test + public void getCatalogTest() { + verifyAction(connection::getCatalog); + } + + @Test + public void getClientInfoTest() { + verifyAction(connection::getClientInfo); + } + + @Test + public void getHoldabilityTest() { + verifyAction(connection::getHoldability); + } + + @Test + public void getMetaDataTest() { + verifyAction(connection::getMetaData); + } + + @Test + public void getNetworkTimeoutTest() { + verifyAction(connection::getNetworkTimeout); + } + + @Test + public void getSchemaTest() { + verifyAction(connection::getSchema); + } + + @Test + public void getTransactionIsolationTest() { + verifyAction(connection::getTransactionIsolation); + } + + @Test + public void getTypeMapTest() { + verifyAction(connection::getTypeMap); + } + + @Test + public void getWarningsTest() { + verifyAction(connection::getWarnings); + } + + @Test + public void isReadOnlyTest() { + verifyAction(connection::isReadOnly); + } + + @Test + public void nativeSQLTest() { + verifyAction(() -> connection.nativeSQL(null)); + } + + @Test + public void prepareCallTest() { + verifyAction(() -> connection.prepareCall(null)); + } + + @Test + public void prepareStatementTest() { + verifyAction(() -> connection.prepareStatement(null)); + } + + @Test + public void releaseSavepointTest() { + verifyAction(() -> { connection.releaseSavepoint(null); return null; }); + } + + @Test + public void rollbackTest() { + verifyAction(() -> { connection.rollback(); return null; }); + } + + @Test + public void setAutoCommitTest() { + verifyAction(() -> { connection.setAutoCommit(true); return null; }); + } + + @Test + public void setCatalogTest() { + verifyAction(() -> { connection.setCatalog(null); return null; }); + } + + @Test + public void setHoldabilityTest() { + verifyAction(() -> { connection.setHoldability(-1); return null; }); + } + + @Test + public void setNetworkTimeoutTest() { + verifyAction(() -> { connection.setNetworkTimeout(null, -1); return null; }); + } + + @Test + public void setReadOnlyTest() { + verifyAction(() -> { connection.setReadOnly(true); return null; }); + } + + @Test + public void setSavepointTest() { + verifyAction(connection::setSavepoint); + } + + @Test + public void setSchemaTest() { + verifyAction(() -> { connection.setSchema(null); return null; }); + } + + @Test + public void setTransactionIsolationTest() { + verifyAction(() -> { connection.setTransactionIsolation(-1); return null; }); + } + + @Test + public void setTypeMapTest() { + verifyAction(() -> { connection.setTypeMap(Collections.emptyMap()); return null; }); + } +} diff --git a/mysqlAuroraArc/src/test/java/org/threadly/db/aurora/DriverLocalDbTest.java b/mysqlAuroraArc/src/test/java/org/threadly/db/aurora/DriverLocalDbTest.java index 058094d..e0967f8 100644 --- a/mysqlAuroraArc/src/test/java/org/threadly/db/aurora/DriverLocalDbTest.java +++ b/mysqlAuroraArc/src/test/java/org/threadly/db/aurora/DriverLocalDbTest.java @@ -1,12 +1,14 @@ package org.threadly.db.aurora; import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; import java.sql.ResultSet; import java.sql.SQLClientInfoException; import java.sql.SQLException; import java.sql.Timestamp; import java.util.List; +import java.util.Properties; import org.junit.After; import org.junit.Before; @@ -31,6 +33,8 @@ import org.threadly.concurrent.UnfairExecutor; import org.threadly.db.LoggingDriver; import org.threadly.db.aurora.Driver; +import org.threadly.db.aurora.AuroraClusterMonitor.AuroraServersKey; +import org.threadly.db.aurora.AuroraClusterMonitor.ClusterChecker; import org.threadly.util.Clock; import org.threadly.util.ExceptionUtils; import org.threadly.util.Pair; @@ -252,6 +256,23 @@ public void transactionInsertAndLookupExceptionThrownAfterDone() { throw new SuppressedStackRuntimeException(); }); } + + @Test + public void connectErrorDelayedTest() throws SQLException { + AuroraServer[] servers = new AuroraServer[] { new AuroraServer("127.0.0.1:3306", new Properties()), + new AuroraServer("127.0.0.2:6603", new Properties()) }; + // put in monitor so we don't fail trying to establish cluster monitor + AuroraClusterMonitor.MONITORS.put(new AuroraServersKey(servers), + new AuroraClusterMonitor(mock(ClusterChecker.class))); + + DBI dbi = new DBI("jdbc:mysql:aurora://127.0.0.1:3306,127.0.0.2:6603/auroraArc?" + + "useUnicode=yes&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false", + "auroraArc", ""); + + try (Handle h = dbi.open()) { + assertTrue(h.getConnection().isValid(0)); + } + } @Test public void z_lookupRecordsPaged() throws InterruptedException {