diff --git a/changelog.d/20240605_093600_boris_kill_worker.md b/changelog.d/20240605_093600_boris_kill_worker.md new file mode 100644 index 00000000000..b6a5acfe9ce --- /dev/null +++ b/changelog.d/20240605_093600_boris_kill_worker.md @@ -0,0 +1,4 @@ +### Fixed + +- Queued jobs are not considered in deferring logic + () diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 85b00ae75c6..daa7e617a2f 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -165,25 +165,18 @@ def define_dependent_job( if not should_be_dependent: return None - started_user_jobs = [ - job - for job in queue.job_class.fetch_many( - queue.started_job_registry.get_job_ids(), queue.connection - ) - if job and job.meta.get("user", {}).get("id") == user_id - ] - deferred_user_jobs = [ - job - for job in queue.job_class.fetch_many( - queue.deferred_job_registry.get_job_ids(), queue.connection - ) - # Since there is no cleanup implementation in DeferredJobRegistry, - # this registry can contain "outdated" jobs that weren't deleted from it - # but were added to another registry. Probably such situations can occur - # if there are active or deferred jobs when restarting the worker container. - if job and job.meta.get("user", {}).get("id") == user_id and job.is_deferred - ] - all_user_jobs = started_user_jobs + deferred_user_jobs + queues = [queue.deferred_job_registry, queue, queue.started_job_registry] + # Since there is no cleanup implementation in DeferredJobRegistry, + # this registry can contain "outdated" jobs that weren't deleted from it + # but were added to another registry. Probably such situations can occur + # if there are active or deferred jobs when restarting the worker container. + filters = [lambda job: job.is_deferred, lambda _: True, lambda _: True] + all_user_jobs = [] + for q, f in zip(queues, filters): + job_ids = q.get_job_ids() + jobs = q.job_class.fetch_many(job_ids, q.connection) + jobs = filter(lambda job: job and job.meta.get("user", {}).get("id") == user_id and f(job), jobs) + all_user_jobs.extend(jobs) # prevent possible cyclic dependencies if rq_id: