Skip to content

Commit

Permalink
Move database retries to top-level history methods
Browse files Browse the repository at this point in the history
  • Loading branch information
adamnovak committed Jan 21, 2025
1 parent 8b711f1 commit 2b91421
Showing 1 changed file with 34 additions and 20 deletions.
54 changes: 34 additions & 20 deletions src/toil/lib/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import threading
import uuid
from dataclasses import dataclass
from typing import Iterable, Optional
from typing import Any, Iterable, Optional, TypeVar, Callable

from toil.lib.io import get_toil_home
from toil.lib.retry import ErrorCondition, retry
Expand Down Expand Up @@ -103,6 +103,22 @@ class JobAttemptSummary:
memory_bytes: Optional[int]
disk_bytes: Optional[int]


RT = TypeVar("RT")

def db_retry(function: Callable[..., RT]) -> Callable[..., RT]:
"""
Decorate a function with the appropriate retries for accessing the database.
"""
return retry(
infinite_retries=True,
errors=[
ErrorCondition(
error=sqlite3.OperationalError, error_message_must_include="is locked"
)
],
)(function)

class HistoryManager:
"""
Class responsible for managing the history of Toil runs.
Expand All @@ -116,20 +132,14 @@ def database_path(cls) -> str:
return os.path.join(get_toil_home(), "history.sqlite")

@classmethod
@retry(
infinite_retries=True,
errors=[
ErrorCondition(
error=sqlite3.OperationalError, error_message_must_include="is locked"
)
],
)
def connection(cls) -> sqlite3.Connection:
"""
Connect to the history database.
Caller must not actually use the connection without using
ensure_tables() to protect reads and updates.
Must be called from inside a top-level methodf marked @db_retry.
"""
if not os.path.exists(cls.database_path()):
# Make the database and protect it from snoopers and busybodies
Expand All @@ -141,7 +151,7 @@ def connection(cls) -> sqlite3.Connection:
cls.database_path(),
isolation_level="DEFERRED"
)

if hasattr(con, 'autocommit'):
# This doesn't much matter given the isolation level setting,
# but is recommended on Python versions that have it (3.12+)
Expand All @@ -159,6 +169,8 @@ def ensure_tables(cls, con: sqlite3.Connection, cur: sqlite3.Cursor) -> None:
Leaves the cursor in a transaction where the schema version is known to be correct.
Must be called from inside a top-level methodf marked @db_retry.
:raises HistoryDatabaseSchemaTooNewError: If the schema is newer than the current version.
"""

Expand Down Expand Up @@ -266,6 +278,7 @@ def ensure_tables(cls, con: sqlite3.Connection, cur: sqlite3.Cursor) -> None:
##

@classmethod
@db_retry
def record_workflow_creation(cls, workflow_id: str, job_store_spec: str) -> None:
"""
Record that a workflow is being run.
Expand All @@ -277,7 +290,7 @@ def record_workflow_creation(cls, workflow_id: str, job_store_spec: str) -> None
A workflow may have multiple attempts to run it, some of which succeed
and others of which fail. Probably only the last one should succeed.
:param job_store_spec: The job store specifier for the workflow. Should
be canonical and always start with the type and a colon. If the
job store is later moved by the user, the location will not be
Expand All @@ -303,6 +316,7 @@ def record_workflow_creation(cls, workflow_id: str, job_store_spec: str) -> None


@classmethod
@db_retry
def record_workflow_metadata(cls, workflow_id: str, workflow_name: str, trs_spec: Optional[str] = None) -> None:
"""
Associate a name and optionally a TRS ID and version with a workflow run.
Expand Down Expand Up @@ -330,6 +344,7 @@ def record_workflow_metadata(cls, workflow_id: str, workflow_name: str, trs_spec
con.close()

@classmethod
@db_retry
def record_job_attempt(
cls,
workflow_id: str,
Expand Down Expand Up @@ -408,6 +423,7 @@ def record_job_attempt(
con.close()

@classmethod
@db_retry
def record_workflow_attempt(
cls,
workflow_id: str,
Expand Down Expand Up @@ -495,12 +511,7 @@ def record_workflow_attempt(
# but it also means we can't deadlock.

@classmethod
def get_workflow_trs_spec(cls, workflow_id: str) -> Optional[str]:
"""
Get the TRS spec for a workflow, or None if it does not have one.
"""

@classmethod
@db_retry
def summarize_workflows(cls) -> list[WorkflowSummary]:
"""
List all known workflows and their summary statistics.
Expand Down Expand Up @@ -552,10 +563,8 @@ def summarize_workflows(cls) -> list[WorkflowSummary]:

return workflows




@classmethod
@db_retry
def get_submittable_workflow_attempts(cls, limit: int = sys.maxsize) -> list[WorkflowAttemptSummary]:
"""
List all workflow attempts not yet submitted to Dockstore.
Expand Down Expand Up @@ -625,6 +634,7 @@ def get_submittable_workflow_attempts(cls, limit: int = sys.maxsize) -> list[Wor
return attempts

@classmethod
@db_retry
def get_workflow_attempts_with_submittable_job_attempts(cls, limit: int = sys.maxsize) -> list[WorkflowAttemptSummary]:
"""
Get all workflow attempts that have job attempts not yet submitted to
Expand Down Expand Up @@ -705,6 +715,7 @@ def get_workflow_attempts_with_submittable_job_attempts(cls, limit: int = sys.ma
return attempts

@classmethod
@db_retry
def get_workflow_attempt(cls, workflow_id: str, attempt_number: int) -> Optional[WorkflowAttemptSummary]:
"""
Get a single (not necessarily unsubmitted, not necessarily TRS-ID-having) workflow attempt summary, if present.
Expand Down Expand Up @@ -778,6 +789,7 @@ def get_workflow_attempt(cls, workflow_id: str, attempt_number: int) -> Optional
return attempts[0]

@classmethod
@db_retry
def get_unsubmitted_job_attempts(cls, workflow_id: str, attempt_number: int) -> list[JobAttemptSummary]:
"""
List all job attempts in the given workflow attempt not yet submitted to Dockstore.
Expand Down Expand Up @@ -842,6 +854,7 @@ def get_unsubmitted_job_attempts(cls, workflow_id: str, attempt_number: int) ->
###

@classmethod
@db_retry
def mark_workflow_attempt_submitted(cls, workflow_id: str, attempt_number: int) -> None:
"""
Mark a workflow attempt as having been successfully submitted to Dockstore.
Expand All @@ -866,6 +879,7 @@ def mark_workflow_attempt_submitted(cls, workflow_id: str, attempt_number: int)
con.close()

@classmethod
@db_retry
def mark_job_attempts_submitted(cls, job_attempt_ids: list[str]) -> None:
"""
Mark a collection of job attempts as submitted to Dockstore in a single transaction.
Expand Down

0 comments on commit 2b91421

Please sign in to comment.