diff --git a/cylc/flow/task_queues/independent.py b/cylc/flow/task_queues/independent.py index d423a5f1702..185edee4f2b 100644 --- a/cylc/flow/task_queues/independent.py +++ b/cylc/flow/task_queues/independent.py @@ -18,11 +18,13 @@ from collections import deque from contextlib import suppress -from typing import List, Set, Dict, Counter, Any +from typing import TYPE_CHECKING, List, Set, Dict, Counter, Any -from cylc.flow.task_proxy import TaskProxy from cylc.flow.task_queues import TaskQueueManagerBase +if TYPE_CHECKING: + from cylc.flow.task_proxy import TaskProxy + class LimitedTaskQueue: """One task queue with group members and active limit.""" @@ -33,16 +35,16 @@ def __init__(self, limit: int, members: Set[str]) -> None: self.members = members # member task names self.deque: deque = deque() - def push_task(self, itask: TaskProxy) -> None: + def push_task(self, itask: 'TaskProxy') -> None: """Queue task if in my membership list.""" if itask.tdef.name in self.members: self.deque.appendleft(itask) - def release(self, active: Counter[str]) -> List[TaskProxy]: + def release(self, active: Counter[str]) -> List['TaskProxy']: """Release tasks if below the active limit.""" # The "active" argument counts active tasks by name. - released: List[TaskProxy] = [] - held: List[TaskProxy] = [] + released: List['TaskProxy'] = [] + held: List['TaskProxy'] = [] n_active: int = 0 for mem in self.members: n_active += active[mem] @@ -62,7 +64,7 @@ def release(self, active: Counter[str]) -> List[TaskProxy]: self.deque.appendleft(itask) return released - def remove(self, itask: TaskProxy) -> bool: + def remove(self, itask: 'TaskProxy') -> bool: """Remove a single task from queue, return True if removed.""" try: self.deque.remove(itask) @@ -111,30 +113,30 @@ def __init__(self, config["limit"], config["members"] ) - self.force_released: Set[TaskProxy] = set() + self.force_released: Set['TaskProxy'] = set() - def push_task(self, itask: TaskProxy) -> None: + def push_task(self, itask: 'TaskProxy') -> None: """Push a task to the appropriate queue.""" for queue in self.queues.values(): queue.push_task(itask) - def release_tasks(self, active: Counter[str]) -> List[TaskProxy]: + def release_tasks(self, active: Counter[str]) -> List['TaskProxy']: """Release tasks up to the queue limits.""" - released: List[TaskProxy] = [] + released: List['TaskProxy'] = [] for queue in self.queues.values(): released += queue.release(active) if self.force_released: - released += list(self.force_released) + released.extend(self.force_released) self.force_released = set() return released - def remove_task(self, itask: TaskProxy) -> None: + def remove_task(self, itask: 'TaskProxy') -> None: """Remove a task from whichever queue it belongs to.""" for queue in self.queues.values(): if queue.remove(itask): break - def force_release_task(self, itask: TaskProxy) -> None: + def force_release_task(self, itask: 'TaskProxy') -> None: """Remove a task from whichever queue it belongs to. To be returned when release_tasks() is next called. diff --git a/tests/unit/test_indep_task_queues.py b/tests/unit/test_indep_task_queues.py index a0a1894cece..d144f58eecf 100644 --- a/tests/unit/test_indep_task_queues.py +++ b/tests/unit/test_indep_task_queues.py @@ -21,8 +21,8 @@ import pytest +from cylc.flow.task_proxy import TaskProxy from cylc.flow.task_queues.independent import IndepQueueManager -from cylc.flow.task_state import TASK_STATUS_PREPARING MEMBERS = {"a", "b", "c", "d", "e", "f"} @@ -61,9 +61,7 @@ @pytest.mark.parametrize( - "active," - "expected_released," - "expected_foo_groups", + "active, expected_released, expected_foo_groups", [ ( Counter(["b1", "b2", "s1", "o1"]), @@ -73,28 +71,24 @@ ] ) def test_queue_and_release( - active, - expected_released, - expected_foo_groups): + active, + expected_released, + expected_foo_groups +): """Test task queue and release.""" # configure the queue queue_mgr = IndepQueueManager(QCONFIG, ALL_TASK_NAMES, DESCENDANTS) # add newly ready tasks to the queue for name in READY_TASK_NAMES: - itask = Mock() + itask = Mock(spec=TaskProxy) itask.tdef.name = name itask.state.is_held = False queue_mgr.push_task(itask) # release tasks, given current active task counter released = queue_mgr.release_tasks(active) - assert sorted([r.tdef.name for r in released]) == sorted(expected_released) - - # check released tasks change state to "preparing", and not is_queued - for r in released: - assert r.state.reset.called_with(TASK_STATUS_PREPARING) - assert r.state.reset.called_with(is_queued=False) + assert sorted(r.tdef.name for r in released) == sorted(expected_released) # check that adopted orphans end up in the default queue orphans = ["orphan1", "orphan2"]