Skip to content

Commit

Permalink
Use SQLAlchemy backbone so we can use ScopedSession for all single th…
Browse files Browse the repository at this point in the history
…read listeners.
  • Loading branch information
TorecLuik committed Aug 21, 2024
1 parent c7769f3 commit 0e77745
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 97 deletions.
94 changes: 94 additions & 0 deletions biomero/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from eventsourcing.utils import get_topic
import logging
from sqlalchemy import create_engine, text, Column, Integer, String, URL, DateTime, Float
from sqlalchemy.orm import sessionmaker, declarative_base, scoped_session
from sqlalchemy.dialects.postgresql import UUID as PGUUID
import os

logger = logging.getLogger(__name__)

# --------------------- VIEWS DB tables/classes ---------------------------- #

# Base class for declarative class definitions
Base = declarative_base()


class JobView(Base):
__tablename__ = 'biomero_job_view'

slurm_job_id = Column(Integer, primary_key=True)
user = Column(Integer, nullable=False)
group = Column(Integer, nullable=False)


class JobProgressView(Base):
__tablename__ = 'biomero_job_progress_view'

slurm_job_id = Column(Integer, primary_key=True)
status = Column(String, nullable=False)
progress = Column(String, nullable=True)


class TaskExecution(Base):
__tablename__ = 'biomero_task_execution'

task_id = Column(PGUUID(as_uuid=True), primary_key=True)
task_name = Column(String, nullable=False)
task_version = Column(String)
user_id = Column(Integer, nullable=True)
group_id = Column(Integer, nullable=True)
status = Column(String, nullable=False)
start_time = Column(DateTime, nullable=False)
end_time = Column(DateTime, nullable=True)
error_type = Column(String, nullable=True)


class EngineManager:
_engine = None
_scoped_session_topic = None
_session = None

@classmethod
def create_scoped_session(cls):
if cls._engine is None:
persistence_mod = os.getenv('PERSISTENCE_MODULE')
if 'sqlalchemy' in persistence_mod:
logger.info("Using sqlalchemy database")
database_url = os.getenv('SQLALCHEMY_URL')
cls._engine = create_engine(database_url)
else:
raise NotImplementedError(f"Can't handle {persistence_mod}")

# setup tables if needed
Base.metadata.create_all(cls._engine)

# Create a scoped_session object.
cls._session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=cls._engine)
)

class MyScopedSessionAdapter:
def __getattribute__(self, item: str) -> None:
return getattr(cls._session, item)

# Produce the topic of the scoped session adapter class.
cls._scoped_session_topic = get_topic(MyScopedSessionAdapter)

return cls._scoped_session_topic

@classmethod
def get_session(cls):
return cls._session()

@classmethod
def commit(cls):
cls._session.commit()

@classmethod
def close_engine(cls):
if cls._engine is not None:
cls._session.remove()
cls._engine.dispose()
cls._engine = None
cls._session = None
cls._scoped_session_topic = None
15 changes: 14 additions & 1 deletion biomero/eventsourcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from eventsourcing.domain import Aggregate, event
from eventsourcing.application import Application
from uuid import NAMESPACE_URL, UUID, uuid5
from typing import Any, Dict, List
from fabric import Result
import logging
from biomero.database import EngineManager


# Create a logger for this module
Expand Down Expand Up @@ -200,6 +200,7 @@ def initiate_workflow(self,
logger.debug(f"[WFT] Initiating workflow: name={name}, description={description}, user={user}, group={group}")
workflow = WorkflowRun(name, description, user, group)
self.save(workflow)
EngineManager.commit()
return workflow.id

def add_task_to_workflow(self,
Expand All @@ -217,9 +218,11 @@ def add_task_to_workflow(self,
input_data,
kwargs)
self.save(task)
EngineManager.commit()
workflow = self.repository.get(workflow_id)
workflow.add_task(task.id)
self.save(workflow)
EngineManager.commit()
return task.id

def start_workflow(self, workflow_id: UUID):
Expand All @@ -228,69 +231,79 @@ def start_workflow(self, workflow_id: UUID):
workflow = self.repository.get(workflow_id)
workflow.start_workflow()
self.save(workflow)
EngineManager.commit()

def complete_workflow(self, workflow_id: UUID):
logger.debug(f"[WFT] Completing workflow: workflow_id={workflow_id}")

workflow = self.repository.get(workflow_id)
workflow.complete_workflow()
self.save(workflow)
EngineManager.commit()

def fail_workflow(self, workflow_id: UUID, error_message: str):
logger.debug(f"[WFT] Failing workflow: workflow_id={workflow_id}, error_message={error_message}")

workflow = self.repository.get(workflow_id)
workflow.fail_workflow(error_message)
self.save(workflow)
EngineManager.commit()

def start_task(self, task_id: UUID):
logger.debug(f"[WFT] Starting task: task_id={task_id}")

task = self.repository.get(task_id)
task.start_task()
self.save(task)
EngineManager.commit()

def complete_task(self, task_id: UUID, message: str):
logger.debug(f"[WFT] Completing task: task_id={task_id}, message={message}")

task = self.repository.get(task_id)
task.complete_task(message)
self.save(task)
EngineManager.commit()

def fail_task(self, task_id: UUID, error_message: str):
logger.debug(f"[WFT] Failing task: task_id={task_id}, error_message={error_message}")

task = self.repository.get(task_id)
task.fail_task(error_message)
self.save(task)
EngineManager.commit()

def add_job_id(self, task_id, slurm_job_id):
logger.debug(f"[WFT] Adding job_id to task: task_id={task_id}, slurm_job_id={slurm_job_id}")

task = self.repository.get(task_id)
task.add_job_id(slurm_job_id)
self.save(task)
EngineManager.commit()

def add_result(self, task_id, result):
logger.debug(f"[WFT] Adding result to task: task_id={task_id}, result={result}")

task = self.repository.get(task_id)
task.add_result(result)
self.save(task)
EngineManager.commit()

def update_task_status(self, task_id, status):
logger.debug(f"[WFT] Adding status to task: task_id={task_id}, status={status}")

task = self.repository.get(task_id)
task.update_task_status(status)
self.save(task)
EngineManager.commit()

def update_task_progress(self, task_id, progress):
logger.debug(f"[WFT] Adding progress to task: task_id={task_id}, progress={progress}")

task = self.repository.get(task_id)
task.update_task_progress(progress)
self.save(task)
EngineManager.commit()



6 changes: 4 additions & 2 deletions biomero/slurm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import io
import os
from biomero.eventsourcing import WorkflowTracker
from biomero.views import JobAccounting, JobProgress, WorkflowAnalytics, EngineManager
from biomero.views import JobAccounting, JobProgress, WorkflowAnalytics
from biomero.database import EngineManager
from eventsourcing.system import System, SingleThreadedRunner

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1407,14 +1408,15 @@ def run_conversion_workflow_job(self,
f"echo \"Number of .{source_format} files: $N\"",
conversion_cmd
]

logger.debug(f"wf_id: {wf_id}")
if not wf_id:
wf_id = self.workflowTracker.initiate_workflow(
"conversion",
-1,
-1,
-1
)
logger.debug(f"wf_id: {wf_id}")
task_id = self.workflowTracker.add_task_to_workflow(
wf_id,
chosen_converter,
Expand Down
Loading

0 comments on commit 0e77745

Please sign in to comment.