Skip to content

Commit

Permalink
Merge pull request #5604 from oliver-sanders/5603
Browse files Browse the repository at this point in the history
pool: fix infinite loop with --start-cycle-point
  • Loading branch information
wxtim authored Jun 29, 2023
2 parents b325d2d + 0aadc90 commit 8acb8e7
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 8 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ enhancements to `cylc lint`:

### Fixes

[#5604](https://github.com/cylc/cylc-flow/pull/5604) -
Fix a possible issue where workflows started using
`cylc play --start-cycle-point` could hang during startup.

[#5524](https://github.com/cylc/cylc-flow/pull/5524) - Logging includes timestamps
for `cylc play` when called by `cylc vip` or `cylc vr`.

Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,10 @@ def get_next_point_on_sequence(
return self.get_next_point_on_sequence(result)
return result

def get_first_point(self, point):
def get_first_point(
self,
point: ISO8601Point
) -> Optional[ISO8601Point]:
"""Return the first point >= to point, or None if out of bounds."""
with contextlib.suppress(KeyError):
return ISO8601Point(self._cached_first_point_values[point.value])
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,8 @@ async def start(self):
self.server.thread.start()
barrier.wait()

await self.configure()
self._configure_contact()
await self.configure()
except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc:
await self.handle_exception(exc)

Expand Down Expand Up @@ -1807,7 +1807,11 @@ async def _shutdown(self, reason: BaseException) -> None:
sys.stdout.flush()
sys.stderr.flush()

if self.contact_data and self.task_job_mgr:
if (
self.workflow_db_mgr.pri_path
and Path(self.workflow_db_mgr.pri_path).exists()
):
# only attempt remote tidy if the workflow has been started
self.task_job_mgr.task_remote_mgr.remote_tidy()

try:
Expand Down
15 changes: 10 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,17 @@ def compute_runahead(self, force=False) -> bool:
changed (needed if max_future_offset changed, or on reload).
"""
points: List['PointBase'] = []
sequence_points: Set['PointBase']
if not self.main_pool:
# Start at first point in each sequence, after the initial point.
points = list({
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
})
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
else:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
Expand Down Expand Up @@ -793,7 +798,7 @@ def get_hidden_tasks(self) -> List[TaskProxy]:
self._hidden_pool_list.extend(list(itask_id_maps.values()))
return self._hidden_pool_list

def get_tasks_by_point(self):
def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]':
"""Return a map of task proxies by cycle point."""
point_itasks = {}
for point, itask_id_map in self.main_pool.items():
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,3 +962,30 @@ async def test_graph_change_prereq_satisfaction(
schd.pool.reload_taskdefs()

await test.asend(schd)


async def test_runahead_limit_for_sequence_before_start_cycle(
flow,
scheduler,
start,
):
"""It should obey the runahead limit.
Ensure the runahead limit is computed correctly for sequences before the start cycle
See https://github.com/cylc/cylc-flow/issues/5603
"""
id_ = flow({
'scheduler': {'allow implicit tasks': 'True'},
'scheduling': {
'initial cycle point': '2000',
'runahead limit': 'P2Y',
'graph': {
'R1/2000': 'a',
'P1Y': 'b[-P1Y] => b',
},
}
})
schd = scheduler(id_, startcp='2005')
async with start(schd):
assert str(schd.pool.runahead_limit_point) == '20070101T0000Z'

0 comments on commit 8acb8e7

Please sign in to comment.