Skip to content

Commit

Permalink
Polishing #90
Browse files Browse the repository at this point in the history
- Update docs
- Use ArrayDeq as queue instead of LinkedBlockingQueue
- Improve partition cache handling when updating
  • Loading branch information
mp911de committed Jul 1, 2015
1 parent 59a5860 commit f7dbbb0
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 22 deletions.
9 changes: 5 additions & 4 deletions src/main/java/com/lambdaworks/redis/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;

import com.google.common.base.Supplier;
Expand Down Expand Up @@ -294,7 +296,7 @@ public RedisAsyncConnection<String, String> connectAsync(RedisURI redisURI) {
}

private <K, V> RedisAsyncConnectionImpl<K, V> connectAsync(RedisCodec<K, V> codec, RedisURI redisURI) {
BlockingQueue<RedisCommand<K, V, ?>> queue = new LinkedBlockingQueue<RedisCommand<K, V, ?>>();
Queue<RedisCommand<K, V, ?>> queue = new ArrayDeque<RedisCommand<K, V, ?>>();

CommandHandler<K, V> handler = new CommandHandler<K, V>(clientOptions, queue);
RedisAsyncConnectionImpl<K, V> connection = newRedisAsyncConnectionImpl(handler, codec, timeout, unit);
Expand Down Expand Up @@ -366,8 +368,7 @@ public <K, V> RedisPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> codec)
protected <K, V> RedisPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> codec, RedisURI redisURI) {

checkArgument(codec != null, "RedisCodec must not be null");
BlockingQueue<RedisCommand<K, V, ?>> queue = new LinkedBlockingQueue<RedisCommand<K, V, ?>>();

Queue<RedisCommand<K, V, ?>> queue = new ArrayDeque<RedisCommand<K, V, ?>>();
PubSubCommandHandler<K, V> handler = new PubSubCommandHandler<K, V>(clientOptions, queue, codec);
RedisPubSubConnectionImpl<K, V> connection = newRedisPubSubConnectionImpl(handler, codec, timeout, unit);

Expand Down Expand Up @@ -411,7 +412,7 @@ public RedisSentinelAsyncConnection<String, String> connectSentinelAsync(RedisUR
}

private <K, V> RedisSentinelAsyncConnection<K, V> connectSentinelAsyncImpl(RedisCodec<K, V> codec, RedisURI redisURI) {
BlockingQueue<RedisCommand<K, V, ?>> queue = new LinkedBlockingQueue<RedisCommand<K, V, ?>>();
Queue<RedisCommand<K, V, ?>> queue = new ArrayDeque<RedisCommand<K, V, ?>>();

ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder();
connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

import java.io.Closeable;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Supplier;
Expand Down Expand Up @@ -128,7 +128,7 @@ protected RedisAsyncConnectionImpl<String, String> connectAsyncImpl(SocketAddres
<K, V> RedisAsyncConnectionImpl<K, V> connectAsyncImpl(RedisCodec<K, V> codec, final SocketAddress socketAddress) {

logger.debug("connectAsyncImpl(" + socketAddress + ")");
BlockingQueue<RedisCommand<K, V, ?>> queue = new LinkedBlockingQueue<RedisCommand<K, V, ?>>();
Queue<RedisCommand<K, V, ?>> queue = new ArrayDeque<RedisCommand<K, V, ?>>();

CommandHandler<K, V> handler = new CommandHandler<K, V>(clientOptions, queue);
RedisAsyncConnectionImpl<K, V> connection = newRedisAsyncConnectionImpl(handler, codec, timeout, unit);
Expand Down Expand Up @@ -166,7 +166,7 @@ <K, V> RedisAdvancedClusterAsyncConnectionImpl<K, V> connectClusterAsyncImpl(Red
}

logger.debug("connectCluster(" + socketAddressSupplier.get() + ")");
BlockingQueue<RedisCommand<K, V, ?>> queue = new LinkedBlockingQueue<RedisCommand<K, V, ?>>();
Queue<RedisCommand<K, V, ?>> queue = new ArrayDeque<RedisCommand<K, V, ?>>();

CommandHandler<K, V> handler = new CommandHandler<K, V>(clientOptions, queue);

Expand Down Expand Up @@ -221,7 +221,15 @@ protected void initializePartitions() {
this.partitions = loadedPartitions;
}

protected Partitions getPartitions() {
/**
* Retrieve the cluster view. Partitions are shared amongst all connections opened by this client instance.
*
* @return the partitions.
*/
public Partitions getPartitions() {
if (partitions == null) {
initializePartitions();
}
return partitions;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/lambdaworks/redis/cluster/SlotHash.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static final int getSlot(byte[] key) {
System.arraycopy(key, start + 1, finalKey, 0, finalKey.length);
}
}
return CRC16.crc16(finalKey) % 16384;
return CRC16.crc16(finalKey) % SLOT_COUNT;
}

private static int indexOf(byte[] haystack, byte needle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.lambdaworks.redis.RedisURI;

/**
* Parser for node information output (CLUSTER NODES).
* Parser for node information output of {@code CLUSTER NODES} and {@code CLUSTER SLAVES}.
*
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
* @since 3.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.lambdaworks.redis.cluster.models.partitions;

import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.*;

import com.google.common.collect.Lists;
import com.lambdaworks.redis.cluster.SlotHash;
Expand Down Expand Up @@ -32,6 +29,8 @@ public RedisClusterNode getPartitionBySlot(int slot) {
public void updateCache() {
if (slotCache == null) {
slotCache = new RedisClusterNode[SlotHash.SLOT_COUNT];
} else {
Arrays.fill(slotCache, null);
}

for (RedisClusterNode partition : partitions) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.lambdaworks.redis.cluster.models.partitions;

import static com.google.common.base.Preconditions.checkArgument;

import java.io.Serializable;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -48,6 +50,7 @@ public RedisURI getUri() {
}

public void setUri(RedisURI uri) {
checkArgument(uri != null, "uri must not be null");
this.uri = uri;
}

Expand All @@ -56,6 +59,7 @@ public String getNodeId() {
}

public void setNodeId(String nodeId) {
checkArgument(nodeId != null, "nodeId must not be null");
this.nodeId = nodeId;
}

Expand Down Expand Up @@ -104,6 +108,8 @@ public List<Integer> getSlots() {
}

public void setSlots(List<Integer> slots) {
checkArgument(slots != null, "slots must not be null");

this.slots = slots;
}

Expand All @@ -126,9 +132,6 @@ public boolean equals(Object o) {

RedisClusterNode that = (RedisClusterNode) o;

if (uri != null ? !uri.equals(that.uri) : that.uri != null) {
return false;
}
if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) {
return false;
}
Expand All @@ -138,8 +141,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
int result = uri != null ? uri.hashCode() : 0;
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
int result = 31 * (nodeId != null ? nodeId.hashCode() : 0);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

package com.lambdaworks.redis.pubsub;

import java.util.concurrent.BlockingQueue;
import java.util.Queue;

import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.codec.RedisCodec;
Expand Down Expand Up @@ -33,7 +33,7 @@ public class PubSubCommandHandler<K, V> extends CommandHandler<K, V> {
* @param queue Command queue.
* @param codec Codec.
*/
public PubSubCommandHandler(ClientOptions clientOptions, BlockingQueue<RedisCommand<K, V, ?>> queue, RedisCodec<K, V> codec) {
public PubSubCommandHandler(ClientOptions clientOptions, Queue<RedisCommand<K, V, ?>> queue, RedisCodec<K, V> codec) {
super(clientOptions, queue);
this.codec = codec;
this.output = new PubSubOutput<K, V, V>(codec);
Expand Down

0 comments on commit f7dbbb0

Please sign in to comment.