Skip to content

Commit

Permalink
feat: support jedis cluster pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Mar 21, 2024
1 parent 94c5a19 commit 945818d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.alicp.jetcache.external.AbstractExternalCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.ClusterPipeline;
import redis.clients.jedis.Connection;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
Expand All @@ -20,6 +21,7 @@
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.commands.KeyBinaryCommands;
import redis.clients.jedis.commands.StringBinaryCommands;
import redis.clients.jedis.commands.StringPipelineBinaryCommands;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.util.Pool;

Expand Down Expand Up @@ -280,32 +282,32 @@ protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireA
try {
commands = (StringBinaryCommands) writeCommands();
int failCount = 0;
if(commands instanceof Jedis || commands instanceof JedisPooled) {
List<Response<String>> responses = new ArrayList<>();
Pipeline pipeline = null;
if(commands instanceof JedisPooled) {
connection = ((JedisPooled) commands).getPool().getResource();
pipeline = new Pipeline(connection);
} else {
pipeline = new Pipeline((Jedis) commands);
}
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
Response<String> resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
responses.add(resp);
}
pipeline.sync();
for (Response<String> resp : responses) {
if (!"OK".equals(resp.get())) {
failCount++;
}
}
List<Response<String>> responses = new ArrayList<>();
StringPipelineBinaryCommands pipeline;
if (commands instanceof JedisPooled) {
connection = ((JedisPooled) commands).getPool().getResource();
pipeline = new Pipeline(connection);
} else if (commands instanceof JedisCluster) {
JedisCluster cluster = (JedisCluster) commands;
pipeline = cluster.pipelined();
} else {
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheResult r = do_PUT(en.getKey(), en.getValue(), expireAfterWrite, timeUnit);
if (!r.isSuccess()) {
failCount++;
}
pipeline = new Pipeline((Jedis) commands);
}
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
Response<String> resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
responses.add(resp);
}
if (pipeline instanceof Pipeline) {
((Pipeline) pipeline).sync();
} else if (pipeline instanceof ClusterPipeline) {
((ClusterPipeline) pipeline).sync();
} else {
throw new UnsupportedOperationException("unrecognized pipeline type");
}
for (Response<String> resp : responses) {
if (!"OK".equals(resp.get())) {
failCount++;
}
}
return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@
<spring.boot.version>3.1.3</spring.boot.version>
<lettuce.version>6.2.6.RELEASE</lettuce.version>

<jedis.version>4.3.2</jedis.version>
<jedis.version>4.4.0</jedis.version>
<redisson.version>3.18.0</redisson.version>

<!--
Expand Down

0 comments on commit 945818d

Please sign in to comment.