-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Queues max worker concurrency #177
Conversation
@@ -1132,12 +1132,13 @@ def start_queued_workflows(self, queue: "Queue", executor_id: str) -> List[str]: | |||
|
|||
# Select not-yet-completed functions in the queue ordered by the | |||
# time at which they were enqueued. | |||
# If there is a concurrency limit N, select only the N most recent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most recent are the tail of the queue, not its head. (We dequeue the oldest, i.e., the head.)
…-- can we split the logic instead of doing this try/catch?
9432f85
to
ceb40a6
Compare
.where( | ||
# Only select functions that have not been started yet or have been started by this worker | ||
or_( | ||
SystemSchema.workflow_queue.c.executor_id == None, | ||
SystemSchema.workflow_queue.c.executor_id == executor_id, | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could remove the completed_at_epoch_ms == None
filter, redundant with the necessary executor_id == None
, but I think its good for clarity and shouldn't affect performance.
- Implement worker concurrency limits for workflow queues. Same logic as in dbos-inc/dbos-transact-py#177 - `worker_concurrency` is only available in a new constructor using an object as input -- much easier to use as we add arguments to the class. Sanity check on the distributed test: data:image/s3,"s3://crabby-images/b2ae4/b2ae46a480858195385015e684eaf876a6d62740" alt="Screenshot 2025-01-23 at 18 03 15"
This PR allows users to control the maximum numbers of tasks, in a queue, a single DBOS Transact instance can execute concurrently. This knob is exposed by the new
worker_concurrency
Queue initialization parameter.This is implemented by modifying the queue DB query to only retrieve uncompleted tasks eligible for this worker and limiting the query to either
worker_concurrency
orconcurrency
, with this priority, if they are set.