Skip to content

Commit

Permalink
Add custom connection validation to ConnectionPoolSupport
Browse files Browse the repository at this point in the history
  • Loading branch information
big-cir committed Feb 4, 2025
1 parent 319e315 commit 064d3f7
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 15 deletions.
52 changes: 48 additions & 4 deletions src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.apache.commons.pool2.BasePooledObjectFactory;
Expand Down Expand Up @@ -60,6 +61,7 @@
* </pre>
*
* @author Mark Paluch
* @author dae won
* @since 4.3
*/
public abstract class ConnectionPoolSupport {
Expand All @@ -77,8 +79,8 @@ private ConnectionPoolSupport() {
* @return the connection pool.
*/
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config) {
return createGenericObjectPool(connectionSupplier, config, true);
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, Predicate<T> connectionValidator) {
return createGenericObjectPool(connectionSupplier, config, true, connectionValidator);
}

/**
Expand All @@ -94,14 +96,17 @@ private ConnectionPoolSupport() {
*/
@SuppressWarnings("unchecked")
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, boolean wrapConnections) {
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, boolean wrapConnections,
Predicate<T> connectionValidator) {

LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");
LettuceAssert.notNull(connectionValidator, "Connection validator must not be null");

AtomicReference<Origin<T>> poolRef = new AtomicReference<>();

GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {
GenericObjectPool<T> pool = new GenericObjectPool<T>(
new EnhancedRedisPooledObjectFactory<T>(connectionSupplier, connectionValidator), config) {

@Override
public T borrowObject() throws Exception {
Expand Down Expand Up @@ -249,4 +254,43 @@ public CompletableFuture<Void> returnObjectAsync(T o) throws Exception {

}

private static class EnhancedRedisPooledObjectFactory<T extends StatefulConnection<?, ?>>
extends BasePooledObjectFactory<T> {

private final Supplier<T> connectionSupplier;

private final Predicate<T> connectionValidator;

EnhancedRedisPooledObjectFactory(Supplier<T> connectionSupplier, Predicate<T> connectionValidator) {
this.connectionSupplier = connectionSupplier;
this.connectionValidator = connectionValidator;
}

@Override
public T create() throws Exception {
return connectionSupplier.get();
}

@Override
public PooledObject<T> wrap(T obj) {
return new DefaultPooledObject<>(obj);
}

@Override
public boolean validateObject(PooledObject<T> p) {
T connection = p.getObject();
return connection.isOpen() && connectionValidator.test(connection);
}

@Override
public void destroyObject(PooledObject<T> p) throws Exception {
try {
p.getObject().close();
} catch (Exception e) {
e.printStackTrace();
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,13 @@ void verifierShouldCatchTooFewParametersDeclarations() {
void shouldWorkWithPooledConnection() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(client::connect, new GenericObjectPoolConfig<>());
.createGenericObjectPool(client::connect, new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ static void afterClass() {
void genericPoolShouldWorkWithWrappedConnections() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

borrowAndReturn(pool);
borrowAndClose(pool);
Expand All @@ -91,7 +97,13 @@ void genericPoolShouldCloseConnectionsAboveMaxIdleSize() throws Exception {
poolConfig.setMaxIdle(2);

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), poolConfig);
.createGenericObjectPool(() -> client.connect(), poolConfig, connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

borrowAndReturn(pool);
borrowAndClose(pool);
Expand Down Expand Up @@ -120,7 +132,13 @@ void genericPoolShouldCloseConnectionsAboveMaxIdleSize() throws Exception {
void genericPoolShouldWorkWithPlainConnections() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false);
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false, connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

borrowAndReturn(pool);

Expand Down Expand Up @@ -151,7 +169,13 @@ void softReferencePoolShouldWorkWithPlainConnections() throws Exception {
void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand All @@ -172,7 +196,13 @@ void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Excepti
void wrappedConnectionShouldUseWrappers() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand All @@ -197,7 +227,13 @@ void wrappedMasterSlaveConnectionShouldUseWrappers() throws Exception {

GenericObjectPool<StatefulRedisMasterReplicaConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> MasterReplica.connect(client, new StringCodec(), RedisURI.create(host, port)),
new GenericObjectPoolConfig<>());
new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisMasterReplicaConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand All @@ -223,7 +259,13 @@ void wrappedClusterConnectionShouldUseWrappers() throws Exception {
RedisURI.create(TestSettings.host(), 7379));

GenericObjectPool<StatefulRedisClusterConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(redisClusterClient::connect, new GenericObjectPoolConfig<>());
.createGenericObjectPool(redisClusterClient::connect, new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisClusterConnection<String, String> connection = pool.borrowObject();
RedisAdvancedClusterCommands<String, String> sync = connection.sync();
Expand All @@ -250,7 +292,13 @@ void wrappedClusterConnectionShouldUseWrappers() throws Exception {
void plainConnectionShouldNotUseWrappers() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false);
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false, connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand Down Expand Up @@ -295,7 +343,13 @@ void softRefPoolShouldWorkWithWrappedConnections() throws Exception {
void wrappedObjectClosedAfterReturn() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), true);
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), true, connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
Expand All @@ -317,7 +371,13 @@ void wrappedObjectClosedAfterReturn() throws Exception {
void tryWithResourcesReturnsConnectionToPool() throws Exception {

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>());
.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> {
try {
return "PONG".equals(connection.sync().ping());
} catch (Exception e) {
return false;
}
});

StatefulRedisConnection<String, String> usedConnection = null;
try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
Expand Down

0 comments on commit 064d3f7

Please sign in to comment.