Skip to content

Commit

Permalink
wdlwiz: render called subworkflows
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Mar 3, 2024
1 parent 6dfe837 commit f7cb020
Showing 1 changed file with 75 additions and 44 deletions.
119 changes: 75 additions & 44 deletions examples/wdlviz.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
Visualize a WDL workflow using miniwdl and graphviz
"""

# black -l 100 wdlviz.py && pylint wdlviz.py
import os
import sys
Expand Down Expand Up @@ -62,82 +63,112 @@ def wdlviz(workflow: WDL.Workflow, inputs=False, outputs=False):
# 2. graphviz API -- https://graphviz.readthedocs.io/en/stable/manual.html

# initialiaze Digraph
top = graphviz.Digraph(comment=workflow.name)
top.attr(compound="true", rankdir="LR")
fontname = "Roboto"
top = graphviz.Digraph()
top.attr(label=workflow.name, labelloc="t", fontname=fontname, compound="true", rankdir="LR")
top.attr("node", fontname=fontname)
top.attr("edge", color="#00000080")
node_ids = set()

# recursively add graphviz nodes for each decl/call/scatter/conditional workflow node.
# recursively add graphviz nodes for each workflow node.
nodes_visited = set()
subworkflows_visited = set()

def add_node(graph: graphviz.Digraph, node: WDL.WorkflowNode):
nonlocal node_ids
nonlocal nodes_visited, subworkflows_visited
if isinstance(node, WDL.WorkflowSection):
# scatter/conditional section: add a cluster subgraph to contain its body
with graph.subgraph(name="cluster-" + node.workflow_node_id) as sg:
with graph.subgraph(name=f"cluster-{id(node)}") as sg:
label = "scatter" if isinstance(node, WDL.Scatter) else "if"
sg.attr(label=label + f"({str(node.expr)})", fontname=fontname, rank="same")
sg.attr(label=f"{label}({node.expr})", fontname=fontname, rank="same")
for child in node.body:
add_node(sg, child)
# Add an invisible node inside the subgraph, which provides a sink for dependencies
# of the scatter/conditional expression itself
sg.node(node.workflow_node_id, "", style="invis", height="0", width="0", margin="0")
node_ids.add(node.workflow_node_id)
node_ids |= set(g.workflow_node_id for g in node.gathers.values())
sg.node(str(id(node)), "", style="invis", height="0", width="0", margin="0")
nodes_visited.add(node.workflow_node_id)
nodes_visited |= set(g.workflow_node_id for g in node.gathers.values())
elif isinstance(node, WDL.Call) or (
isinstance(node, WDL.Decl)
and (inputs or node_ids.intersection(node.workflow_node_dependencies))
and (inputs or nodes_visited.intersection(node.workflow_node_dependencies))
):
name = node.name
if isinstance(node, WDL.Call) and isinstance(node.callee, WDL.Workflow):
# subworkflow call: add a cluster subgraph for the called workflow; only once, if
# the subworkflow is called in multiple places.
if id(node.callee) not in subworkflows_visited:
subworkflows_visited.add(id(node.callee))
with top.subgraph(name=f"cluster-{id(node.callee)}") as sg:
sg.attr(label=node.callee.name, fontname=fontname, rank="max")
sg.node( # sink
str(id(node.callee)),
"",
style="invis",
height="0",
width="0",
margin="0",
)
add_workflow(sg, node.callee)
# dotted connection from call to subworkflow
top.edge(
f"{id(node)}:s",
f"{id(node.callee)}:n",
lhead=f"cluster-{id(node.callee)}",
style="dotted",
arrowhead="none",
constraint="false",
)
name = f"{node.callee.name} as {name}"
# node for call or decl
graph.node(
node.workflow_node_id,
node.name,
str(id(node)),
name,
shape=("cds" if isinstance(node, WDL.Call) else "plaintext"),
)
node_ids.add(node.workflow_node_id)

for node in workflow.body:
add_node(top, node)

# cluster of the input decls
if inputs:
with top.subgraph(name="cluster-inputs") as sg:
for inp in workflow.inputs or []:
assert inp.workflow_node_id.startswith("decl-")
sg.node(inp.workflow_node_id, inp.workflow_node_id[5:], shape="plaintext")
node_ids.add(inp.workflow_node_id)
sg.attr(label="inputs", fontname=fontname)

# cluster of the output decls
if outputs:
with top.subgraph(name="cluster-outputs") as sg:
for outp in workflow.outputs or []:
assert outp.workflow_node_id.startswith("output-")
sg.node(outp.workflow_node_id, outp.workflow_node_id[7:], shape="plaintext")
node_ids.add(outp.workflow_node_id)
sg.attr(label="outputs", fontname=fontname)
nodes_visited.add(node.workflow_node_id)

# add edge for each dependency between workflow nodes
def add_edges(node):
def add_edges(graph, workflow, node):
for dep_id in node.workflow_node_dependencies:
dep = workflow.get_node(dep_id)
# leave Gather nodes invisible by replacing any dependencies on them with their
# final_referee
if isinstance(dep, WDL.Tree.Gather):
dep = dep.final_referee
dep_id = dep.workflow_node_id
if dep_id in node_ids and node.workflow_node_id in node_ids:
if dep.workflow_node_id in nodes_visited and node.workflow_node_id in nodes_visited:
lhead = None
if isinstance(node, WDL.WorkflowSection):
lhead = "cluster-" + node.workflow_node_id
top.edge(dep_id, node.workflow_node_id, lhead=lhead)
lhead = f"cluster-{id(node)}"
graph.edge(str(id(dep)), str(id(node)), lhead=lhead)
if isinstance(node, WDL.WorkflowSection):
for child in node.body:
add_edges(child)

for node in (workflow.inputs or []) + workflow.body + (workflow.outputs or []):
add_edges(node)
add_edges(graph, workflow, child)

def add_workflow(graph, workflow):
for node in workflow.body:
add_node(graph, node)

# cluster of the input decls
if inputs:
with graph.subgraph(name=f"cluster-inputs-{id(workflow)}") as sg:
for inp in workflow.inputs or []:
assert inp.workflow_node_id.startswith("decl-")
sg.node(str(id(inp)), inp.workflow_node_id[5:], shape="plaintext")
nodes_visited.add(inp.workflow_node_id)
sg.attr(label="inputs", fontname=fontname)

# cluster of the output decls
if outputs:
with graph.subgraph(name=f"cluster-outputs-{id(workflow)}") as sg:
for outp in workflow.outputs or []:
assert outp.workflow_node_id.startswith("output-")
sg.node(str(id(outp)), outp.workflow_node_id[7:], shape="plaintext")
nodes_visited.add(outp.workflow_node_id)
sg.attr(label="outputs", fontname=fontname)

for node in (workflow.inputs or []) + workflow.body + (workflow.outputs or []):
add_edges(graph, workflow, node)

add_workflow(top, workflow)

return top

Expand Down

0 comments on commit f7cb020

Please sign in to comment.