diff --git a/src/main/java/com/lambdaworks/redis/RedisClient.java b/src/main/java/com/lambdaworks/redis/RedisClient.java index 8a4eae4dc4..5058205468 100644 --- a/src/main/java/com/lambdaworks/redis/RedisClient.java +++ b/src/main/java/com/lambdaworks/redis/RedisClient.java @@ -7,7 +7,9 @@ import java.net.ConnectException; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.List; +import java.util.Queue; import java.util.concurrent.*; import com.google.common.base.Supplier; @@ -294,7 +296,7 @@ public RedisAsyncConnection connectAsync(RedisURI redisURI) { } private RedisAsyncConnectionImpl connectAsync(RedisCodec codec, RedisURI redisURI) { - BlockingQueue> queue = new LinkedBlockingQueue>(); + Queue> queue = new ArrayDeque>(); CommandHandler handler = new CommandHandler(clientOptions, queue); RedisAsyncConnectionImpl connection = newRedisAsyncConnectionImpl(handler, codec, timeout, unit); @@ -366,8 +368,7 @@ public RedisPubSubConnection connectPubSub(RedisCodec codec) protected RedisPubSubConnection connectPubSub(RedisCodec codec, RedisURI redisURI) { checkArgument(codec != null, "RedisCodec must not be null"); - BlockingQueue> queue = new LinkedBlockingQueue>(); - + Queue> queue = new ArrayDeque>(); PubSubCommandHandler handler = new PubSubCommandHandler(clientOptions, queue, codec); RedisPubSubConnectionImpl connection = newRedisPubSubConnectionImpl(handler, codec, timeout, unit); @@ -411,7 +412,7 @@ public RedisSentinelAsyncConnection connectSentinelAsync(RedisUR } private RedisSentinelAsyncConnection connectSentinelAsyncImpl(RedisCodec codec, RedisURI redisURI) { - BlockingQueue> queue = new LinkedBlockingQueue>(); + Queue> queue = new ArrayDeque>(); ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder(); connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions())); diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index 01fe47a0b1..08f10be91c 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -4,10 +4,10 @@ import java.io.Closeable; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.Collections; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.Queue; import java.util.concurrent.TimeUnit; import com.google.common.base.Supplier; @@ -128,7 +128,7 @@ protected RedisAsyncConnectionImpl connectAsyncImpl(SocketAddres RedisAsyncConnectionImpl connectAsyncImpl(RedisCodec codec, final SocketAddress socketAddress) { logger.debug("connectAsyncImpl(" + socketAddress + ")"); - BlockingQueue> queue = new LinkedBlockingQueue>(); + Queue> queue = new ArrayDeque>(); CommandHandler handler = new CommandHandler(clientOptions, queue); RedisAsyncConnectionImpl connection = newRedisAsyncConnectionImpl(handler, codec, timeout, unit); @@ -166,7 +166,7 @@ RedisAdvancedClusterAsyncConnectionImpl connectClusterAsyncImpl(Red } logger.debug("connectCluster(" + socketAddressSupplier.get() + ")"); - BlockingQueue> queue = new LinkedBlockingQueue>(); + Queue> queue = new ArrayDeque>(); CommandHandler handler = new CommandHandler(clientOptions, queue); @@ -221,7 +221,15 @@ protected void initializePartitions() { this.partitions = loadedPartitions; } - protected Partitions getPartitions() { + /** + * Retrieve the cluster view. Partitions are shared amongst all connections opened by this client instance. + * + * @return the partitions. + */ + public Partitions getPartitions() { + if (partitions == null) { + initializePartitions(); + } return partitions; } diff --git a/src/main/java/com/lambdaworks/redis/cluster/SlotHash.java b/src/main/java/com/lambdaworks/redis/cluster/SlotHash.java index 54c263b65e..5c143ff253 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/SlotHash.java +++ b/src/main/java/com/lambdaworks/redis/cluster/SlotHash.java @@ -57,7 +57,7 @@ public static final int getSlot(byte[] key) { System.arraycopy(key, start + 1, finalKey, 0, finalKey.length); } } - return CRC16.crc16(finalKey) % 16384; + return CRC16.crc16(finalKey) % SLOT_COUNT; } private static int indexOf(byte[] haystack, byte needle) { diff --git a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/ClusterPartitionParser.java b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/ClusterPartitionParser.java index 412868695d..4371ff5c26 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/ClusterPartitionParser.java +++ b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/ClusterPartitionParser.java @@ -12,7 +12,7 @@ import com.lambdaworks.redis.RedisURI; /** - * Parser for node information output (CLUSTER NODES). + * Parser for node information output of {@code CLUSTER NODES} and {@code CLUSTER SLAVES}. * * @author Mark Paluch * @since 3.0 diff --git a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/Partitions.java b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/Partitions.java index 2e74fd7e75..f150c3e650 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/Partitions.java +++ b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/Partitions.java @@ -1,9 +1,6 @@ package com.lambdaworks.redis.cluster.models.partitions; -import java.util.AbstractCollection; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; +import java.util.*; import com.google.common.collect.Lists; import com.lambdaworks.redis.cluster.SlotHash; @@ -32,6 +29,8 @@ public RedisClusterNode getPartitionBySlot(int slot) { public void updateCache() { if (slotCache == null) { slotCache = new RedisClusterNode[SlotHash.SLOT_COUNT]; + } else { + Arrays.fill(slotCache, null); } for (RedisClusterNode partition : partitions) { diff --git a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java index 4a5b901dc4..9a4d52d9f2 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java +++ b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java @@ -1,5 +1,7 @@ package com.lambdaworks.redis.cluster.models.partitions; +import static com.google.common.base.Preconditions.checkArgument; + import java.io.Serializable; import java.util.List; import java.util.Set; @@ -48,6 +50,7 @@ public RedisURI getUri() { } public void setUri(RedisURI uri) { + checkArgument(uri != null, "uri must not be null"); this.uri = uri; } @@ -56,6 +59,7 @@ public String getNodeId() { } public void setNodeId(String nodeId) { + checkArgument(nodeId != null, "nodeId must not be null"); this.nodeId = nodeId; } @@ -104,6 +108,8 @@ public List getSlots() { } public void setSlots(List slots) { + checkArgument(slots != null, "slots must not be null"); + this.slots = slots; } @@ -126,9 +132,6 @@ public boolean equals(Object o) { RedisClusterNode that = (RedisClusterNode) o; - if (uri != null ? !uri.equals(that.uri) : that.uri != null) { - return false; - } if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) { return false; } @@ -138,8 +141,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - int result = uri != null ? uri.hashCode() : 0; - result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); + int result = 31 * (nodeId != null ? nodeId.hashCode() : 0); return result; } diff --git a/src/main/java/com/lambdaworks/redis/pubsub/PubSubCommandHandler.java b/src/main/java/com/lambdaworks/redis/pubsub/PubSubCommandHandler.java index ea4a8e2126..4444692a41 100644 --- a/src/main/java/com/lambdaworks/redis/pubsub/PubSubCommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/pubsub/PubSubCommandHandler.java @@ -2,7 +2,7 @@ package com.lambdaworks.redis.pubsub; -import java.util.concurrent.BlockingQueue; +import java.util.Queue; import com.lambdaworks.redis.ClientOptions; import com.lambdaworks.redis.codec.RedisCodec; @@ -33,7 +33,7 @@ public class PubSubCommandHandler extends CommandHandler { * @param queue Command queue. * @param codec Codec. */ - public PubSubCommandHandler(ClientOptions clientOptions, BlockingQueue> queue, RedisCodec codec) { + public PubSubCommandHandler(ClientOptions clientOptions, Queue> queue, RedisCodec codec) { super(clientOptions, queue); this.codec = codec; this.output = new PubSubOutput(codec);