Skip to content

Commit

Permalink
Add fields for tracking where jobs are executed
Browse files Browse the repository at this point in the history
  • Loading branch information
bjester committed Dec 6, 2023
1 parent fb70ecd commit fa282e9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
9 changes: 9 additions & 0 deletions kolibri/core/tasks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ def update_metadata(self, **kwargs):
self.extra_metadata[key] = value
self.save_meta()

def update_worker_info(self, host=None, process=None, thread=None, extra=None):
self.storage.save_worker_info(
self.job_id,
host=host,
process=process,
thread=thread,
extra=extra,
)

def check_for_cancel(self):
if self.cancellable:
if self.storage.check_job_canceled(self.job_id):
Expand Down
39 changes: 39 additions & 0 deletions kolibri/core/tasks/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ class ORMJob(Base):

scheduled_time = Column(DateTime())

# Optional references to the worker host, process and thread that are running this job,
# and any extra metadata that can be used by specific worker implementations.
worker_host = Column(String, nullable=True)
worker_process = Column(String, nullable=True)
worker_thread = Column(String, nullable=True)
worker_extra = Column(String, nullable=True)

__table_args__ = (Index("queue__scheduled_time", "queue", "scheduled_time"),)


Expand Down Expand Up @@ -491,6 +498,38 @@ def save_job_meta(self, job):
def save_job_as_cancellable(self, job_id, cancellable=True):
self._update_job(job_id, cancellable=cancellable)

def save_worker_info(
self, job_id, host=None, process=None, thread=None, extra=None
):
"""
Generally we only want to capture/update, not erase, any of this information so we only
update the fields that are non-None.
"""
if not any([host, process, thread, extra]):
# nothing to do
return

with self.session_scope() as session:
try:
_, orm_job = self._get_job_and_orm_job(job_id, session)
if host is not None:
orm_job.worker_host = host
if process is not None:
orm_job.worker_process = process
if thread is not None:
orm_job.worker_thread = thread
if extra is not None:
orm_job.worker_extra = extra
session.add(orm_job)
try:
session.commit()
except Exception as e:
logger.error("Got an error running session.commit(): {}".format(e))
except JobNotFound:
logger.error(
"Tried to update job with id {} but it was not found".format(job_id)
)

# Turning off the complexity warning for this function as moving the conditional validation checks
# inline would be the simplest way to 'reduce' the complexity, but would make it less readable.
def reschedule_finished_job_if_needed( # noqa: C901
Expand Down
24 changes: 22 additions & 2 deletions kolibri/core/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
logger = logging.getLogger(__name__)


def execute_job(job_id):
def execute_job(
job_id, worker_host=None, worker_process=None, worker_thread=None, worker_extra=None
):
"""
Call the function stored in the job.func.
:return: None
Expand All @@ -24,6 +26,8 @@ def execute_job(job_id):

job = storage.get_job(job_id)

job.update_worker_info(worker_host, worker_process, worker_thread, worker_extra)

job.execute()

connection.dispose()
Expand All @@ -32,6 +36,22 @@ def execute_job(job_id):
django_connection.close()


def execute_job_with_python_worker(job_id):
"""
Call execute_job but additionally with the current host, process and thread information taken
directly from python internals.
"""
import os
import threading

execute_job(
job_id,
worker_host=os.uname()[1],
worker_process=str(os.getpid()),
worker_thread=str(threading.get_ident()),
)


class Worker(object):
def __init__(self, connection, regular_workers=2, high_workers=1):
# Internally, we use concurrent.future.Future to run and track
Expand Down Expand Up @@ -166,7 +186,7 @@ def start_next_job(self, job):
:return future:
"""
future = self.workers.submit(
execute_job,
execute_job_with_python_worker,
job_id=job.job_id,
)

Expand Down

0 comments on commit fa282e9

Please sign in to comment.