From c292114d260de0142aad57629aada3d0ce7a2146 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 27 May 2021 16:28:54 +0100 Subject: [PATCH] task iteration improvements --- cylc/flow/broadcast_mgr.py | 10 +++---- cylc/flow/scheduler.py | 20 ++++++------- cylc/flow/task_pool.py | 60 +++++++++++++++++--------------------- cylc/flow/xtrigger_mgr.py | 10 +++---- 4 files changed, 45 insertions(+), 55 deletions(-) diff --git a/cylc/flow/broadcast_mgr.py b/cylc/flow/broadcast_mgr.py index 0817d629aee..657195b9a53 100644 --- a/cylc/flow/broadcast_mgr.py +++ b/cylc/flow/broadcast_mgr.py @@ -70,12 +70,10 @@ def check_ext_triggers(self, itasks, ext_trigger_queue): ext_trigger = ext_trigger_queue.get_nowait() self.ext_triggers.setdefault(ext_trigger, 0) self.ext_triggers[ext_trigger] += 1 - return set( - [ - itask for itask in itasks - if self._match_ext_trigger(itask) - ] - ) + return { + itask for itask in itasks + if self._match_ext_trigger(itask) + } def clear_broadcast( self, point_strings=None, namespaces=None, cancel_settings=None): diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 542c4db88f6..23ffd38ee6e 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1753,16 +1753,16 @@ def check_auto_shutdown(self): # Don't if stalled, unless "abort on stalled" is set. return False - if [ - itask for itask in self.pool.get_tasks() - if itask.state( - TASK_STATUS_PREPARING, - TASK_STATUS_SUBMITTED, - TASK_STATUS_RUNNING - ) - or (itask.state(TASK_STATUS_WAITING) - and not itask.state.is_runahead) - ]: + if any( + itask for itask in self.pool.get_tasks() + if itask.state( + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RUNNING + ) + or (itask.state(TASK_STATUS_WAITING) + and not itask.state.is_runahead) + ): # Don't if there are more tasks to run (if waiting and not # runahead, then held, queued, or xtriggered). return False diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 002a4b1933e..09d9975d931 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -303,39 +303,33 @@ def release_runahead_tasks(self): # And any manually-triggered task. # TODO: should runahead should not consider unsatisfied hidden tasks? - release = [] - for itask_id_maps in ( - list(self.main_pool.values()) - + list(self.hidden_pool.values())): - for itask in itask_id_maps.values(): - if not itask.state.is_runahead: - continue - if ( - itask.state(TASK_STATUS_FAILED, - TASK_STATUS_SUCCEEDED, - TASK_STATUS_EXPIRED) - or itask.is_manual_submit - ): - release.append(itask) - for itask in release: + for itask in ( + itask + for pool in (self.main_pool, self.hidden_pool) + for cycle in pool.values() + for itask in cycle.values() + if itask.state.is_runahead + if itask.state( + TASK_STATUS_FAILED, + TASK_STATUS_SUCCEEDED, + TASK_STATUS_EXPIRED + ) + or itask.is_manual_submit + ): self.release_runahead_task(itask) released = True points = [] for point, itasks in sorted(self.get_tasks_by_point().items()): - has_unfinished_itasks = False - for itask in itasks: - if not itask.state( + if points or all( + not itask.state( TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED, TASK_STATUS_EXPIRED - ): - has_unfinished_itasks = True - break - if not points and not has_unfinished_itasks: - # We need to begin with an unfinished cycle point. - continue - points.append(point) + ) + for itask in itasks + ): + points.append(point) if not points: return False @@ -400,16 +394,16 @@ def release_runahead_tasks(self): if self.stop_point and latest_allowed_point > self.stop_point: latest_allowed_point = self.stop_point - release = [] - for point, itask_id_map in self.main_pool.items(): - if point <= latest_allowed_point: - for itask in itask_id_map.values(): - if not itask.state.is_runahead: - continue - release.append(itask) - for itask in release: + for itask in ( + itask + for point, itask_id_map in self.main_pool.items() + for itask in itask_id_map.values() + if point <= latest_allowed_point + if itask.state.is_runahead + ): self.release_runahead_task(itask) released = True + return released def load_abs_outputs_for_restart(self, row_idx, row): diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index f5173b2f19e..f07aa2e772d 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -371,12 +371,10 @@ def check_xtriggers( db_update_func: method to update xtriggers in the DB """ - satisfied = set( - [ - itask for itask in itasks - if itask.state.xtriggers_all_satisfied() - ] - ) + satisfied = { + itask for itask in itasks + if itask.state.xtriggers_all_satisfied() + } if satisfied: self._housekeep(itasks) db_update_func(self.sat_xtrig)