diff --git a/examples/wdlviz.py b/examples/wdlviz.py index 2b7ed112..01ab446e 100755 --- a/examples/wdlviz.py +++ b/examples/wdlviz.py @@ -2,6 +2,7 @@ """ Visualize a WDL workflow using miniwdl and graphviz """ + # black -l 100 wdlviz.py && pylint wdlviz.py import os import sys @@ -62,82 +63,112 @@ def wdlviz(workflow: WDL.Workflow, inputs=False, outputs=False): # 2. graphviz API -- https://graphviz.readthedocs.io/en/stable/manual.html # initialiaze Digraph - top = graphviz.Digraph(comment=workflow.name) - top.attr(compound="true", rankdir="LR") fontname = "Roboto" + top = graphviz.Digraph() + top.attr(label=workflow.name, labelloc="t", fontname=fontname, compound="true", rankdir="LR") top.attr("node", fontname=fontname) top.attr("edge", color="#00000080") - node_ids = set() - # recursively add graphviz nodes for each decl/call/scatter/conditional workflow node. + # recursively add graphviz nodes for each workflow node. + nodes_visited = set() + subworkflows_visited = set() def add_node(graph: graphviz.Digraph, node: WDL.WorkflowNode): - nonlocal node_ids + nonlocal nodes_visited, subworkflows_visited if isinstance(node, WDL.WorkflowSection): # scatter/conditional section: add a cluster subgraph to contain its body - with graph.subgraph(name="cluster-" + node.workflow_node_id) as sg: + with graph.subgraph(name=f"cluster-{id(node)}") as sg: label = "scatter" if isinstance(node, WDL.Scatter) else "if" - sg.attr(label=label + f"({str(node.expr)})", fontname=fontname, rank="same") + sg.attr(label=f"{label}({node.expr})", fontname=fontname, rank="same") for child in node.body: add_node(sg, child) # Add an invisible node inside the subgraph, which provides a sink for dependencies # of the scatter/conditional expression itself - sg.node(node.workflow_node_id, "", style="invis", height="0", width="0", margin="0") - node_ids.add(node.workflow_node_id) - node_ids |= set(g.workflow_node_id for g in node.gathers.values()) + sg.node(str(id(node)), "", style="invis", height="0", width="0", margin="0") + nodes_visited.add(node.workflow_node_id) + nodes_visited |= set(g.workflow_node_id for g in node.gathers.values()) elif isinstance(node, WDL.Call) or ( isinstance(node, WDL.Decl) - and (inputs or node_ids.intersection(node.workflow_node_dependencies)) + and (inputs or nodes_visited.intersection(node.workflow_node_dependencies)) ): + name = node.name + if isinstance(node, WDL.Call) and isinstance(node.callee, WDL.Workflow): + # subworkflow call: add a cluster subgraph for the called workflow; only once, if + # the subworkflow is called in multiple places. + if id(node.callee) not in subworkflows_visited: + subworkflows_visited.add(id(node.callee)) + with top.subgraph(name=f"cluster-{id(node.callee)}") as sg: + sg.attr(label=node.callee.name, fontname=fontname, rank="max") + sg.node( # sink + str(id(node.callee)), + "", + style="invis", + height="0", + width="0", + margin="0", + ) + add_workflow(sg, node.callee) + # dotted connection from call to subworkflow + top.edge( + f"{id(node)}:s", + f"{id(node.callee)}:n", + lhead=f"cluster-{id(node.callee)}", + style="dotted", + arrowhead="none", + constraint="false", + ) + name = f"{node.callee.name} as {name}" # node for call or decl graph.node( - node.workflow_node_id, - node.name, + str(id(node)), + name, shape=("cds" if isinstance(node, WDL.Call) else "plaintext"), ) - node_ids.add(node.workflow_node_id) - - for node in workflow.body: - add_node(top, node) - - # cluster of the input decls - if inputs: - with top.subgraph(name="cluster-inputs") as sg: - for inp in workflow.inputs or []: - assert inp.workflow_node_id.startswith("decl-") - sg.node(inp.workflow_node_id, inp.workflow_node_id[5:], shape="plaintext") - node_ids.add(inp.workflow_node_id) - sg.attr(label="inputs", fontname=fontname) - - # cluster of the output decls - if outputs: - with top.subgraph(name="cluster-outputs") as sg: - for outp in workflow.outputs or []: - assert outp.workflow_node_id.startswith("output-") - sg.node(outp.workflow_node_id, outp.workflow_node_id[7:], shape="plaintext") - node_ids.add(outp.workflow_node_id) - sg.attr(label="outputs", fontname=fontname) + nodes_visited.add(node.workflow_node_id) # add edge for each dependency between workflow nodes - def add_edges(node): + def add_edges(graph, workflow, node): for dep_id in node.workflow_node_dependencies: dep = workflow.get_node(dep_id) # leave Gather nodes invisible by replacing any dependencies on them with their # final_referee if isinstance(dep, WDL.Tree.Gather): dep = dep.final_referee - dep_id = dep.workflow_node_id - if dep_id in node_ids and node.workflow_node_id in node_ids: + if dep.workflow_node_id in nodes_visited and node.workflow_node_id in nodes_visited: lhead = None if isinstance(node, WDL.WorkflowSection): - lhead = "cluster-" + node.workflow_node_id - top.edge(dep_id, node.workflow_node_id, lhead=lhead) + lhead = f"cluster-{id(node)}" + graph.edge(str(id(dep)), str(id(node)), lhead=lhead) if isinstance(node, WDL.WorkflowSection): for child in node.body: - add_edges(child) - - for node in (workflow.inputs or []) + workflow.body + (workflow.outputs or []): - add_edges(node) + add_edges(graph, workflow, child) + + def add_workflow(graph, workflow): + for node in workflow.body: + add_node(graph, node) + + # cluster of the input decls + if inputs: + with graph.subgraph(name=f"cluster-inputs-{id(workflow)}") as sg: + for inp in workflow.inputs or []: + assert inp.workflow_node_id.startswith("decl-") + sg.node(str(id(inp)), inp.workflow_node_id[5:], shape="plaintext") + nodes_visited.add(inp.workflow_node_id) + sg.attr(label="inputs", fontname=fontname) + + # cluster of the output decls + if outputs: + with graph.subgraph(name=f"cluster-outputs-{id(workflow)}") as sg: + for outp in workflow.outputs or []: + assert outp.workflow_node_id.startswith("output-") + sg.node(str(id(outp)), outp.workflow_node_id[7:], shape="plaintext") + nodes_visited.add(outp.workflow_node_id) + sg.attr(label="outputs", fontname=fontname) + + for node in (workflow.inputs or []) + workflow.body + (workflow.outputs or []): + add_edges(graph, workflow, node) + + add_workflow(top, workflow) return top