Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data-store n > 0 window DB loads #4581

Merged
merged 9 commits into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,23 @@ First Release Candidate for Cylc 8.

### Enhancements

[#3931](https://github.com/cylc/cylc-flow/pull/3931) -
Convert Cylc to use the new "Universal Identifier".
[#4581](https://github.com/cylc/cylc-flow/pull/4581) - Job and task history
is now loaded into the window about active tasks. Reflow future tasks now set
to waiting.

[#4506](https://github.com/cylc/cylc-flow/pull/4506) -
Cylc no longer creates a `flow.cylc` symlink to a `suite.rc` file.
[#3931](https://github.com/cylc/cylc-flow/pull/3931) - Convert Cylc to
use the new "Universal Identifier".

[#4506](https://github.com/cylc/cylc-flow/pull/4506) - Cylc no longer
creates a `flow.cylc` symlink to a `suite.rc` file.
This only affects you if you have used a prior Cylc 8 pre-release.

[#4547](https://github.com/cylc/cylc-flow/pull/4547) - The max scan depth is
now configurable in `global.cylc[install]max depth`, and `cylc install` will
fail if the workflow ID would exceed this depth.

[#4534](https://github.com/cylc/cylc-flow/pull/4534)
- Permit jobs to be run on platforms with no $HOME directory.
[#4534](https://github.com/cylc/cylc-flow/pull/4534) - Permit jobs
to be run on platforms with no $HOME directory.

[#4536](https://github.com/cylc/cylc-flow/pull/4536) - `cylc extract-resources`
renamed `cylc get-resources` and small changes made:
Expand Down
231 changes: 176 additions & 55 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
TASK_STATUS_EXPIRED,
TASK_STATUSES_ORDERED
)
from cylc.flow.task_state_prop import extract_group_state
Expand Down Expand Up @@ -398,6 +397,7 @@ def __init__(self, schd):
self.n_window_nodes = {}
self.n_window_edges = {}
self.n_window_boundary_nodes = {}
self.db_load_task_proxies = {}
self.family_pruned_ids = set()
self.prune_trigger_nodes = {}
self.prune_flagged_nodes = set()
Expand Down Expand Up @@ -592,10 +592,8 @@ def increment_graph_window(
"""Generate graph window about given origin to n-edge-distance.

Args:
name (str):
Task name.
point (cylc.flow.cycling.PointBase):
PointBase derived object.
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy object.
edge_distance (int):
Graph distance from active/origin node.
active_id (str):
Expand All @@ -613,6 +611,12 @@ def increment_graph_window(
if active_id is None:
active_id = s_tokens.id

# flag manual triggers for pruning on deletion.
if itask.is_manual_submit:
self.prune_trigger_nodes.setdefault(active_id, set()).add(
s_tokens.id
)

# Setup and check if active node is another's boundary node
# to flag its paths for pruning.
if edge_distance == 0:
Expand Down Expand Up @@ -648,7 +652,7 @@ def increment_graph_window(
edge_distance += 1

# Don't expand window about orphan task.
if is_orphan is not True:
if not is_orphan:
# TODO: xtrigger is workflow_state edges too
# Reference set for workflow relations
for items in itask.graph_children.values():
Expand Down Expand Up @@ -677,6 +681,8 @@ def increment_graph_window(
True,
)

# If this is the active task (edge_distance has been incremented),
# then add the most distant child as a trigger to prune it.
if edge_distance == 1:
levels = self.n_window_boundary_nodes[active_id].keys()
# Could be self-reference node foo:failed => foo
Expand Down Expand Up @@ -753,13 +759,15 @@ def remove_pool_node(self, name, point):
).id
if tp_id in self.all_task_pool:
self.all_task_pool.remove(tp_id)
self.updates_pending = True
# flagged isolates/end-of-branch nodes for pruning on removal
if (
tp_id in self.prune_trigger_nodes and
tp_id in self.prune_trigger_nodes[tp_id]
):
self.prune_flagged_nodes.update(self.prune_trigger_nodes[tp_id])
del self.prune_trigger_nodes[tp_id]
self.updates_pending = True

def add_pool_node(self, name, point):
"""Add external ID reference for internal task pool node."""
Expand All @@ -782,7 +790,7 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):

Returns:

None
True/False

"""
name = itask.tdef.name
Expand All @@ -791,11 +799,13 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
task_proxies = self.data[self.workflow_id][TASK_PROXIES]

is_orphan = False
if name not in self.schd.config.taskdefs:
is_orphan = True

if tp_id in task_proxies or tp_id in self.added[TASK_PROXIES]:
return is_orphan

if name not in self.schd.config.taskdefs:
is_orphan = True
if is_orphan:
self.generate_orphan_task(itask)

# Most the time the definition node will be in the store,
Expand All @@ -818,14 +828,7 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
),
depth=task_def.depth,
name=name,
state=TASK_STATUS_WAITING,
flow_nums=json.dumps(list(itask.flow_nums))
)
if is_parent and tp_id not in self.n_window_nodes:
# TODO: Load task info from DB, including itask prerequisites
tproxy.state = TASK_STATUS_EXPIRED
else:
tproxy.state = TASK_STATUS_WAITING

tproxy.namespace[:] = task_def.namespace
if is_orphan:
Expand All @@ -846,39 +849,6 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
]
tproxy.first_parent = tproxy.ancestors[0]

for prereq in itask.state.prerequisites:
# Protobuf messages populated within
prereq_obj = prereq.api_dump()
if prereq_obj:
tproxy.prerequisites.append(prereq_obj)

for label, message, satisfied in itask.state.outputs.get_all():
output = tproxy.outputs[label]
output.label = label
output.message = message
output.satisfied = satisfied
output.time = update_time

if itask.tdef.clocktrigger_offset is not None:
tproxy.clock_trigger.satisfied = itask.is_waiting_clock_done()
tproxy.clock_trigger.time = itask.clock_trigger_time
tproxy.clock_trigger.time_string = time2str(
itask.clock_trigger_time)

for trig, satisfied in itask.state.external_triggers.items():
ext_trig = tproxy.external_triggers[trig]
ext_trig.id = trig
ext_trig.satisfied = satisfied

for label, satisfied in itask.state.xtriggers.items():
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
itask, label).get_signature()
xtrig = tproxy.xtriggers[sig]
xtrig.id = sig
xtrig.label = label
xtrig.satisfied = satisfied
self.xtrigger_tasks.setdefault(sig, set()).add(tp_id)

self.added[TASK_PROXIES][tp_id] = tproxy
getattr(self.updated[WORKFLOW], TASK_PROXIES).append(tp_id)
self.updated[TASKS].setdefault(
Expand All @@ -890,12 +860,25 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
).proxies.append(tp_id)
self.generate_ghost_family(tproxy.first_parent, child_task=tp_id)
self.state_update_families.add(tproxy.first_parent)
if tproxy.state in self.latest_state_tasks:
tp_ref = Tokens(tproxy.id).relative_id
tp_queue = self.latest_state_tasks[tproxy.state]
if tp_ref in tp_queue:
tp_queue.remove(tp_ref)
self.latest_state_tasks[tproxy.state].appendleft(tp_ref)

# Active, but not in the data-store yet (new).
if tp_id in self.n_window_nodes:
self.process_internal_task_proxy(itask, tproxy)
# Has run before, so get history.
# Cannot batch as task is active (all jobs retrieved at once).
if itask.submit_num > 0:
flow_db = self.schd.workflow_db_mgr.pri_dao
for row in flow_db.select_jobs_for_datastore(
{itask.tokens.relative_id}
):
self.insert_db_job(1, row)
else:
# Batch non-active node for load of DB history.
self.db_load_task_proxies[itask.tokens.relative_id] = (
itask,
is_parent,
)

self.updates_pending = True

return is_orphan
Expand Down Expand Up @@ -1002,6 +985,139 @@ def generate_ghost_family(self, fp_id, child_fam=None, child_task=None):
elif child_fam not in fp_parent.child_families:
fp_parent.child_families.append(child_fam)

def process_internal_task_proxy(self, itask, tproxy):
"""Extract information from internal task proxy object."""

update_time = time()

tproxy.state = itask.state.status
tproxy.flow_nums = json.dumps(list(itask.flow_nums))

prereq_list = []
for prereq in itask.state.prerequisites:
# Protobuf messages populated within
prereq_obj = prereq.api_dump()
if prereq_obj:
prereq_list.append(prereq_obj)
del tproxy.prerequisites[:]
tproxy.prerequisites.extend(prereq_list)

for label, message, satisfied in itask.state.outputs.get_all():
output = tproxy.outputs[label]
output.label = label
output.message = message
output.satisfied = satisfied
output.time = update_time

if itask.tdef.clocktrigger_offset is not None:
tproxy.clock_trigger.satisfied = itask.is_waiting_clock_done()
tproxy.clock_trigger.time = itask.clock_trigger_time
tproxy.clock_trigger.time_string = time2str(
itask.clock_trigger_time)

for trig, satisfied in itask.state.external_triggers.items():
ext_trig = tproxy.external_triggers[trig]
ext_trig.id = trig
ext_trig.satisfied = satisfied

for label, satisfied in itask.state.xtriggers.items():
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
itask, label).get_signature()
xtrig = tproxy.xtriggers[sig]
xtrig.id = sig
xtrig.label = label
xtrig.satisfied = satisfied
self.xtrigger_tasks.setdefault(sig, set()).add(tproxy.id)

if tproxy.state in self.latest_state_tasks:
tp_ref = Tokens(tproxy.id).relative_id
tp_queue = self.latest_state_tasks[tproxy.state]
if tp_ref in tp_queue:
tp_queue.remove(tp_ref)
self.latest_state_tasks[tproxy.state].appendleft(tp_ref)

def apply_task_proxy_db_history(self):
"""Extract and apply DB history on given task proxies."""

if not self.db_load_task_proxies:
return

flow_db = self.schd.workflow_db_mgr.pri_dao

task_ids = set(self.db_load_task_proxies.keys())

# Batch load rows with matching cycle & name column pairs.
prereq_ids = set()
for (
cycle, name, flow_nums_str, status, submit_num, outputs_str
) in flow_db.select_tasks_for_datastore(task_ids):
tokens = self.id_.duplicate(
cycle=cycle,
task=name,
)
itask, is_parent = self.db_load_task_proxies[tokens.relative_id]
itask.submit_num = submit_num
flow_nums = set(json.loads(flow_nums_str))
# Do not set states and outputs for future tasks in flow.
if (
itask.flow_nums and
flow_nums != itask.flow_nums and
not is_parent
):
itask.state_reset(TASK_STATUS_WAITING)
continue
else:
itask.flow_nums = flow_nums
itask.state_reset(status)
if (
outputs_str is not None
and itask.state(
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED
)
):
for message in json.loads(outputs_str).values():
itask.state.outputs.set_completion(message, True)
# Gather tasks with flow id.
prereq_ids.add(f'{tokens.relative_id}/{flow_nums_str}')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because flow_nums are defined as a set on the TaskProxy there is no guarantee of order when they are written to the DB which could mess with this comparison logic.

We should push serialisation/deserialisation through a sort to make it safe. Seems to work now so we can do this later - #4672


# Batch load prerequisites of tasks according to flow.
prereqs_map = {}
for (
cycle, name, prereq_name,
prereq_cycle, prereq_output, satisfied
) in flow_db.select_prereqs_for_datastore(prereq_ids):
Comment on lines +1087 to +1090
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: we have ditched pylint which is the tool which used to tell us to double indent these statements.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, what's your preference? this:

        for (
            cycle, name,
            prereq_name,
            prereq_cycle,
            prereq_output,
            satisfied
        ) in flow_db.select_prereqs_for_datastore(prereq_ids):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. (Indent-wise; not necessarily each variable on a separate line).

tokens = self.id_.duplicate(
cycle=cycle,
task=name,
)
prereqs_map.setdefault(tokens.relative_id, {})[
(prereq_cycle, prereq_name, prereq_output)
] = satisfied if satisfied != '0' else False

for ikey, prereqs in prereqs_map.items():
for itask_prereq in (
self.db_load_task_proxies[ikey][0].state.prerequisites
):
for key in itask_prereq.satisfied.keys():
itask_prereq.satisfied[key] = prereqs[key]

# Extract info from itasks to data-store.
for task_info in self.db_load_task_proxies.values():
self.process_internal_task_proxy(
task_info[0],
self.added[TASK_PROXIES][
self.id_.duplicate(task_info[0].tokens).id
]
)

# Batch load jobs from DB.
for row in flow_db.select_jobs_for_datastore(task_ids):
self.insert_db_job(1, row)

self.db_load_task_proxies.clear()

def insert_job(self, name, point_string, status, job_conf):
"""Insert job into data-store.

Expand Down Expand Up @@ -1126,6 +1242,9 @@ def insert_db_job(self, row_idx, row):
def update_data_structure(self, reloaded=False):
"""Workflow batch updates in the data structure."""

# load database history for flagged nodes
self.apply_task_proxy_db_history()

# Avoids changing window edge distance during edge/node creation
if self.next_n_edge_distance is not None:
self.n_edge_distance = self.next_n_edge_distance
Expand Down Expand Up @@ -1166,11 +1285,13 @@ def prune_data_store(self):
if not self.prune_flagged_nodes:
return

# Keep all nodes in the path of active tasks.
in_paths_nodes = set().union(*[
v
for k, v in self.n_window_nodes.items()
if k in self.all_task_pool
])
# Gather all nodes in the paths of tasks flagged for pruning.
out_paths_nodes = self.prune_flagged_nodes.union(*[
v
for k, v in self.n_window_nodes.items()
Expand Down
Loading