Skip to content

Commit

Permalink
Add support for REPLICAOF and CLUSTER REPLICAS commands #2020
Browse files Browse the repository at this point in the history
Also, reorder cluster API methods alphabetically.
  • Loading branch information
mp911de committed Feb 25, 2022
1 parent e1e24ca commit 9311410
Show file tree
Hide file tree
Showing 18 changed files with 626 additions and 366 deletions.
16 changes: 16 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ public RedisFuture<String> clusterReplicate(String nodeId) {
return dispatch(commandBuilder.clusterReplicate(nodeId));
}

@Override
public RedisFuture<List<String>> clusterReplicas(String nodeId) {
return dispatch(commandBuilder.clusterReplicas(nodeId));
}

@Override
public RedisFuture<String> clusterReset(boolean hard) {
return dispatch(commandBuilder.clusterReset(hard));
Expand Down Expand Up @@ -1372,6 +1377,16 @@ public RedisFuture<Boolean> renamenx(K key, K newKey) {
return dispatch(commandBuilder.renamenx(key, newKey));
}

@Override
public RedisFuture<String> replicaof(String host, int port) {
return dispatch(commandBuilder.replicaof(host, port));
}

@Override
public RedisFuture<String> replicaofNoOne() {
return dispatch(commandBuilder.replicaofNoOne());
}

@Override
public void reset() {
getConnection().reset();
Expand Down Expand Up @@ -1546,6 +1561,7 @@ public void setAutoFlushCommands(boolean autoFlush) {
connection.setAutoFlushCommands(autoFlush);
}

@Override
public void setTimeout(Duration timeout) {
connection.setTimeout(timeout);
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ public Mono<String> clusterReplicate(String nodeId) {
return createMono(() -> commandBuilder.clusterReplicate(nodeId));
}

@Override
public Flux<String> clusterReplicas(String nodeId) {
return createDissolvingFlux(() -> commandBuilder.clusterSlaves(nodeId));
}

@Override
public Mono<String> clusterReset(boolean hard) {
return createMono(() -> commandBuilder.clusterReset(hard));
Expand Down Expand Up @@ -1447,6 +1452,16 @@ public Mono<Boolean> renamenx(K key, K newKey) {
return createMono(() -> commandBuilder.renamenx(key, newKey));
}

@Override
public Mono<String> replicaof(String host, int port) {
return createMono(() -> commandBuilder.replicaof(host, port));
}

@Override
public Mono<String> replicaofNoOne() {
return createMono(() -> commandBuilder.replicaofNoOne());
}

@Override
public void reset() {
getConnection().reset();
Expand Down Expand Up @@ -1621,6 +1636,7 @@ public void setAutoFlushCommands(boolean autoFlush) {
connection.setAutoFlushCommands(autoFlush);
}

@Override
public void setTimeout(Duration timeout) {
connection.setTimeout(timeout);
}
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,13 @@ Command<K, V, String> clusterReplicate(String nodeId) {
return createCommand(CLUSTER, new StatusOutput<>(codec), args);
}

Command<K, V, List<String>> clusterReplicas(String nodeId) {
assertNodeId(nodeId);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(REPLICAS).add(nodeId);
return createCommand(CLUSTER, new StringListOutput<>(codec), args);
}

Command<K, V, String> clusterReset(boolean hard) {

CommandArgs<K, V> args = new CommandArgs<>(codec).add(RESET);
Expand Down Expand Up @@ -1913,6 +1920,19 @@ Command<K, V, Boolean> renamenx(K key, K newKey) {
return createCommand(RENAMENX, new BooleanOutput<>(codec), args);
}

Command<K, V, String> replicaof(String host, int port) {
LettuceAssert.notNull(host, "Host " + MUST_NOT_BE_NULL);
LettuceAssert.notEmpty(host, "Host " + MUST_NOT_BE_EMPTY);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(host).add(port);
return createCommand(REPLICAOF, new StatusOutput<>(codec), args);
}

Command<K, V, String> replicaofNoOne() {
CommandArgs<K, V> args = new CommandArgs<>(codec).add(NO).add(ONE);
return createCommand(REPLICAOF, new StatusOutput<>(codec), args);
}

Command<K, V, String> restore(K key, byte[] value, RestoreArgs restoreArgs) {
notNullKey(key);
LettuceAssert.notNull(value, "Value " + MUST_NOT_BE_NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,24 @@ public interface RedisServerAsyncCommands<K, V> {
*/
RedisFuture<Long> memoryUsage(K key);

/**
* Make the server a replica of another instance.
*
* @param host the host type: string.
* @param port the port type: string.
* @return String simple-string-reply.
* @since 6.1.7
*/
RedisFuture<String> replicaof(String host, int port);

/**
* Promote server as master.
*
* @return String simple-string-reply.
* @since 6.1.7
*/
RedisFuture<String> replicaofNoOne();

/**
* Synchronously save the dataset to disk.
*
Expand All @@ -369,18 +387,20 @@ public interface RedisServerAsyncCommands<K, V> {
void shutdown(boolean save);

/**
* Make the server a replica of another instance, or promote it as master.
* Make the server a replica of another instance.
*
* @param host the host type: string.
* @param port the port type: string.
* @return String simple-string-reply.
* @deprecated since 6.1.7, use {@link #replicaof(String, int)} instead.
*/
RedisFuture<String> slaveof(String host, int port);

/**
* Promote server as master.
*
* @return String simple-string-reply.
* @deprecated since 6.1.7, use {@link #replicaofNoOne()} instead.
*/
RedisFuture<String> slaveofNoOne();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,24 @@ public interface RedisServerReactiveCommands<K, V> {
*/
Mono<Long> memoryUsage(K key);

/**
* Make the server a replica of another instance.
*
* @param host the host type: string.
* @param port the port type: string.
* @return String simple-string-reply.
* @since 6.1.7
*/
Mono<String> replicaof(String host, int port);

/**
* Promote server as master.
*
* @return String simple-string-reply.
* @since 6.1.7
*/
Mono<String> replicaofNoOne();

/**
* Synchronously save the dataset to disk.
*
Expand All @@ -369,18 +387,20 @@ public interface RedisServerReactiveCommands<K, V> {
Mono<Void> shutdown(boolean save);

/**
* Make the server a replica of another instance, or promote it as master.
* Make the server a replica of another instance.
*
* @param host the host type: string.
* @param port the port type: string.
* @return String simple-string-reply.
* @deprecated since 6.1.7, use {@link #replicaof(String, int)} instead.
*/
Mono<String> slaveof(String host, int port);

/**
* Promote server as master.
*
* @return String simple-string-reply.
* @deprecated since 6.1.7, use {@link #replicaofNoOne()} instead.
*/
Mono<String> slaveofNoOne();

Expand Down
22 changes: 21 additions & 1 deletion src/main/java/io/lettuce/core/api/sync/RedisServerCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,24 @@ public interface RedisServerCommands<K, V> {
*/
Long memoryUsage(K key);

/**
* Make the server a replica of another instance.
*
* @param host the host type: string.
* @param port the port type: string.
* @return String simple-string-reply.
* @since 6.1.7
*/
String replicaof(String host, int port);

/**
* Promote server as master.
*
* @return String simple-string-reply.
* @since 6.1.7
*/
String replicaofNoOne();

/**
* Synchronously save the dataset to disk.
*
Expand All @@ -368,18 +386,20 @@ public interface RedisServerCommands<K, V> {
void shutdown(boolean save);

/**
* Make the server a replica of another instance, or promote it as master.
* Make the server a replica of another instance.
*
* @param host the host type: string.
* @param port the port type: string.
* @return String simple-string-reply.
* @deprecated since 6.1.7, use {@link #replicaof(String, int)} instead.
*/
String slaveof(String host, int port);

/**
* Promote server as master.
*
* @return String simple-string-reply.
* @deprecated since 6.1.7, use {@link #replicaofNoOne()} instead.
*/
String slaveofNoOne();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,24 @@ public interface NodeSelectionServerAsyncCommands<K, V> {
*/
AsyncExecutions<Long> memoryUsage(K key);

/**
* Make the server a replica of another instance.
*
* @param host the host type: string.
* @param port the port type: string.
* @return String simple-string-reply.
* @since 6.1.7
*/
AsyncExecutions<String> replicaof(String host, int port);

/**
* Promote server as master.
*
* @return String simple-string-reply.
* @since 6.1.7
*/
AsyncExecutions<String> replicaofNoOne();

/**
* Synchronously save the dataset to disk.
*
Expand All @@ -347,18 +365,20 @@ public interface NodeSelectionServerAsyncCommands<K, V> {
AsyncExecutions<String> save();

/**
* Make the server a replica of another instance, or promote it as master.
* Make the server a replica of another instance.
*
* @param host the host type: string.
* @param port the port type: string.
* @return String simple-string-reply.
* @deprecated since 6.1.7, use {@link #replicaof(String, int)} instead.
*/
AsyncExecutions<String> slaveof(String host, int port);

/**
* Promote server as master.
*
* @return String simple-string-reply.
* @deprecated since 6.1.7, use {@link #replicaofNoOne()} instead.
*/
AsyncExecutions<String> slaveofNoOne();

Expand Down
Loading

0 comments on commit 9311410

Please sign in to comment.