Skip to content

Commit

Permalink
[#4071] Add metrics feature (#4235)
Browse files Browse the repository at this point in the history
* first cut at supporting metrics definitions

* teach dbt about metrics

* wip

* support partial parsing for metrics

* working on tests

* Fix some tests

* Add partial parsing metrics test

* Fix some more tests

* Update CHANGELOG.md

* Fix partial parsing yaml file to correct model syntax

Co-authored-by: Drew Banin <drew@fishtownanalytics.com>

automatic commit by git-black, original commits:
  855419d
  • Loading branch information
gshank authored and iknox-fa committed Feb 8, 2022
1 parent 5f6a9b0 commit 674f354
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 89 deletions.
2 changes: 1 addition & 1 deletion core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def print_compile_stats(stats):
NodeType.Seed: "seed file",
NodeType.Source: "source",
NodeType.Exposure: "exposure",
NodeType.Metric: 'metric'
NodeType.Metric: "metric",
}

results = {k: 0 for k in names.keys()}
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,7 @@ def __call__(self, *args) -> str:
ref_invalid_args(self.model, args)
self.validate_args(name, package)
self.model.refs.append(list(args))
return ''
return ""

def validate_args(self, name, package):
if not isinstance(name, str):
Expand All @@ -1384,7 +1384,7 @@ def generate_parse_metrics(
) -> Dict[str, Any]:
project = config.load_dependencies()[package_name]
return {
'ref': MetricRefResolver(
"ref": MetricRefResolver(
None,
metric,
project,
Expand Down
17 changes: 10 additions & 7 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
)
from dbt.contracts.graph.parsed import (
ParsedMacro, ParsedDocumentation,
ParsedSourceDefinition, ParsedExposure, ParsedMetric,
HasUniqueID, UnpatchedSourceDefinition, ManifestNodes
ParsedDocumentation,
ParsedSourceDefinition,
ParsedExposure,
ParsedMetric,
HasUniqueID,
UnpatchedSourceDefinition,
ManifestNodes,
)
from dbt.contracts.graph.unparsed import SourcePatch
from dbt.contracts.files import SourceFile, SchemaSourceFile, FileHash, AnySourceFile
Expand Down Expand Up @@ -714,7 +719,7 @@ def get_resource_fqns(self) -> Mapping[str, PathSet]:
self.exposures.values(),
self.nodes.values(),
self.sources.values(),
self.metrics.values()
self.metrics.values(),
)
for resource in all_resources:
resource_type_plural = resource.resource_type.pluralize()
Expand Down Expand Up @@ -756,7 +761,7 @@ def build_parent_and_child_maps(self):
self.nodes.values(),
self.sources.values(),
self.exposures.values(),
self.metrics.values(),
self.exposures.values(),
))
forward_edges, backward_edges = build_node_edges(edge_members)
self.child_map = forward_edges
Expand Down Expand Up @@ -1126,9 +1131,7 @@ class WritableManifest(ArtifactMixin):
))
)
metrics: Mapping[UniqueID, ParsedMetric] = field(
metadata=dict(description=(
'The metrics defined in the dbt project and its dependencies'
))
metadata=dict(description=("The metrics defined in the dbt project and its dependencies"))
)
selectors: Mapping[UniqueID, Any] = field(
metadata=dict(description=(
Expand Down
54 changes: 33 additions & 21 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,19 @@
UnparsedBaseNode, FreshnessThreshold, ExternalTable,
HasYamlMetadata, MacroArgument, UnparsedSourceDefinition,
UnparsedSourceTableDefinition, UnparsedColumn, TestDef,
ExposureOwner, ExposureType, MaturityType, MetricFilter
UnparsedBaseNode,
FreshnessThreshold,
ExternalTable,
HasYamlMetadata,
MacroArgument,
UnparsedSourceDefinition,
UnparsedSourceTableDefinition,
UnparsedColumn,
TestDef,
ExposureOwner,
ExposureType,
MaturityType,
MetricFilter,
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.exceptions import warn_or_error
Expand Down Expand Up @@ -848,50 +860,50 @@ def depends_on_nodes(self):
def search_name(self):
return self.name

def same_model(self, old: 'ParsedMetric') -> bool:
def same_model(self, old: "ParsedMetric") -> bool:
return self.model == old.model

def same_dimensions(self, old: 'ParsedMetric') -> bool:
def same_dimensions(self, old: "ParsedMetric") -> bool:
return self.dimensions == old.dimensions

def same_filters(self, old: 'ParsedMetric') -> bool:
def same_filters(self, old: "ParsedMetric") -> bool:
return self.filters == old.filters

def same_description(self, old: 'ParsedMetric') -> bool:
def same_description(self, old: "ParsedMetric") -> bool:
return self.description == old.description

def same_label(self, old: 'ParsedMetric') -> bool:
def same_label(self, old: "ParsedMetric") -> bool:
return self.label == old.label

def same_type(self, old: 'ParsedMetric') -> bool:
def same_type(self, old: "ParsedMetric") -> bool:
return self.type == old.type

def same_sql(self, old: 'ParsedMetric') -> bool:
def same_sql(self, old: "ParsedMetric") -> bool:
return self.sql == old.sql

def same_timestamp(self, old: 'ParsedMetric') -> bool:
def same_timestamp(self, old: "ParsedMetric") -> bool:
return self.timestamp == old.timestamp

def same_time_grains(self, old: 'ParsedMetric') -> bool:
def same_time_grains(self, old: "ParsedMetric") -> bool:
return self.time_grains == old.time_grains

def same_contents(self, old: Optional['ParsedMetric']) -> bool:
def same_contents(self, old: Optional["ParsedMetric"]) -> bool:
# existing when it didn't before is a change!
# metadata/tags changes are not "changes"
if old is None:
return True

return (
self.same_model(old) and
self.same_dimensions(old) and
self.same_filters(old) and
self.same_description(old) and
self.same_label(old) and
self.same_type(old) and
self.same_sql(old) and
self.same_timestamp(old) and
self.same_time_grains(old) and
True
self.same_model(old)
and self.same_dimensions(old)
and self.same_filters(old)
and self.same_description(old)
and self.same_label(old)
and self.same_type(old)
and self.same_sql(old)
and self.same_timestamp(old)
and self.same_time_grains(old)
and True
)


Expand Down
2 changes: 1 addition & 1 deletion core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ class UnparsedMetric(dbtClassMixin, Replaceable):
name: str
label: str
type: str
description: str = ''
description: str = ""
sql: Optional[str] = None
timestamp: Optional[str] = None
time_grains: List[str] = field(default_factory=list)
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/graph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

INTERSECTION_DELIMITER = ","

DEFAULT_INCLUDES: List[str] = ['fqn:*', 'source:*', 'exposure:*', 'metric:*']
DEFAULT_INCLUDES: List[str] = ["fqn:*", "source:*", "exposure:*", "metric:*"]
DEFAULT_EXCLUDES: List[str] = []


Expand Down
27 changes: 14 additions & 13 deletions core/dbt/graph/selector_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class MethodName(StrEnum):
ResourceType = 'resource_type'
State = 'state'
Exposure = 'exposure'
Metric = 'metric'
Metric = "metric"
Result = 'result'


Expand Down Expand Up @@ -122,8 +122,7 @@ def exposure_nodes(
yield unique_id, exposure

def metric_nodes(
self,
included_nodes: Set[UniqueId]
self, included_nodes: Set[UniqueId]
) -> Iterator[Tuple[UniqueId, ParsedMetric]]:

for key, metric in self.manifest.metrics.items():
Expand All @@ -138,8 +137,10 @@ def all_nodes(
) -> Iterator[Tuple[UniqueId, SelectorTarget]]:
yield from chain(self.parsed_nodes(included_nodes),
self.source_nodes(included_nodes),
self.exposure_nodes(included_nodes),
self.metric_nodes(included_nodes))
self.source_nodes(included_nodes),
self.exposure_nodes(included_nodes),
self.metric_nodes(included_nodes),
)

def configurable_nodes(
self,
Expand All @@ -153,8 +154,10 @@ def non_source_nodes(
included_nodes: Set[UniqueId],
) -> Iterator[Tuple[UniqueId, Union[ParsedExposure, ManifestNode, ParsedMetric]]]:
yield from chain(self.parsed_nodes(included_nodes),
self.exposure_nodes(included_nodes),
self.metric_nodes(included_nodes))
self.parsed_nodes(included_nodes),
self.exposure_nodes(included_nodes),
self.metric_nodes(included_nodes),
)

@abc.abstractmethod
def search(
Expand Down Expand Up @@ -267,10 +270,8 @@ def search(


class MetricSelectorMethod(SelectorMethod):
def search(
self, included_nodes: Set[UniqueId], selector: str
) -> Iterator[UniqueId]:
parts = selector.split('.')
def search(self, included_nodes: Set[UniqueId], selector: str) -> Iterator[UniqueId]:
parts = selector.split(".")
target_package = SELECTOR_GLOB
if len(parts) == 1:
target_name = parts[0]
Expand All @@ -279,8 +280,8 @@ def search(
else:
msg = (
'Invalid metric selector value "{}". Metrics must be of '
'the form ${{metric_name}} or '
'${{metric_package.metric_name}}'
"the form ${{metric_name}} or "
"${{metric_package.metric_name}}"
).format(selector)
raise RuntimeException(msg)

Expand Down
4 changes: 2 additions & 2 deletions core/dbt/node_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class NodeType(StrEnum):
Source = 'source'
Macro = "macro"
Exposure = 'exposure'
Metric = 'metric'
Metric = "metric"

@classmethod
def executable(cls) -> List["NodeType"]:
Expand Down Expand Up @@ -51,7 +51,7 @@ def documentable(cls) -> List['NodeType']:
cls.Macro,
cls.Analysis,
cls.Exposure,
cls.Metric
cls.Metric,
]

def pluralize(self) -> str:
Expand Down
32 changes: 15 additions & 17 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@
Manifest, Disabled, MacroManifest, ManifestStateCheck, ParsingInfo
)
from dbt.contracts.graph.parsed import (
ParsedSourceDefinition, ParsedNode, ParsedMacro, ColumnInfo, ParsedExposure, ParsedMetric
ParsedSourceDefinition,
ParsedNode,
ParsedMacro,
ColumnInfo,
ParsedExposure,
ParsedMetric,
)
from dbt.contracts.util import Writable
from dbt.exceptions import (
Expand Down Expand Up @@ -1026,9 +1031,7 @@ def _process_docs_for_exposure(
exposure.description = get_rendered(exposure.description, context)


def _process_docs_for_metrics(
context: Dict[str, Any], metric: ParsedMetric
) -> None:
def _process_docs_for_metrics(context: Dict[str, Any], metric: ParsedMetric) -> None:
metric.description = get_rendered(metric.description, context)


Expand Down Expand Up @@ -1073,9 +1076,7 @@ def _process_refs_for_exposure(
manifest.update_exposure(exposure)


def _process_refs_for_metric(
manifest: Manifest, current_project: str, metric: ParsedMetric
):
def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: ParsedMetric):
"""Given a manifest and a metric in that manifest, process its refs"""
for ref in metric.refs:
target_model: Optional[Union[Disabled, ManifestNode]] = None
Expand All @@ -1088,7 +1089,7 @@ def _process_refs_for_metric(
target_model_package, target_model_name = ref
else:
raise dbt.exceptions.InternalException(
f'Refs should always be 1 or 2 arguments - got {len(ref)}'
f"Refs should always be 1 or 2 arguments - got {len(ref)}"
)

target_model = manifest.resolve_ref(
Expand All @@ -1102,8 +1103,10 @@ def _process_refs_for_metric(
# This may raise. Even if it doesn't, we don't want to add
# this exposure to the graph b/c there is no destination exposure
invalid_ref_fail_unless_test(
metric, target_model_name, target_model_package,
disabled=(isinstance(target_model, Disabled))
metric,
target_model_name,
target_model_package,
disabled=(isinstance(target_model, Disabled)),
)

continue
Expand Down Expand Up @@ -1186,9 +1189,7 @@ def _process_sources_for_exposure(
manifest.update_exposure(exposure)


def _process_sources_for_metric(
manifest: Manifest, current_project: str, metric: ParsedMetric
):
def _process_sources_for_metric(manifest: Manifest, current_project: str, metric: ParsedMetric):
target_source: Optional[Union[Disabled, ParsedSourceDefinition]] = None
for source_name, table_name in metric.sources:
target_source = manifest.resolve_source(
Expand All @@ -1199,10 +1200,7 @@ def _process_sources_for_metric(
)
if target_source is None or isinstance(target_source, Disabled):
invalid_source_fail_unless_test(
metric,
source_name,
table_name,
disabled=(isinstance(target_source, Disabled))
metric, source_name, table_name, disabled=(isinstance(target_source, Disabled))
)
continue
target_source_id = target_source.unique_id
Expand Down
Loading

0 comments on commit 674f354

Please sign in to comment.