Skip to content

Commit

Permalink
Merge pull request #2047 from scireum/feature/aha/better-report-confi…
Browse files Browse the repository at this point in the history
…g-errors

Properly reports misconfigurations of FIFO / prioritized queues
  • Loading branch information
andyHa authored Nov 25, 2024
2 parents 3022b26 + 9cdafda commit 752ce97
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions src/main/java/sirius/biz/cluster/work/DistributedTasks.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,17 @@ public class DistributedTasks implements MetricProvider {

private static final String KEY_EXECUTOR = "_executor";
private static final String KEY_PENALTY_TOKEN = "_penalty_token";
private static final String CONFIG_KEY_PRIORITIZED = "prioritized";
private static final String CONFIG_KEY_PENALTY_TIME = "penaltyTime";
private static final String CONFIG_KEY_CONCURRENCY_TOKEN = "concurrencyToken";
private static final String CONFIG_BLOCK_ASYNC_DISTRIBUTED_QUEUES = "async.distributed.queues";

/**
* Contains the logger used to report everything related to distributed tasks.
*/
public static final Log LOG = Log.get("distributed-tasks");


@Part
private NeighborhoodWatch orchestration;

Expand Down Expand Up @@ -230,17 +235,25 @@ public List<DistributedQueueInfo> getQueues() {
* @return the configuration for the given queue
*/
private DistributedQueueInfo loadQueueInfo(String queueName) {
Extension config = Sirius.getSettings().getExtension("async.distributed.queues", queueName);
Extension config = Sirius.getSettings().getExtension(CONFIG_BLOCK_ASYNC_DISTRIBUTED_QUEUES, queueName);
if (config == null || config.isDefault()) {
LOG.WARN("Missing configuration for queue: %s", queueName);
return new DistributedQueueInfo(queueName, null, null);
}

return new DistributedQueueInfo(queueName,
config.get("concurrencyToken").asString(),
config.get("prioritized").asBoolean() ?
Duration.ofMillis(config.getMilliseconds("penaltyTime")) :
null);
Duration penaltyTime = null;
if (config.get(CONFIG_KEY_PRIORITIZED).asBoolean()) {
long penaltyTimeMillis = config.getMilliseconds(CONFIG_KEY_PENALTY_TIME);
if (penaltyTimeMillis == 0) {
LOG.WARN("A prioritized queue (%s) needs a penaltyTime!", queueName);
} else {
penaltyTime = Duration.ofMillis(penaltyTimeMillis);
}
} else if (config.getMilliseconds(CONFIG_KEY_PENALTY_TIME) > 0) {
LOG.WARN("A FIFO queue (%s) must not have a penaltyTime!", queueName);
}

return new DistributedQueueInfo(queueName, config.get(CONFIG_KEY_CONCURRENCY_TOKEN).asString(), penaltyTime);
}

/**
Expand Down Expand Up @@ -268,7 +281,7 @@ public void submitFIFOTask(Class<? extends DistributedTaskExecutor> executor, Ob
getFifo(queueName).offer(data);
}

protected FifoQueue getFifo(String queueName) {
private FifoQueue getFifo(String queueName) {
return fifos.computeIfAbsent(queueName, this::createFifo);
}

Expand Down Expand Up @@ -377,7 +390,7 @@ public void submitPrioritizedTask(Class<? extends DistributedTaskExecutor> execu
getPrioritizedQueue(queueName).offer(priority, data);
}

protected PrioritizedQueue getPrioritizedQueue(String queueName) {
private PrioritizedQueue getPrioritizedQueue(String queueName) {
return prioritizedQueues.computeIfAbsent(queueName, this::createPrioritizedQueue);
}

Expand Down

0 comments on commit 752ce97

Please sign in to comment.