Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed: Queued jobs are not considered in deferring logic #7907

Merged
merged 35 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bc920a8
Kill workhorse process when cancelling export job
bsekachev May 17, 2024
683420a
Minor improvements
bsekachev May 17, 2024
7d095f4
Minor improvements
bsekachev May 17, 2024
e7fc367
Fixed debug worker
bsekachev May 17, 2024
e272452
Removed extra code
bsekachev May 17, 2024
79d2c87
Production improvements
bsekachev May 19, 2024
566b908
Moved method
bsekachev May 19, 2024
0ac46f8
Fixed comment
bsekachev May 19, 2024
7ac72dd
Disabled pylint false alarm
bsekachev May 19, 2024
d1cb317
Minor fix
bsekachev May 19, 2024
29bbb44
Updated comments
bsekachev May 19, 2024
74393e6
Implemented for backup
bsekachev May 19, 2024
43bbc97
Merge branch 'develop' into bs/kill_worker
bsekachev May 20, 2024
8ed8c49
Patched rq
bsekachev May 21, 2024
3bbc2c9
Fixed pylint
bsekachev May 21, 2024
98a00b4
Updated license headers
bsekachev May 21, 2024
0536e0d
Consider queued jobs when looking for a dependends job
bsekachev May 21, 2024
fda8d89
added comment
bsekachev May 21, 2024
ed0cc01
Merge branch 'develop' into bs/kill_worker
bsekachev May 21, 2024
9a35fa9
Fixed define_dependent_job
bsekachev May 21, 2024
c082802
Improved comments
bsekachev May 21, 2024
9e7f6ae
Merge branch 'develop' into bs/kill_worker
bsekachev May 21, 2024
57616e9
Using cupress
bsekachev May 22, 2024
98fa00b
Returned status check
bsekachev May 22, 2024
c2de3e3
Fixed circular dependency issues
bsekachev May 23, 2024
d5a57cf
Update cvat/apps/engine/utils.py
bsekachev May 23, 2024
fbff394
Restored comment
bsekachev May 23, 2024
bf054ce
Update cvat/apps/engine/utils.py
bsekachev May 23, 2024
89a8b6c
Merge branch 'develop' into bs/kill_worker
bsekachev May 23, 2024
b332d9c
Merge branch 'develop' into bs/kill_worker
bsekachev May 23, 2024
6af94a7
Merge branch 'develop' into bs/kill_worker
bsekachev May 23, 2024
c7955ca
Renamed identificators
bsekachev May 27, 2024
8a86065
Updated
bsekachev Jun 5, 2024
8f43503
Added changelog
bsekachev Jun 5, 2024
959d340
Added none check
bsekachev Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/20240605_093600_boris_kill_worker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Queued jobs are not considered in deferring logic
(<https://github.com/cvat-ai/cvat/pull/7907>)
31 changes: 12 additions & 19 deletions cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,25 +165,18 @@ def define_dependent_job(
if not should_be_dependent:
return None

started_user_jobs = [
job
for job in queue.job_class.fetch_many(
queue.started_job_registry.get_job_ids(), queue.connection
)
if job and job.meta.get("user", {}).get("id") == user_id
]
deferred_user_jobs = [
job
for job in queue.job_class.fetch_many(
queue.deferred_job_registry.get_job_ids(), queue.connection
)
# Since there is no cleanup implementation in DeferredJobRegistry,
# this registry can contain "outdated" jobs that weren't deleted from it
# but were added to another registry. Probably such situations can occur
# if there are active or deferred jobs when restarting the worker container.
if job and job.meta.get("user", {}).get("id") == user_id and job.is_deferred
]
all_user_jobs = started_user_jobs + deferred_user_jobs
queues = [queue.deferred_job_registry, queue, queue.started_job_registry]
# Since there is no cleanup implementation in DeferredJobRegistry,
# this registry can contain "outdated" jobs that weren't deleted from it
# but were added to another registry. Probably such situations can occur
# if there are active or deferred jobs when restarting the worker container.
filters = [lambda job: job.is_deferred, lambda _: True, lambda _: True]
all_user_jobs = []
for q, f in zip(queues, filters):
job_ids = q.get_job_ids()
jobs = q.job_class.fetch_many(job_ids, q.connection)
jobs = filter(lambda job: job and job.meta.get("user", {}).get("id") == user_id and f(job), jobs)
all_user_jobs.extend(jobs)

# prevent possible cyclic dependencies
if rq_id:
Expand Down
Loading