From bc920a883ea46e8f606735268a7bb335f320befc Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Fri, 17 May 2024 13:39:17 +0300 Subject: [PATCH 01/28] Kill workhorse process when cancelling export job --- .vscode/settings.json | 5 ----- cvat/apps/engine/views.py | 11 ++++++++--- cvat/apps/events/handlers.py | 5 +++++ cvat/rqworker.py | 11 +++++++++-- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 75ca0eb512a..21b46cc608a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -40,9 +40,4 @@ "cSpell.words": [ "crowdsourcing" ], - "isort.args":["--profile", "black"], - "[python]": { - "editor.defaultFormatter": "ms-python.black-formatter", - "editor.formatOnSave": true - } } diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 28bac7295fd..a01303bcfc7 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -2972,9 +2972,14 @@ def _export_annotations( rq_request = rq_job.meta.get('request', None) request_time = rq_request.get('timestamp', None) if rq_request else None if request_time is None or request_time < last_instance_update_time: - # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER - # we have to enqueue dependent jobs after canceling one - rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) + if rq_job.worker_name: + send_stop_job_command(rq_job.connection, rq_job.id) + else: + # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER + # we have to enqueue dependent jobs after canceling one + rq_job.cancel( + enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER + ) rq_job.delete() else: if rq_job.is_finished: diff --git a/cvat/apps/events/handlers.py b/cvat/apps/events/handlers.py index 5591817dce3..dabaace950b 100644 --- a/cvat/apps/events/handlers.py +++ b/cvat/apps/events/handlers.py @@ -521,6 +521,11 @@ def handle_dataset_import( handle_dataset_io(instance, "import", format_name=format_name, cloud_storage=cloud_storage) def handle_rq_exception(rq_job, exc_type, exc_value, tb): + if isinstance(exc_type, SystemExit): + # rq process was killed intentionally by SIGTERM + # we do not need to log it + return False + oid = rq_job.meta.get("org_id", None) oslug = rq_job.meta.get("org_slug", None) pid = rq_job.meta.get("project_id", None) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index d368a1ef262..0bce612ce9e 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: MIT import os - +import signal from rq import Worker import cvat.utils.remote_debugger as debug @@ -42,12 +42,18 @@ def execute_job(self, *args, **kwargs): # errors during debugging # https://stackoverflow.com/questions/8242837/django-multiprocessing-and-database-connections/10684672#10684672 from django import db - db.connections.close_all() + db.connections.close_all() return self.perform_job(*args, **kwargs) + def kill_horse(self, sig: signal.Signals = signal.SIGTERM): + # Send SIGTERM instead of default SIGKILL in debug mode as SIGKILL can't be handled + # to prevent killing rq worker process as rq code handles SIGTERM properly + super().kill_horse(sig) + if debug.is_debugging_enabled(): + class RemoteDebugWorker(SimpleWorker): """ Support for VS code debugger @@ -68,6 +74,7 @@ def execute_job(self, *args, **kwargs): if os.environ.get("COVERAGE_PROCESS_START"): import coverage + default_exit = os._exit def coverage_exit(*args, **kwargs): From 683420a96ddf9a3f933821fc070c64f49dbe9bef Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Fri, 17 May 2024 13:42:27 +0300 Subject: [PATCH 02/28] Minor improvements --- cvat/rqworker.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 0bce612ce9e..5111232fd66 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -1,5 +1,5 @@ # Copyright (C) 2018-2022 Intel Corporation -# Copyright (C) 2022-2023 CVAT.ai Corporation +# Copyright (C) 2022-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT @@ -48,12 +48,12 @@ def execute_job(self, *args, **kwargs): def kill_horse(self, sig: signal.Signals = signal.SIGTERM): # Send SIGTERM instead of default SIGKILL in debug mode as SIGKILL can't be handled - # to prevent killing rq worker process as rq code handles SIGTERM properly + # to prevent killing debug process (rq code handles SIGTERM properly) + # and just starts a new rq job super().kill_horse(sig) if debug.is_debugging_enabled(): - class RemoteDebugWorker(SimpleWorker): """ Support for VS code debugger @@ -74,7 +74,6 @@ def execute_job(self, *args, **kwargs): if os.environ.get("COVERAGE_PROCESS_START"): import coverage - default_exit = os._exit def coverage_exit(*args, **kwargs): From 7d095f468b4191ef028484581708e681a0e63869 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Fri, 17 May 2024 13:43:03 +0300 Subject: [PATCH 03/28] Minor improvements --- cvat/rqworker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 5111232fd66..9e41de7209d 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -42,8 +42,8 @@ def execute_job(self, *args, **kwargs): # errors during debugging # https://stackoverflow.com/questions/8242837/django-multiprocessing-and-database-connections/10684672#10684672 from django import db - db.connections.close_all() + return self.perform_job(*args, **kwargs) def kill_horse(self, sig: signal.Signals = signal.SIGTERM): From e7fc3673f3506e73e6137b40edb5aec5ec5bae60 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Fri, 17 May 2024 18:45:44 +0300 Subject: [PATCH 04/28] Fixed debug worker --- cvat/apps/engine/views.py | 4 +++- cvat/rqworker.py | 20 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index a01303bcfc7..40f1bd4c845 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -16,6 +16,7 @@ from textwrap import dedent import django_rq +from rq.command import send_stop_job_command from attr.converters import to_bool from django.conf import settings from django.contrib.auth.models import User @@ -2965,6 +2966,7 @@ def _export_annotations( tasks_update = list(map(lambda db_task: timezone.localtime(db_task.updated_date), db_instance.tasks.all())) last_instance_update_time = max(tasks_update + [last_instance_update_time]) + timestamp = datetime.strftime(last_instance_update_time, "%Y_%m_%d_%H_%M_%S") is_annotation_file = rq_id.startswith('export:annotations') @@ -2972,7 +2974,7 @@ def _export_annotations( rq_request = rq_job.meta.get('request', None) request_time = rq_request.get('timestamp', None) if rq_request else None if request_time is None or request_time < last_instance_update_time: - if rq_job.worker_name: + if rq_job.get_status() == "started": send_stop_job_command(rq_job.connection, rq_job.id) else: # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 9e41de7209d..c42c3c9a2cb 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -6,6 +6,7 @@ import os import signal from rq import Worker +from rq.worker import StopRequested import cvat.utils.remote_debugger as debug @@ -52,6 +53,25 @@ def kill_horse(self, sig: signal.Signals = signal.SIGTERM): # and just starts a new rq job super().kill_horse(sig) + def handle_job_failure(self, *args, **kwargs): + # export job with the same ID was re-queued in the main process + # we do not need to handle failure + is_stopped_export_job = kwargs['queue'].name == 'export' and kwargs['exc_string'].strip().split('\n')[-1] == 'rq.worker.StopRequested' + signal.signal(signal.SIGTERM, self.request_stop) + if not is_stopped_export_job: + super().handle_job_failure(*args, **kwargs) + + # after the first warm stop (StopRequested), default code reassignes SIGTERM signal to cold stop (SysExit) + # we still want use warm stops in debug process + signal.signal(signal.SIGTERM, self.request_stop) + + def handle_exception(self, *args, **kwargs): + is_stopped_export_job = args[1] == StopRequested + if not is_stopped_export_job: + # we do not need to write exception here because the process was stopped intentionally + # moreover default code saves meta in and rewrites request datetime in meta with old value + super().handle_job_failure(*args, **kwargs) + if debug.is_debugging_enabled(): class RemoteDebugWorker(SimpleWorker): From e272452732dc07b957fbd277e5d9e692b5b716bb Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Fri, 17 May 2024 19:05:47 +0300 Subject: [PATCH 05/28] Removed extra code --- cvat/apps/events/handlers.py | 5 ----- cvat/rqworker.py | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/cvat/apps/events/handlers.py b/cvat/apps/events/handlers.py index dabaace950b..5591817dce3 100644 --- a/cvat/apps/events/handlers.py +++ b/cvat/apps/events/handlers.py @@ -521,11 +521,6 @@ def handle_dataset_import( handle_dataset_io(instance, "import", format_name=format_name, cloud_storage=cloud_storage) def handle_rq_exception(rq_job, exc_type, exc_value, tb): - if isinstance(exc_type, SystemExit): - # rq process was killed intentionally by SIGTERM - # we do not need to log it - return False - oid = rq_job.meta.get("org_id", None) oslug = rq_job.meta.get("org_slug", None) pid = rq_job.meta.get("project_id", None) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index c42c3c9a2cb..e85f0b87d39 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -70,7 +70,7 @@ def handle_exception(self, *args, **kwargs): if not is_stopped_export_job: # we do not need to write exception here because the process was stopped intentionally # moreover default code saves meta in and rewrites request datetime in meta with old value - super().handle_job_failure(*args, **kwargs) + super().handle_exception(*args, **kwargs) if debug.is_debugging_enabled(): From 79d2c8743d6ec12bd0575183e94d042a46b617d0 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Sun, 19 May 2024 11:03:59 +0300 Subject: [PATCH 06/28] Production improvements --- cvat/rqworker.py | 45 ++++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index e85f0b87d39..1e19979240c 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -6,12 +6,28 @@ import os import signal from rq import Worker -from rq.worker import StopRequested import cvat.utils.remote_debugger as debug +class CVATWorker(Worker): + def _install_signal_handlers(self): + super()._install_signal_handlers() + # by default first SIGTERM request warm shutdown used, then switched to cold + # we want always use cold shutdown + signal.signal(signal.SIGTERM, self.request_force_stop) -DefaultWorker = Worker + def handle_job_failure(self, job, queue, **kwargs): + if self._stopped_job_id == job.id: + self._stopped_job_id = None + self.set_current_job_id(None) + else: + # the job was stopped intentionally, we do not need update its status or put it into failed registry + # in our case the job is immediately removed after stop request + super().handle_job_failure(job, queue, **kwargs) + + + +DefaultWorker = CVATWorker class BaseDeathPenalty: @@ -25,7 +41,7 @@ def __exit__(self, exc_type, exc_value, traceback): pass -class SimpleWorker(Worker): +class SimpleWorker(CVATWorker): """ Allows to work with at most 1 worker thread. Useful for debugging. """ @@ -50,26 +66,17 @@ def execute_job(self, *args, **kwargs): def kill_horse(self, sig: signal.Signals = signal.SIGTERM): # Send SIGTERM instead of default SIGKILL in debug mode as SIGKILL can't be handled # to prevent killing debug process (rq code handles SIGTERM properly) - # and just starts a new rq job super().kill_horse(sig) - def handle_job_failure(self, *args, **kwargs): - # export job with the same ID was re-queued in the main process - # we do not need to handle failure - is_stopped_export_job = kwargs['queue'].name == 'export' and kwargs['exc_string'].strip().split('\n')[-1] == 'rq.worker.StopRequested' - signal.signal(signal.SIGTERM, self.request_stop) - if not is_stopped_export_job: - super().handle_job_failure(*args, **kwargs) - - # after the first warm stop (StopRequested), default code reassignes SIGTERM signal to cold stop (SysExit) - # we still want use warm stops in debug process - signal.signal(signal.SIGTERM, self.request_stop) - def handle_exception(self, *args, **kwargs): - is_stopped_export_job = args[1] == StopRequested + # on production it sends SIGKILL to work horse process + # but for development we overrided it and it sends SIGTERM to the process + # we need to prevent exception handling as the process killed intentilnally + + # moreover default code saves meta with exception + # and rewrites request datetime in meta with old value, as new job with the same ID may aldeady been created in a new process + is_stopped_export_job = args[1] == SystemExit if not is_stopped_export_job: - # we do not need to write exception here because the process was stopped intentionally - # moreover default code saves meta in and rewrites request datetime in meta with old value super().handle_exception(*args, **kwargs) From 566b908270a84b2c601a6dfaa03765aad25357a1 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Sun, 19 May 2024 11:06:28 +0300 Subject: [PATCH 07/28] Moved method --- cvat/rqworker.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 1e19979240c..6deaca8f292 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -10,12 +10,6 @@ import cvat.utils.remote_debugger as debug class CVATWorker(Worker): - def _install_signal_handlers(self): - super()._install_signal_handlers() - # by default first SIGTERM request warm shutdown used, then switched to cold - # we want always use cold shutdown - signal.signal(signal.SIGTERM, self.request_force_stop) - def handle_job_failure(self, job, queue, **kwargs): if self._stopped_job_id == job.id: self._stopped_job_id = None @@ -48,6 +42,12 @@ class SimpleWorker(CVATWorker): death_penalty_class = BaseDeathPenalty + def _install_signal_handlers(self): + super()._install_signal_handlers() + # by default first SIGTERM request warm shutdown used, then switched to cold + # we want always use cold shutdown + signal.signal(signal.SIGTERM, self.request_force_stop) + def main_work_horse(self, *args, **kwargs): raise NotImplementedError("Test worker does not implement this method") From 0ac46f842bee9f80d749c2aebe0cf5a0babe3927 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Sun, 19 May 2024 11:07:07 +0300 Subject: [PATCH 08/28] Fixed comment --- cvat/rqworker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 6deaca8f292..34ff5752212 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -44,7 +44,7 @@ class SimpleWorker(CVATWorker): def _install_signal_handlers(self): super()._install_signal_handlers() - # by default first SIGTERM request warm shutdown used, then switched to cold + # by default first SIGTERM request uses warm shutdown, then switched to cold # we want always use cold shutdown signal.signal(signal.SIGTERM, self.request_force_stop) From 7ac72ddfddb9bfb71dc51af546fc3c55662febc6 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Sun, 19 May 2024 11:11:44 +0300 Subject: [PATCH 09/28] Disabled pylint false alarm --- cvat/rqworker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 34ff5752212..8194f6f0433 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -11,6 +11,7 @@ class CVATWorker(Worker): def handle_job_failure(self, job, queue, **kwargs): + # pylint: disable=access-member-before-definition if self._stopped_job_id == job.id: self._stopped_job_id = None self.set_current_job_id(None) From d1cb3170590b5bfaf4bb60b8c51937c3b81467c1 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Sun, 19 May 2024 13:37:04 +0300 Subject: [PATCH 10/28] Minor fix --- cvat/rqworker.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 8194f6f0433..ea5dc768956 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -6,6 +6,7 @@ import os import signal from rq import Worker +from rq.worker import StopRequested import cvat.utils.remote_debugger as debug @@ -43,12 +44,6 @@ class SimpleWorker(CVATWorker): death_penalty_class = BaseDeathPenalty - def _install_signal_handlers(self): - super()._install_signal_handlers() - # by default first SIGTERM request uses warm shutdown, then switched to cold - # we want always use cold shutdown - signal.signal(signal.SIGTERM, self.request_force_stop) - def main_work_horse(self, *args, **kwargs): raise NotImplementedError("Test worker does not implement this method") @@ -76,7 +71,7 @@ def handle_exception(self, *args, **kwargs): # moreover default code saves meta with exception # and rewrites request datetime in meta with old value, as new job with the same ID may aldeady been created in a new process - is_stopped_export_job = args[1] == SystemExit + is_stopped_export_job = isinstance(args[2], (StopRequested, SystemExit)) if not is_stopped_export_job: super().handle_exception(*args, **kwargs) From 29bbb44ae0ff957d531bcb61e52ea3658699a028 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Sun, 19 May 2024 13:51:59 +0300 Subject: [PATCH 11/28] Updated comments --- cvat/rqworker.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index ea5dc768956..00417a0a9fd 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -18,7 +18,7 @@ def handle_job_failure(self, job, queue, **kwargs): self.set_current_job_id(None) else: # the job was stopped intentionally, we do not need update its status or put it into failed registry - # in our case the job is immediately removed after stop request + # in our case we remove the job from the main process immediately, after stop request super().handle_job_failure(job, queue, **kwargs) @@ -60,17 +60,21 @@ def execute_job(self, *args, **kwargs): return self.perform_job(*args, **kwargs) def kill_horse(self, sig: signal.Signals = signal.SIGTERM): - # Send SIGTERM instead of default SIGKILL in debug mode as SIGKILL can't be handled - # to prevent killing debug process (rq code handles SIGTERM properly) + # In debug mode we send SIGTERM instead of default SIGKILL + # Because SIGKILL is not handled (and can't be handled) by RQ code and it kills debug process from VSCode + # All three signals (SIGKILL, SIGTERM, SIGINT) are regularly used at RQ code super().kill_horse(sig) def handle_exception(self, *args, **kwargs): - # on production it sends SIGKILL to work horse process - # but for development we overrided it and it sends SIGTERM to the process - # we need to prevent exception handling as the process killed intentilnally - - # moreover default code saves meta with exception - # and rewrites request datetime in meta with old value, as new job with the same ID may aldeady been created in a new process + # In production environment it sends SIGKILL to work horse process + # But for development we overrided it and it sends SIGTERM to the process + # So, we need additionally prevent exception handling as the process killed intentionally + + # Moreover default "handle_exception" code saves meta with exception + # It leads to extra bugs: + # - we create another job with the same ID in the server process + # - when exception message is saved in worker code, it also saves outdated datetime value as part of meta information + # - this outdated value then used in server code is_stopped_export_job = isinstance(args[2], (StopRequested, SystemExit)) if not is_stopped_export_job: super().handle_exception(*args, **kwargs) From 74393e6652a3170825035230353655332edc88ca Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Sun, 19 May 2024 14:08:13 +0300 Subject: [PATCH 12/28] Implemented for backup --- cvat/apps/engine/backup.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index a3a63c082b7..3c0e418bda7 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -17,6 +17,7 @@ from tempfile import NamedTemporaryFile import django_rq +from rq.command import send_stop_job_command from attr.converters import to_bool from django.conf import settings from django.db import transaction @@ -964,9 +965,12 @@ def export(db_instance, request, queue_name): rq_request = rq_job.meta.get('request', None) request_time = rq_request.get("timestamp", None) if rq_request else None if request_time is None or request_time < last_instance_update_time: - # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER - # we have to enqueue dependent jobs after canceling one - rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) + if rq_job.get_status() == "started": + send_stop_job_command(rq_job.connection, rq_job.id) + else: + # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER + # we have to enqueue dependent jobs after canceling onegg + rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) rq_job.delete() else: if rq_job.is_finished: From 8ed8c495ae12b345da0acedbf0290a09c83f8c6c Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Tue, 21 May 2024 11:45:49 +0300 Subject: [PATCH 13/28] Patched rq --- cvat/rq_patching.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/cvat/rq_patching.py b/cvat/rq_patching.py index cd8c1ac7422..d7c5540d6de 100644 --- a/cvat/rq_patching.py +++ b/cvat/rq_patching.py @@ -3,14 +3,19 @@ # SPDX-License-Identifier: MIT import traceback +import sys from datetime import datetime from typing import Optional import rq.registry +from redis.client import Pipeline from rq.exceptions import AbandonedJobError, NoSuchJobError from rq.job import JobStatus -from rq.utils import current_timestamp +from rq.utils import current_timestamp, utcformat from rq.version import VERSION +from rq.worker import StopRequested +import cvat.utils.remote_debugger as debug + # NOTE: we should patch implementation of original method because @@ -69,7 +74,22 @@ def custom_started_job_registry_cleanup(self, timestamp: Optional[float] = None) return job_ids +def custom_hearbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pipeline'] = None, xx: bool = False): + self.last_heartbeat = timestamp + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) + + # the only difference with the default implementation + # is to avoid adding job to started job registry + # that is not clear why they adds a job to started_job_registry when the job has failed + # also, as far as I see they also commented this line in unrealeased (2.0) version + exc_info = sys.exc_info() + is_stopped_export_job = isinstance(exc_info[1], (StopRequested, SystemExit)) + if not is_stopped_export_job: + self.started_job_registry.add(self, ttl, pipeline=pipeline, xx=xx) + def update_started_job_registry_cleanup() -> None: # don't forget to check if the issue https://github.com/rq/rq/issues/2006 has been resolved in upstream assert VERSION == '1.16.0' rq.registry.StartedJobRegistry.cleanup = custom_started_job_registry_cleanup + rq.job.Job.heartbeat = custom_hearbeat From 3bbc2c9c1dc72141ba3f06093c9625f890d3ac96 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Tue, 21 May 2024 11:52:45 +0300 Subject: [PATCH 14/28] Fixed pylint --- cvat/rq_patching.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cvat/rq_patching.py b/cvat/rq_patching.py index d7c5540d6de..851f97892d8 100644 --- a/cvat/rq_patching.py +++ b/cvat/rq_patching.py @@ -1,4 +1,4 @@ -# Copyright (C) 2023 CVAT.ai Corporation +# Copyright (C) 2023-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT @@ -8,14 +8,11 @@ from typing import Optional import rq.registry -from redis.client import Pipeline from rq.exceptions import AbandonedJobError, NoSuchJobError from rq.job import JobStatus from rq.utils import current_timestamp, utcformat from rq.version import VERSION from rq.worker import StopRequested -import cvat.utils.remote_debugger as debug - # NOTE: we should patch implementation of original method because From 98a00b43ceff19cf2b361364b29e85ec12e41696 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Tue, 21 May 2024 11:53:05 +0300 Subject: [PATCH 15/28] Updated license headers --- cvat/apps/engine/backup.py | 2 +- cvat/apps/engine/views.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index 3c0e418bda7..835e7ebcfb0 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -1,5 +1,5 @@ # Copyright (C) 2021-2022 Intel Corporation -# Copyright (C) 2022-2023 CVAT.ai Corporation +# Copyright (C) 2022-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 40f1bd4c845..db41f608005 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -1,5 +1,5 @@ # Copyright (C) 2018-2022 Intel Corporation -# Copyright (C) 2022-2023 CVAT.ai Corporation +# Copyright (C) 2022-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT From 0536e0da4f3c14226ee9240dc446f5000fc459d7 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Tue, 21 May 2024 13:00:23 +0300 Subject: [PATCH 16/28] Consider queued jobs when looking for a dependends job --- cvat/apps/engine/backup.py | 13 ++++++++----- cvat/apps/engine/utils.py | 9 ++++++++- cvat/apps/engine/views.py | 15 ++++++++------- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index 835e7ebcfb0..ead35d4cf0b 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -18,6 +18,7 @@ import django_rq from rq.command import send_stop_job_command +from rq.exceptions import InvalidJobOperation from attr.converters import to_bool from django.conf import settings from django.db import transaction @@ -965,12 +966,14 @@ def export(db_instance, request, queue_name): rq_request = rq_job.meta.get('request', None) request_time = rq_request.get("timestamp", None) if rq_request else None if request_time is None or request_time < last_instance_update_time: - if rq_job.get_status() == "started": + # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER + # we have to enqueue dependent jobs after canceling one + rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) + try: send_stop_job_command(rq_job.connection, rq_job.id) - else: - # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER - # we have to enqueue dependent jobs after canceling onegg - rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) + except InvalidJobOperation: + pass + rq_job.delete() else: if rq_job.is_finished: diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 85b00ae75c6..4e0684645ea 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -183,7 +183,14 @@ def define_dependent_job( # if there are active or deferred jobs when restarting the worker container. if job and job.meta.get("user", {}).get("id") == user_id and job.is_deferred ] - all_user_jobs = started_user_jobs + deferred_user_jobs + queued_user_jobs = [ + job + for job in queue.job_class.fetch_many( + queue.get_job_ids(), queue.connection + ) + if job and job.meta.get("user", {}).get("id") == user_id and job.is_deferred + ] + all_user_jobs = started_user_jobs + deferred_user_jobs + queued_user_jobs # prevent possible cyclic dependencies if rq_id: diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index db41f608005..197d92fedcf 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -17,6 +17,7 @@ import django_rq from rq.command import send_stop_job_command +from rq.exceptions import InvalidJobOperation from attr.converters import to_bool from django.conf import settings from django.contrib.auth.models import User @@ -2974,14 +2975,14 @@ def _export_annotations( rq_request = rq_job.meta.get('request', None) request_time = rq_request.get('timestamp', None) if rq_request else None if request_time is None or request_time < last_instance_update_time: - if rq_job.get_status() == "started": + # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER + # we have to enqueue dependent jobs after canceling one + rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) + try: send_stop_job_command(rq_job.connection, rq_job.id) - else: - # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER - # we have to enqueue dependent jobs after canceling one - rq_job.cancel( - enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER - ) + except InvalidJobOperation: + pass + rq_job.delete() else: if rq_job.is_finished: From fda8d8923c3e8189413e7d1625f254234f18fdd9 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Tue, 21 May 2024 13:08:11 +0300 Subject: [PATCH 17/28] added comment --- cvat/rqworker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 00417a0a9fd..034e0236f43 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -11,6 +11,9 @@ import cvat.utils.remote_debugger as debug class CVATWorker(Worker): + # this method may be called not only from work-horse process + # but also from parent process in Worker::monitor_work_horse_process + # if parent process sees that work-horse is dead def handle_job_failure(self, job, queue, **kwargs): # pylint: disable=access-member-before-definition if self._stopped_job_id == job.id: From 9a35fa9c0843371a79dd6c466ed44e4659e430f9 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Tue, 21 May 2024 14:48:07 +0300 Subject: [PATCH 18/28] Fixed define_dependent_job --- cvat/apps/engine/utils.py | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 4e0684645ea..c81b433bf91 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -165,32 +165,22 @@ def define_dependent_job( if not should_be_dependent: return None - started_user_jobs = [ - job - for job in queue.job_class.fetch_many( - queue.started_job_registry.get_job_ids(), queue.connection - ) - if job and job.meta.get("user", {}).get("id") == user_id - ] - deferred_user_jobs = [ - job - for job in queue.job_class.fetch_many( - queue.deferred_job_registry.get_job_ids(), queue.connection - ) + keys = [] + with queue.connection.pipeline() as pipeline: # Since there is no cleanup implementation in DeferredJobRegistry, # this registry can contain "outdated" jobs that weren't deleted from it # but were added to another registry. Probably such situations can occur # if there are active or deferred jobs when restarting the worker container. - if job and job.meta.get("user", {}).get("id") == user_id and job.is_deferred - ] - queued_user_jobs = [ - job - for job in queue.job_class.fetch_many( - queue.get_job_ids(), queue.connection - ) - if job and job.meta.get("user", {}).get("id") == user_id and job.is_deferred + for registry in [queue.started_job_registry, queue.deferred_job_registry]: + pipeline.zrange(registry.key, 0, -1) + pipeline.lrange(queue.key, 0, -1) + results = pipeline.execute() + keys = [bytes_key.decode('utf-8') for result in results for bytes_key in result] + + all_user_jobs = [ + job for job in queue.job_class.fetch_many(keys, queue.connection) + if job and job.meta.get("user", {}).get("id") == user_id ] - all_user_jobs = started_user_jobs + deferred_user_jobs + queued_user_jobs # prevent possible cyclic dependencies if rq_id: From c0828023f62d84948dfcb31cb0b153af95cfe14b Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Tue, 21 May 2024 15:31:54 +0300 Subject: [PATCH 19/28] Improved comments --- cvat/rqworker.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/cvat/rqworker.py b/cvat/rqworker.py index 034e0236f43..c6a996dfc34 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -11,17 +11,19 @@ import cvat.utils.remote_debugger as debug class CVATWorker(Worker): - # this method may be called not only from work-horse process - # but also from parent process in Worker::monitor_work_horse_process + # may be called from work horse's perform_job::except block + # or from parent's Worker::monitor_work_horse_process # if parent process sees that work-horse is dead + + # This modification ensures that jobs stopped intentionally + # do not get their status updated or placed in the failed registry + # as the main server code must delete them at all def handle_job_failure(self, job, queue, **kwargs): # pylint: disable=access-member-before-definition if self._stopped_job_id == job.id: self._stopped_job_id = None self.set_current_job_id(None) else: - # the job was stopped intentionally, we do not need update its status or put it into failed registry - # in our case we remove the job from the main process immediately, after stop request super().handle_job_failure(job, queue, **kwargs) @@ -69,12 +71,13 @@ def kill_horse(self, sig: signal.Signals = signal.SIGTERM): super().kill_horse(sig) def handle_exception(self, *args, **kwargs): - # In production environment it sends SIGKILL to work horse process - # But for development we overrided it and it sends SIGTERM to the process - # So, we need additionally prevent exception handling as the process killed intentionally + # In production environment it sends SIGKILL to work horse process and this method never called + # But for development we overrided the signal and it sends SIGTERM to the process + # This modification ensures that exceptions are handled differently + # when the job is stopped intentionally, avoiding incorrect exception handling. - # Moreover default "handle_exception" code saves meta with exception - # It leads to extra bugs: + # PROBLEM: default "handle_exception" code saves meta with exception + # It leads to bugs: # - we create another job with the same ID in the server process # - when exception message is saved in worker code, it also saves outdated datetime value as part of meta information # - this outdated value then used in server code From 57616e9fefd139ca78cefd767c734c6a92115235 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Wed, 22 May 2024 16:03:55 +0300 Subject: [PATCH 20/28] Using cupress --- cvat/apps/engine/backup.py | 6 ++---- cvat/apps/engine/views.py | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index ead35d4cf0b..dbe7c082484 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -15,6 +15,7 @@ from zipfile import ZipFile from datetime import datetime from tempfile import NamedTemporaryFile +from contextlib import suppress import django_rq from rq.command import send_stop_job_command @@ -969,11 +970,8 @@ def export(db_instance, request, queue_name): # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER # we have to enqueue dependent jobs after canceling one rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) - try: + with suppress(InvalidJobOperation): send_stop_job_command(rq_job.connection, rq_job.id) - except InvalidJobOperation: - pass - rq_job.delete() else: if rq_job.is_finished: diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 197d92fedcf..7ff55e552b9 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -14,6 +14,7 @@ from datetime import datetime from tempfile import NamedTemporaryFile from textwrap import dedent +from contextlib import suppress import django_rq from rq.command import send_stop_job_command @@ -2978,11 +2979,8 @@ def _export_annotations( # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER # we have to enqueue dependent jobs after canceling one rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) - try: + with suppress(InvalidJobOperation): send_stop_job_command(rq_job.connection, rq_job.id) - except InvalidJobOperation: - pass - rq_job.delete() else: if rq_job.is_finished: From 98fa00b3dc89cad42f3f543eeebe0d39bc27f508 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Wed, 22 May 2024 16:18:27 +0300 Subject: [PATCH 21/28] Returned status check --- cvat/apps/engine/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index c81b433bf91..12b9dfb0029 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -179,7 +179,7 @@ def define_dependent_job( all_user_jobs = [ job for job in queue.job_class.fetch_many(keys, queue.connection) - if job and job.meta.get("user", {}).get("id") == user_id + if job and job.meta.get("user", {}).get("id") == user_id and job.get_status() in ["started", "queued", "deferred"] ] # prevent possible cyclic dependencies From c2de3e3fcc2e2d0a5382d7d4217a7efaea297c0b Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Thu, 23 May 2024 11:00:29 +0300 Subject: [PATCH 22/28] Fixed circular dependency issues --- cvat/apps/engine/utils.py | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 12b9dfb0029..11bc75190c0 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -165,22 +165,16 @@ def define_dependent_job( if not should_be_dependent: return None - keys = [] - with queue.connection.pipeline() as pipeline: - # Since there is no cleanup implementation in DeferredJobRegistry, - # this registry can contain "outdated" jobs that weren't deleted from it - # but were added to another registry. Probably such situations can occur - # if there are active or deferred jobs when restarting the worker container. - for registry in [queue.started_job_registry, queue.deferred_job_registry]: - pipeline.zrange(registry.key, 0, -1) - pipeline.lrange(queue.key, 0, -1) - results = pipeline.execute() - keys = [bytes_key.decode('utf-8') for result in results for bytes_key in result] - - all_user_jobs = [ - job for job in queue.job_class.fetch_many(keys, queue.connection) - if job and job.meta.get("user", {}).get("id") == user_id and job.get_status() in ["started", "queued", "deferred"] - ] + queues = [queue.deferred_job_registry, queue, queue.started_job_registry] + filters = [lambda job: job.is_deferred, lambda _: True, lambda _: True] + all_user_jobs = [] + for q, f in zip(queues, filters): + job_ids = q.get_job_ids() + jobs = queue.job_class.fetch_many(job_ids, queue.connection) + jobs = filter(lambda job: job.meta.get("user", {}).get("id") == user_id and f(job), jobs) + all_user_jobs.extend(jobs) + + print(all_user_jobs) # prevent possible cyclic dependencies if rq_id: From d5a57cfd109a8e141d9a2442182a28593b8c42ef Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Thu, 23 May 2024 11:05:09 +0300 Subject: [PATCH 23/28] Update cvat/apps/engine/utils.py --- cvat/apps/engine/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 11bc75190c0..d1b2eef869e 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -174,8 +174,6 @@ def define_dependent_job( jobs = filter(lambda job: job.meta.get("user", {}).get("id") == user_id and f(job), jobs) all_user_jobs.extend(jobs) - print(all_user_jobs) - # prevent possible cyclic dependencies if rq_id: all_job_dependency_ids = { From fbff39454c201fdaa966973757bbac58511ff912 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Thu, 23 May 2024 11:08:03 +0300 Subject: [PATCH 24/28] Restored comment --- cvat/apps/engine/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index d1b2eef869e..04ba4041ede 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -166,6 +166,10 @@ def define_dependent_job( return None queues = [queue.deferred_job_registry, queue, queue.started_job_registry] + # Since there is no cleanup implementation in DeferredJobRegistry, + # this registry can contain "outdated" jobs that weren't deleted from it + # but were added to another registry. Probably such situations can occur + # if there are active or deferred jobs when restarting the worker container. filters = [lambda job: job.is_deferred, lambda _: True, lambda _: True] all_user_jobs = [] for q, f in zip(queues, filters): From bf054cee20e6ccca490180a33c815c80d766a2aa Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Thu, 23 May 2024 15:04:52 +0300 Subject: [PATCH 25/28] Update cvat/apps/engine/utils.py Co-authored-by: Maria Khrustaleva --- cvat/apps/engine/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 04ba4041ede..aa62485981a 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -174,7 +174,7 @@ def define_dependent_job( all_user_jobs = [] for q, f in zip(queues, filters): job_ids = q.get_job_ids() - jobs = queue.job_class.fetch_many(job_ids, queue.connection) + jobs = q.job_class.fetch_many(job_ids, q.connection) jobs = filter(lambda job: job.meta.get("user", {}).get("id") == user_id and f(job), jobs) all_user_jobs.extend(jobs) From c7955ca0f0b4d93f204c0f6ad459f13645600ea3 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Mon, 27 May 2024 14:55:22 +0300 Subject: [PATCH 26/28] Renamed identificators --- cvat/rq_patching.py | 8 ++++---- cvat/rqworker.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cvat/rq_patching.py b/cvat/rq_patching.py index 851f97892d8..13c39658db4 100644 --- a/cvat/rq_patching.py +++ b/cvat/rq_patching.py @@ -71,7 +71,7 @@ def custom_started_job_registry_cleanup(self, timestamp: Optional[float] = None) return job_ids -def custom_hearbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pipeline'] = None, xx: bool = False): +def custom_heartbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pipeline'] = None, xx: bool = False): self.last_heartbeat = timestamp connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) @@ -81,12 +81,12 @@ def custom_hearbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pip # that is not clear why they adds a job to started_job_registry when the job has failed # also, as far as I see they also commented this line in unrealeased (2.0) version exc_info = sys.exc_info() - is_stopped_export_job = isinstance(exc_info[1], (StopRequested, SystemExit)) - if not is_stopped_export_job: + is_stopped_job = isinstance(exc_info[1], (StopRequested, SystemExit)) + if not is_stopped_job: self.started_job_registry.add(self, ttl, pipeline=pipeline, xx=xx) def update_started_job_registry_cleanup() -> None: # don't forget to check if the issue https://github.com/rq/rq/issues/2006 has been resolved in upstream assert VERSION == '1.16.0' rq.registry.StartedJobRegistry.cleanup = custom_started_job_registry_cleanup - rq.job.Job.heartbeat = custom_hearbeat + rq.job.Job.heartbeat = custom_heartbeat diff --git a/cvat/rqworker.py b/cvat/rqworker.py index c6a996dfc34..b39cf47c66c 100644 --- a/cvat/rqworker.py +++ b/cvat/rqworker.py @@ -81,8 +81,8 @@ def handle_exception(self, *args, **kwargs): # - we create another job with the same ID in the server process # - when exception message is saved in worker code, it also saves outdated datetime value as part of meta information # - this outdated value then used in server code - is_stopped_export_job = isinstance(args[2], (StopRequested, SystemExit)) - if not is_stopped_export_job: + is_stopped_job = isinstance(args[2], (StopRequested, SystemExit)) + if not is_stopped_job: super().handle_exception(*args, **kwargs) From 8f4350351a3c73fc391b9bc22c4a974360678d13 Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Wed, 5 Jun 2024 09:36:44 +0300 Subject: [PATCH 27/28] Added changelog --- changelog.d/20240605_093600_boris_kill_worker.md | 4 ++++ cvat/apps/engine/views.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 changelog.d/20240605_093600_boris_kill_worker.md diff --git a/changelog.d/20240605_093600_boris_kill_worker.md b/changelog.d/20240605_093600_boris_kill_worker.md new file mode 100644 index 00000000000..b6a5acfe9ce --- /dev/null +++ b/changelog.d/20240605_093600_boris_kill_worker.md @@ -0,0 +1,4 @@ +### Fixed + +- Queued jobs are not considered in deferring logic + () diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 408e379d102..2ef73e29c6b 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -1,5 +1,5 @@ # Copyright (C) 2018-2022 Intel Corporation -# Copyright (C) 2022-2023 CVAT.ai Corporation +# Copyright (C) 2022-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT From 959d340b842035d390c88ade8abbe57000077dfe Mon Sep 17 00:00:00 2001 From: Boris Sekachev Date: Wed, 5 Jun 2024 10:51:16 +0300 Subject: [PATCH 28/28] Added none check --- cvat/apps/engine/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index aa62485981a..daa7e617a2f 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -175,7 +175,7 @@ def define_dependent_job( for q, f in zip(queues, filters): job_ids = q.get_job_ids() jobs = q.job_class.fetch_many(job_ids, q.connection) - jobs = filter(lambda job: job.meta.get("user", {}).get("id") == user_id and f(job), jobs) + jobs = filter(lambda job: job and job.meta.get("user", {}).get("id") == user_id and f(job), jobs) all_user_jobs.extend(jobs) # prevent possible cyclic dependencies