Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed: Queued jobs are not considered in deferring logic #7907

Merged
merged 35 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bc920a8
Kill workhorse process when cancelling export job
bsekachev May 17, 2024
683420a
Minor improvements
bsekachev May 17, 2024
7d095f4
Minor improvements
bsekachev May 17, 2024
e7fc367
Fixed debug worker
bsekachev May 17, 2024
e272452
Removed extra code
bsekachev May 17, 2024
79d2c87
Production improvements
bsekachev May 19, 2024
566b908
Moved method
bsekachev May 19, 2024
0ac46f8
Fixed comment
bsekachev May 19, 2024
7ac72dd
Disabled pylint false alarm
bsekachev May 19, 2024
d1cb317
Minor fix
bsekachev May 19, 2024
29bbb44
Updated comments
bsekachev May 19, 2024
74393e6
Implemented for backup
bsekachev May 19, 2024
43bbc97
Merge branch 'develop' into bs/kill_worker
bsekachev May 20, 2024
8ed8c49
Patched rq
bsekachev May 21, 2024
3bbc2c9
Fixed pylint
bsekachev May 21, 2024
98a00b4
Updated license headers
bsekachev May 21, 2024
0536e0d
Consider queued jobs when looking for a dependends job
bsekachev May 21, 2024
fda8d89
added comment
bsekachev May 21, 2024
ed0cc01
Merge branch 'develop' into bs/kill_worker
bsekachev May 21, 2024
9a35fa9
Fixed define_dependent_job
bsekachev May 21, 2024
c082802
Improved comments
bsekachev May 21, 2024
9e7f6ae
Merge branch 'develop' into bs/kill_worker
bsekachev May 21, 2024
57616e9
Using cupress
bsekachev May 22, 2024
98fa00b
Returned status check
bsekachev May 22, 2024
c2de3e3
Fixed circular dependency issues
bsekachev May 23, 2024
d5a57cf
Update cvat/apps/engine/utils.py
bsekachev May 23, 2024
fbff394
Restored comment
bsekachev May 23, 2024
bf054ce
Update cvat/apps/engine/utils.py
bsekachev May 23, 2024
89a8b6c
Merge branch 'develop' into bs/kill_worker
bsekachev May 23, 2024
b332d9c
Merge branch 'develop' into bs/kill_worker
bsekachev May 23, 2024
6af94a7
Merge branch 'develop' into bs/kill_worker
bsekachev May 23, 2024
c7955ca
Renamed identificators
bsekachev May 27, 2024
8a86065
Updated
bsekachev Jun 5, 2024
8f43503
Added changelog
bsekachev Jun 5, 2024
959d340
Added none check
bsekachev Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,4 @@
"cSpell.words": [
"crowdsourcing"
],
"isort.args":["--profile", "black"],
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added these settings previously, but now it looks like they bring more problems than profit

}
7 changes: 6 additions & 1 deletion cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2021-2022 Intel Corporation
# Copyright (C) 2022-2023 CVAT.ai Corporation
# Copyright (C) 2022-2024 CVAT.ai Corporation
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
#
# SPDX-License-Identifier: MIT

Expand All @@ -15,8 +15,11 @@
from zipfile import ZipFile
from datetime import datetime
from tempfile import NamedTemporaryFile
from contextlib import suppress

