Skip to content

Commit

Permalink
Add support for Redis Sentinel authentication #1002
Browse files Browse the repository at this point in the history
Redis now supports authentication against Redis Sentinel that was introduced with Redis 5.0.1. The password can be only set programmatically as
URI-based user-info applies to the actual Redis server.

RedisURI sentinelUrl = RedisURI.Builder.sentinel("host", 26379, "my-master", "some-password").build();
  • Loading branch information
mp911de committed Mar 15, 2019
1 parent 89054be commit 8e21d2e
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 69 deletions.
120 changes: 75 additions & 45 deletions src/main/java/io/lettuce/core/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.net.SocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -283,8 +285,7 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone

@SuppressWarnings("unchecked")
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
RedisCodec<K, V> codec, Endpoint endpoint,
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
RedisCodec<K, V> codec, Endpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {

ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
Expand Down Expand Up @@ -432,8 +433,7 @@ private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubS
StatefulRedisPubSubConnectionImpl<K, V> connection = newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout);

ConnectionFuture<StatefulRedisPubSubConnection<K, V>> future = connectStatefulAsync(connection, codec, endpoint,
redisURI,
() -> new PubSubCommandHandler<>(clientOptions, clientResources, codec, endpoint));
redisURI, () -> new PubSubCommandHandler<>(clientOptions, clientResources, codec, endpoint));

return future.whenComplete((conn, throwable) -> {

Expand Down Expand Up @@ -523,6 +523,59 @@ private <K, V> CompletableFuture<StatefulRedisSentinelConnection<K, V>> connectS
assertNotNull(codec);
checkValidRedisURI(redisURI);

logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels());

if (redisURI.getSentinels().isEmpty() && (isNotEmpty(redisURI.getHost()) || !isEmpty(redisURI.getSocket()))) {
return doConnectSentinelAsync(codec, redisURI.getClientName(), redisURI, timeout).toCompletableFuture();
}

List<RedisURI> sentinels = redisURI.getSentinels();
Queue<Throwable> exceptionCollector = new LinkedBlockingQueue<>();
validateUrisAreOfSameConnectionType(sentinels);

Mono<StatefulRedisSentinelConnection<K, V>> connectionLoop = null;

for (RedisURI uri : sentinels) {

String clientName = LettuceStrings.isNotEmpty(uri.getClientName()) ? uri.getClientName() : redisURI.getClientName();

Mono<StatefulRedisSentinelConnection<K, V>> connectionMono = Mono
.defer(() -> Mono.fromCompletionStage(doConnectSentinelAsync(codec, clientName, uri, timeout)))
.onErrorMap(CompletionException.class, Throwable::getCause)
.onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, e))
.doOnError(exceptionCollector::add);

if (connectionLoop == null) {
connectionLoop = connectionMono;
} else {
connectionLoop = connectionLoop.onErrorResume(t -> connectionMono);
}
}

if (connectionLoop == null) {
return Mono.<StatefulRedisSentinelConnection<K, V>> error(
new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels())).toFuture();
}

return connectionLoop.onErrorMap(
e -> {

RedisConnectionException ex = new RedisConnectionException("Cannot connect to a Redis Sentinel: "
+ redisURI.getSentinels(), e);

for (Throwable throwable : exceptionCollector) {
if (e != throwable) {
ex.addSuppressed(throwable);
}
}

return ex;
}).toFuture();
}

private <K, V> ConnectionFuture<StatefulRedisSentinelConnection<K, V>> doConnectSentinelAsync(RedisCodec<K, V> codec,
String clientName, RedisURI redisURI, Duration timeout) {

ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder();
connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions()));
connectionBuilder.clientResources(clientResources);
Expand All @@ -536,7 +589,7 @@ private <K, V> CompletableFuture<StatefulRedisSentinelConnection<K, V>> connectS

StatefulRedisSentinelConnectionImpl<K, V> connection = newStatefulRedisSentinelConnection(writer, codec, timeout);

logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels());
logger.debug("Connecting to Redis Sentinel, address: " + redisURI);

connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(clientOptions, clientResources, endpoint))
.connection(connection);
Expand All @@ -546,55 +599,32 @@ private <K, V> CompletableFuture<StatefulRedisSentinelConnection<K, V>> connectS
connectionBuilder.enablePingBeforeConnect();
}

