Skip to content

Commit

Permalink
Fix Queues (#171)
Browse files Browse the repository at this point in the history
Fix an issue where the executor ID of a dequeued workflow was that of
the enqueuing executor, not the dequeuing executor.
  • Loading branch information
kraftp authored Jan 8, 2025
1 parent 6167422 commit 14e7ec1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
5 changes: 3 additions & 2 deletions dbos/_dbos.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
start_workflow,
workflow_wrapper,
)
from ._queue import Queue, _queue_thread
from ._queue import Queue, queue_thread
from ._recovery import recover_pending_workflows, startup_recovery_thread
from ._registrations import (
DEFAULT_MAX_RECOVERY_ATTEMPTS,
Expand Down Expand Up @@ -283,6 +283,7 @@ def __init__(
self.flask: Optional["Flask"] = flask
self._executor_field: Optional[ThreadPoolExecutor] = None
self._background_threads: List[threading.Thread] = []
self._executor_id: str = os.environ.get("DBOS__VMID", "local")

# If using FastAPI, set up middleware and lifecycle events
if self.fastapi is not None:
Expand Down Expand Up @@ -383,7 +384,7 @@ def _launch(self) -> None:
evt = threading.Event()
self.stop_events.append(evt)
bg_queue_thread = threading.Thread(
target=_queue_thread, args=(evt, self), daemon=True
target=queue_thread, args=(evt, self), daemon=True
)
bg_queue_thread.start()
self._background_threads.append(bg_queue_thread)
Expand Down
4 changes: 2 additions & 2 deletions dbos/_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ def enqueue(
return start_workflow(dbos, func, self.name, False, *args, **kwargs)


def _queue_thread(stop_event: threading.Event, dbos: "DBOS") -> None:
def queue_thread(stop_event: threading.Event, dbos: "DBOS") -> None:
while not stop_event.is_set():
if stop_event.wait(timeout=1):
return
for _, queue in dbos._registry.queue_info_map.items():
try:
wf_ids = dbos._sys_db.start_queued_workflows(queue)
wf_ids = dbos._sys_db.start_queued_workflows(queue, dbos._executor_id)
for id in wf_ids:
execute_workflow_by_id(dbos, id)
except Exception:
Expand Down
9 changes: 6 additions & 3 deletions dbos/_sys_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ def enqueue(self, workflow_id: str, queue_name: str) -> None:
.on_conflict_do_nothing()
)

def start_queued_workflows(self, queue: "Queue") -> List[str]:
def start_queued_workflows(self, queue: "Queue", executor_id: str) -> List[str]:
start_time_ms = int(time.time() * 1000)
if queue.limiter is not None:
limiter_period_ms = int(queue.limiter["period"] * 1000)
Expand Down Expand Up @@ -1159,15 +1159,18 @@ def start_queued_workflows(self, queue: "Queue") -> List[str]:
if len(ret_ids) + num_recent_queries >= queue.limiter["limit"]:
break

# To start a function, first set its status to PENDING
# To start a function, first set its status to PENDING and update its executor ID
c.execute(
SystemSchema.workflow_status.update()
.where(SystemSchema.workflow_status.c.workflow_uuid == id)
.where(
SystemSchema.workflow_status.c.status
== WorkflowStatusString.ENQUEUED.value
)
.values(status=WorkflowStatusString.PENDING.value)
.values(
status=WorkflowStatusString.PENDING.value,
executor_id=executor_id,
)
)

# Then give it a start time
Expand Down

0 comments on commit 14e7ec1

Please sign in to comment.