import django_rq
from rq.command import send_stop_job_command
from rq.exceptions import InvalidJobOperation
from attr.converters import to_bool
from django.conf import settings
from django.db import transaction
Expand Down Expand Up @@ -967,6 +970,8 @@ def export(db_instance, request, queue_name):
# in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER
# we have to enqueue dependent jobs after canceling one
rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER)
with suppress(InvalidJobOperation):
send_stop_job_command(rq_job.connection, rq_job.id)
rq_job.delete()
else:
if rq_job.is_finished:
Expand Down
31 changes: 12 additions & 19 deletions cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,25 +165,18 @@ def define_dependent_job(
if not should_be_dependent:
return None

started_user_jobs = [
job
for job in queue.job_class.fetch_many(
queue.started_job_registry.get_job_ids(), queue.connection
)
if job and job.meta.get("user", {}).get("id") == user_id
]
deferred_user_jobs = [
job
for job in queue.job_class.fetch_many(
queue.deferred_job_registry.get_job_ids(), queue.connection
)
# Since there is no cleanup implementation in DeferredJobRegistry,
# this registry can contain "outdated" jobs that weren't deleted from it
# but were added to another registry. Probably such situations can occur
# if there are active or deferred jobs when restarting the worker container.
if job and job.meta.get("user", {}).get("id") == user_id and job.is_deferred
]
all_user_jobs = started_user_jobs + deferred_user_jobs
queues = [queue.deferred_job_registry, queue, queue.started_job_registry]
# Since there is no cleanup implementation in DeferredJobRegistry,
# this registry can contain "outdated" jobs that weren't deleted from it
# but were added to another registry. Probably such situations can occur
# if there are active or deferred jobs when restarting the worker container.
filters = [lambda job: job.is_deferred, lambda _: True, lambda _: True]
all_user_jobs = []
for q, f in zip(queues, filters):
job_ids = q.get_job_ids()
jobs = q.job_class.fetch_many(job_ids, q.connection)
jobs = filter(lambda job: job.meta.get("user", {}).get("id") == user_id and f(job), jobs)
all_user_jobs.extend(jobs)

# prevent possible cyclic dependencies
if rq_id:
Expand Down
8 changes: 7 additions & 1 deletion cvat/apps/engine/views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2018-2022 Intel Corporation
# Copyright (C) 2022-2023 CVAT.ai Corporation
# Copyright (C) 2022-2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

Expand All @@ -14,8 +14,11 @@
from datetime import datetime
from tempfile import NamedTemporaryFile
from textwrap import dedent
from contextlib import suppress

import django_rq
from rq.command import send_stop_job_command
from rq.exceptions import InvalidJobOperation
from attr.converters import to_bool
from django.conf import settings
from django.contrib.auth.models import User
Expand Down Expand Up @@ -2965,6 +2968,7 @@ def _export_annotations(
tasks_update = list(map(lambda db_task: timezone.localtime(db_task.updated_date), db_instance.tasks.all()))
last_instance_update_time = max(tasks_update + [last_instance_update_time])


timestamp = datetime.strftime(last_instance_update_time, "%Y_%m_%d_%H_%M_%S")
is_annotation_file = rq_id.startswith('export:annotations')

Expand All @@ -2975,6 +2979,8 @@ def _export_annotations(
# in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER
# we have to enqueue dependent jobs after canceling one
rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER)
with suppress(InvalidJobOperation):
send_stop_job_command(rq_job.connection, rq_job.id)
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper handling of send_stop_job_command function.

The function send_stop_job_command is used here, but there is no import statement for it in the provided code. Ensure that this function is defined or imported correctly to avoid runtime errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? Check line number 20

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bsekachev, you are correct. The send_stop_job_command function is indeed imported at line 20 in the file cvat/apps/engine/views.py. My initial comment was incorrect. Thank you for pointing that out.

Analysis chain
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Search for the definition or import of 'send_stop_job_command' in the file 'cvat/apps/engine/views.py'
rg "send_stop_job_command" cvat/apps/engine/views.py

Length of output: 165


New Learnings Added
User: bsekachev
PR: cvat-ai/cvat#7907
File: cvat/apps/engine/views.py:2982-2983
Timestamp: 2024-05-22T13:55:59.926Z
Learning: The `send_stop_job_command` function is imported from `rq.command` at line 20 in the file `cvat/apps/engine/views.py`.

Note: Learnings are effective only in the context of specific code segments. To apply general coding guidelines, please configure review instructions.

rq_job.delete()
else:
if rq_job.is_finished:
Expand Down
21 changes: 19 additions & 2 deletions cvat/rq_patching.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# Copyright (C) 2023 CVAT.ai Corporation
# Copyright (C) 2023-2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import traceback
import sys
from datetime import datetime
from typing import Optional

import rq.registry
from rq.exceptions import AbandonedJobError, NoSuchJobError
from rq.job import JobStatus
from rq.utils import current_timestamp
from rq.utils import current_timestamp, utcformat
from rq.version import VERSION
from rq.worker import StopRequested


# NOTE: we should patch implementation of original method because
Expand Down Expand Up @@ -69,7 +71,22 @@ def custom_started_job_registry_cleanup(self, timestamp: Optional[float] = None)

return job_ids

def custom_hearbeat(self, timestamp: datetime, ttl: int, pipeline: Optional['Pipeline'] = None, xx: bool = False):
self.last_heartbeat = timestamp
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))