Mono<StatefulRedisSentinelConnection<K, V>> connect;
if (redisURI.getSentinels().isEmpty() && (isNotEmpty(redisURI.getHost()) || !isEmpty(redisURI.getSocket()))) {

channelType(connectionBuilder, redisURI);
connect = Mono.fromCompletionStage(initializeChannelAsync(connectionBuilder));
} else {
channelType(connectionBuilder, redisURI);
ConnectionFuture<?> sync = initializeChannelAsync(connectionBuilder);

List<RedisURI> sentinels = redisURI.getSentinels();
validateUrisAreOfSameConnectionType(sentinels);
if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {

Mono<StatefulRedisSentinelConnection<K, V>> connectionLoop = Mono.defer(() -> {
sync = sync.thenCompose(channelHandler -> {

RedisURI uri = sentinels.get(0);
channelType(connectionBuilder, uri);
return connectSentinel(connectionBuilder, uri);
CommandArgs<K, V> args = new CommandArgs<>(codec).add(redisURI.getPassword());
return connection.async().dispatch(CommandType.AUTH, new StatusOutput<>(codec), args).toCompletableFuture();
});

for (int i = 1; i < sentinels.size(); i++) {

RedisURI uri = sentinels.get(i);
connectionLoop = connectionLoop.onErrorResume(t -> connectSentinel(connectionBuilder, uri));
}

connect = connectionLoop;
}

if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
connect = connect.doOnNext(c -> connection.setClientName(redisURI.getClientName()));
if (LettuceStrings.isNotEmpty(clientName)) {
sync = sync.thenApply(channelHandler -> {
connection.setClientName(clientName);
return channelHandler;
});
}

return connect.doOnError(e -> {

connection.close();
throw new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels(), e);
}).toFuture();
}

private <K, V> Mono<StatefulRedisSentinelConnection<K, V>> connectSentinel(ConnectionBuilder connectionBuilder, RedisURI uri) {

connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));
SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(uri);
logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);
return sync.thenApply(ignore -> (StatefulRedisSentinelConnection<K, V>) connection).whenComplete((ignore, e) -> {

Mono<StatefulRedisSentinelConnection<K, V>> connectionMono = Mono
.fromCompletionStage(initializeChannelAsync(connectionBuilder));

return connectionMono.onErrorMap(CompletionException.class, Throwable::getCause) //
.doOnError(t -> logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + t.toString())) //
.onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, e));
if (e != null) {
logger.warn("Cannot connect Redis Sentinel at " + redisURI + ": " + e.toString());
connection.close();
}
});
}

/**
Expand Down
91 changes: 79 additions & 12 deletions src/main/java/io/lettuce/core/RedisURI.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,19 @@ public char[] getPassword() {
* @param password the password, must not be {@literal null}.
*/
public void setPassword(String password) {
setPassword((CharSequence) password);
}

/**
* Sets the password. Use empty string to skip authentication.
*
* @param password the password, must not be {@literal null}.
* @since 5.2
*/
public void setPassword(CharSequence password) {

LettuceAssert.notNull(password, "Password must not be null");
this.password = password.toCharArray();
this.password = password.toString().toCharArray();
}

