Skip to content

Commit

Permalink
Fix IndepQueueManager test (#5832)
Browse files Browse the repository at this point in the history
Fix test
  • Loading branch information
MetRonnie authored Nov 21, 2023
1 parent 88b2798 commit e692a06
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 28 deletions.
30 changes: 16 additions & 14 deletions cylc/flow/task_queues/independent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

from collections import deque
from contextlib import suppress
from typing import List, Set, Dict, Counter, Any
from typing import TYPE_CHECKING, List, Set, Dict, Counter, Any

from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_queues import TaskQueueManagerBase

if TYPE_CHECKING:
from cylc.flow.task_proxy import TaskProxy


class LimitedTaskQueue:
"""One task queue with group members and active limit."""
Expand All @@ -33,16 +35,16 @@ def __init__(self, limit: int, members: Set[str]) -> None:
self.members = members # member task names
self.deque: deque = deque()

def push_task(self, itask: TaskProxy) -> None:
def push_task(self, itask: 'TaskProxy') -> None:
"""Queue task if in my membership list."""
if itask.tdef.name in self.members:
self.deque.appendleft(itask)

def release(self, active: Counter[str]) -> List[TaskProxy]:
def release(self, active: Counter[str]) -> List['TaskProxy']:
"""Release tasks if below the active limit."""
# The "active" argument counts active tasks by name.
released: List[TaskProxy] = []
held: List[TaskProxy] = []
released: List['TaskProxy'] = []
held: List['TaskProxy'] = []
n_active: int = 0
for mem in self.members:
n_active += active[mem]
Expand All @@ -62,7 +64,7 @@ def release(self, active: Counter[str]) -> List[TaskProxy]:
self.deque.appendleft(itask)
return released

def remove(self, itask: TaskProxy) -> bool:
def remove(self, itask: 'TaskProxy') -> bool:
"""Remove a single task from queue, return True if removed."""
try:
self.deque.remove(itask)
Expand Down Expand Up @@ -111,30 +113,30 @@ def __init__(self,
config["limit"], config["members"]
)

self.force_released: Set[TaskProxy] = set()
self.force_released: Set['TaskProxy'] = set()

def push_task(self, itask: TaskProxy) -> None:
def push_task(self, itask: 'TaskProxy') -> None:
"""Push a task to the appropriate queue."""
for queue in self.queues.values():
queue.push_task(itask)

def release_tasks(self, active: Counter[str]) -> List[TaskProxy]:
def release_tasks(self, active: Counter[str]) -> List['TaskProxy']:
"""Release tasks up to the queue limits."""
released: List[TaskProxy] = []
released: List['TaskProxy'] = []
for queue in self.queues.values():
released += queue.release(active)
if self.force_released:
released += list(self.force_released)
released.extend(self.force_released)
self.force_released = set()
return released

def remove_task(self, itask: TaskProxy) -> None:
def remove_task(self, itask: 'TaskProxy') -> None:
"""Remove a task from whichever queue it belongs to."""
for queue in self.queues.values():
if queue.remove(itask):
break

def force_release_task(self, itask: TaskProxy) -> None:
def force_release_task(self, itask: 'TaskProxy') -> None:
"""Remove a task from whichever queue it belongs to.
To be returned when release_tasks() is next called.
Expand Down
22 changes: 8 additions & 14 deletions tests/unit/test_indep_task_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import pytest

from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_queues.independent import IndepQueueManager
from cylc.flow.task_state import TASK_STATUS_PREPARING


MEMBERS = {"a", "b", "c", "d", "e", "f"}
Expand Down Expand Up @@ -61,9 +61,7 @@


@pytest.mark.parametrize(
"active,"
"expected_released,"
"expected_foo_groups",
"active, expected_released, expected_foo_groups",
[
(
Counter(["b1", "b2", "s1", "o1"]),
Expand All @@ -73,28 +71,24 @@
]
)
def test_queue_and_release(
active,
expected_released,
expected_foo_groups):
active,
expected_released,
expected_foo_groups
):
"""Test task queue and release."""
# configure the queue
queue_mgr = IndepQueueManager(QCONFIG, ALL_TASK_NAMES, DESCENDANTS)

# add newly ready tasks to the queue
for name in READY_TASK_NAMES:
itask = Mock()
itask = Mock(spec=TaskProxy)
itask.tdef.name = name
itask.state.is_held = False
queue_mgr.push_task(itask)

# release tasks, given current active task counter
released = queue_mgr.release_tasks(active)
assert sorted([r.tdef.name for r in released]) == sorted(expected_released)

# check released tasks change state to "preparing", and not is_queued
for r in released:
assert r.state.reset.called_with(TASK_STATUS_PREPARING)
assert r.state.reset.called_with(is_queued=False)
assert sorted(r.tdef.name for r in released) == sorted(expected_released)

# check that adopted orphans end up in the default queue
orphans = ["orphan1", "orphan2"]
Expand Down

0 comments on commit e692a06

Please sign in to comment.