# the only difference with the default implementation
# is to avoid adding job to started job registry
# that is not clear why they adds a job to started_job_registry when the job has failed
# also, as far as I see they also commented this line in unrealeased (2.0) version
exc_info = sys.exc_info()
is_stopped_export_job = isinstance(exc_info[1], (StopRequested, SystemExit))
if not is_stopped_export_job:
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
self.started_job_registry.add(self, ttl, pipeline=pipeline, xx=xx)

def update_started_job_registry_cleanup() -> None:
bsekachev marked this conversation as resolved.
Show resolved Hide resolved
# don't forget to check if the issue https://github.com/rq/rq/issues/2006 has been resolved in upstream
assert VERSION == '1.16.0'
rq.registry.StartedJobRegistry.cleanup = custom_started_job_registry_cleanup
rq.job.Job.heartbeat = custom_hearbeat
47 changes: 43 additions & 4 deletions cvat/rqworker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
# Copyright (C) 2018-2022 Intel Corporation
# Copyright (C) 2022-2023 CVAT.ai Corporation
# Copyright (C) 2022-2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import os

import signal
from rq import Worker
from rq.worker import StopRequested

import cvat.utils.remote_debugger as debug

class CVATWorker(Worker):
# may be called from work horse's perform_job::except block
# or from parent's Worker::monitor_work_horse_process
# if parent process sees that work-horse is dead

# This modification ensures that jobs stopped intentionally
# do not get their status updated or placed in the failed registry
# as the main server code must delete them at all
def handle_job_failure(self, job, queue, **kwargs):
# pylint: disable=access-member-before-definition
if self._stopped_job_id == job.id:
self._stopped_job_id = None
self.set_current_job_id(None)
else:
super().handle_job_failure(job, queue, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see one problem here - a started rq job can also be stopped from django admin panel and with these changes such a job won't be handled correctly.



DefaultWorker = Worker

DefaultWorker = CVATWorker


class BaseDeathPenalty:
Expand All @@ -24,7 +42,7 @@ def __exit__(self, exc_type, exc_value, traceback):
pass


class SimpleWorker(Worker):
class SimpleWorker(CVATWorker):
"""
Allows to work with at most 1 worker thread. Useful for debugging.
"""
Expand All @@ -46,6 +64,27 @@ def execute_job(self, *args, **kwargs):

return self.perform_job(*args, **kwargs)

def kill_horse(self, sig: signal.Signals = signal.SIGTERM):
# In debug mode we send SIGTERM instead of default SIGKILL
# Because SIGKILL is not handled (and can't be handled) by RQ code and it kills debug process from VSCode
# All three signals (SIGKILL, SIGTERM, SIGINT) are regularly used at RQ code
super().kill_horse(sig)

def handle_exception(self, *args, **kwargs):
# In production environment it sends SIGKILL to work horse process and this method never called
# But for development we overrided the signal and it sends SIGTERM to the process
# This modification ensures that exceptions are handled differently
# when the job is stopped intentionally, avoiding incorrect exception handling.

# PROBLEM: default "handle_exception" code saves meta with exception
# It leads to bugs:
# - we create another job with the same ID in the server process
# - when exception message is saved in worker code, it also saves outdated datetime value as part of meta information
# - this outdated value then used in server code
is_stopped_export_job = isinstance(args[2], (StopRequested, SystemExit))
if not is_stopped_export_job:
super().handle_exception(*args, **kwargs)
bsekachev marked this conversation as resolved.
Show resolved Hide resolved


if debug.is_debugging_enabled():
class RemoteDebugWorker(SimpleWorker):
Expand Down
Loading