diff --git a/src/main/java/sirius/biz/cluster/work/DistributedTasks.java b/src/main/java/sirius/biz/cluster/work/DistributedTasks.java index dfc0ef00d..d841140e2 100644 --- a/src/main/java/sirius/biz/cluster/work/DistributedTasks.java +++ b/src/main/java/sirius/biz/cluster/work/DistributedTasks.java @@ -69,12 +69,17 @@ public class DistributedTasks implements MetricProvider { private static final String KEY_EXECUTOR = "_executor"; private static final String KEY_PENALTY_TOKEN = "_penalty_token"; + private static final String CONFIG_KEY_PRIORITIZED = "prioritized"; + private static final String CONFIG_KEY_PENALTY_TIME = "penaltyTime"; + private static final String CONFIG_KEY_CONCURRENCY_TOKEN = "concurrencyToken"; + private static final String CONFIG_BLOCK_ASYNC_DISTRIBUTED_QUEUES = "async.distributed.queues"; /** * Contains the logger used to report everything related to distributed tasks. */ public static final Log LOG = Log.get("distributed-tasks"); + @Part private NeighborhoodWatch orchestration; @@ -230,17 +235,25 @@ public List getQueues() { * @return the configuration for the given queue */ private DistributedQueueInfo loadQueueInfo(String queueName) { - Extension config = Sirius.getSettings().getExtension("async.distributed.queues", queueName); + Extension config = Sirius.getSettings().getExtension(CONFIG_BLOCK_ASYNC_DISTRIBUTED_QUEUES, queueName); if (config == null || config.isDefault()) { LOG.WARN("Missing configuration for queue: %s", queueName); return new DistributedQueueInfo(queueName, null, null); } - return new DistributedQueueInfo(queueName, - config.get("concurrencyToken").asString(), - config.get("prioritized").asBoolean() ? - Duration.ofMillis(config.getMilliseconds("penaltyTime")) : - null); + Duration penaltyTime = null; + if (config.get(CONFIG_KEY_PRIORITIZED).asBoolean()) { + long penaltyTimeMillis = config.getMilliseconds(CONFIG_KEY_PENALTY_TIME); + if (penaltyTimeMillis == 0) { + LOG.WARN("A prioritized queue (%s) needs a penaltyTime!", queueName); + } else { + penaltyTime = Duration.ofMillis(penaltyTimeMillis); + } + } else if (config.getMilliseconds(CONFIG_KEY_PENALTY_TIME) > 0) { + LOG.WARN("A FIFO queue (%s) must not have a penaltyTime!", queueName); + } + + return new DistributedQueueInfo(queueName, config.get(CONFIG_KEY_CONCURRENCY_TOKEN).asString(), penaltyTime); } /** @@ -268,7 +281,7 @@ public void submitFIFOTask(Class executor, Ob getFifo(queueName).offer(data); } - protected FifoQueue getFifo(String queueName) { + private FifoQueue getFifo(String queueName) { return fifos.computeIfAbsent(queueName, this::createFifo); } @@ -377,7 +390,7 @@ public void submitPrioritizedTask(Class execu getPrioritizedQueue(queueName).offer(priority, data); } - protected PrioritizedQueue getPrioritizedQueue(String queueName) { + private PrioritizedQueue getPrioritizedQueue(String queueName) { return prioritizedQueues.computeIfAbsent(queueName, this::createPrioritizedQueue); }