From fa282e9f801d7381eb806e39e984e4304039b668 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Wed, 6 Dec 2023 12:46:05 -0800 Subject: [PATCH] Add fields for tracking where jobs are executed --- kolibri/core/tasks/job.py | 9 ++++++++ kolibri/core/tasks/storage.py | 39 +++++++++++++++++++++++++++++++++++ kolibri/core/tasks/worker.py | 24 +++++++++++++++++++-- 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py index 75c9053070d..43c7faa1e95 100644 --- a/kolibri/core/tasks/job.py +++ b/kolibri/core/tasks/job.py @@ -262,6 +262,15 @@ def update_metadata(self, **kwargs): self.extra_metadata[key] = value self.save_meta() + def update_worker_info(self, host=None, process=None, thread=None, extra=None): + self.storage.save_worker_info( + self.job_id, + host=host, + process=process, + thread=thread, + extra=extra, + ) + def check_for_cancel(self): if self.cancellable: if self.storage.check_job_canceled(self.job_id): diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py index 3e64bbaaa4e..2aa862f504a 100644 --- a/kolibri/core/tasks/storage.py +++ b/kolibri/core/tasks/storage.py @@ -79,6 +79,13 @@ class ORMJob(Base): scheduled_time = Column(DateTime()) + # Optional references to the worker host, process and thread that are running this job, + # and any extra metadata that can be used by specific worker implementations. + worker_host = Column(String, nullable=True) + worker_process = Column(String, nullable=True) + worker_thread = Column(String, nullable=True) + worker_extra = Column(String, nullable=True) + __table_args__ = (Index("queue__scheduled_time", "queue", "scheduled_time"),) @@ -491,6 +498,38 @@ def save_job_meta(self, job): def save_job_as_cancellable(self, job_id, cancellable=True): self._update_job(job_id, cancellable=cancellable) + def save_worker_info( + self, job_id, host=None, process=None, thread=None, extra=None + ): + """ + Generally we only want to capture/update, not erase, any of this information so we only + update the fields that are non-None. + """ + if not any([host, process, thread, extra]): + # nothing to do + return + + with self.session_scope() as session: + try: + _, orm_job = self._get_job_and_orm_job(job_id, session) + if host is not None: + orm_job.worker_host = host + if process is not None: + orm_job.worker_process = process + if thread is not None: + orm_job.worker_thread = thread + if extra is not None: + orm_job.worker_extra = extra + session.add(orm_job) + try: + session.commit() + except Exception as e: + logger.error("Got an error running session.commit(): {}".format(e)) + except JobNotFound: + logger.error( + "Tried to update job with id {} but it was not found".format(job_id) + ) + # Turning off the complexity warning for this function as moving the conditional validation checks # inline would be the simplest way to 'reduce' the complexity, but would make it less readable. def reschedule_finished_job_if_needed( # noqa: C901 diff --git a/kolibri/core/tasks/worker.py b/kolibri/core/tasks/worker.py index 3d52d5fef8a..778f0c805d7 100644 --- a/kolibri/core/tasks/worker.py +++ b/kolibri/core/tasks/worker.py @@ -12,7 +12,9 @@ logger = logging.getLogger(__name__) -def execute_job(job_id): +def execute_job( + job_id, worker_host=None, worker_process=None, worker_thread=None, worker_extra=None +): """ Call the function stored in the job.func. :return: None @@ -24,6 +26,8 @@ def execute_job(job_id): job = storage.get_job(job_id) + job.update_worker_info(worker_host, worker_process, worker_thread, worker_extra) + job.execute() connection.dispose() @@ -32,6 +36,22 @@ def execute_job(job_id): django_connection.close() +def execute_job_with_python_worker(job_id): + """ + Call execute_job but additionally with the current host, process and thread information taken + directly from python internals. + """ + import os + import threading + + execute_job( + job_id, + worker_host=os.uname()[1], + worker_process=str(os.getpid()), + worker_thread=str(threading.get_ident()), + ) + + class Worker(object): def __init__(self, connection, regular_workers=2, high_workers=1): # Internally, we use concurrent.future.Future to run and track @@ -166,7 +186,7 @@ def start_next_job(self, job): :return future: """ future = self.workers.submit( - execute_job, + execute_job_with_python_worker, job_id=job.job_id, )