diff --git a/ansibleplaybookgrapher/graph.py b/ansibleplaybookgrapher/graph.py
index a652546f..dc76f810 100644
--- a/ansibleplaybookgrapher/graph.py
+++ b/ansibleplaybookgrapher/graph.py
@@ -14,8 +14,7 @@
# along with this program. If not, see .
import os
from collections import defaultdict
-from typing import Dict, List, ItemsView, Set
-from collections import defaultdict
+from typing import Dict, List, ItemsView, Set, Type
from ansibleplaybookgrapher.utils import generate_id
@@ -25,7 +24,14 @@ class Node:
A node in the graph. Everything of the final graph is a node: playbook, plays, edges, tasks and roles.
"""
- def __init__(self, node_name: str, node_id: str, when: str = "", raw_object=None):
+ def __init__(
+ self,
+ node_name: str,
+ node_id: str,
+ when: str = "",
+ raw_object=None,
+ parent: "Node" = None,
+ ):
"""
:param node_name: The name of the node
@@ -33,8 +39,10 @@ def __init__(self, node_name: str, node_id: str, when: str = "", raw_object=None
:param when: The conditional attached to the node
:param raw_object: The raw ansible object matching this node in the graph. Will be None if there is no match on
Ansible side
+ :param parent: The parent of this node
"""
self.name = node_name
+ self.parent = parent
self.id = node_id
self.when = when
self.raw_object = raw_object
@@ -50,8 +58,22 @@ def retrieve_position(self):
if self.raw_object and self.raw_object.get_ds():
self.path, self.line, self.column = self.raw_object.get_ds().ansible_pos
+ def get_first_parent_matching_type(self, node_type: Type) -> "Type":
+ """
+ Get the first parent of this node matching the given type
+ :return:
+ """
+ current_parent = self.parent
+
+ while current_parent is not None:
+ if isinstance(current_parent, node_type):
+ return current_parent
+ current_parent = current_parent.parent
+
+ raise ValueError(f"No parent of type {node_type} found for {self}")
+
def __repr__(self):
- return f"{type(self).__name__}(name='{self.name}',id='{self.id}')"
+ return f"{type(self).__name__}(name='{self.name}', id='{self.id}')"
def __eq__(self, other):
return self.id == other.id
@@ -74,6 +96,7 @@ def __init__(
node_id: str,
when: str = "",
raw_object=None,
+ parent: "Node" = None,
supported_compositions: List[str] = None,
):
"""
@@ -84,7 +107,7 @@ def __init__(
Ansible side
:param supported_compositions:
"""
- super().__init__(node_name, node_id, when, raw_object)
+ super().__init__(node_name, node_id, when, raw_object, parent)
self._supported_compositions = supported_compositions or []
# The dict will contain the different types of composition.
self._compositions = defaultdict(list) # type: Dict[str, List]
@@ -133,17 +156,17 @@ def _get_all_tasks_nodes(self, task_acc: List["Node"]):
elif isinstance(node, CompositeNode):
node._get_all_tasks_nodes(task_acc)
- def links_structure(self) -> Dict[str, List[Node]]:
+ def links_structure(self) -> Dict[Node, List[Node]]:
"""
Return a representation of the composite node where each key of the dictionary is the node id and the
value is the list of the linked nodes
:return:
"""
- links: Dict[str, List[Node]] = defaultdict(list)
+ links: Dict[Node, List[Node]] = defaultdict(list)
self._get_all_links(links)
return links
- def _get_all_links(self, links: Dict[str, List[Node]]):
+ def _get_all_links(self, links: Dict[Node, List[Node]]):
"""
Recursively get the node links
:return:
@@ -152,7 +175,7 @@ def _get_all_links(self, links: Dict[str, List[Node]]):
for node in nodes:
if isinstance(node, CompositeNode):
node._get_all_links(links)
- links[self.id].append(node)
+ links[self].append(node)
class CompositeTasksNode(CompositeNode):
@@ -160,8 +183,17 @@ class CompositeTasksNode(CompositeNode):
A special composite node which only support adding "tasks". Useful for block and role
"""
- def __init__(self, node_name: str, node_id: str, when: str = "", raw_object=None):
- super().__init__(node_name, node_id, when=when, raw_object=raw_object)
+ def __init__(
+ self,
+ node_name: str,
+ node_id: str,
+ when: str = "",
+ raw_object=None,
+ parent: "Node" = None,
+ ):
+ super().__init__(
+ node_name, node_id, when=when, raw_object=raw_object, parent=parent
+ )
self._supported_compositions = ["tasks"]
def add_node(self, target_composition: str, node: Node):
@@ -216,7 +248,7 @@ def plays(self) -> List["PlayNode"]:
"""
return self._compositions["plays"]
- def roles_usage(self) -> Dict["RoleNode", List[str]]:
+ def roles_usage(self) -> Dict["RoleNode", List[Node]]:
"""
For each role in the graph, return the plays that reference the role
FIXME: Review this implementation. It may not be the most efficient way, but it's ok for the moment
@@ -226,18 +258,15 @@ def roles_usage(self) -> Dict["RoleNode", List[str]]:
usages = defaultdict(list)
links = self.links_structure()
- for node_id, linked_nodes in links.items():
+ for node, linked_nodes in links.items():
for linked_node in linked_nodes:
if isinstance(linked_node, RoleNode):
- usages[linked_node].append(node_id)
-
- # In case a role is used by another role, replace it by the play associated with using role (transitivity)
- for usages_set in usages.values():
- for node_id in usages_set.copy():
- for r in usages:
- if node_id == r.id:
- usages_set.remove(node_id)
- usages_set.extend(usages[r])
+ if isinstance(node, PlayNode):
+ usages[linked_node].append(node)
+ else:
+ usages[linked_node].append(
+ node.get_first_parent_matching_type(PlayNode)
+ )
return usages
@@ -257,6 +286,7 @@ def __init__(
node_id: str = None,
when: str = "",
raw_object=None,
+ parent: "Node" = None,
hosts: List[str] = None,
):
"""
@@ -269,6 +299,7 @@ def __init__(
node_id or generate_id("play_"),
when=when,
raw_object=raw_object,
+ parent=parent,
supported_compositions=["pre_tasks", "roles", "tasks", "post_tasks"],
)
self.hosts = hosts or []
@@ -296,13 +327,19 @@ class BlockNode(CompositeTasksNode):
"""
def __init__(
- self, node_name: str, node_id: str = None, when: str = "", raw_object=None
+ self,
+ node_name: str,
+ node_id: str = None,
+ when: str = "",
+ raw_object=None,
+ parent: "Node" = None,
):
super().__init__(
node_name,
node_id or generate_id("block_"),
when=when,
raw_object=raw_object,
+ parent=parent,
)
@@ -312,7 +349,12 @@ class TaskNode(Node):
"""
def __init__(
- self, node_name: str, node_id: str = None, when: str = "", raw_object=None
+ self,
+ node_name: str,
+ node_id: str = None,
+ when: str = "",
+ raw_object=None,
+ parent: "Node" = None,
):
"""
@@ -321,7 +363,11 @@ def __init__(
:param raw_object:
"""
super().__init__(
- node_name, node_id or generate_id("task_"), when=when, raw_object=raw_object
+ node_name,
+ node_id or generate_id("task_"),
+ when=when,
+ raw_object=raw_object,
+ parent=parent,
)
@@ -336,6 +382,7 @@ def __init__(
node_id: str = None,
when: str = "",
raw_object=None,
+ parent: "Node" = None,
include_role: bool = False,
):
"""
@@ -346,7 +393,11 @@ def __init__(
"""
self.include_role = include_role
super().__init__(
- node_name, node_id or generate_id("role_"), when=when, raw_object=raw_object
+ node_name,
+ node_id or generate_id("role_"),
+ when=when,
+ raw_object=raw_object,
+ parent=parent,
)
def retrieve_position(self):
diff --git a/ansibleplaybookgrapher/graphbuilder.py b/ansibleplaybookgrapher/graphbuilder.py
index fd5d60be..a5cf4e56 100644
--- a/ansibleplaybookgrapher/graphbuilder.py
+++ b/ansibleplaybookgrapher/graphbuilder.py
@@ -12,7 +12,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from typing import Dict, Optional, Tuple, List
+from typing import Dict, Optional, Tuple, List, Set
from ansible.utils.display import Display
from graphviz import Digraph
@@ -23,6 +23,7 @@
RoleNode,
BlockNode,
Node,
+ PlayNode,
)
from ansibleplaybookgrapher.utils import get_play_colors, merge_dicts
@@ -86,7 +87,7 @@ def parse(
for play in playbook_node.plays:
# TODO: find a way to create visual distance between the generated colors
# https://stackoverflow.com/questions/9018016/how-to-compare-two-colors-for-similarity-difference
- self.plays_color[play.id] = get_play_colors(play.id)
+ self.plays_color[play] = get_play_colors(play.id)
# Update the usage of the roles
self.roles_usage = merge_dicts(
@@ -142,11 +143,11 @@ class GraphvizGraphBuilder:
def __init__(
self,
- playbook_node: "PlaybookNode",
+ playbook_node: PlaybookNode,
open_protocol_handler: str,
digraph: Digraph,
- play_colors: Dict[str, Tuple[str, str]],
- roles_usage: Dict["RoleNode", List[str]] = None,
+ play_colors: Dict[PlayNode, Tuple[str, str]],
+ roles_usage: Dict[RoleNode, List[Node]] = None,
roles_built: Dict = None,
open_protocol_custom_formats: Dict[str, str] = None,
):
@@ -337,9 +338,13 @@ def build_role(
if role_to_render is None:
# Merge the colors for each play where this role is used
role_plays = self.roles_usage[destination]
- colors = list(map(self.play_colors.get, role_plays))
# Graphviz support providing multiple colors separated by :
- role_color = ":".join([c[0] for c in colors])
+ if len(role_plays) > 1:
+ # If the role is used in multiple plays, we take black as the default color
+ role_color = "black"
+ else:
+ colors = list(map(self.play_colors.get, role_plays))[0]
+ role_color = colors[0]
self.roles_built[destination.name] = destination
@@ -361,8 +366,6 @@ def build_role(
counter=role_task_counter,
color=role_color,
)
- else:
- print("here")
def build_graphviz_graph(self):
"""
@@ -380,7 +383,7 @@ def build_graphviz_graph(self):
for play_counter, play in enumerate(self.playbook_node.plays, 1):
with self.digraph.subgraph(name=play.name) as play_subgraph:
- color, play_font_color = self.play_colors[play.id]
+ color, play_font_color = self.play_colors[play]
play_tooltip = (
",".join(play.hosts) if len(play.hosts) > 0 else play.name
)
diff --git a/ansibleplaybookgrapher/parser.py b/ansibleplaybookgrapher/parser.py
index 6aeaf897..520ee023 100644
--- a/ansibleplaybookgrapher/parser.py
+++ b/ansibleplaybookgrapher/parser.py
@@ -118,6 +118,7 @@ def _add_task(
generate_id(f"{node_type}_"),
when=convert_when_to_str(task.when),
raw_object=task,
+ parent=parent_node,
),
)
@@ -229,6 +230,7 @@ def parse(self, *args, **kwargs) -> PlaybookNode:
clean_name(role.get_name()),
node_id="role_" + hash_value(role.get_name()),
raw_object=role,
+ parent=play_node,
)
# edge from play to role
play_node.add_node("roles", role_node)
@@ -297,7 +299,10 @@ def _include_tasks_in_blocks(
if not block._implicit and block._role is None:
# Here we have an explicit block. Ansible internally converts all normal tasks to Block
block_node = BlockNode(
- str(block.name), when=convert_when_to_str(block.when), raw_object=block
+ str(block.name),
+ when=convert_when_to_str(block.when),
+ raw_object=block,
+ parent=parent_nodes[-1],
)
parent_nodes[-1].add_node(f"{node_type}s", block_node)
parent_nodes.append(block_node)
@@ -341,6 +346,7 @@ def _include_tasks_in_blocks(
node_id="role_" + hash_value(task_or_block._role_name),
when=convert_when_to_str(task_or_block.when),
raw_object=task_or_block,
+ parent=parent_nodes[-1],
include_role=True,
)
parent_nodes[-1].add_node(
diff --git a/ansibleplaybookgrapher/postprocessor.py b/ansibleplaybookgrapher/postprocessor.py
index 1acfabd3..8dfe6dac 100644
--- a/ansibleplaybookgrapher/postprocessor.py
+++ b/ansibleplaybookgrapher/postprocessor.py
@@ -137,10 +137,10 @@ def _insert_links(self, playbook_node: PlaybookNode):
display.vv(f"Inserting links structure for the playbook '{playbook_node.name}'")
links_structure = playbook_node.links_structure()
- for node_id, node_links in links_structure.items():
+ for node, node_links in links_structure.items():
# Find the group g with the specified id
xpath_result = self.root.xpath(
- f"ns:g/*[@id='{node_id}']", namespaces={"ns": SVG_NAMESPACE}
+ f"ns:g/*[@id='{node.id}']", namespaces={"ns": SVG_NAMESPACE}
)
if xpath_result:
element = xpath_result[0]
@@ -151,7 +151,7 @@ def _insert_links(self, playbook_node: PlaybookNode):
"link",
attrib={
"target": link.id,
- "edge": f"edge_{counter}_{node_id}_{link.id}",
+ "edge": f"edge_{counter}_{node.id}_{link.id}",
},
)
)
diff --git a/ansibleplaybookgrapher/utils.py b/ansibleplaybookgrapher/utils.py
index 10c78c37..4b3e7f17 100644
--- a/ansibleplaybookgrapher/utils.py
+++ b/ansibleplaybookgrapher/utils.py
@@ -18,7 +18,7 @@
from collections import defaultdict
from itertools import chain
from operator import methodcaller
-from typing import Tuple, List, Dict, Any
+from typing import Tuple, List, Dict, Any, Set
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_text
diff --git a/tests/fixtures/include_role.yml b/tests/fixtures/include_role.yml
index c745c914..04c12b6d 100644
--- a/tests/fixtures/include_role.yml
+++ b/tests/fixtures/include_role.yml
@@ -6,12 +6,13 @@
- fake_role
- display_some_facts
tasks:
- - name: (0) First Include role (with loop)
- include_role:
- name: '{{ item }}'
- loop:
- - fake_role
- - display_some_facts
+ - block:
+ - name: (0) First Include role (with loop)
+ include_role:
+ name: '{{ item }}'
+ loop:
+ - fake_role
+ - display_some_facts
- name: (1) Debug
debug:
diff --git a/tests/fixtures/roles/nested_include_role/tasks/main.yml b/tests/fixtures/roles/nested_include_role/tasks/main.yml
index 58fde0bd..6ab450e6 100644
--- a/tests/fixtures/roles/nested_include_role/tasks/main.yml
+++ b/tests/fixtures/roles/nested_include_role/tasks/main.yml
@@ -9,7 +9,7 @@
name: postgresql
state: started
-- name: Include role
+- name: Include role block
when: x is not defined
include_role:
name: display_some_facts
diff --git a/tests/test_graph.py b/tests/test_graph.py
index 817b6ca9..52cb0ff6 100644
--- a/tests/test_graph.py
+++ b/tests/test_graph.py
@@ -26,17 +26,15 @@ def test_links_structure():
play.add_node("tasks", task_3)
all_links = play.links_structure()
- assert len(all_links) == 2, "The links should contains only 6 elements"
+ assert len(all_links) == 2, "The links should contains only 2 elements"
- assert len(all_links[play.id]) == 2, "The play should be linked to 2 nodes"
+ assert len(all_links[play]) == 2, "The play should be linked to 2 nodes"
for e in [role, task_3]:
- assert (
- e in all_links[play.id]
- ), f"The play should be linked to the task {task_1}"
+ assert e in all_links[play], f"The play should be linked to the task {task_1}"
- assert len(all_links[role.id]) == 2, "The role should be linked to two nodes"
+ assert len(all_links[role]) == 2, "The role should be linked to two nodes"
for e in [task_1, task_2]:
- assert e in all_links[role.id], f"The role should be linked to the edge {e}"
+ assert e in all_links[role], f"The role should be linked to the edge {e}"
def test_get_all_tasks_nodes():
diff --git a/tests/test_parser.py b/tests/test_parser.py
index b9d0b408..e2d9049f 100644
--- a/tests/test_parser.py
+++ b/tests/test_parser.py
@@ -108,12 +108,14 @@ def test_include_role_parsing(grapher_cli: PlaybookGrapherCLI, capsys):
in capsys.readouterr().err
), "A warning should be displayed regarding loop being not supported"
- # first include_role
- include_role_1 = tasks[0]
+ # first include_role using a block
+ block_include_role = tasks[0]
+ assert isinstance(block_include_role, BlockNode)
+ include_role_1 = block_include_role.tasks[0]
assert isinstance(include_role_1, RoleNode)
assert include_role_1.include_role
assert include_role_1.path == os.path.join(FIXTURES_PATH, "include_role.yml")
- assert include_role_1.line == 9, "The first include role should be at line 9"
+ assert include_role_1.line == 10, "The first include role should be at line 9"
assert (
len(include_role_1.tasks) == 0
), "We don't support adding tasks from include_role with loop"
@@ -226,15 +228,20 @@ def test_roles_usage(grapher_cli: PlaybookGrapherCLI):
for role, plays in roles_usage.items():
assert all(
- map(lambda node_id: node_id.startswith("play_"), plays)
+ map(lambda node: node.id.startswith("play_"), plays)
), "All nodes IDs should be play"
- nb_plays = len(plays)
+
+ nb_plays_for_the_role = len(plays)
if role.name == "fake_role":
- assert nb_plays == 3, "The role fake_role is used 3 times in the plays"
+ assert (
+ nb_plays_for_the_role == 3
+ ), "The role fake_role is used 3 times in the plays"
elif role.name == "display_some_facts":
assert (
- nb_plays == 4
+ nb_plays_for_the_role == 4
), "The role display_some_facts is used 4 times in the plays"
elif role.name == "nested_included_role":
- assert nb_plays == 1, "The role nested_included_role is used in 1 play"
+ assert (
+ nb_plays_for_the_role == 1
+ ), "The role nested_included_role is used in 1 play"
diff --git a/tests/test_playbook_grapher.py b/tests/test_playbook_grapher.py
index 9449ac95..8de9dcb6 100644
--- a/tests/test_playbook_grapher.py
+++ b/tests/test_playbook_grapher.py
@@ -241,6 +241,7 @@ def test_include_role(request, include_role_tasks_option, expected_tasks_number)
svg_path=svg_path,
playbook_paths=playbook_paths,
plays_number=1,
+ blocks_number=1,
tasks_number=expected_tasks_number,
roles_number=3,
)