Skip to content

Commit

Permalink
[#4071] Add metrics feature (#4235)
Browse files Browse the repository at this point in the history
* first cut at supporting metrics definitions

* teach dbt about metrics

* wip

* support partial parsing for metrics

* working on tests

* Fix some tests

* Add partial parsing metrics test

* Fix some more tests

* Update CHANGELOG.md

* Fix partial parsing yaml file to correct model syntax

Co-authored-by: Drew Banin <drew@fishtownanalytics.com>
  • Loading branch information
2 people authored and jtcohen6 committed Nov 9, 2021
1 parent 83ba864 commit 07af7eb
Show file tree
Hide file tree
Showing 28 changed files with 863 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Allow nullable `error_after` in source freshness ([#3874](https://github.com/dbt-labs/dbt-core/issues/3874), [#3955](https://github.com/dbt-labs/dbt-core/pull/3955))
- Increase performance of graph subset selection ([#4135](https://github.com/dbt-labs/dbt-core/issues/4135),[#4155](https://github.com/dbt-labs/dbt-core/pull/4155))
- Speed up node selection by skipping indirect node incorporation when not needed ([#4213](https://github.com/dbt-labs/dbt-core/issues/4213),[#4214](https://github.com/dbt-labs/dbt-core/pull/4214))
- Add metrics nodes ([#4071](https://github.com/dbt-labs/dbt-core/issues/4071), [#4235](https://github.com/dbt-labs/dbt-core/pull/4235))

### Fixes
- Changes unit tests using `assertRaisesRegexp` to `assertRaisesRegex` ([#4136](https://github.com/dbt-labs/dbt-core/issues/4132), [#4136](https://github.com/dbt-labs/dbt-core/pull/4136))
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def print_compile_stats(stats):
NodeType.Seed: 'seed file',
NodeType.Source: 'source',
NodeType.Exposure: 'exposure',
NodeType.Metric: 'metric'
}

results = {k: 0 for k in names.keys()}
Expand Down Expand Up @@ -425,6 +426,8 @@ def link_graph(self, linker: Linker, manifest: Manifest, add_test_edges: bool =
self.link_node(linker, node, manifest)
for exposure in manifest.exposures.values():
self.link_node(linker, exposure, manifest)
for metric in manifest.metrics.values():
self.link_node(linker, metric, manifest)

cycle = linker.find_cycles()

Expand Down
26 changes: 26 additions & 0 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from dbt.contracts.graph.parsed import (
ParsedMacro,
ParsedExposure,
ParsedMetric,
ParsedSeedNode,
ParsedSourceDefinition,
)
Expand Down Expand Up @@ -1386,6 +1387,31 @@ def generate_parse_exposure(
}


class MetricRefResolver(BaseResolver):
def __call__(self, *args) -> str:
if len(args) not in (1, 2):
ref_invalid_args(self.model, args)
self.model.refs.append(list(args))
return ''


def generate_parse_metrics(
metric: ParsedMetric,
config: RuntimeConfig,
manifest: Manifest,
package_name: str,
) -> Dict[str, Any]:
project = config.load_dependencies()[package_name]
return {
'ref': MetricRefResolver(
None,
metric,
project,
manifest,
),
}


# This class is currently used by the schema parser in order
# to limit the number of macros in the context by using
# the TestMacroNamespace
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class SchemaSourceFile(BaseSourceFile):
tests: Dict[str, Any] = field(default_factory=dict)
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
# node patches contain models, seeds, snapshots, analyses
ndp: List[str] = field(default_factory=list)
# any macro patches in this file by macro unique_id.
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/contracts/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
ParsedHookNode,
ParsedModelNode,
ParsedExposure,
ParsedMetric,
ParsedResource,
ParsedRPCNode,
ParsedSqlNode,
Expand Down Expand Up @@ -232,8 +233,10 @@ def parsed_instance_for(compiled: CompiledNode) -> ParsedResource:
ParsedSourceDefinition,
]

# anything that participates in the graph: sources, exposures, manifest nodes
# anything that participates in the graph: sources, exposures, metrics,
# or manifest nodes
GraphMemberNode = Union[
CompileResultNode,
ParsedExposure,
ParsedMetric,
]
35 changes: 32 additions & 3 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
)
from dbt.contracts.graph.parsed import (
ParsedMacro, ParsedDocumentation,
ParsedSourceDefinition, ParsedExposure, HasUniqueID,
UnpatchedSourceDefinition, ManifestNodes
ParsedSourceDefinition, ParsedExposure, ParsedMetric,
HasUniqueID, UnpatchedSourceDefinition, ManifestNodes
)
from dbt.contracts.graph.unparsed import SourcePatch
from dbt.contracts.files import SourceFile, SchemaSourceFile, FileHash, AnySourceFile
Expand Down Expand Up @@ -564,6 +564,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
macros: MutableMapping[str, ParsedMacro] = field(default_factory=dict)
docs: MutableMapping[str, ParsedDocumentation] = field(default_factory=dict)
exposures: MutableMapping[str, ParsedExposure] = field(default_factory=dict)
metrics: MutableMapping[str, ParsedMetric] = field(default_factory=dict)
selectors: MutableMapping[str, Any] = field(default_factory=dict)
files: MutableMapping[str, AnySourceFile] = field(default_factory=dict)
metadata: ManifestMetadata = field(default_factory=ManifestMetadata)
Expand Down Expand Up @@ -631,6 +632,9 @@ def sync_update_node(
def update_exposure(self, new_exposure: ParsedExposure):
_update_into(self.exposures, new_exposure)

def update_metric(self, new_metric: ParsedMetric):
_update_into(self.metrics, new_metric)

def update_node(self, new_node: ManifestNode):
_update_into(self.nodes, new_node)

Expand All @@ -648,6 +652,10 @@ def build_flat_graph(self):
k: v.to_dict(omit_none=False)
for k, v in self.exposures.items()
},
'metrics': {
k: v.to_dict(omit_none=False)
for k, v in self.metrics.items()
},
'nodes': {
k: v.to_dict(omit_none=False)
for k, v in self.nodes.items()
Expand Down Expand Up @@ -700,7 +708,12 @@ def find_materialization_macro_by_name(

def get_resource_fqns(self) -> Mapping[str, PathSet]:
resource_fqns: Dict[str, Set[Tuple[str, ...]]] = {}
all_resources = chain(self.exposures.values(), self.nodes.values(), self.sources.values())
all_resources = chain(
self.exposures.values(),
self.nodes.values(),
self.sources.values(),
self.metrics.values()
)
for resource in all_resources:
resource_type_plural = resource.resource_type.pluralize()
if resource_type_plural not in resource_fqns:
Expand Down Expand Up @@ -728,6 +741,7 @@ def deepcopy(self):
macros={k: _deepcopy(v) for k, v in self.macros.items()},
docs={k: _deepcopy(v) for k, v in self.docs.items()},
exposures={k: _deepcopy(v) for k, v in self.exposures.items()},
metrics={k: _deepcopy(v) for k, v in self.metrics.items()},
selectors={k: _deepcopy(v) for k, v in self.selectors.items()},
metadata=self.metadata,
disabled={k: _deepcopy(v) for k, v in self.disabled.items()},
Expand All @@ -740,6 +754,7 @@ def build_parent_and_child_maps(self):
self.nodes.values(),
self.sources.values(),
self.exposures.values(),
self.metrics.values(),
))
forward_edges, backward_edges = build_node_edges(edge_members)
self.child_map = forward_edges
Expand All @@ -761,6 +776,7 @@ def writable_manifest(self):
macros=self.macros,
docs=self.docs,
exposures=self.exposures,
metrics=self.metrics,
selectors=self.selectors,
metadata=self.metadata,
disabled=self.disabled,
Expand All @@ -780,6 +796,8 @@ def expect(self, unique_id: str) -> GraphMemberNode:
return self.sources[unique_id]
elif unique_id in self.exposures:
return self.exposures[unique_id]
elif unique_id in self.metrics:
return self.metrics[unique_id]
else:
# something terrible has happened
raise dbt.exceptions.InternalException(
Expand Down Expand Up @@ -1008,6 +1026,11 @@ def add_exposure(self, source_file: SchemaSourceFile, exposure: ParsedExposure):
self.exposures[exposure.unique_id] = exposure
source_file.exposures.append(exposure.unique_id)

def add_metric(self, source_file: SchemaSourceFile, metric: ParsedMetric):
_check_duplicates(metric, self.metrics)
self.metrics[metric.unique_id] = metric
source_file.metrics.append(metric.unique_id)

def add_disabled_nofile(self, node: CompileResultNode):
# There can be multiple disabled nodes for the same unique_id
if node.unique_id in self.disabled:
Expand Down Expand Up @@ -1044,6 +1067,7 @@ def __reduce_ex__(self, protocol):
self.macros,
self.docs,
self.exposures,
self.metrics,
self.selectors,
self.files,
self.metadata,
Expand Down Expand Up @@ -1101,6 +1125,11 @@ class WritableManifest(ArtifactMixin):
'The exposures defined in the dbt project and its dependencies'
))
)
metrics: Mapping[UniqueID, ParsedMetric] = field(
metadata=dict(description=(
'The metrics defined in the dbt project and its dependencies'
))
)
selectors: Mapping[UniqueID, Any] = field(
metadata=dict(description=(
'The selectors defined in selectors.yml'
Expand Down
78 changes: 77 additions & 1 deletion core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
UnparsedBaseNode, FreshnessThreshold, ExternalTable,
HasYamlMetadata, MacroArgument, UnparsedSourceDefinition,
UnparsedSourceTableDefinition, UnparsedColumn, TestDef,
ExposureOwner, ExposureType, MaturityType
ExposureOwner, ExposureType, MaturityType, MetricFilter
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.exceptions import warn_or_error
Expand Down Expand Up @@ -783,6 +783,81 @@ def same_contents(self, old: Optional['ParsedExposure']) -> bool:
)


@dataclass
class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
model: str
name: str
description: str
label: str
type: str
sql: Optional[str]
timestamp: Optional[str]
filters: List[MetricFilter]
time_grains: List[str]
dimensions: List[str]
resource_type: NodeType = NodeType.Metric
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
sources: List[List[str]] = field(default_factory=list)
depends_on: DependsOn = field(default_factory=DependsOn)
refs: List[List[str]] = field(default_factory=list)
created_at: int = field(default_factory=lambda: int(time.time()))

@property
def depends_on_nodes(self):
return self.depends_on.nodes

@property
def search_name(self):
return self.name

def same_model(self, old: 'ParsedMetric') -> bool:
return self.model == old.model

def same_dimensions(self, old: 'ParsedMetric') -> bool:
return self.dimensions == old.dimensions

def same_filters(self, old: 'ParsedMetric') -> bool:
return self.filters == old.filters

def same_description(self, old: 'ParsedMetric') -> bool:
return self.description == old.description

def same_label(self, old: 'ParsedMetric') -> bool:
return self.label == old.label

def same_type(self, old: 'ParsedMetric') -> bool:
return self.type == old.type

def same_sql(self, old: 'ParsedMetric') -> bool:
return self.sql == old.sql

def same_timestamp(self, old: 'ParsedMetric') -> bool:
return self.timestamp == old.timestamp

def same_time_grains(self, old: 'ParsedMetric') -> bool:
return self.time_grains == old.time_grains

def same_contents(self, old: Optional['ParsedMetric']) -> bool:
# existing when it didn't before is a change!
# metadata/tags changes are not "changes"
if old is None:
return True

return (
self.same_model(old) and
self.same_dimensions(old) and
self.same_filters(old) and
self.same_description(old) and
self.same_label(old) and
self.same_type(old) and
self.same_sql(old) and
self.same_timestamp(old) and
self.same_time_grains(old) and
True
)


ManifestNodes = Union[
ParsedAnalysisNode,
ParsedSingularTestNode,
Expand All @@ -801,5 +876,6 @@ def same_contents(self, old: Optional['ParsedExposure']) -> bool:
ParsedMacro,
ParsedNode,
ParsedExposure,
ParsedMetric,
ParsedSourceDefinition,
]
24 changes: 24 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,27 @@ class UnparsedExposure(dbtClassMixin, Replaceable):
tags: List[str] = field(default_factory=list)
url: Optional[str] = None
depends_on: List[str] = field(default_factory=list)


@dataclass
class MetricFilter(dbtClassMixin, Replaceable):
field: str
operator: str
# TODO : Can we make this Any?
value: str


@dataclass
class UnparsedMetric(dbtClassMixin, Replaceable):
model: str
name: str
label: str
type: str
description: str = ''
sql: Optional[str] = None
timestamp: Optional[str] = None
time_grains: List[str] = field(default_factory=list)
dimensions: List[str] = field(default_factory=list)
filters: List[MetricFilter] = field(default_factory=list)
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
2 changes: 1 addition & 1 deletion core/dbt/graph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

INTERSECTION_DELIMITER = ','

DEFAULT_INCLUDES: List[str] = ['fqn:*', 'source:*', 'exposure:*']
DEFAULT_INCLUDES: List[str] = ['fqn:*', 'source:*', 'exposure:*', 'metric:*']
DEFAULT_EXCLUDES: List[str] = []


Expand Down
6 changes: 3 additions & 3 deletions core/dbt/graph/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Dict, Set, List, Generator, Optional

from .graph import UniqueId
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure, ParsedMetric
from dbt.contracts.graph.compiled import GraphMemberNode
from dbt.contracts.graph.manifest import Manifest
from dbt.node_types import NodeType
Expand Down Expand Up @@ -47,8 +47,8 @@ def _include_in_cost(self, node_id: UniqueId) -> bool:
node = self.manifest.expect(node_id)
if node.resource_type != NodeType.Model:
return False
# must be a Model - tell mypy this won't be a Source or Exposure
assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure))
# must be a Model - tell mypy this won't be a Source or Exposure or Metric
assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure, ParsedMetric))
if node.is_ephemeral:
return False
return True
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def _is_graph_member(self, unique_id: UniqueId) -> bool:
return source.config.enabled
elif unique_id in self.manifest.exposures:
return True
elif unique_id in self.manifest.metrics:
return True
node = self.manifest.nodes[unique_id]
return not node.empty and node.config.enabled

Expand All @@ -184,6 +186,8 @@ def _is_match(self, unique_id: UniqueId) -> bool:
node = self.manifest.sources[unique_id]
elif unique_id in self.manifest.exposures:
node = self.manifest.exposures[unique_id]
elif unique_id in self.manifest.metrics:
node = self.manifest.metrics[unique_id]
else:
raise InternalException(
f'Node {unique_id} not found in the manifest!'
Expand Down
Loading

0 comments on commit 07af7eb

Please sign in to comment.