-
Notifications
You must be signed in to change notification settings - Fork 56
Installing & Using
dyno-queues are built on top of Dynomite so a cluster needs to be installed / provisioned. In a distributed environment, we rely on the DC_QUORUM_SAFE consistency offered by dynomite to ensure the writes are done ith full DC quorum.
- Dyno client version 1.4.7
- JDK 1.8+
Queues are sharded based on the availability zone. When pushing an element to the queue, the shard is determined based on round robin. This will ensure eventually all the shards are balanced. When configuring the queues, ShardSupplier determines the number of available shards and current shard.
See DynoShardSupplier and RedisQueues for a reference implementation that uses Dynomite host supplier to determine the shards.
Messages from the queue are polled only from the current shard, so it is required that you have at-least 1 server serving the queue per availability zone.
https://netflix.github.io/dyno-queues/javadoc/com/netflix/dyno/queues/DynoQueue.html
//Imports
import com.netflix.dyno.queues.redis.DynoJedisClient;
//Initialize the connection to dynomite
DynoJedisClient dynoClient = new DynoJedisClient.Builder().withApplicationName(appName).withDynomiteClusterName(cluster).withDiscoveryClient(dc).build();
String region = "us-east-1"; //System.getProperty(" EC2_REGION")
String localDC = "us-east-1d"; //System.getProperty(" EC2_AVAILABILITY_ZONE")
localDC = localDC.replaceAll(region, "");
DynoShardSupplier ss = new DynoShardSupplier(dynoClient.getConnPool().getConfiguration().getHostSupplier(), region, localDC);
//Create the RedisQueues
RedisQueues queues = new RedisQueues(dynoClient, dynoClient, prefix, ss, 60_000, 60_000, dynoThreadCount);
//Get the queue instance
String queueName = "my_test_queue";
DynoQueue queue = queues.get(queueName);
//Use the queue
Message msg = new Message("id1", "message payload");
queue.push(Arrays.asList(msg));
//poll for the message
int count = 10;
//queue.pop is supports long polling, which essentially waits for until a given time (1 second in the below example) if there are no messages in queue
//If there are messages in the queue already, the call will return "immediately"
List<Message> polled = queue.pop(count, 1, TimeUnit.SECONDS);