Skip to content

Commit

Permalink
new prune/gen scheme & mutation added
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Oct 14, 2020
1 parent 9f639f5 commit 851015a
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 100 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1742,7 +1742,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
# (name is left name)
self.taskdefs[name].add_graph_child(task_trigger, right, seq)
# graph_parents not currently used but might be needed soon:
# self.taskdefs[right].add_graph_parent(task_trigger, name, seq)
self.taskdefs[right].add_graph_parent(task_trigger, name, seq)

# Walk down "expr_list" depth first, and replace any items matching a
# key in "triggers" ("left" values) with the trigger.
Expand Down
274 changes: 192 additions & 82 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"""

from collections import Counter
from copy import deepcopy
from copy import copy, deepcopy
import json
from time import time
import zlib
Expand All @@ -65,7 +65,9 @@
from cylc.flow.suite_status import get_suite_status
from cylc.flow.task_id import TaskID
from cylc.flow.task_job_logs import JOB_LOG_OPTS
from cylc.flow.task_proxy import generate_graph_children
from cylc.flow.task_proxy import (
generate_graph_children, generate_graph_parents
)
from cylc.flow.task_state import TASK_STATUS_WAITING
from cylc.flow.task_state_prop import extract_group_state
from cylc.flow.wallclock import (
Expand Down Expand Up @@ -288,7 +290,7 @@ class DataStoreMgr:
Message containing the global information of the workflow.
.descendants (dict):
Local store of config.get_first_parent_descendants()
.n_max (int):
.n_edge_distance (int):
Maximum distance of the data-store graph from the active pool.
.parents (dict):
Local store of config.get_parent_lists()
Expand All @@ -312,7 +314,8 @@ def __init__(self, schd):
self.parents = {}
self.state_update_families = set()
self.updated_state_families = set()
self.n_max = 1
self.n_edge_distance = 1
self.next_n_edge_distance = None
# Managed data types
self.data = {
self.workflow_id: deepcopy(DATA_TEMPLATE)
Expand All @@ -331,10 +334,13 @@ def __init__(self, schd):
self.updates_pending = False
self.delta_queues = {self.workflow_id: {}}
self.publish_deltas = []
self.in_window_edges = {}
self.out_window_edges = {}
self.in_window_nodes = {}
self.out_window_nodes = {}
self.all_task_pool = set()
self.active_descendant_nodes = set()
self.n_window_nodes = {}
self.n_window_edges = {}
self.n_window_boundary_nodes = {}
self.prune_trigger_nodes = {}
self.prune_flagged_nodes = set()
self.prune_pending = False

def initiate_data_model(self, reloaded=False):
Expand Down Expand Up @@ -530,13 +536,23 @@ def generate_definition_elements(self):
self.parents = parents

def increment_graph_window(
self, name, point, flow_label, n_distance=0, active_id=None):
"""Generate graph window about given task proxy to n_max
self, name, point, flow_label,
edge_distance=0, active_id=None, descendant=False):
"""Generate graph window about given origin to n-edge-distance.
Args:
source_itask (cylc.flow.TaskProxy):
Edge source task proxy object.
n_distance (int): Graph distance from active parent.
name (str):
Task name.
point (cylc.flow.cycling.PointBase):
PointBase derived object.
flow_label (str):
Flow label used to distinguish multiple runs.
edge_distance (int):
Graph distance from active/origin node.
active_id (str):
Active/origin node id.
descendent (bool):
Is the current node a direct descendent of the active/origin.
Returns:
Expand All @@ -550,64 +566,119 @@ def increment_graph_window(
active_id = s_id

# Generate task node
if edge_distance == 0:
self.n_window_edges[active_id] = set()
self.n_window_boundary_nodes[active_id] = {}
self.n_window_nodes[active_id] = set()
if active_id in self.prune_trigger_nodes:
self.prune_flagged_nodes.update(
self.prune_trigger_nodes[active_id])
del self.prune_trigger_nodes[active_id]
self.prune_pending = True

graph_children = generate_graph_children(
self.schd.config.get_taskdef(name), point)

if edge_distance > self.n_edge_distance or not graph_children:
if descendant or edge_distance == 0:
self.n_window_boundary_nodes[
active_id].setdefault(edge_distance, set()).add(s_id)

if edge_distance > self.n_edge_distance:
return

self.n_window_nodes[active_id].add(s_id)
self.generate_ghost_task(s_id, name, point, flow_label)
if n_distance == 0:
self.in_window_nodes[active_id] = set()
self.in_window_edges[active_id] = set()
self.in_window_nodes[active_id].add(s_id)

n_distance += 1
if n_distance > self.n_max:
return
edge_distance += 1

# TODO: xtrigger is suite_state edges too
# Reference set for workflow relations
new_edges = set()
for output, items in generate_graph_children(
for _, items in graph_children.items():
if edge_distance == 1:
descendant = True
self._expand_graph_window(
s_id, s_node, items, active_id, flow_label, edge_distance,
descendant, False)

for _, items in generate_graph_parents(
self.schd.config.get_taskdef(name), point).items():
for t_name, t_point, is_abs in items:
t_node = TaskID.get(t_name, t_point)
t_id = (
f'{self.workflow_id}{ID_DELIM}{t_point}{ID_DELIM}{t_name}')
self.increment_graph_window(
t_name, t_point, flow_label, n_distance, active_id)

# Initiate edge element.
self._expand_graph_window(
s_id, s_node, items, active_id, flow_label, edge_distance,
False, True)

if edge_distance == 1:
# print('ACTIVE')
# print(active_id)
# print(self.n_window_nodes[active_id])
# print('BOUNDARY')
# print(self.n_window_boundary_nodes[active_id])
for tp_id in self.n_window_boundary_nodes[active_id][
max(self.n_window_boundary_nodes[active_id].keys())]:
self.prune_trigger_nodes.setdefault(
tp_id, set()).add(active_id)
if self.n_window_edges[active_id]:
getattr(self.updated[WORKFLOW], EDGES).edges.extend(
self.n_window_edges[active_id])

def _expand_graph_window(
self, s_id, s_node, items, active_id, flow_label, edge_distance,
descendant=False, is_parent=False):
"""Generate nodes/edges for children/parents of source node."""
for t_name, t_point, is_abs in items:
t_node = TaskID.get(t_name, t_point)
t_id = (
f'{self.workflow_id}{ID_DELIM}{t_point}{ID_DELIM}{t_name}')
# Initiate edge element.
if is_parent:
e_id = (
f'{self.workflow_id}{ID_DELIM}{t_node}{ID_DELIM}{s_node}')
else:
e_id = (
f'{self.workflow_id}{ID_DELIM}{s_node}{ID_DELIM}{t_node}')
if (
e_id not in self.data[self.workflow_id][EDGES] and
e_id not in self.added[EDGES]
):
self.added[EDGES][e_id] = PbEdge(
id=e_id,
source=s_id,
target=t_id
)
# Add edge id to node field for resolver reference
self.updated[TASK_PROXIES].setdefault(
t_id,
PbTaskProxy(id=t_id)).edges.append(e_id)
self.updated[TASK_PROXIES].setdefault(
s_id,
PbTaskProxy(id=s_id)).edges.append(e_id)
new_edges.add(e_id)
if new_edges:
self.updates_pending = True
self.in_window_edges[active_id] |= new_edges
getattr(self.updated[WORKFLOW], EDGES).edges.extend(new_edges)
if e_id in self.n_window_edges[active_id]:
continue
if (
e_id not in self.data[self.workflow_id][EDGES] and
e_id not in self.added[EDGES] and
edge_distance <= self.n_edge_distance
):
self.added[EDGES][e_id] = PbEdge(
id=e_id,
source=s_id,
target=t_id
)
# Add edge id to node field for resolver reference
self.updated[TASK_PROXIES].setdefault(
t_id,
PbTaskProxy(id=t_id)).edges.append(e_id)
self.updated[TASK_PROXIES].setdefault(
s_id,
PbTaskProxy(id=s_id)).edges.append(e_id)
self.n_window_edges[active_id].add(e_id)
if t_id in self.n_window_nodes[active_id]:
continue
self.increment_graph_window(
t_name, t_point, flow_label,
copy(edge_distance), active_id, descendant)

def decrement_graph_window(self, name, point):
def remove_active_node(self, name, point):
"""Flag removal of this active graph window."""
tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
self.all_task_pool.remove(tp_id)
# flagged isolates/end-of-branch nodes for pruning on removal
if (
tp_id in self.prune_trigger_nodes and
tp_id in self.prune_trigger_nodes[tp_id]
):
self.prune_flagged_nodes.update(self.prune_trigger_nodes[tp_id])
del self.prune_trigger_nodes[tp_id]
self.prune_pending = True

def add_runahead_node(self, name, point):
"""Flag removal of this active graph window."""
tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
if tp_id in self.in_window_nodes:
self.out_window_nodes[tp_id] = self.in_window_nodes[tp_id]
del self.in_window_nodes[tp_id]
if tp_id in self.in_window_edges:
self.out_window_edges[tp_id] = self.in_window_edges[tp_id]
del self.in_window_edges[tp_id]
self.prune_pending = True
self.all_task_pool.add(tp_id)

def generate_ghost_task(self, tp_id, name, point, flow_label):
"""Create task-point element populated with static data.
Expand Down Expand Up @@ -737,6 +808,11 @@ def update_data_structure(self, updated_nodes=None):
self.update_dynamic_elements(updated_nodes)
self.update_family_proxies()