/**
Expand Down Expand Up @@ -958,11 +968,12 @@ public static class Builder {
private int database;
private String clientName;
private char[] password;
private char[] sentinelPassword;
private boolean ssl = false;
private boolean verifyPeer = true;
private boolean startTls = false;
private Duration timeout = DEFAULT_TIMEOUT_DURATION;
private final List<HostAndPort> sentinels = new ArrayList<>();
private final List<RedisURI> sentinels = new ArrayList<>();

private Builder() {
}
Expand All @@ -971,7 +982,7 @@ private Builder() {
* Set Redis socket. Creates a new builder.
*
* @param socket the host name
* @return New builder with Redis socket.
* @return new builder with Redis socket.
*/
public static Builder socket(String socket) {

Expand All @@ -986,7 +997,7 @@ public static Builder socket(String socket) {
* Set Redis host. Creates a new builder.
*
* @param host the host name
* @return New builder with Redis host/port.
* @return new builder with Redis host/port.
*/
public static Builder redis(String host) {
return redis(host, DEFAULT_REDIS_PORT);
Expand All @@ -997,7 +1008,7 @@ public static Builder redis(String host) {
*
* @param host the host name
* @param port the port
* @return New builder with Redis host/port.
* @return new builder with Redis host/port.
*/
public static Builder redis(String host, int port) {

Expand All @@ -1012,7 +1023,7 @@ public static Builder redis(String host, int port) {
* Set Sentinel host. Creates a new builder.
*
* @param host the host name
* @return New builder with Sentinel host/port.
* @return new builder with Sentinel host/port.
*/
public static Builder sentinel(String host) {

Expand All @@ -1027,7 +1038,7 @@ public static Builder sentinel(String host) {
*
* @param host the host name
* @param port the port
* @return New builder with Sentinel host/port.
* @return new builder with Sentinel host/port.
*/
public static Builder sentinel(String host, int port) {

Expand All @@ -1043,7 +1054,7 @@ public static Builder sentinel(String host, int port) {
*
* @param host the host name
* @param masterId sentinel master id
* @return New builder with Sentinel host/port.
* @return new builder with Sentinel host/port.
*/
public static Builder sentinel(String host, String masterId) {
return sentinel(host, DEFAULT_SENTINEL_PORT, masterId);
Expand All @@ -1055,14 +1066,30 @@ public static Builder sentinel(String host, String masterId) {
* @param host the host name
* @param port the port
* @param masterId sentinel master id
* @return New builder with Sentinel host/port.
* @return new builder with Sentinel host/port.
*/
public static Builder sentinel(String host, int port, String masterId) {
return sentinel(host, port, masterId, null);
}

/**
* Set Sentinel host, port, master id and Sentinel authentication. Creates a new builder.
*
* @param host the host name
* @param port the port
* @param masterId sentinel master id
* @param password the Sentinel password (supported since Redis 5.0.1)
* @return new builder with Sentinel host/port.
*/
public static Builder sentinel(String host, int port, String masterId, CharSequence password) {

LettuceAssert.notEmpty(host, "Host must not be empty");
LettuceAssert.isTrue(isValidPort(port), String.format("Port out of range: %s", port));

Builder builder = RedisURI.builder();
if (password != null) {
builder.sentinelPassword = password.toString().toCharArray();
}
return builder.withSentinelMasterId(masterId).withSentinel(host, port);
}

Expand All @@ -1085,11 +1112,49 @@ public Builder withSentinel(String host) {
*/
public Builder withSentinel(String host, int port) {

if (this.sentinelPassword != null) {
return withSentinel(host, port, new String(this.sentinelPassword));
}

return withSentinel(host, port, null);
}

/**
* Add a withSentinel host/port and Sentinel authentication to the existing builder.
*
* @param host the host name
* @param port the port
* @param password the Sentinel password (supported since Redis 5.0.1)
* @return the builder
* @since 5.2
*/
public Builder withSentinel(String host, int port, CharSequence password) {

LettuceAssert.assertState(this.host == null, "Cannot use with Redis mode.");
LettuceAssert.notEmpty(host, "Host must not be empty");
LettuceAssert.isTrue(isValidPort(port), String.format("Port out of range: %s", port));

sentinels.add(HostAndPort.of(host, port));
RedisURI redisURI = RedisURI.create(host, port);

if (password != null) {
redisURI.setPassword(password.toString());
}

return withSentinel(redisURI);
}

/**
* Add a withSentinel RedisURI to the existing builder.
*
* @param redisURI the sentinel URI
* @return the builder
* @since 5.2
*/
public Builder withSentinel(RedisURI redisURI) {

LettuceAssert.notNull(redisURI, "Redis URI must not be null");

sentinels.add(redisURI);
return this;
}

Expand Down Expand Up @@ -1290,8 +1355,10 @@ public RedisURI build() {

redisURI.setSentinelMasterId(sentinelMasterId);

for (HostAndPort sentinel : sentinels) {
redisURI.getSentinels().add(new RedisURI(sentinel.getHostText(), sentinel.getPort(), timeout));
for (RedisURI sentinel : sentinels) {

sentinel.setTimeout(timeout);
redisURI.getSentinels().add(sentinel);
}

redisURI.setSocket(socket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.protocol.*;
import io.lettuce.core.sentinel.api.StatefulRedisSentinelConnection;
import io.lettuce.core.sentinel.api.async.RedisSentinelAsyncCommands;

Expand Down Expand Up @@ -136,6 +137,25 @@ public RedisFuture<String> info(String section) {
return dispatch(commandBuilder.info(section));
}

@Override
public <T> RedisFuture<T> dispatch(ProtocolKeyword type, CommandOutput<K, V, T> output) {

LettuceAssert.notNull(type, "Command type must not be null");
LettuceAssert.notNull(output, "CommandOutput type must not be null");

return dispatch(new AsyncCommand<>(new Command<>(type, output)));
}

@Override
public <T> RedisFuture<T> dispatch(ProtocolKeyword type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {

LettuceAssert.notNull(type, "Command type must not be null");
LettuceAssert.notNull(output, "CommandOutput type must not be null");
LettuceAssert.notNull(args, "CommandArgs type must not be null");

return dispatch(new AsyncCommand<>(new Command<>(type, output, args)));
}

public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
return (AsyncCommand<K, V, T>) connection.dispatch(new AsyncCommand<>(cmd));
}
Expand Down
Loading

0 comments on commit 8e21d2e

Please sign in to comment.