diff --git a/src/toil/lib/history.py b/src/toil/lib/history.py index e657b6d89c..ab98f4327c 100644 --- a/src/toil/lib/history.py +++ b/src/toil/lib/history.py @@ -23,7 +23,7 @@ import threading import uuid from dataclasses import dataclass -from typing import Iterable, Optional +from typing import Any, Iterable, Optional, TypeVar, Callable from toil.lib.io import get_toil_home from toil.lib.retry import ErrorCondition, retry @@ -103,6 +103,22 @@ class JobAttemptSummary: memory_bytes: Optional[int] disk_bytes: Optional[int] + +RT = TypeVar("RT") + +def db_retry(function: Callable[..., RT]) -> Callable[..., RT]: + """ + Decorate a function with the appropriate retries for accessing the database. + """ + return retry( + infinite_retries=True, + errors=[ + ErrorCondition( + error=sqlite3.OperationalError, error_message_must_include="is locked" + ) + ], + )(function) + class HistoryManager: """ Class responsible for managing the history of Toil runs. @@ -116,20 +132,14 @@ def database_path(cls) -> str: return os.path.join(get_toil_home(), "history.sqlite") @classmethod - @retry( - infinite_retries=True, - errors=[ - ErrorCondition( - error=sqlite3.OperationalError, error_message_must_include="is locked" - ) - ], - ) def connection(cls) -> sqlite3.Connection: """ Connect to the history database. Caller must not actually use the connection without using ensure_tables() to protect reads and updates. + + Must be called from inside a top-level methodf marked @db_retry. """ if not os.path.exists(cls.database_path()): # Make the database and protect it from snoopers and busybodies @@ -141,7 +151,7 @@ def connection(cls) -> sqlite3.Connection: cls.database_path(), isolation_level="DEFERRED" ) - + if hasattr(con, 'autocommit'): # This doesn't much matter given the isolation level setting, # but is recommended on Python versions that have it (3.12+) @@ -159,6 +169,8 @@ def ensure_tables(cls, con: sqlite3.Connection, cur: sqlite3.Cursor) -> None: Leaves the cursor in a transaction where the schema version is known to be correct. + Must be called from inside a top-level methodf marked @db_retry. + :raises HistoryDatabaseSchemaTooNewError: If the schema is newer than the current version. """ @@ -266,6 +278,7 @@ def ensure_tables(cls, con: sqlite3.Connection, cur: sqlite3.Cursor) -> None: ## @classmethod + @db_retry def record_workflow_creation(cls, workflow_id: str, job_store_spec: str) -> None: """ Record that a workflow is being run. @@ -277,7 +290,7 @@ def record_workflow_creation(cls, workflow_id: str, job_store_spec: str) -> None A workflow may have multiple attempts to run it, some of which succeed and others of which fail. Probably only the last one should succeed. - + :param job_store_spec: The job store specifier for the workflow. Should be canonical and always start with the type and a colon. If the job store is later moved by the user, the location will not be @@ -303,6 +316,7 @@ def record_workflow_creation(cls, workflow_id: str, job_store_spec: str) -> None @classmethod + @db_retry def record_workflow_metadata(cls, workflow_id: str, workflow_name: str, trs_spec: Optional[str] = None) -> None: """ Associate a name and optionally a TRS ID and version with a workflow run. @@ -330,6 +344,7 @@ def record_workflow_metadata(cls, workflow_id: str, workflow_name: str, trs_spec con.close() @classmethod + @db_retry def record_job_attempt( cls, workflow_id: str, @@ -408,6 +423,7 @@ def record_job_attempt( con.close() @classmethod + @db_retry def record_workflow_attempt( cls, workflow_id: str, @@ -495,12 +511,7 @@ def record_workflow_attempt( # but it also means we can't deadlock. @classmethod - def get_workflow_trs_spec(cls, workflow_id: str) -> Optional[str]: - """ - Get the TRS spec for a workflow, or None if it does not have one. - """ - - @classmethod + @db_retry def summarize_workflows(cls) -> list[WorkflowSummary]: """ List all known workflows and their summary statistics. @@ -552,10 +563,8 @@ def summarize_workflows(cls) -> list[WorkflowSummary]: return workflows - - - @classmethod + @db_retry def get_submittable_workflow_attempts(cls, limit: int = sys.maxsize) -> list[WorkflowAttemptSummary]: """ List all workflow attempts not yet submitted to Dockstore. @@ -625,6 +634,7 @@ def get_submittable_workflow_attempts(cls, limit: int = sys.maxsize) -> list[Wor return attempts @classmethod + @db_retry def get_workflow_attempts_with_submittable_job_attempts(cls, limit: int = sys.maxsize) -> list[WorkflowAttemptSummary]: """ Get all workflow attempts that have job attempts not yet submitted to @@ -705,6 +715,7 @@ def get_workflow_attempts_with_submittable_job_attempts(cls, limit: int = sys.ma return attempts @classmethod + @db_retry def get_workflow_attempt(cls, workflow_id: str, attempt_number: int) -> Optional[WorkflowAttemptSummary]: """ Get a single (not necessarily unsubmitted, not necessarily TRS-ID-having) workflow attempt summary, if present. @@ -778,6 +789,7 @@ def get_workflow_attempt(cls, workflow_id: str, attempt_number: int) -> Optional return attempts[0] @classmethod + @db_retry def get_unsubmitted_job_attempts(cls, workflow_id: str, attempt_number: int) -> list[JobAttemptSummary]: """ List all job attempts in the given workflow attempt not yet submitted to Dockstore. @@ -842,6 +854,7 @@ def get_unsubmitted_job_attempts(cls, workflow_id: str, attempt_number: int) -> ### @classmethod + @db_retry def mark_workflow_attempt_submitted(cls, workflow_id: str, attempt_number: int) -> None: """ Mark a workflow attempt as having been successfully submitted to Dockstore. @@ -866,6 +879,7 @@ def mark_workflow_attempt_submitted(cls, workflow_id: str, attempt_number: int) con.close() @classmethod + @db_retry def mark_job_attempts_submitted(cls, job_attempt_ids: list[str]) -> None: """ Mark a collection of job attempts as submitted to Dockstore in a single transaction.