Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed: Queued jobs are not considered in deferring logic #7907

Merged
merged 35 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bc920a8
Kill workhorse process when cancelling export job
bsekachev May 17, 2024
683420a
Minor improvements
bsekachev May 17, 2024
7d095f4
Minor improvements
bsekachev May 17, 2024
e7fc367
Fixed debug worker
bsekachev May 17, 2024
e272452
Removed extra code
bsekachev May 17, 2024
79d2c87
Production improvements
bsekachev May 19, 2024
566b908
Moved method
bsekachev May 19, 2024
0ac46f8
Fixed comment
bsekachev May 19, 2024
7ac72dd
Disabled pylint false alarm
bsekachev May 19, 2024
d1cb317
Minor fix
bsekachev May 19, 2024
29bbb44
Updated comments
bsekachev May 19, 2024
74393e6
Implemented for backup
bsekachev May 19, 2024
43bbc97
Merge branch 'develop' into bs/kill_worker
bsekachev May 20, 2024
8ed8c49
Patched rq
bsekachev May 21, 2024
3bbc2c9
Fixed pylint
bsekachev May 21, 2024
98a00b4
Updated license headers
bsekachev May 21, 2024
0536e0d
Consider queued jobs when looking for a dependends job
bsekachev May 21, 2024
fda8d89
added comment
bsekachev May 21, 2024
ed0cc01
Merge branch 'develop' into bs/kill_worker
bsekachev May 21, 2024
9a35fa9
Fixed define_dependent_job
bsekachev May 21, 2024
c082802
Improved comments
bsekachev May 21, 2024
9e7f6ae
Merge branch 'develop' into bs/kill_worker
bsekachev May 21, 2024
57616e9
Using cupress
bsekachev May 22, 2024
98fa00b
Returned status check
bsekachev May 22, 2024
c2de3e3
Fixed circular dependency issues
bsekachev May 23, 2024
d5a57cf
Update cvat/apps/engine/utils.py
bsekachev May 23, 2024
fbff394
Restored comment
bsekachev May 23, 2024
bf054ce
Update cvat/apps/engine/utils.py
bsekachev May 23, 2024
89a8b6c
Merge branch 'develop' into bs/kill_worker
bsekachev May 23, 2024
b332d9c
Merge branch 'develop' into bs/kill_worker
bsekachev May 23, 2024
6af94a7
Merge branch 'develop' into bs/kill_worker
bsekachev May 23, 2024
c7955ca
Renamed identificators
bsekachev May 27, 2024
8a86065
Updated
bsekachev Jun 5, 2024
8f43503
Added changelog
bsekachev Jun 5, 2024
959d340
Added none check
bsekachev Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,4 @@
"cSpell.words": [
"crowdsourcing"
],
"isort.args":["--profile", "black"],
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added these settings previously, but now it looks like they bring more problems than profit

}
13 changes: 10 additions & 3 deletions cvat/apps/engine/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from textwrap import dedent

import django_rq
from rq.command import send_stop_job_command
from attr.converters import to_bool
from django.conf import settings
from django.contrib.auth.models import User
Expand Down Expand Up @@ -2965,16 +2966,22 @@ def _export_annotations(
tasks_update = list(map(lambda db_task: timezone.localtime(db_task.updated_date), db_instance.tasks.all()))
last_instance_update_time = max(tasks_update + [last_instance_update_time])


timestamp = datetime.strftime(last_instance_update_time, "%Y_%m_%d_%H_%M_%S")
is_annotation_file = rq_id.startswith('export:annotations')

if rq_job:
rq_request = rq_job.meta.get('request', None)
request_time = rq_request.get('timestamp', None) if rq_request else None
if request_time is None or request_time < last_instance_update_time:
# in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER
# we have to enqueue dependent jobs after canceling one
rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER)
if rq_job.get_status() == "started":
send_stop_job_command(rq_job.connection, rq_job.id)
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
else:
# in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER
# we have to enqueue dependent jobs after canceling one
rq_job.cancel(
enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER
)
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
rq_job.delete()
else:
if rq_job.is_finished:
Expand Down
37 changes: 33 additions & 4 deletions cvat/rqworker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
# Copyright (C) 2018-2022 Intel Corporation
# Copyright (C) 2022-2023 CVAT.ai Corporation
# Copyright (C) 2022-2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import os