# Avoids changing window edge distance during edge/node creation
if self.next_n_edge_distance is not None:
self.n_edge_distance = self.next_n_edge_distance
self.next_n_edge_distance = None

# Update workflow statuses and totals if needed
if self.prune_pending:
self.prune_data_store()
Expand Down Expand Up @@ -765,41 +841,70 @@ def prune_data_store(self):
"""
self.prune_pending = False
if not self.out_window_nodes and not self.out_window_edges:
if not self.prune_flagged_nodes:
return

data = self.data[self.workflow_id]

# Gather nodes and edges to prune
node_ids = set().union(*self.out_window_nodes.values()).difference(
*self.in_window_nodes.values())
self.out_window_nodes.clear()
edge_ids = set().union(*self.out_window_edges.values()).difference(
*self.in_window_edges.values())
self.out_window_edges.clear()

for e_id in edge_ids:
self.deltas[EDGES].pruned.append(e_id)
# Gather nodes to prune via a diff of nodes in the dependency paths
# of flagged boundary nodes against all other paths.
in_paths = [
v
for k, v in self.n_window_nodes.items()
if k in self.all_task_pool
]
# if k in self.all_task_pool
# print('IN PATHS')
# print(in_paths)
out_paths = [
v
for k, v in self.n_window_nodes.items()
if k in self.prune_flagged_nodes
]
# print('OUT PATHS')
# print(out_paths)
node_ids = set().union(*out_paths).difference(*in_paths)
# print('TO PRUNE')
# print(node_ids)
# self.prune_flagged_nodes.difference_update(node_ids)
self.prune_flagged_nodes.clear()

flagged_ids = set()
tp_data = self.data[self.workflow_id][TASK_PROXIES]
tp_added = self.added[TASK_PROXIES]
parent_ids = set()
for tp_id in node_ids:
if tp_id in self.n_window_nodes:
del self.n_window_nodes[tp_id]
if tp_id in self.n_window_edges:
del self.n_window_edges[tp_id]
if tp_id in tp_data:
node = tp_data[tp_id]
elif tp_id in tp_added:
node = tp_added[tp_id]
else:
continue
self.deltas[TASK_PROXIES].pruned.append(tp_id)
self.schd.job_pool.remove_task_jobs(tp_id)
flagged_ids.add(data[TASK_PROXIES][tp_id].first_parent)
self.deltas[EDGES].pruned.extend(node.edges)
parent_ids.add(node.first_parent)

prune_ids = set()
checked_ids = set()
while flagged_ids:
for fp_id in flagged_ids:
while parent_ids:
for fp_id in parent_ids:
self._family_ascent_point_prune(
fp_id, node_ids, flagged_ids, checked_ids, prune_ids)
fp_id, node_ids, parent_ids, checked_ids, prune_ids)
break
if prune_ids:
self.deltas[FAMILY_PROXIES].pruned.extend(prune_ids)
self.updates_pending = True

def _family_ascent_point_prune(
self, fp_id, node_ids, flagged_ids, checked_ids, prune_ids):
self, fp_id, node_ids, parent_ids, checked_ids, prune_ids):
"""Find and prune family recursively checking child families.
Recursively map out child families to the bottom from the origin
family. The work back up to origin checking these families are active.
"""
fp_data = self.data[self.workflow_id][FAMILY_PROXIES]
if fp_id in fp_data and fp_id not in self.updated[FAMILY_PROXIES]:
fam_node = fp_data[fp_id]
Expand All @@ -811,7 +916,7 @@ def _family_ascent_point_prune(
]
for child_id in child_fam_nodes:
self._family_ascent_point_prune(
child_id, node_ids, flagged_ids, checked_ids, prune_ids)
child_id, node_ids, parent_ids, checked_ids, prune_ids)
if [
child_id
for child_id in fam_node.child_tasks
Expand All @@ -824,11 +929,11 @@ def _family_ascent_point_prune(
self.state_update_families.add(fp_id)
else:
if fam_node.first_parent:
flagged_ids.add(fam_node.first_parent)
parent_ids.add(fam_node.first_parent)
prune_ids.add(fp_id)
checked_ids.add(fp_id)
if fp_id in flagged_ids:
flagged_ids.remove(fp_id)
if fp_id in parent_ids:
parent_ids.remove(fp_id)

def update_dynamic_elements(self, updated_nodes=None):
"""Update data elements containing dynamic/live fields."""
Expand Down Expand Up @@ -1041,6 +1146,11 @@ def hold_release_tasks(self, hold=True):
tp_delta.state = tp_node.state
self.state_update_families.add(tp_node.first_parent)

def set_window_size(self, edge_distance):
"""Set the what the edge distance will change to."""
self.next_n_edge_distance = edge_distance
self.updates_pending = True

def update_workflow(self):
"""Update workflow element status and state totals."""
# Create new message and copy existing message content
Expand Down
Loading

0 comments on commit 851015a

Please sign in to comment.