From 22dfff4f3b9740af70cc3b5be8ca57e2d0d21ac4 Mon Sep 17 00:00:00 2001 From: Dave Gaeddert Date: Fri, 2 Feb 2024 13:50:47 -0600 Subject: [PATCH] Worker middleware --- bolt-worker/bolt/worker/admin.py | 4 ++-- bolt-worker/bolt/worker/cli.py | 2 +- bolt-worker/bolt/worker/default_settings.py | 7 +++++-- bolt-worker/bolt/worker/middleware.py | 17 +++++++++++++++++ bolt-worker/bolt/worker/models.py | 18 +++++++++++++++++- bolt-worker/bolt/worker/workers.py | 16 +++++++++------- 6 files changed, 51 insertions(+), 13 deletions(-) create mode 100644 bolt-worker/bolt/worker/middleware.py diff --git a/bolt-worker/bolt/worker/admin.py b/bolt-worker/bolt/worker/admin.py index 750cc4a9bb..dddd7f2d13 100644 --- a/bolt-worker/bolt/worker/admin.py +++ b/bolt-worker/bolt/worker/admin.py @@ -71,7 +71,7 @@ class LostJobsCard(Card): text = "View" # TODO make not required - just an icon? def get_description(self): - delta = timedelta(seconds=settings.JOBS_LOST_AFTER) + delta = timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER) return f"Jobs are considered lost after {_td_format(delta)}" def get_number(self): @@ -179,7 +179,7 @@ class ListView(AdminModelListView): default_datetime_range = DatetimeRangeAliases.LAST_30_DAYS def get_description(self): - delta = timedelta(seconds=settings.JOBS_CLEARABLE_AFTER) + delta = timedelta(seconds=settings.WORKER_JOBS_CLEARABLE_AFTER) return f"Jobs are cleared after {_td_format(delta)}" def get_initial_queryset(self): diff --git a/bolt-worker/bolt/worker/cli.py b/bolt-worker/bolt/worker/cli.py index db03bfb552..4c29422770 100644 --- a/bolt-worker/bolt/worker/cli.py +++ b/bolt-worker/bolt/worker/cli.py @@ -61,7 +61,7 @@ def _shutdown(signalnum, _): @cli.command() def clear_completed(): - cutoff = timezone.now() - datetime.timedelta(seconds=settings.JOBS_CLEARABLE_AFTER) + cutoff = timezone.now() - datetime.timedelta(seconds=settings.WORKER_JOBS_CLEARABLE_AFTER) click.echo(f"Clearing jobs finished before {cutoff}") results = ( JobResult.objects.exclude(ended_at__isnull=True) diff --git a/bolt-worker/bolt/worker/default_settings.py b/bolt-worker/bolt/worker/default_settings.py index 5c28e3bc71..87dc7d77de 100644 --- a/bolt-worker/bolt/worker/default_settings.py +++ b/bolt-worker/bolt/worker/default_settings.py @@ -1,2 +1,5 @@ -JOBS_CLEARABLE_AFTER: int = 60 * 60 * 24 * 7 # One week -JOBS_LOST_AFTER: int = 60 * 60 * 24 # One day +WORKER_JOBS_CLEARABLE_AFTER: int = 60 * 60 * 24 * 7 # One week +WORKER_JOBS_LOST_AFTER: int = 60 * 60 * 24 # One day +WORKER_MIDDLEWARE: list[str] = [ + "bolt.worker.middleware.AppLoggerMiddleware", +] diff --git a/bolt-worker/bolt/worker/middleware.py b/bolt-worker/bolt/worker/middleware.py new file mode 100644 index 0000000000..a8a9a2e5a8 --- /dev/null +++ b/bolt-worker/bolt/worker/middleware.py @@ -0,0 +1,17 @@ +from bolt.logs import app_logger + + +class AppLoggerMiddleware: + def __init__(self, run_job): + self.run_job = run_job + + def __call__(self, job): + app_logger.kv.context["job_request_uuid"] = str(job.job_request_uuid) + app_logger.kv.context["job_uuid"] = str(job.uuid) + + job_result = self.run_job(job) + + app_logger.kv.context.pop("job_request_uuid", None) + app_logger.kv.context.pop("job_uuid", None) + + return job_result diff --git a/bolt-worker/bolt/worker/models.py b/bolt-worker/bolt/worker/models.py index ff90151346..86293c2178 100644 --- a/bolt-worker/bolt/worker/models.py +++ b/bolt-worker/bolt/worker/models.py @@ -97,7 +97,7 @@ def mark_lost_jobs(self): # In theory we could save a timeout per-job and mark them timed-out more quickly, # but if they're still running, we can't actually send a signal to cancel it... now = timezone.now() - cutoff = now - datetime.timedelta(seconds=settings.JOBS_LOST_AFTER) + cutoff = now - datetime.timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER) lost_jobs = self.filter( created_at__lt=cutoff ) # Doesn't matter whether it started or not -- it shouldn't take this long. @@ -183,6 +183,22 @@ def convert_to_result(self, *, status, error=""): return result + def as_json(self): + """A JSON-compatible representation to make it easier to reference in Sentry or logging""" + return { + "uuid": str(self.uuid), + "created_at": self.created_at.isoformat(), + "started_at": self.started_at.isoformat() if self.started_at else None, + "job_request_uuid": str(self.job_request_uuid), + "job_class": self.job_class, + "parameters": self.parameters, + "priority": self.priority, + "source": self.source, + "retries": self.retries, + "retry_attempt": self.retry_attempt, + "unique_key": self.unique_key, + } + class JobResultQuerySet(models.QuerySet): def successful(self): diff --git a/bolt-worker/bolt/worker/workers.py b/bolt-worker/bolt/worker/workers.py index 62ffc54d59..0f7db05ae7 100644 --- a/bolt-worker/bolt/worker/workers.py +++ b/bolt-worker/bolt/worker/workers.py @@ -7,8 +7,9 @@ from functools import partial from bolt.db import transaction -from bolt.logs import app_logger +from bolt.runtime import settings from bolt.signals import request_finished, request_started +from bolt.utils.module_loading import import_string from .models import Job, JobRequest, JobResult, JobResultStatuses @@ -166,17 +167,18 @@ def process_job(job_uuid): job.source, ) - app_logger.kv.context["job_request_uuid"] = str(job.job_request_uuid) - app_logger.kv.context["job_uuid"] = str(job.uuid) + middleware_chain = lambda job: job.run() - job_result = job.run() + for middleware_path in reversed(settings.WORKER_MIDDLEWARE): + middleware_class = import_string(middleware_path) + middleware_instance = middleware_class(middleware_chain) + middleware_chain = middleware_instance + + job_result = middleware_chain(job) # Release it now del job - app_logger.kv.context.pop("job_request_uuid", None) - app_logger.kv.context.pop("job_uuid", None) - duration = job_result.ended_at - job_result.started_at duration = duration.total_seconds()