Skip to content

Commit

Permalink
Fix duplicate job submissions. (#6337)
Browse files Browse the repository at this point in the history
Fix duplicate job submissions.
  • Loading branch information
hjoliver authored Aug 29, 2024
1 parent 6926aac commit 1c5db24
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 10 deletions.
1 change: 1 addition & 0 deletions changes.d/6337.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix potential duplicate job submissions when manually triggering unqueued active tasks.
4 changes: 2 additions & 2 deletions cylc/flow/task_queues/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def release_tasks(self, active: Counter[str]) -> 'List[TaskProxy]':
pass

@abstractmethod
def remove_task(self, itask: 'TaskProxy') -> None:
"""Remove a task from the queueing system."""
def remove_task(self, itask: 'TaskProxy') -> bool:
"""Try to remove a task from the queues. Return True if done."""
pass

@abstractmethod
Expand Down
12 changes: 5 additions & 7 deletions cylc/flow/task_queues/independent.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,17 @@ def release_tasks(self, active: Counter[str]) -> List['TaskProxy']:
self.force_released = set()
return released

def remove_task(self, itask: 'TaskProxy') -> None:
"""Remove a task from whichever queue it belongs to."""
for queue in self.queues.values():
if queue.remove(itask):
break
def remove_task(self, itask: 'TaskProxy') -> bool:
"""Try to remove a task from the queues. Return True if done."""
return any(queue.remove(itask) for queue in self.queues.values())

def force_release_task(self, itask: 'TaskProxy') -> None:
"""Remove a task from whichever queue it belongs to.
To be returned when release_tasks() is next called.
"""
self.remove_task(itask)
self.force_released.add(itask)
if self.remove_task(itask):
self.force_released.add(itask)

def adopt_tasks(self, orphans: List[str]) -> None:
"""Adopt orphaned tasks to the default group."""
Expand Down
43 changes: 42 additions & 1 deletion tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2085,7 +2085,7 @@ async def test_set_future_flow(flow, scheduler, start, log_filter):
# set b:succeeded in flow 2 and check downstream spawning
schd.pool.set_prereqs_and_outputs(['1/b'], prereqs=[], outputs=[], flow=[2])
assert schd.pool.get_task(IntegerPoint("1"), "c1") is None, '1/c1 (flow 2) should not be spawned after 1/b:succeeded'
assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded'
assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded'


async def test_trigger_queue(one, run, db_select, complete):
Expand All @@ -2109,3 +2109,44 @@ async def test_trigger_queue(one, run, db_select, complete):
one.resume_workflow()
await complete(one, timeout=2)
assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)]


async def test_trigger_unqueued(flow, scheduler, start):
"""Test triggering an unqueued active task.
It should not add to the force_released list.
See https://github.com/cylc/cylc-flow/pull/6337
"""
conf = {
'scheduler': {'allow implicit tasks': 'True'},
'scheduling': {
'graph': {
'R1': 'a & b => c'
}
}
}
schd = scheduler(
flow(conf),
run_mode='simulation',
paused_start=False
)

async with start(schd):
# Release tasks 1/a and 1/b
schd.pool.release_runahead_tasks()
schd.release_queued_tasks()
assert pool_get_task_ids(schd.pool) == ['1/a', '1/b']

# Mark 1/a as succeeded and spawn 1/c
task_a = schd.pool.get_task(IntegerPoint("1"), "a")
schd.pool.task_events_mgr.process_message(task_a, 1, 'succeeded')
assert pool_get_task_ids(schd.pool) == ['1/b', '1/c']

# Trigger the partially satisified (and not queued) task 1/c
schd.pool.force_trigger_tasks(['1/c'], [FLOW_ALL])

# It should not add to the queue managers force_released list.
assert not schd.pool.task_queue_mgr.force_released, (
"Triggering an unqueued task should not affect the force_released list"
)

0 comments on commit 1c5db24

Please sign in to comment.