Skip to content

Commit

Permalink
reload: simplify execution pathway
Browse files Browse the repository at this point in the history
* The reload code was spread around four places:
  * Scheduler.command_reload_workflow
  * Scheduler.main_loop
  * Pool.set_do_reload
  * Pool.reload_taskdefs
* This commit co-locates the Scheduler/Pool parts and turns them into a
  single synchronous operation (no main-loop moving parts) to simplify
  the pathway.
* This removes the need for the `do_reload` pool flag.
  • Loading branch information
oliver-sanders committed Jun 19, 2023
1 parent b368e7c commit ebe5dd7
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 48 deletions.
46 changes: 28 additions & 18 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ def command_reload_workflow(self) -> None:
# reload the workflow definition
LOG.info("Reloading the workflow definition.")
try:
cfg = self.load_flow_file(is_reload=True)
config = self.load_flow_file(is_reload=True)
except (ParsecError, CylcConfigError) as exc:
if cylc.flow.flags.verbosity > 1:
# log full traceback in debug mode
Expand All @@ -1092,10 +1092,13 @@ def command_reload_workflow(self) -> None:
self.workflow_db_mgr.pri_dao.select_workflow_params(
self._load_workflow_params
)
self.apply_new_config(cfg, is_reload=True)
self.apply_new_config(config, is_reload=True)
self.broadcast_mgr.linearized_ancestors = (
self.config.get_linearized_ancestors())
self.pool.set_do_reload(self.config)

# OLD
# self.pool.set_do_reload(self.config)

self.task_events_mgr.mail_interval = self.cylc_config['mail'][
'task event batch interval']
self.task_events_mgr.mail_smtp = self._get_events_conf("smtp")
Expand All @@ -1113,6 +1116,28 @@ def command_reload_workflow(self) -> None:
self.is_updated = True
self.is_reloaded = True

# NEW

# Re-initialise data model on reload
self.data_store_mgr.initiate_data_model(reloaded=True)

# Reset the remote init map to trigger fresh file installation
self.task_job_mgr.task_remote_mgr.remote_init_map.clear()
self.task_job_mgr.task_remote_mgr.is_reload = True
self.pool.reload_taskdefs(config)
# Load jobs from DB
self.workflow_db_mgr.pri_dao.select_jobs_for_restart(
self.data_store_mgr.insert_db_job
)
if self.pool.compute_runahead(force=True):
self.pool.release_runahead_tasks()
self.is_reloaded = True
self.is_updated = True



LOG.info("Reload completed.")

# resume the workflow if previously paused
self.reload_pending = False
if not was_paused:
Expand Down Expand Up @@ -1618,21 +1643,6 @@ async def main_loop(self) -> None:
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()
if self.pool.do_reload:
# Re-initialise data model on reload
self.data_store_mgr.initiate_data_model(reloaded=True)
# Reset the remote init map to trigger fresh file installation
self.task_job_mgr.task_remote_mgr.remote_init_map.clear()
self.task_job_mgr.task_remote_mgr.is_reload = True
self.pool.reload_taskdefs()
# Load jobs from DB
self.workflow_db_mgr.pri_dao.select_jobs_for_restart(
self.data_store_mgr.insert_db_job)
LOG.info("Reload completed.")
if self.pool.compute_runahead(force=True):
self.pool.release_runahead_tasks()
self.is_reloaded = True
self.is_updated = True

self.process_command_queue()
self.proc_pool.process()
Expand Down
49 changes: 21 additions & 28 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __init__(
self.data_store_mgr: 'DataStoreMgr' = data_store_mgr
self.flow_mgr: 'FlowMgr' = flow_mgr

self.do_reload = False
self.max_future_offset: Optional['IntervalBase'] = None
self._prev_runahead_base_point: Optional['PointBase'] = None
self._prev_runahead_sequence_points: Optional[Set['PointBase']] = None
Expand All @@ -132,7 +131,6 @@ def __init__(
self.abort_task_failed = False
self.expected_failed_tasks = self.config.get_expected_failed_tasks()

self.orphans: List[str] = []
self.task_name_list = self.config.get_task_name_list()
self.task_queue_mgr = IndepQueueManager(
self.config.cfg['scheduling']['queues'],
Expand Down Expand Up @@ -913,25 +911,7 @@ def set_max_future_offset(self):
if max_offset != orig and self.compute_runahead(force=True):
self.release_runahead_tasks()

def set_do_reload(self, config: 'WorkflowConfig') -> None:
"""Set the task pool to reload mode."""
self.config = config
self.stop_point = config.stop_point or config.final_point
self.do_reload = True

# find any old tasks that have been removed from the workflow
old_task_name_list = self.task_name_list
self.task_name_list = self.config.get_task_name_list()
for name in old_task_name_list:
if name not in self.task_name_list:
self.orphans.append(name)
for name in self.task_name_list:
if name in self.orphans:
self.orphans.remove(name)
# adjust the new workflow config to handle the orphans
self.config.adopt_orphans(self.orphans)

def reload_taskdefs(self) -> None:
def reload_taskdefs(self, config: 'WorkflowConfig') -> None:
"""Reload the definitions of task proxies in the pool.
Orphaned tasks (whose definitions were removed from the workflow):
Expand All @@ -941,18 +921,33 @@ def reload_taskdefs(self) -> None:
Otherwise: replace task definitions but copy over existing outputs etc.
"""
self.config = config
self.stop_point = config.stop_point or config.final_point

# find any old tasks that have been removed from the workflow
old_task_name_list = self.task_name_list
self.task_name_list = self.config.get_task_name_list()
orphans = [
task
for task in old_task_name_list
if task not in self.task_name_list
]

# adjust the new workflow config to handle the orphans
self.config.adopt_orphans(orphans)

LOG.info("Reloading task definitions.")
tasks = self.get_all_tasks()
# Log tasks orphaned by a reload but not currently in the task pool.
for name in self.orphans:
for name in orphans:
if name not in (itask.tdef.name for itask in tasks):
LOG.warning("Removed task: '%s'", name)
for itask in tasks:
if itask.tdef.name in self.orphans:
if itask.tdef.name in orphans:
if (
itask.state(TASK_STATUS_WAITING)
or itask.state.is_held
or itask.state.is_queued
itask.state(TASK_STATUS_WAITING)
or itask.state.is_held
or itask.state.is_queued
):
# Remove orphaned task if it hasn't started running yet.
self.remove(itask, 'task definition removed')
Expand Down Expand Up @@ -1007,8 +1002,6 @@ def reload_taskdefs(self) -> None:
if all(ready_check_items) and not itask.state.is_runahead:
self.queue_task(itask)

self.do_reload = False

def set_stop_point(self, stop_point: 'PointBase') -> bool:
"""Set the workflow stop cycle point.
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,6 @@ async def test_reload_prereqs(

# Reload the workflow config
schd.command_reload_workflow()
schd.pool.reload_taskdefs()
assert list_tasks(schd) == expected_3

# Check resulting dependencies of task z
Expand Down Expand Up @@ -959,6 +958,5 @@ async def test_graph_change_prereq_satisfaction(

# Reload the workflow config
schd.command_reload_workflow()
schd.pool.reload_taskdefs()

await test.asend(schd)

0 comments on commit ebe5dd7

Please sign in to comment.