Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored the methods, improved naming, removed unnecesary variables, added typing. #1

Open
wants to merge 2 commits into
base: dbt-build-performance-fix
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 33 additions & 28 deletions core/dbt/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,29 @@ def select_successors(self, selected: Set[UniqueId]) -> Set[UniqueId]:
successors.update(self.graph.successors(node))
return successors

def trim_graph(self, trimgraph: nx.DiGraph, lookup_node, all_nodes: set, selected_nodes: set):
"""introduced to boost performance in selecting nodes for dbt build,
this process untangles unnecessary nodes from lookup node without effecting selected nodes
"""
ancestors_set = set(nx.ancestors(trimgraph,lookup_node))
predecessors_set = set(nx.predecessor(trimgraph,lookup_node)).difference({lookup_node}) # make sure to remove self node

upflow_common_list = selected_nodes.intersection(ancestors_set)
downflow_common_list = selected_nodes.intersection(predecessors_set)
def trim_unvisited_nodes(
self, target_graph: nx.DiGraph, selected_nodes: Set[UniqueId]
) -> "Graph":
"""Method that modifies the graph by removing unnecessary nodes
from graph without effecting selection."""

all_nodes: Set[UniqueId] = set(target_graph.nodes)

for node in selected_nodes:
ancestors: Set[UniqueId] = set(nx.ancestors(target_graph, node))
predecessors: Set[UniqueId] = set(nx.predecessor(target_graph, node))
predecessors = predecessors.difference({node})
visited_nodes = predecessors.union(ancestors).union(selected_nodes)
unvisited_nodes = all_nodes.difference(visited_nodes)

if not selected_nodes.intersection(predecessors):
target_graph.remove_nodes_from(predecessors)
if not selected_nodes.intersection(ancestors):
target_graph.remove_nodes_from(ancestors)
target_graph.remove_nodes_from(unvisited_nodes)


if not upflow_common_list:
trimgraph.remove_nodes_from(ancestors_set)
if not downflow_common_list:
trimgraph.remove_nodes_from(predecessors_set)

touched_nodes = ancestors_set.union(predecessors_set).union(selected_nodes)
untouched_nodes = all_nodes.difference(touched_nodes)
if untouched_nodes:
trimgraph.remove_nodes_from(untouched_nodes)

def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph":
"""Create and return a new graph that is a shallow copy of the graph,
but with only the nodes in include_nodes. Transitive edges across
Expand All @@ -110,18 +113,20 @@ def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph":
if non_cyclic_new_edges:
new_graph.add_edges_from(non_cyclic_new_edges)

# for node in self:
# if node not in include_nodes:
# source_nodes = [x for x, _ in new_graph.in_edges(node)]
# target_nodes = [x for _, x in new_graph.out_edges(node)]

# new_edges = product(source_nodes, target_nodes)
# non_cyclic_new_edges = [
# (source, target) for source, target in new_edges if source != target
# ] # removes cyclic refs
self.trim_unvisited_nodes(new_graph, include_nodes)

all_nodes = set(new_graph.nodes)
nodes_to_remove = all_nodes - include_nodes

for node in nodes_to_remove:
possible_edges = product(new_graph.predecessors(node), new_graph.successors(node))
non_cyclic_edges = [
(source, target) for source, target in possible_edges if source != target
]
new_graph.remove_node(node)
new_graph.add_edges_from(non_cyclic_edges)

# new_graph.add_edges_from(non_cyclic_new_edges)
# new_graph.remove_node(node)

for node in include_nodes:
if node not in new_graph:
Expand Down