Skip to content

Commit

Permalink
Worker middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
davegaeddert committed Feb 2, 2024
1 parent a73918b commit 22dfff4
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 13 deletions.
4 changes: 2 additions & 2 deletions bolt-worker/bolt/worker/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class LostJobsCard(Card):
text = "View" # TODO make not required - just an icon?

def get_description(self):
delta = timedelta(seconds=settings.JOBS_LOST_AFTER)
delta = timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER)
return f"Jobs are considered lost after {_td_format(delta)}"

def get_number(self):
Expand Down Expand Up @@ -179,7 +179,7 @@ class ListView(AdminModelListView):
default_datetime_range = DatetimeRangeAliases.LAST_30_DAYS

def get_description(self):
delta = timedelta(seconds=settings.JOBS_CLEARABLE_AFTER)
delta = timedelta(seconds=settings.WORKER_JOBS_CLEARABLE_AFTER)
return f"Jobs are cleared after {_td_format(delta)}"

def get_initial_queryset(self):
Expand Down
2 changes: 1 addition & 1 deletion bolt-worker/bolt/worker/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _shutdown(signalnum, _):

@cli.command()
def clear_completed():
cutoff = timezone.now() - datetime.timedelta(seconds=settings.JOBS_CLEARABLE_AFTER)
cutoff = timezone.now() - datetime.timedelta(seconds=settings.WORKER_JOBS_CLEARABLE_AFTER)
click.echo(f"Clearing jobs finished before {cutoff}")
results = (
JobResult.objects.exclude(ended_at__isnull=True)
Expand Down
7 changes: 5 additions & 2 deletions bolt-worker/bolt/worker/default_settings.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
JOBS_CLEARABLE_AFTER: int = 60 * 60 * 24 * 7 # One week
JOBS_LOST_AFTER: int = 60 * 60 * 24 # One day
WORKER_JOBS_CLEARABLE_AFTER: int = 60 * 60 * 24 * 7 # One week
WORKER_JOBS_LOST_AFTER: int = 60 * 60 * 24 # One day
WORKER_MIDDLEWARE: list[str] = [
"bolt.worker.middleware.AppLoggerMiddleware",
]
17 changes: 17 additions & 0 deletions bolt-worker/bolt/worker/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from bolt.logs import app_logger


class AppLoggerMiddleware:
def __init__(self, run_job):
self.run_job = run_job

def __call__(self, job):
app_logger.kv.context["job_request_uuid"] = str(job.job_request_uuid)
app_logger.kv.context["job_uuid"] = str(job.uuid)

job_result = self.run_job(job)

app_logger.kv.context.pop("job_request_uuid", None)
app_logger.kv.context.pop("job_uuid", None)

return job_result
18 changes: 17 additions & 1 deletion bolt-worker/bolt/worker/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def mark_lost_jobs(self):
# In theory we could save a timeout per-job and mark them timed-out more quickly,
# but if they're still running, we can't actually send a signal to cancel it...
now = timezone.now()
cutoff = now - datetime.timedelta(seconds=settings.JOBS_LOST_AFTER)
cutoff = now - datetime.timedelta(seconds=settings.WORKER_JOBS_LOST_AFTER)
lost_jobs = self.filter(
created_at__lt=cutoff
) # Doesn't matter whether it started or not -- it shouldn't take this long.
Expand Down Expand Up @@ -183,6 +183,22 @@ def convert_to_result(self, *, status, error=""):

return result

def as_json(self):
"""A JSON-compatible representation to make it easier to reference in Sentry or logging"""
return {
"uuid": str(self.uuid),
"created_at": self.created_at.isoformat(),
"started_at": self.started_at.isoformat() if self.started_at else None,
"job_request_uuid": str(self.job_request_uuid),
"job_class": self.job_class,
"parameters": self.parameters,
"priority": self.priority,
"source": self.source,
"retries": self.retries,
"retry_attempt": self.retry_attempt,
"unique_key": self.unique_key,
}


class JobResultQuerySet(models.QuerySet):
def successful(self):
Expand Down
16 changes: 9 additions & 7 deletions bolt-worker/bolt/worker/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from functools import partial

from bolt.db import transaction
from bolt.logs import app_logger
from bolt.runtime import settings
from bolt.signals import request_finished, request_started
from bolt.utils.module_loading import import_string

from .models import Job, JobRequest, JobResult, JobResultStatuses

Expand Down Expand Up @@ -166,17 +167,18 @@ def process_job(job_uuid):
job.source,
)

app_logger.kv.context["job_request_uuid"] = str(job.job_request_uuid)
app_logger.kv.context["job_uuid"] = str(job.uuid)
middleware_chain = lambda job: job.run()

job_result = job.run()
for middleware_path in reversed(settings.WORKER_MIDDLEWARE):
middleware_class = import_string(middleware_path)
middleware_instance = middleware_class(middleware_chain)
middleware_chain = middleware_instance

job_result = middleware_chain(job)

# Release it now
del job

app_logger.kv.context.pop("job_request_uuid", None)
app_logger.kv.context.pop("job_uuid", None)

duration = job_result.ended_at - job_result.started_at
duration = duration.total_seconds()

Expand Down

0 comments on commit 22dfff4

Please sign in to comment.