Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CT-2414: Add graph summaries to target directory output #7358

Merged
merged 8 commits into from
Apr 28, 2023
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230425-142522.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add graph structure summaries to target path output
time: 2023-04-25T14:25:22.269051-04:00
custom:
Author: peterallenwebb
Issue: "7357"
57 changes: 49 additions & 8 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import argparse
import json

import networkx as nx # type: ignore
import os
import pickle
Expand Down Expand Up @@ -27,8 +29,8 @@
DbtRuntimeError,
)
from dbt.graph import Graph
from dbt.events.functions import fire_event
from dbt.events.types import FoundStats, WritingInjectedSQLForNode
from dbt.events.functions import fire_event, get_invocation_id
from dbt.events.types import FoundStats, Note, WritingInjectedSQLForNode
from dbt.events.contextvars import get_node_info
from dbt.node_types import NodeType, ModelLanguage
from dbt.events.format import pluralize
Expand Down Expand Up @@ -161,6 +163,25 @@ def write_graph(self, outfile: str, manifest: Manifest):
with open(outfile, "wb") as outfh:
pickle.dump(out_graph, outfh, protocol=pickle.HIGHEST_PROTOCOL)

def get_graph_summary(self, manifest: Manifest) -> Dict[int, Dict[str, Any]]:
"""Create a smaller summary of the graph, suitable for basic diagnostics
and performance tuning. The summary includes only the edge structure,
node types, and node names. Each of the n nodes is assigned an integer
index 0, 1, 2,..., n-1 for compactness"""
graph_nodes = dict()
index_dict = dict()
for node_index, node_name in enumerate(self.graph):
index_dict[node_name] = node_index
data = manifest.expect(node_name).to_dict(omit_none=True)
graph_nodes[node_index] = {"name": node_name, "type": data["resource_type"]}

for node_index, node in graph_nodes.items():
successors = [index_dict[n] for n in self.graph.successors(node["name"])]
if successors:
node["succ"] = [index_dict[n] for n in self.graph.successors(node["name"])]

return graph_nodes


class Compiler:
def __init__(self, config):
Expand Down Expand Up @@ -405,7 +426,7 @@ def link_node(self, linker: Linker, node: GraphMemberNode, manifest: Manifest):
else:
raise GraphDependencyNotFoundError(node, dependency)

def link_graph(self, linker: Linker, manifest: Manifest, add_test_edges: bool = False):
def link_graph(self, linker: Linker, manifest: Manifest):
for source in manifest.sources.values():
linker.add_node(source.unique_id)
for node in manifest.nodes.values():
Expand All @@ -420,10 +441,6 @@ def link_graph(self, linker: Linker, manifest: Manifest, add_test_edges: bool =
if cycle:
raise RuntimeError("Found a cycle: {}".format(cycle))

if add_test_edges:
manifest.build_parent_and_child_maps()
self.add_test_edges(linker, manifest)

def add_test_edges(self, linker: Linker, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
Expand Down Expand Up @@ -480,7 +497,31 @@ def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph
self.initialize()
linker = Linker()

self.link_graph(linker, manifest, add_test_edges)
self.link_graph(linker, manifest)

# Create a file containing basic information about graph structure,
# supporting diagnostics and performance analysis.
summaries: Dict = dict()
summaries["_invocation_id"] = get_invocation_id()
summaries["linked"] = linker.get_graph_summary(manifest)

if add_test_edges:
manifest.build_parent_and_child_maps()
self.add_test_edges(linker, manifest)

# Create another diagnostic summary, just as above, but this time
# including the test edges.
summaries["with_test_edges"] = linker.get_graph_summary(manifest)

with open(os.path.join(self.config.target_path, "graph_summary.json"), "w") as out_stream:
try:
out_stream.write(json.dumps(summaries))
except Exception as e: # This is non-essential information, so merely note failures.
fire_event(
Note(
msg=f"An error was encountered writing the graph summary information: {e}"
)
)

stats = _generate_stats(manifest)

Expand Down
13 changes: 13 additions & 0 deletions tests/functional/compile/test_compile.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json
import pathlib
import pytest

from dbt.cli.main import dbtRunner
Expand Down Expand Up @@ -174,3 +176,14 @@ def test_compile_inline_not_add_node(self, project):
populate_cache=False,
)
assert len(manifest.nodes) == 4

def test_graph_summary_output(self, project):
"""Ensure that the compile command generates a file named graph_summary.json
in the target directory, that the file contains valid json, and that the
json has the high level structure it should."""
dbtRunner().invoke(["compile"])
summary_path = pathlib.Path(project.project_root, "target/graph_summary.json")
with open(summary_path, "r") as summary_file:
summary = json.load(summary_file)
assert "_invocation_id" in summary
assert "linked" in summary