Skip to content

Commit

Permalink
task iteration improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed May 27, 2021
1 parent 5a9ddfe commit c292114
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 55 deletions.
10 changes: 4 additions & 6 deletions cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@ def check_ext_triggers(self, itasks, ext_trigger_queue):
ext_trigger = ext_trigger_queue.get_nowait()
self.ext_triggers.setdefault(ext_trigger, 0)
self.ext_triggers[ext_trigger] += 1
return set(
[
itask for itask in itasks
if self._match_ext_trigger(itask)
]
)
return {
itask for itask in itasks
if self._match_ext_trigger(itask)
}

def clear_broadcast(
self, point_strings=None, namespaces=None, cancel_settings=None):
Expand Down
20 changes: 10 additions & 10 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1753,16 +1753,16 @@ def check_auto_shutdown(self):
# Don't if stalled, unless "abort on stalled" is set.
return False

if [
itask for itask in self.pool.get_tasks()
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead)
]:
if any(
itask for itask in self.pool.get_tasks()
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead)
):
# Don't if there are more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
return False
Expand Down
60 changes: 27 additions & 33 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,39 +303,33 @@ def release_runahead_tasks(self):
# And any manually-triggered task.

# TODO: should runahead should not consider unsatisfied hidden tasks?
release = []
for itask_id_maps in (
list(self.main_pool.values())
+ list(self.hidden_pool.values())):
for itask in itask_id_maps.values():
if not itask.state.is_runahead:
continue
if (
itask.state(TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED)
or itask.is_manual_submit
):
release.append(itask)
for itask in release:
for itask in (
itask
for pool in (self.main_pool, self.hidden_pool)
for cycle in pool.values()
for itask in cycle.values()
if itask.state.is_runahead
if itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
)
or itask.is_manual_submit
):
self.release_runahead_task(itask)
released = True

points = []
for point, itasks in sorted(self.get_tasks_by_point().items()):
has_unfinished_itasks = False
for itask in itasks:
if not itask.state(
if points or all(
not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
):
has_unfinished_itasks = True
break
if not points and not has_unfinished_itasks:
# We need to begin with an unfinished cycle point.
continue
points.append(point)
)
for itask in itasks
):
points.append(point)

if not points:
return False
Expand Down Expand Up @@ -400,16 +394,16 @@ def release_runahead_tasks(self):
if self.stop_point and latest_allowed_point > self.stop_point:
latest_allowed_point = self.stop_point

release = []
for point, itask_id_map in self.main_pool.items():
if point <= latest_allowed_point:
for itask in itask_id_map.values():
if not itask.state.is_runahead:
continue
release.append(itask)
for itask in release:
for itask in (
itask
for point, itask_id_map in self.main_pool.items()
for itask in itask_id_map.values()
if point <= latest_allowed_point
if itask.state.is_runahead
):
self.release_runahead_task(itask)
released = True

return released

def load_abs_outputs_for_restart(self, row_idx, row):
Expand Down
10 changes: 4 additions & 6 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,10 @@ def check_xtriggers(
db_update_func: method to update xtriggers in the DB
"""
satisfied = set(
[
itask for itask in itasks
if itask.state.xtriggers_all_satisfied()
]
)
satisfied = {
itask for itask in itasks
if itask.state.xtriggers_all_satisfied()
}
if satisfied:
self._housekeep(itasks)
db_update_func(self.sat_xtrig)
Expand Down

0 comments on commit c292114

Please sign in to comment.