Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove RETRIES_EXCEEDED Workflows From Queues Table #158

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dbos/_sys_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ def update_workflow_status(
recovery_attempts: int = row[0]
if recovery_attempts > max_recovery_attempts:
with self.engine.begin() as c:
c.execute(
sa.delete(SystemSchema.workflow_queue).where(
SystemSchema.workflow_queue.c.workflow_uuid
== status["workflow_uuid"]
)
)
c.execute(
sa.update(SystemSchema.workflow_status)
.where(
Expand All @@ -311,6 +317,7 @@ def update_workflow_status(
)
.values(
status=WorkflowStatusString.RETRIES_EXCEEDED.value,
queue_name=None,
)
)
raise DBOSDeadLetterQueueError(
Expand Down
45 changes: 43 additions & 2 deletions tests/test_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from sqlalchemy.exc import OperationalError

# Public API
from dbos import DBOS, GetWorkflowsInput, SetWorkflowID
from dbos._error import DBOSDeadLetterQueueError, DBOSErrorCode, DBOSException
from dbos import DBOS, GetWorkflowsInput, Queue, SetWorkflowID
from dbos._error import DBOSDeadLetterQueueError
from dbos._sys_db import WorkflowStatusString


Expand Down Expand Up @@ -150,3 +150,44 @@ def dead_letter_workflow() -> None:
assert handle.get_result() == None
dbos._sys_db.wait_for_buffer_flush()
assert handle.get_status().status == WorkflowStatusString.SUCCESS.value


def test_enqueued_dead_letter_queue(dbos: DBOS) -> None:
function_started_event = threading.Event()
event = threading.Event()
max_concurrency = 1
max_recovery_attempts = 10
recovery_count = 0

@DBOS.workflow(max_recovery_attempts=max_recovery_attempts)
def dead_letter_workflow() -> None:
function_started_event.set()
nonlocal recovery_count
recovery_count += 1
event.wait()

@DBOS.workflow()
def regular_workflow() -> None:
return

queue = Queue("test_queue", concurrency=max_concurrency)
handle = queue.enqueue(dead_letter_workflow)
function_started_event.wait()

for i in range(max_recovery_attempts):
DBOS.recover_pending_workflows()
assert recovery_count == i + 2

regular_handle = queue.enqueue(regular_workflow)

with pytest.raises(Exception) as exc_info:
DBOS.recover_pending_workflows()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if recover pending workflows should throw an error... It could be solved in a separate PR, but if we throw an error, then no other workflows can be recovered correctly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can resolve this separately

assert exc_info.errisinstance(DBOSDeadLetterQueueError)
assert handle.get_status().status == WorkflowStatusString.RETRIES_EXCEEDED.value

assert regular_handle.get_result() == None

event.set()
assert handle.get_result() == None
dbos._sys_db.wait_for_buffer_flush()
assert handle.get_status().status == WorkflowStatusString.SUCCESS.value