diff --git a/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/RedisAutoConfiguration.java b/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/RedisAutoConfiguration.java index 1321a0b6..6abbd055 100644 --- a/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/RedisAutoConfiguration.java +++ b/jetcache-starter/jetcache-autoconfigure/src/main/java/com/alicp/jetcache/autoconfigure/RedisAutoConfiguration.java @@ -2,6 +2,7 @@ import com.alicp.jetcache.CacheBuilder; import com.alicp.jetcache.CacheConfigException; +import com.alicp.jetcache.redis.JedisClusterWrapper; import com.alicp.jetcache.redis.RedisCacheBuilder; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.beans.PropertyValues; @@ -13,7 +14,6 @@ import org.springframework.util.ClassUtils; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisSentinelPool; import redis.clients.jedis.Protocol; @@ -137,7 +137,8 @@ private Object parsePool(ConfigTree ct) { .map(uri -> uri.toString().split(":")) .map(hostAndPort -> new HostAndPort(hostAndPort[0], Integer.parseInt(hostAndPort[1]))) .collect(Collectors.toSet()); - return new JedisCluster(hostAndPortSet, connectionTimeout, soTimeout, maxAttempt, user, password, + // we use our wrapper here to support pipeline + return new JedisClusterWrapper(hostAndPortSet, connectionTimeout, soTimeout, maxAttempt, user, password, clientName, poolConfig, ssl); } } else { diff --git a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/JedisClusterWrapper.java b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/JedisClusterWrapper.java new file mode 100644 index 00000000..dbf316ad --- /dev/null +++ b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/JedisClusterWrapper.java @@ -0,0 +1,29 @@ +package com.alicp.jetcache.redis; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.ClusterPipeline; +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.providers.ClusterConnectionProvider; + +import java.util.Set; + +/** + * Support for pipeline in JedisCluster. + */ +public class JedisClusterWrapper extends JedisCluster { + + private final ClusterConnectionProvider provider; + + public JedisClusterWrapper(Set hostAndPortSet, int connectionTimeout, int soTimeout, int maxAttempt, String user, String password, String clientName, GenericObjectPoolConfig poolConfig, boolean ssl) { + super(hostAndPortSet, connectionTimeout, soTimeout, maxAttempt, user, password, clientName, poolConfig, ssl); + DefaultJedisClientConfig jedisClientConfig = DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout) + .socketTimeoutMillis(soTimeout).user(user).password(password).clientName(clientName).ssl(ssl).build(); + this.provider = new ClusterConnectionProvider(hostAndPortSet, jedisClientConfig, poolConfig); + } + + public ClusterPipeline getPipeline() { + return new ClusterPipeline(provider); + } +} diff --git a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java index 84e8fcda..4e34cabb 100644 --- a/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java +++ b/jetcache-support/jetcache-redis/src/main/java/com/alicp/jetcache/redis/RedisCache.java @@ -145,7 +145,7 @@ static int randomIndex(int[] weights) { int x = 0; for (int i = 0; i < weights.length; i++) { x += weights[i]; - if(r < x){ + if (r < x) { return i; } } @@ -287,11 +287,13 @@ protected CacheResult do_PUT_ALL(Map map, long expireA if (commands instanceof JedisPooled) { connection = ((JedisPooled) commands).getPool().getResource(); pipeline = new Pipeline(connection); - } else if (commands instanceof JedisCluster) { - JedisCluster cluster = (JedisCluster) commands; - pipeline = cluster.pipelined(); - } else { + } else if (commands instanceof JedisClusterWrapper) { + JedisClusterWrapper cluster = (JedisClusterWrapper) commands; + pipeline = cluster.getPipeline(); + } else if (commands instanceof Jedis) { pipeline = new Pipeline((Jedis) commands); + } else { + throw new IllegalStateException("Jedis type can not be unWrapper JedisCluster"); } for (Map.Entry en : map.entrySet()) { CacheValueHolder holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite)); diff --git a/pom.xml b/pom.xml index f659dd98..b36eba86 100644 --- a/pom.xml +++ b/pom.xml @@ -204,7 +204,7 @@ 3.1.3 6.2.6.RELEASE - 4.4.0 + 4.3.2 3.18.0