import signal
from rq import Worker
from rq.worker import StopRequested

import cvat.utils.remote_debugger as debug

class CVATWorker(Worker):
def handle_job_failure(self, job, queue, **kwargs):
# pylint: disable=access-member-before-definition
if self._stopped_job_id == job.id:
self._stopped_job_id = None
self.set_current_job_id(None)
else:
# the job was stopped intentionally, we do not need update its status or put it into failed registry
# in our case the job is immediately removed after stop request
super().handle_job_failure(job, queue, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure clarity by adding a comment explaining the check for _stopped_job_id.

+        # Check if the job was stopped intentionally and handle accordingly

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def handle_job_failure(self, job, queue, **kwargs):
# pylint: disable=access-member-before-definition
if self._stopped_job_id == job.id:
self._stopped_job_id = None
self.set_current_job_id(None)
else:
# the job was stopped intentionally, we do not need update its status or put it into failed registry
# in our case the job is immediately removed after stop request
super().handle_job_failure(job, queue, **kwargs)
def handle_job_failure(self, job, queue, **kwargs):
# pylint: disable=access-member-before-definition
# Check if the job was stopped intentionally and handle accordingly
if self._stopped_job_id == job.id:
self._stopped_job_id = None
self.set_current_job_id(None)
else:
# the job was stopped intentionally, we do not need update its status or put it into failed registry
# in our case the job is immediately removed after stop request
super().handle_job_failure(job, queue, **kwargs)



DefaultWorker = Worker

DefaultWorker = CVATWorker


class BaseDeathPenalty:
Expand All @@ -24,7 +37,7 @@ def __exit__(self, exc_type, exc_value, traceback):
pass


class SimpleWorker(Worker):
class SimpleWorker(CVATWorker):
"""
Allows to work with at most 1 worker thread. Useful for debugging.
"""
Expand All @@ -46,6 +59,22 @@ def execute_job(self, *args, **kwargs):

return self.perform_job(*args, **kwargs)

def kill_horse(self, sig: signal.Signals = signal.SIGTERM):
# Send SIGTERM instead of default SIGKILL in debug mode as SIGKILL can't be handled
# to prevent killing debug process (rq code handles SIGTERM properly)
super().kill_horse(sig)

def handle_exception(self, *args, **kwargs):
# on production it sends SIGKILL to work horse process
# but for development we overrided it and it sends SIGTERM to the process
# we need to prevent exception handling as the process killed intentilnally

# moreover default code saves meta with exception
# and rewrites request datetime in meta with old value, as new job with the same ID may aldeady been created in a new process
is_stopped_export_job = isinstance(args[2], (StopRequested, SystemExit))
if not is_stopped_export_job:
super().handle_exception(*args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify the exception handling logic with a detailed comment.

+        # Check if the exception is due to a stop request or system exit and handle accordingly

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def handle_exception(self, *args, **kwargs):
# on production it sends SIGKILL to work horse process
# but for development we overrided it and it sends SIGTERM to the process
# we need to prevent exception handling as the process killed intentilnally
# moreover default code saves meta with exception
# and rewrites request datetime in meta with old value, as new job with the same ID may aldeady been created in a new process
is_stopped_export_job = isinstance(args[2], (StopRequested, SystemExit))
if not is_stopped_export_job:
super().handle_exception(*args, **kwargs)
def handle_exception(self, *args, **kwargs):
# on production it sends SIGKILL to work horse process
# but for development we overrided it and it sends SIGTERM to the process
# we need to prevent exception handling as the process killed intentilnally
# moreover default code saves meta with exception
# and rewrites request datetime in meta with old value, as new job with the same ID may aldeady been created in a new process
# Check if the exception is due to a stop request or system exit and handle accordingly
is_stopped_export_job = isinstance(args[2], (StopRequested, SystemExit))
if not is_stopped_export_job:
super().handle_exception(*args, **kwargs)



if debug.is_debugging_enabled():
class RemoteDebugWorker(SimpleWorker):
Expand Down
Loading