diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 40a6aa0dee6..b5e6b93625e 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1742,7 +1742,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, # (name is left name) self.taskdefs[name].add_graph_child(task_trigger, right, seq) # graph_parents not currently used but might be needed soon: - # self.taskdefs[right].add_graph_parent(task_trigger, name, seq) + self.taskdefs[right].add_graph_parent(task_trigger, name, seq) # Walk down "expr_list" depth first, and replace any items matching a # key in "triggers" ("left" values) with the trigger. diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index f9f2367f9de..93d69121f68 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -51,7 +51,7 @@ """ from collections import Counter -from copy import deepcopy +from copy import copy, deepcopy import json from time import time import zlib @@ -65,7 +65,9 @@ from cylc.flow.suite_status import get_suite_status from cylc.flow.task_id import TaskID from cylc.flow.task_job_logs import JOB_LOG_OPTS -from cylc.flow.task_proxy import generate_graph_children +from cylc.flow.task_proxy import ( + generate_graph_children, generate_graph_parents +) from cylc.flow.task_state import TASK_STATUS_WAITING from cylc.flow.task_state_prop import extract_group_state from cylc.flow.wallclock import ( @@ -288,7 +290,7 @@ class DataStoreMgr: Message containing the global information of the workflow. .descendants (dict): Local store of config.get_first_parent_descendants() - .n_max (int): + .n_edge_distance (int): Maximum distance of the data-store graph from the active pool. .parents (dict): Local store of config.get_parent_lists() @@ -312,7 +314,8 @@ def __init__(self, schd): self.parents = {} self.state_update_families = set() self.updated_state_families = set() - self.n_max = 1 + self.n_edge_distance = 1 + self.next_n_edge_distance = None # Managed data types self.data = { self.workflow_id: deepcopy(DATA_TEMPLATE) @@ -331,10 +334,13 @@ def __init__(self, schd): self.updates_pending = False self.delta_queues = {self.workflow_id: {}} self.publish_deltas = [] - self.in_window_edges = {} - self.out_window_edges = {} - self.in_window_nodes = {} - self.out_window_nodes = {} + self.all_task_pool = set() + self.active_descendant_nodes = set() + self.n_window_nodes = {} + self.n_window_edges = {} + self.n_window_boundary_nodes = {} + self.prune_trigger_nodes = {} + self.prune_flagged_nodes = set() self.prune_pending = False def initiate_data_model(self, reloaded=False): @@ -530,13 +536,23 @@ def generate_definition_elements(self): self.parents = parents def increment_graph_window( - self, name, point, flow_label, n_distance=0, active_id=None): - """Generate graph window about given task proxy to n_max + self, name, point, flow_label, + edge_distance=0, active_id=None, descendant=False): + """Generate graph window about given origin to n-edge-distance. Args: - source_itask (cylc.flow.TaskProxy): - Edge source task proxy object. - n_distance (int): Graph distance from active parent. + name (str): + Task name. + point (cylc.flow.cycling.PointBase): + PointBase derived object. + flow_label (str): + Flow label used to distinguish multiple runs. + edge_distance (int): + Graph distance from active/origin node. + active_id (str): + Active/origin node id. + descendent (bool): + Is the current node a direct descendent of the active/origin. Returns: @@ -550,64 +566,119 @@ def increment_graph_window( active_id = s_id # Generate task node + if edge_distance == 0: + self.n_window_edges[active_id] = set() + self.n_window_boundary_nodes[active_id] = {} + self.n_window_nodes[active_id] = set() + if active_id in self.prune_trigger_nodes: + self.prune_flagged_nodes.update( + self.prune_trigger_nodes[active_id]) + del self.prune_trigger_nodes[active_id] + self.prune_pending = True + + graph_children = generate_graph_children( + self.schd.config.get_taskdef(name), point) + + if edge_distance > self.n_edge_distance or not graph_children: + if descendant or edge_distance == 0: + self.n_window_boundary_nodes[ + active_id].setdefault(edge_distance, set()).add(s_id) + + if edge_distance > self.n_edge_distance: + return + + self.n_window_nodes[active_id].add(s_id) self.generate_ghost_task(s_id, name, point, flow_label) - if n_distance == 0: - self.in_window_nodes[active_id] = set() - self.in_window_edges[active_id] = set() - self.in_window_nodes[active_id].add(s_id) - n_distance += 1 - if n_distance > self.n_max: - return + edge_distance += 1 # TODO: xtrigger is suite_state edges too # Reference set for workflow relations - new_edges = set() - for output, items in generate_graph_children( + for _, items in graph_children.items(): + if edge_distance == 1: + descendant = True + self._expand_graph_window( + s_id, s_node, items, active_id, flow_label, edge_distance, + descendant, False) + + for _, items in generate_graph_parents( self.schd.config.get_taskdef(name), point).items(): - for t_name, t_point, is_abs in items: - t_node = TaskID.get(t_name, t_point) - t_id = ( - f'{self.workflow_id}{ID_DELIM}{t_point}{ID_DELIM}{t_name}') - self.increment_graph_window( - t_name, t_point, flow_label, n_distance, active_id) - - # Initiate edge element. + self._expand_graph_window( + s_id, s_node, items, active_id, flow_label, edge_distance, + False, True) + + if edge_distance == 1: + # print('ACTIVE') + # print(active_id) + # print(self.n_window_nodes[active_id]) + # print('BOUNDARY') + # print(self.n_window_boundary_nodes[active_id]) + for tp_id in self.n_window_boundary_nodes[active_id][ + max(self.n_window_boundary_nodes[active_id].keys())]: + self.prune_trigger_nodes.setdefault( + tp_id, set()).add(active_id) + if self.n_window_edges[active_id]: + getattr(self.updated[WORKFLOW], EDGES).edges.extend( + self.n_window_edges[active_id]) + + def _expand_graph_window( + self, s_id, s_node, items, active_id, flow_label, edge_distance, + descendant=False, is_parent=False): + """Generate nodes/edges for children/parents of source node.""" + for t_name, t_point, is_abs in items: + t_node = TaskID.get(t_name, t_point) + t_id = ( + f'{self.workflow_id}{ID_DELIM}{t_point}{ID_DELIM}{t_name}') + # Initiate edge element. + if is_parent: + e_id = ( + f'{self.workflow_id}{ID_DELIM}{t_node}{ID_DELIM}{s_node}') + else: e_id = ( f'{self.workflow_id}{ID_DELIM}{s_node}{ID_DELIM}{t_node}') - if ( - e_id not in self.data[self.workflow_id][EDGES] and - e_id not in self.added[EDGES] - ): - self.added[EDGES][e_id] = PbEdge( - id=e_id, - source=s_id, - target=t_id - ) - # Add edge id to node field for resolver reference - self.updated[TASK_PROXIES].setdefault( - t_id, - PbTaskProxy(id=t_id)).edges.append(e_id) - self.updated[TASK_PROXIES].setdefault( - s_id, - PbTaskProxy(id=s_id)).edges.append(e_id) - new_edges.add(e_id) - if new_edges: - self.updates_pending = True - self.in_window_edges[active_id] |= new_edges - getattr(self.updated[WORKFLOW], EDGES).edges.extend(new_edges) + if e_id in self.n_window_edges[active_id]: + continue + if ( + e_id not in self.data[self.workflow_id][EDGES] and + e_id not in self.added[EDGES] and + edge_distance <= self.n_edge_distance + ): + self.added[EDGES][e_id] = PbEdge( + id=e_id, + source=s_id, + target=t_id + ) + # Add edge id to node field for resolver reference + self.updated[TASK_PROXIES].setdefault( + t_id, + PbTaskProxy(id=t_id)).edges.append(e_id) + self.updated[TASK_PROXIES].setdefault( + s_id, + PbTaskProxy(id=s_id)).edges.append(e_id) + self.n_window_edges[active_id].add(e_id) + if t_id in self.n_window_nodes[active_id]: + continue + self.increment_graph_window( + t_name, t_point, flow_label, + copy(edge_distance), active_id, descendant) - def decrement_graph_window(self, name, point): + def remove_active_node(self, name, point): """Flag removal of this active graph window.""" + tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}' + self.all_task_pool.remove(tp_id) + # flagged isolates/end-of-branch nodes for pruning on removal + if ( + tp_id in self.prune_trigger_nodes and + tp_id in self.prune_trigger_nodes[tp_id] + ): + self.prune_flagged_nodes.update(self.prune_trigger_nodes[tp_id]) + del self.prune_trigger_nodes[tp_id] + self.prune_pending = True + def add_runahead_node(self, name, point): + """Flag removal of this active graph window.""" tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}' - if tp_id in self.in_window_nodes: - self.out_window_nodes[tp_id] = self.in_window_nodes[tp_id] - del self.in_window_nodes[tp_id] - if tp_id in self.in_window_edges: - self.out_window_edges[tp_id] = self.in_window_edges[tp_id] - del self.in_window_edges[tp_id] - self.prune_pending = True + self.all_task_pool.add(tp_id) def generate_ghost_task(self, tp_id, name, point, flow_label): """Create task-point element populated with static data. @@ -737,6 +808,11 @@ def update_data_structure(self, updated_nodes=None): self.update_dynamic_elements(updated_nodes) self.update_family_proxies() + # Avoids changing window edge distance during edge/node creation + if self.next_n_edge_distance is not None: + self.n_edge_distance = self.next_n_edge_distance + self.next_n_edge_distance = None + # Update workflow statuses and totals if needed if self.prune_pending: self.prune_data_store() @@ -765,41 +841,70 @@ def prune_data_store(self): """ self.prune_pending = False - if not self.out_window_nodes and not self.out_window_edges: + if not self.prune_flagged_nodes: return - data = self.data[self.workflow_id] - - # Gather nodes and edges to prune - node_ids = set().union(*self.out_window_nodes.values()).difference( - *self.in_window_nodes.values()) - self.out_window_nodes.clear() - edge_ids = set().union(*self.out_window_edges.values()).difference( - *self.in_window_edges.values()) - self.out_window_edges.clear() - - for e_id in edge_ids: - self.deltas[EDGES].pruned.append(e_id) + # Gather nodes to prune via a diff of nodes in the dependency paths + # of flagged boundary nodes against all other paths. + in_paths = [ + v + for k, v in self.n_window_nodes.items() + if k in self.all_task_pool + ] + # if k in self.all_task_pool + # print('IN PATHS') + # print(in_paths) + out_paths = [ + v + for k, v in self.n_window_nodes.items() + if k in self.prune_flagged_nodes + ] + # print('OUT PATHS') + # print(out_paths) + node_ids = set().union(*out_paths).difference(*in_paths) + # print('TO PRUNE') + # print(node_ids) + # self.prune_flagged_nodes.difference_update(node_ids) + self.prune_flagged_nodes.clear() - flagged_ids = set() + tp_data = self.data[self.workflow_id][TASK_PROXIES] + tp_added = self.added[TASK_PROXIES] + parent_ids = set() for tp_id in node_ids: + if tp_id in self.n_window_nodes: + del self.n_window_nodes[tp_id] + if tp_id in self.n_window_edges: + del self.n_window_edges[tp_id] + if tp_id in tp_data: + node = tp_data[tp_id] + elif tp_id in tp_added: + node = tp_added[tp_id] + else: + continue self.deltas[TASK_PROXIES].pruned.append(tp_id) self.schd.job_pool.remove_task_jobs(tp_id) - flagged_ids.add(data[TASK_PROXIES][tp_id].first_parent) + self.deltas[EDGES].pruned.extend(node.edges) + parent_ids.add(node.first_parent) prune_ids = set() checked_ids = set() - while flagged_ids: - for fp_id in flagged_ids: + while parent_ids: + for fp_id in parent_ids: self._family_ascent_point_prune( - fp_id, node_ids, flagged_ids, checked_ids, prune_ids) + fp_id, node_ids, parent_ids, checked_ids, prune_ids) break if prune_ids: self.deltas[FAMILY_PROXIES].pruned.extend(prune_ids) self.updates_pending = True def _family_ascent_point_prune( - self, fp_id, node_ids, flagged_ids, checked_ids, prune_ids): + self, fp_id, node_ids, parent_ids, checked_ids, prune_ids): + """Find and prune family recursively checking child families. + + Recursively map out child families to the bottom from the origin + family. The work back up to origin checking these families are active. + + """ fp_data = self.data[self.workflow_id][FAMILY_PROXIES] if fp_id in fp_data and fp_id not in self.updated[FAMILY_PROXIES]: fam_node = fp_data[fp_id] @@ -811,7 +916,7 @@ def _family_ascent_point_prune( ] for child_id in child_fam_nodes: self._family_ascent_point_prune( - child_id, node_ids, flagged_ids, checked_ids, prune_ids) + child_id, node_ids, parent_ids, checked_ids, prune_ids) if [ child_id for child_id in fam_node.child_tasks @@ -824,11 +929,11 @@ def _family_ascent_point_prune( self.state_update_families.add(fp_id) else: if fam_node.first_parent: - flagged_ids.add(fam_node.first_parent) + parent_ids.add(fam_node.first_parent) prune_ids.add(fp_id) checked_ids.add(fp_id) - if fp_id in flagged_ids: - flagged_ids.remove(fp_id) + if fp_id in parent_ids: + parent_ids.remove(fp_id) def update_dynamic_elements(self, updated_nodes=None): """Update data elements containing dynamic/live fields.""" @@ -1041,6 +1146,11 @@ def hold_release_tasks(self, hold=True): tp_delta.state = tp_node.state self.state_update_families.add(tp_node.first_parent) + def set_window_size(self, edge_distance): + """Set the what the edge distance will change to.""" + self.next_n_edge_distance = edge_distance + self.updates_pending = True + def update_workflow(self): """Update workflow element status and state totals.""" # Create new message and copy existing message content diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 1a072f19b9f..4ce1199bbc6 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -711,6 +711,27 @@ def set_verbosity(self, level): self.schd.command_queue.put(("set_verbosity", (level,), {})) return (True, 'Command queued') + def set_window_size(self, edge_distance): + """Set data-store graph window to new max edge distance. + + Args: + edge_distance (int): Edge distance 0..N + + Returns: + tuple: (outcome, message) + + outcome (bool) + True if command successfully queued. + message (str) + Information about outcome. + + """ + if edge_distance >= 0: + self.schd.data_store_mgr.set_window_size(edge_distance) + return (True, f'Maximum edge distance set to {edge_distance}') + else: + return (False, 'Edge distance cannot be negative') + def force_spawn_children(self, tasks, outputs): """Spawn children of given task outputs. diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index b5aafb2b708..436beb47fec 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1535,6 +1535,21 @@ class Arguments: result = GenericScalar() +class SetWindowSize(Mutation): + class Meta: + description = sstrip(''' + Change the edge distance (N) of the data-store graph window. + + ''') + resolver = partial(mutator, command='set_window_size') + + class Arguments: + workflows = List(WorkflowID, required=True) + edge_distance = Int(required=True) + + result = GenericScalar() + + class Stop(Mutation): class Meta: description = sstrip(''' @@ -1714,6 +1729,8 @@ class Mutations(ObjectType): reload = Reload.Field(description=Reload._meta.description) set_verbosity = SetVerbosity.Field( description=SetVerbosity._meta.description) + set_window_size = SetWindowSize.Field( + description=SetWindowSize._meta.description) stop = Stop.Field(description=Stop._meta.description) checkpoint = Checkpoint.Field( description=Checkpoint._meta.description) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index ab247fb41a5..eb47ca84171 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -233,9 +233,8 @@ def add_to_runahead_pool(self, itask, is_new=True): self.runahead_pool[itask.point][itask.identity] = itask self.rhpool_changed = True - # Create new data-store n-distance graph window about this task - self.data_store_mgr.increment_graph_window( - itask.tdef.name, itask.point, itask.flow_label) + # Register pool node in data-store in external ID format + self.data_store_mgr.add_runahead_node(itask.tdef.name, itask.point) # add row to "task_states" table & data-store if is_new: @@ -544,6 +543,9 @@ def release_runahead_task(self, itask): self.pool_changed = True self.pool_changes.append(itask) LOG.debug("[%s] -released to the task pool", itask) + # Create new data-store n-distance graph window about this task + self.data_store_mgr.increment_graph_window( + itask.tdef.name, itask.point, itask.flow_label) del self.runahead_pool[itask.point][itask.identity] if not self.runahead_pool[itask.point]: del self.runahead_pool[itask.point] @@ -603,10 +605,11 @@ def remove(self, itask, reason=""): del self.runahead_pool[itask.point] self.rhpool_changed = True + # Notify the data-store manager of their removal + # (the manager uses window boundary tracking for pruning). + self.data_store_mgr.remove_active_node(itask.tdef.name, itask.point) # Event-driven final update of task_states table. - # TODO: same for datastore (still updated by iterating the task pool) - self.data_store_mgr.decrement_graph_window( - itask.tdef.name, itask.point) + # TODO: same for datastore (still updated by scheduler loop) self.suite_db_mgr.put_update_task_state(itask) LOG.debug("[%s] -%s", itask, msg) del itask diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 75e3d901b9f..926e3b5c774 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -70,6 +70,41 @@ def generate_graph_children(tdef, point): return graph_children +def generate_graph_parents(tdef, point): + # Determine graph parents of this task. + graph_parents = {} + for seq, ups in tdef.graph_parents.items(): + graph_parents[seq] = [] + for name, trigger in ups: + parent_point = trigger.get_parent_point(point) + is_abs = (trigger.offset_is_absolute or + trigger.offset_is_from_icp) + if is_abs: + if parent_point != point: + # If 'foo[^] => bar' only spawn off of '^'. + continue + if seq.is_on_sequence(parent_point): + # E.g.: foo should trigger only on T06: + # PT6H = "waz" + # T06 = "waz[-PT6H] => foo" + graph_parents[seq].append((name, parent_point, is_abs)) + + if tdef.sequential: + # Add prev-instance parent. + prevs = [] + for seq in tdef.sequences: + prev = seq.get_prev_point(point) + if prev is not None: + # Within sequence bounds. + prevs.append(prev) + if prevs: + if seq not in graph_parents: + graph_parents[seq] = [] + graph_parents[seq].append((tdef.name, min(prevs), False)) + + return graph_parents + + class TaskProxy: """Represent an instance of a cycling task in a running suite. diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 1b7840e0484..f5e993af914 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -33,7 +33,7 @@ class TaskDef: "sequential", "is_coldstart", "suite_polling_cfg", "clocktrigger_offset", "expiration_offset", "namespace_hierarchy", "dependencies", "outputs", "param_var", - "graph_children", + "graph_children", "graph_parents", "external_triggers", "xtrig_labels", "name", "elapsed_times"] # Store the elapsed times for a maximum of 10 cycles @@ -63,7 +63,7 @@ def __init__(self, name, rtcfg, run_mode, start_point): self.outputs = set() self.graph_children = {} # graph_parents not currently used, but might be soon: - # self.graph_parents = {} + self.graph_parents = {} self.param_var = {} self.external_triggers = [] self.xtrig_labels = {} # {sequence: [labels]} @@ -84,15 +84,15 @@ def add_graph_child(self, trigger, taskname, sequence): trigger.output, []).append((taskname, trigger)) # graph_parents not currently used, but might be soon: - # def add_graph_parent(self, trigger, parent, sequence): - # """Record task instances that I depend on. - # { - # sequence: set([(a,t1), (b,t2), ...]) # (task-name, trigger) - # } - # """ - # if sequence not in self.graph_parents: - # self.graph_parents[sequence] = set() - # self.graph_parents[sequence].add((parent, trigger)) + def add_graph_parent(self, trigger, parent, sequence): + """Record task instances that I depend on. + { + sequence: set([(a,t1), (b,t2), ...]) # (task-name, trigger) + } + """ + if sequence not in self.graph_parents: + self.graph_parents[sequence] = set() + self.graph_parents[sequence].add((parent, trigger)) def add_dependency(self, dependency, sequence): """Add a dependency to a named sequence.