Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly reports misconfigurations of FIFO / prioritized queues #2047

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions src/main/java/sirius/biz/cluster/work/DistributedTasks.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,17 @@ public class DistributedTasks implements MetricProvider {

private static final String KEY_EXECUTOR = "_executor";
private static final String KEY_PENALTY_TOKEN = "_penalty_token";
private static final String CONFIG_KEY_PRIORITIZED = "prioritized";
private static final String CONFIG_KEY_PENALTY_TIME = "penaltyTime";
private static final String CONFIG_KEY_CONCURRENCY_TOKEN = "concurrencyToken";
private static final String CONFIG_BLOCK_ASYNC_DISTRIBUTED_QUEUES = "async.distributed.queues";

/**
* Contains the logger used to report everything related to distributed tasks.
*/
public static final Log LOG = Log.get("distributed-tasks");


@Part
private NeighborhoodWatch orchestration;

Expand Down Expand Up @@ -230,17 +235,25 @@ public List<DistributedQueueInfo> getQueues() {
* @return the configuration for the given queue
*/
private DistributedQueueInfo loadQueueInfo(String queueName) {
Extension config = Sirius.getSettings().getExtension("async.distributed.queues", queueName);
Extension config = Sirius.getSettings().getExtension(CONFIG_BLOCK_ASYNC_DISTRIBUTED_QUEUES, queueName);
if (config == null || config.isDefault()) {
LOG.WARN("Missing configuration for queue: %s", queueName);
return new DistributedQueueInfo(queueName, null, null);
}

return new DistributedQueueInfo(queueName,
config.get("concurrencyToken").asString(),
config.get("prioritized").asBoolean() ?
Duration.ofMillis(config.getMilliseconds("penaltyTime")) :
null);
Duration penaltyTime = null;
if (config.get(CONFIG_KEY_PRIORITIZED).asBoolean()) {
long penaltyTimeMillis = config.getMilliseconds(CONFIG_KEY_PENALTY_TIME);
if (penaltyTimeMillis == 0) {
LOG.WARN("A prioritized queue (%s) needs a penaltyTime!", queueName);
} else {
penaltyTime = Duration.ofMillis(penaltyTimeMillis);
}
} else if (config.getMilliseconds(CONFIG_KEY_PENALTY_TIME) > 0) {
LOG.WARN("A FIFO queue (%s) must not have a penaltyTime!", queueName);
}

return new DistributedQueueInfo(queueName, config.get(CONFIG_KEY_CONCURRENCY_TOKEN).asString(), penaltyTime);
}

/**
Expand Down Expand Up @@ -268,7 +281,7 @@ public void submitFIFOTask(Class<? extends DistributedTaskExecutor> executor, Ob
getFifo(queueName).offer(data);
}

protected FifoQueue getFifo(String queueName) {
private FifoQueue getFifo(String queueName) {
return fifos.computeIfAbsent(queueName, this::createFifo);
}

Expand Down Expand Up @@ -377,7 +390,7 @@ public void submitPrioritizedTask(Class<? extends DistributedTaskExecutor> execu
getPrioritizedQueue(queueName).offer(priority, data);
}

protected PrioritizedQueue getPrioritizedQueue(String queueName) {
private PrioritizedQueue getPrioritizedQueue(String queueName) {
return prioritizedQueues.computeIfAbsent(queueName, this::createPrioritizedQueue);
}

Expand Down