diff --git a/.changes/unreleased/Under the Hood-20230516-094241.yaml b/.changes/unreleased/Under the Hood-20230516-094241.yaml new file mode 100644 index 00000000000..b2f3173f093 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20230516-094241.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Move node patch method to schema parser patch_node_properties and refactor schema + parsing +time: 2023-05-16T09:42:41.793503-04:00 +custom: + Author: gshank + Issue: "7430" diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index f4caefbf6e4..0c4c2e72dcf 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -38,6 +38,7 @@ ResultNode, BaseNode, ManifestOrPublicNode, + ModelNode, ) from dbt.contracts.graph.unparsed import SourcePatch, NodeVersion, UnparsedVersion from dbt.contracts.graph.manifest_upgrade import upgrade_manifest_json @@ -188,7 +189,13 @@ def find( # If this is an unpinned ref (no 'version' arg was passed), # AND this is a versioned node, # AND this ref is being resolved at runtime -- get_node_info != {} - if version is None and node.is_versioned and get_node_info(): + # Only ModelNodes can be versioned. + if ( + isinstance(node, ModelNode) + and version is None + and node.is_versioned + and get_node_info() + ): # Check to see if newer versions are available, and log an "FYI" if so max_version: UnparsedVersion = max( [ diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 26938b37554..94d30364a3d 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -40,13 +40,12 @@ ) from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin from dbt.events.functions import warn_or_error -from dbt.exceptions import ParsingError, InvalidAccessTypeError, ContractBreakingChangeError +from dbt.exceptions import ParsingError, ContractBreakingChangeError from dbt.events.types import ( SeedIncreased, SeedExceedsLimitSamePath, SeedExceedsLimitAndPathChanged, SeedExceedsLimitChecksumChanged, - ValidationWarning, ) from dbt.events.contextvars import set_contextvars from dbt.flags import get_flags @@ -443,63 +442,6 @@ def same_contract(self, old, adapter_type=None) -> bool: # This would only apply to seeds return True - def patch(self, patch: "ParsedNodePatch"): - """Given a ParsedNodePatch, add the new information to the node.""" - # NOTE: Constraint patching is awkwardly done in the parse_patch function - # which calls this one. We need to combine the logic. - - # explicitly pick out the parts to update so we don't inadvertently - # step on the model name or anything - # Note: config should already be updated - self.patch_path: Optional[str] = patch.file_id - # update created_at so process_docs will run in partial parsing - self.created_at = time.time() - self.description = patch.description - self.columns = patch.columns - self.name = patch.name - - # TODO: version, latest_version, and access are specific to ModelNodes, consider splitting out to ModelNode - if self.resource_type != NodeType.Model: - if patch.version: - warn_or_error( - ValidationWarning( - field_name="version", - resource_type=self.resource_type.value, - node_name=patch.name, - ) - ) - if patch.latest_version: - warn_or_error( - ValidationWarning( - field_name="latest_version", - resource_type=self.resource_type.value, - node_name=patch.name, - ) - ) - self.version = patch.version - self.latest_version = patch.latest_version - - # This might not be the ideal place to validate the "access" field, - # but at this point we have the information we need to properly - # validate and we don't before this. - if patch.access: - if self.resource_type == NodeType.Model: - if AccessType.is_valid(patch.access): - self.access = AccessType(patch.access) - else: - raise InvalidAccessTypeError( - unique_id=self.unique_id, - field_value=patch.access, - ) - else: - warn_or_error( - ValidationWarning( - field_name="access", - resource_type=self.resource_type.value, - node_name=patch.name, - ) - ) - def same_contents(self, old, adapter_type) -> bool: if old is None: return False @@ -1015,14 +957,6 @@ class Macro(BaseNode): created_at: float = field(default_factory=lambda: time.time()) supported_languages: Optional[List[ModelLanguage]] = None - def patch(self, patch: "ParsedMacroPatch"): - self.patch_path: Optional[str] = patch.file_id - self.description = patch.description - self.created_at = time.time() - self.meta = patch.meta - self.docs = patch.docs - self.arguments = patch.arguments - def same_contents(self, other: Optional["Macro"]) -> bool: if other is None: return False @@ -1466,6 +1400,7 @@ class ParsedNodePatch(ParsedPatch): access: Optional[str] version: Optional[NodeVersion] latest_version: Optional[NodeVersion] + constraints: List[Dict[str, Any]] @dataclass diff --git a/core/dbt/parser/common.py b/core/dbt/parser/common.py new file mode 100644 index 00000000000..24a0810943b --- /dev/null +++ b/core/dbt/parser/common.py @@ -0,0 +1,222 @@ +from dbt.contracts.graph.unparsed import ( + HasColumnProps, + UnparsedColumn, + UnparsedNodeUpdate, + UnparsedMacroUpdate, + UnparsedAnalysisUpdate, + UnparsedExposure, + UnparsedModelUpdate, +) +from dbt.contracts.graph.unparsed import NodeVersion, HasColumnTests, HasColumnDocs +from dbt.contracts.graph.nodes import ( + UnpatchedSourceDefinition, + ColumnInfo, + ColumnLevelConstraint, + ConstraintType, +) +from dbt.parser.search import FileBlock +from typing import List, Dict, Any, TypeVar, Generic, Union, Optional +from dataclasses import dataclass +from dbt.exceptions import DbtInternalError, ParsingError + + +def trimmed(inp: str) -> str: + if len(inp) < 50: + return inp + return inp[:44] + "..." + inp[-3:] + + +TestDef = Union[str, Dict[str, Any]] + + +Target = TypeVar( + "Target", + UnparsedNodeUpdate, + UnparsedMacroUpdate, + UnparsedAnalysisUpdate, + UnpatchedSourceDefinition, + UnparsedExposure, + UnparsedModelUpdate, +) + + +ColumnTarget = TypeVar( + "ColumnTarget", + UnparsedModelUpdate, + UnparsedNodeUpdate, + UnparsedAnalysisUpdate, + UnpatchedSourceDefinition, +) + +Versioned = TypeVar("Versioned", bound=UnparsedModelUpdate) + +Testable = TypeVar("Testable", UnparsedNodeUpdate, UnpatchedSourceDefinition, UnparsedModelUpdate) + + +@dataclass +class YamlBlock(FileBlock): + data: Dict[str, Any] + + @classmethod + def from_file_block(cls, src: FileBlock, data: Dict[str, Any]): + return cls( + file=src.file, + data=data, + ) + + +@dataclass +class TargetBlock(YamlBlock, Generic[Target]): + target: Target + + @property + def name(self): + return self.target.name + + @property + def columns(self): + return [] + + @property + def tests(self) -> List[TestDef]: + return [] + + @classmethod + def from_yaml_block(cls, src: YamlBlock, target: Target) -> "TargetBlock[Target]": + return cls( + file=src.file, + data=src.data, + target=target, + ) + + +@dataclass +class TargetColumnsBlock(TargetBlock[ColumnTarget], Generic[ColumnTarget]): + @property + def columns(self): + if self.target.columns is None: + return [] + else: + return self.target.columns + + +@dataclass +class TestBlock(TargetColumnsBlock[Testable], Generic[Testable]): + @property + def tests(self) -> List[TestDef]: + if self.target.tests is None: + return [] + else: + return self.target.tests + + @property + def quote_columns(self) -> Optional[bool]: + return self.target.quote_columns + + @classmethod + def from_yaml_block(cls, src: YamlBlock, target: Testable) -> "TestBlock[Testable]": + return cls( + file=src.file, + data=src.data, + target=target, + ) + + +@dataclass +class VersionedTestBlock(TestBlock, Generic[Versioned]): + @property + def columns(self): + if not self.target.versions: + return super().columns + else: + raise DbtInternalError(".columns for VersionedTestBlock with versions") + + @property + def tests(self) -> List[TestDef]: + if not self.target.versions: + return super().tests + else: + raise DbtInternalError(".tests for VersionedTestBlock with versions") + + @classmethod + def from_yaml_block(cls, src: YamlBlock, target: Versioned) -> "VersionedTestBlock[Versioned]": + return cls( + file=src.file, + data=src.data, + target=target, + ) + + +@dataclass +class GenericTestBlock(TestBlock[Testable], Generic[Testable]): + test: Dict[str, Any] + column_name: Optional[str] + tags: List[str] + version: Optional[NodeVersion] + + @classmethod + def from_test_block( + cls, + src: TestBlock, + test: Dict[str, Any], + column_name: Optional[str], + tags: List[str], + version: Optional[NodeVersion], + ) -> "GenericTestBlock": + return cls( + file=src.file, + data=src.data, + target=src.target, + test=test, + column_name=column_name, + tags=tags, + version=version, + ) + + +class ParserRef: + """A helper object to hold parse-time references.""" + + def __init__(self): + self.column_info: Dict[str, ColumnInfo] = {} + + def _add(self, column: HasColumnProps): + tags: List[str] = [] + tags.extend(getattr(column, "tags", ())) + quote: Optional[bool] + if isinstance(column, UnparsedColumn): + quote = column.quote + else: + quote = None + + if any( + c + for c in column.constraints + if "type" not in c or not ConstraintType.is_valid(c["type"]) + ): + raise ParsingError(f"Invalid constraint type on column {column.name}") + + self.column_info[column.name] = ColumnInfo( + name=column.name, + description=column.description, + data_type=column.data_type, + constraints=[ColumnLevelConstraint.from_dict(c) for c in column.constraints], + meta=column.meta, + tags=tags, + quote=quote, + _extra=column.extra, + ) + + @classmethod + def from_target(cls, target: Union[HasColumnDocs, HasColumnTests]) -> "ParserRef": + refs = cls() + for column in target.columns: + refs._add(column) + return refs + + @classmethod + def from_versioned_target(cls, target: Versioned, version: NodeVersion) -> "ParserRef": + refs = cls() + for base_column in target.get_columns_for_version(version): + refs._add(base_column) + return refs diff --git a/core/dbt/parser/generic_test.py b/core/dbt/parser/generic_test.py index fe146419538..88efc3c7dce 100644 --- a/core/dbt/parser/generic_test.py +++ b/core/dbt/parser/generic_test.py @@ -22,7 +22,7 @@ def resource_type(self) -> NodeType: def get_compiled_path(cls, block: FileBlock): return block.path.relative_path - def parse_generic_test( + def create_generic_test_macro( self, block: jinja.BlockTag, base_node: UnparsedMacro, name: str ) -> Macro: unique_id = self.generate_unique_id(name) @@ -76,7 +76,7 @@ def parse_unparsed_generic_test(self, base_node: UnparsedMacro) -> Iterable[Macr continue name: str = generic_test_name.replace(MACRO_PREFIX, "") - node = self.parse_generic_test(block, base_node, name) + node = self.create_generic_test_macro(block, base_node, name) yield node def parse_file(self, block: FileBlock): diff --git a/core/dbt/parser/generic_test_builders.py b/core/dbt/parser/generic_test_builders.py index 34afcaa5ba6..847f2b29f3c 100644 --- a/core/dbt/parser/generic_test_builders.py +++ b/core/dbt/parser/generic_test_builders.py @@ -1,9 +1,7 @@ import re from copy import deepcopy -from dataclasses import dataclass from typing import ( Generic, - TypeVar, Dict, Any, Tuple, @@ -14,12 +12,8 @@ from dbt.clients.jinja import get_rendered, GENERIC_TEST_KWARGS_NAME from dbt.contracts.graph.nodes import UnpatchedSourceDefinition from dbt.contracts.graph.unparsed import ( - TestDef, NodeVersion, - UnparsedAnalysisUpdate, - UnparsedMacroUpdate, UnparsedNodeUpdate, - UnparsedExposure, UnparsedModelUpdate, ) from dbt.exceptions import ( @@ -34,9 +28,8 @@ TestNameNotStringError, UnexpectedTestNamePatternError, UndefinedMacroError, - DbtInternalError, ) -from dbt.parser.search import FileBlock +from dbt.parser.common import Testable from dbt.utils import md5 @@ -83,150 +76,6 @@ def synthesize_generic_test_names( return short_name, full_name -@dataclass -class YamlBlock(FileBlock): - data: Dict[str, Any] - - @classmethod - def from_file_block(cls, src: FileBlock, data: Dict[str, Any]): - return cls( - file=src.file, - data=data, - ) - - -Versioned = TypeVar("Versioned", bound=UnparsedModelUpdate) - -Testable = TypeVar("Testable", UnparsedNodeUpdate, UnpatchedSourceDefinition, UnparsedModelUpdate) - -ColumnTarget = TypeVar( - "ColumnTarget", - UnparsedModelUpdate, - UnparsedNodeUpdate, - UnparsedAnalysisUpdate, - UnpatchedSourceDefinition, -) - -Target = TypeVar( - "Target", - UnparsedNodeUpdate, - UnparsedMacroUpdate, - UnparsedAnalysisUpdate, - UnpatchedSourceDefinition, - UnparsedExposure, - UnparsedModelUpdate, -) - - -@dataclass -class TargetBlock(YamlBlock, Generic[Target]): - target: Target - - @property - def name(self): - return self.target.name - - @property - def columns(self): - return [] - - @property - def tests(self) -> List[TestDef]: - return [] - - @classmethod - def from_yaml_block(cls, src: YamlBlock, target: Target) -> "TargetBlock[Target]": - return cls( - file=src.file, - data=src.data, - target=target, - ) - - -@dataclass -class TargetColumnsBlock(TargetBlock[ColumnTarget], Generic[ColumnTarget]): - @property - def columns(self): - if self.target.columns is None: - return [] - else: - return self.target.columns - - -@dataclass -class TestBlock(TargetColumnsBlock[Testable], Generic[Testable]): - @property - def tests(self) -> List[TestDef]: - if self.target.tests is None: - return [] - else: - return self.target.tests - - @property - def quote_columns(self) -> Optional[bool]: - return self.target.quote_columns - - @classmethod - def from_yaml_block(cls, src: YamlBlock, target: Testable) -> "TestBlock[Testable]": - return cls( - file=src.file, - data=src.data, - target=target, - ) - - -@dataclass -class VersionedTestBlock(TestBlock, Generic[Versioned]): - @property - def columns(self): - if not self.target.versions: - return super().columns - else: - raise DbtInternalError(".columns for VersionedTestBlock with versions") - - @property - def tests(self) -> List[TestDef]: - if not self.target.versions: - return super().tests - else: - raise DbtInternalError(".tests for VersionedTestBlock with versions") - - @classmethod - def from_yaml_block(cls, src: YamlBlock, target: Versioned) -> "VersionedTestBlock[Versioned]": - return cls( - file=src.file, - data=src.data, - target=target, - ) - - -@dataclass -class GenericTestBlock(TestBlock[Testable], Generic[Testable]): - test: Dict[str, Any] - column_name: Optional[str] - tags: List[str] - version: Optional[NodeVersion] - - @classmethod - def from_test_block( - cls, - src: TestBlock, - test: Dict[str, Any], - column_name: Optional[str], - tags: List[str], - version: Optional[NodeVersion], - ) -> "GenericTestBlock": - return cls( - file=src.file, - data=src.data, - target=src.target, - test=test, - column_name=column_name, - tags=tags, - version=version, - ) - - class TestBuilder(Generic[Testable]): """An object to hold assorted test settings and perform basic parsing diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 7a2c55e6af5..3708c263bf1 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -93,6 +93,7 @@ SeedNode, ManifestNode, ResultNode, + ModelNode, ) from dbt.contracts.graph.unparsed import NodeVersion from dbt.contracts.util import Writable @@ -1403,10 +1404,7 @@ def _process_refs_for_exposure(manifest: Manifest, current_project: str, exposur ) continue - elif ( - target_model.resource_type == NodeType.Model - and target_model.access == AccessType.Private - ): + elif isinstance(target_model, ModelNode) and target_model.access == AccessType.Private: # Exposures do not have a group and so can never reference private models raise dbt.exceptions.DbtReferenceError( unique_id=exposure.unique_id, @@ -1455,10 +1453,7 @@ def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: M should_warn_if_disabled=False, ) continue - elif ( - target_model.resource_type == NodeType.Model - and target_model.access == AccessType.Private - ): + elif isinstance(target_model, ModelNode) and target_model.access == AccessType.Private: if not metric.group or metric.group != target_model.group: raise dbt.exceptions.DbtReferenceError( unique_id=metric.unique_id, @@ -1562,10 +1557,7 @@ def _process_refs_for_node(manifest: Manifest, current_project: str, node: Manif continue # Handle references to models that are private - elif ( - target_model.resource_type == NodeType.Model - and target_model.access == AccessType.Private - ): + elif isinstance(target_model, ModelNode) and target_model.access == AccessType.Private: if not node.group or node.group != target_model.group: raise dbt.exceptions.DbtReferenceError( unique_id=node.unique_id, @@ -1698,7 +1690,7 @@ def write_publication_artifact(root_project: RuntimeConfig, manifest: Manifest): public_node_ids = { node.unique_id for node in manifest.nodes.values() - if node.resource_type == NodeType.Model and node.access == AccessType.Public + if isinstance(node, ModelNode) and node.access == AccessType.Public } # Get the Graph object from the Linker @@ -1710,6 +1702,7 @@ def write_publication_artifact(root_project: RuntimeConfig, manifest: Manifest): public_models = {} for unique_id in public_node_ids: model = manifest.nodes[unique_id] + assert isinstance(model, ModelNode) # public_node_dependencies is the intersection of all parent nodes plus public nodes parents: Set[UniqueId] = graph.select_parents({UniqueId(unique_id)}) public_node_dependencies: Set[UniqueId] = parents.intersection(public_node_ids) diff --git a/core/dbt/parser/schema_generic_tests.py b/core/dbt/parser/schema_generic_tests.py new file mode 100644 index 00000000000..590f946bbc7 --- /dev/null +++ b/core/dbt/parser/schema_generic_tests.py @@ -0,0 +1,407 @@ +import pathlib +import itertools +import os + +from typing import List, Dict, Optional, Union, Any +from dbt.parser.base import SimpleParser +from dbt.parser.generic_test_builders import TestBuilder +from dbt.parser.search import FileBlock +from dbt.context.providers import RefArgs, generate_test_context +from dbt.parser.common import ( + TestBlock, + Testable, + TestDef, + GenericTestBlock, + VersionedTestBlock, + trimmed, +) +from dbt.contracts.graph.unparsed import UnparsedNodeUpdate, NodeVersion, UnparsedColumn +from dbt.contracts.graph.nodes import ( + GenericTestNode, + UnpatchedSourceDefinition, + ManifestNode, + GraphMemberNode, +) +from dbt.context.context_config import ContextConfig +from dbt.context.configured import generate_schema_yml_context, SchemaYamlVars +from dbt.dataclass_schema import ValidationError +from dbt.exceptions import SchemaConfigError, CompilationError, ParsingError, TestConfigError +from dbt.contracts.files import FileHash +from dbt.utils import md5, get_pseudo_test_path +from dbt.clients.jinja import get_rendered, add_rendered_test_kwargs +from dbt.adapters.factory import get_adapter, get_adapter_package_names +from dbt.node_types import NodeType +from dbt.context.macro_resolver import MacroResolver + + +# This parser handles the tests that are defined in "schema" (yaml) files, on models, +# sources, etc. The base generic test is handled by the GenericTestParser +class SchemaGenericTestParser(SimpleParser): + def __init__( + self, + project, + manifest, + root_project, + ) -> None: + super().__init__(project, manifest, root_project) + self.schema_yaml_vars = SchemaYamlVars() + self.render_ctx = generate_schema_yml_context( + self.root_project, self.project.project_name, self.schema_yaml_vars + ) + internal_package_names = get_adapter_package_names(self.root_project.credentials.type) + self.macro_resolver = MacroResolver( + self.manifest.macros, self.root_project.project_name, internal_package_names + ) + + @property + def resource_type(self) -> NodeType: + return NodeType.Test + + @classmethod + def get_compiled_path(cls, block: FileBlock) -> str: + return block.path.relative_path + + def parse_file(self, block: FileBlock, dct: Dict = None) -> None: + pass + + def parse_from_dict(self, dct, validate=True) -> GenericTestNode: + if validate: + GenericTestNode.validate(dct) + return GenericTestNode.from_dict(dct) + + def parse_column_tests( + self, block: TestBlock, column: UnparsedColumn, version: Optional[NodeVersion] + ) -> None: + if not column.tests: + return + + for test in column.tests: + self.parse_test(block, test, column, version) + + def create_test_node( + self, + target: Union[UnpatchedSourceDefinition, UnparsedNodeUpdate], + path: str, + config: ContextConfig, + tags: List[str], + fqn: List[str], + name: str, + raw_code: str, + test_metadata: Dict[str, Any], + file_key_name: str, + column_name: Optional[str], + ) -> GenericTestNode: + + HASH_LENGTH = 10 + + # N.B: This function builds a hashable string from any given test_metadata dict. + # it's a bit fragile for general use (only supports str, int, float, List, Dict) + # but it gets the job done here without the overhead of complete ser(de). + def get_hashable_md(data: Union[str, int, float, List, Dict]) -> Union[str, List, Dict]: + if type(data) == dict: + return {k: get_hashable_md(data[k]) for k in sorted(data.keys())} # type: ignore + elif type(data) == list: + return [get_hashable_md(val) for val in data] # type: ignore + else: + return str(data) + + hashable_metadata = repr(get_hashable_md(test_metadata)) + hash_string = "".join([name, hashable_metadata]) + test_hash = md5(hash_string)[-HASH_LENGTH:] + + dct = { + "alias": name, + "schema": self.default_schema, + "database": self.default_database, + "fqn": fqn, + "name": name, + "resource_type": self.resource_type, + "tags": tags, + "path": path, + "original_file_path": target.original_file_path, + "package_name": self.project.project_name, + "raw_code": raw_code, + "language": "sql", + "unique_id": self.generate_unique_id(name, test_hash), + "config": self.config_dict(config), + "test_metadata": test_metadata, + "column_name": column_name, + "checksum": FileHash.empty().to_dict(omit_none=True), + "file_key_name": file_key_name, + } + try: + GenericTestNode.validate(dct) + return GenericTestNode.from_dict(dct) + except ValidationError as exc: + # this is a bit silly, but build an UnparsedNode just for error + # message reasons + node = self._create_error_node( + name=target.name, + path=path, + original_file_path=target.original_file_path, + raw_code=raw_code, + ) + raise TestConfigError(exc, node) + + # This is called directly in the SourcePatcher and by the "parse_node" + # command which is called by the SchemaParser. + def parse_generic_test( + self, + target: Testable, + test: Dict[str, Any], + tags: List[str], + column_name: Optional[str], + schema_file_id: str, + version: Optional[NodeVersion], + ) -> GenericTestNode: + try: + builder = TestBuilder( + test=test, + target=target, + column_name=column_name, + version=version, + package_name=target.package_name, + render_ctx=self.render_ctx, + ) + if self.schema_yaml_vars.env_vars: + self.store_env_vars(target, schema_file_id, self.schema_yaml_vars.env_vars) + self.schema_yaml_vars.env_vars = {} + + except ParsingError as exc: + context = trimmed(str(target)) + msg = "Invalid test config given in {}:\n\t{}\n\t@: {}".format( + target.original_file_path, exc.msg, context + ) + raise ParsingError(msg) from exc + + except CompilationError as exc: + context = trimmed(str(target)) + msg = ( + "Invalid generic test configuration given in " + f"{target.original_file_path}: \n{exc.msg}\n\t@: {context}" + ) + raise CompilationError(msg) from exc + + original_name = os.path.basename(target.original_file_path) + compiled_path = get_pseudo_test_path(builder.compiled_name, original_name) + + # fqn is the relative path of the yaml file where this generic test is defined, + # minus the project-level directory and the file name itself + # TODO pass a consistent path object from both UnparsedNode and UnpatchedSourceDefinition + path = pathlib.Path(target.original_file_path) + relative_path = str(path.relative_to(*path.parts[:1])) + fqn = self.get_fqn(relative_path, builder.fqn_name) + + # this is the ContextConfig that is used in render_update + config: ContextConfig = self.initial_config(fqn) + + # builder.args contains keyword args for the test macro, + # not configs which have been separated out in the builder. + # The keyword args are not completely rendered until compilation. + metadata = { + "namespace": builder.namespace, + "name": builder.name, + "kwargs": builder.args, + } + tags = sorted(set(itertools.chain(tags, builder.tags()))) + + if isinstance(target, UnpatchedSourceDefinition): + file_key_name = f"{target.source.yaml_key}.{target.source.name}" + else: + file_key_name = f"{target.yaml_key}.{target.name}" + + node = self.create_test_node( + target=target, + path=compiled_path, + config=config, + fqn=fqn, + tags=tags, + name=builder.fqn_name, + raw_code=builder.build_raw_code(), + column_name=column_name, + test_metadata=metadata, + file_key_name=file_key_name, + ) + self.render_test_update(node, config, builder, schema_file_id) + + return node + + def _lookup_attached_node( + self, target: Testable, version: Optional[NodeVersion] + ) -> Optional[Union[ManifestNode, GraphMemberNode]]: + """Look up attached node for Testable target nodes other than sources. Can be None if generic test attached to SQL node with no corresponding .sql file.""" + attached_node = None # type: Optional[Union[ManifestNode, GraphMemberNode]] + if not isinstance(target, UnpatchedSourceDefinition): + attached_node_unique_id = self.manifest.ref_lookup.get_unique_id( + target.name, None, version + ) + if attached_node_unique_id: + attached_node = self.manifest.nodes[attached_node_unique_id] + else: + disabled_node = self.manifest.disabled_lookup.find( + target.name, None + ) or self.manifest.disabled_lookup.find(target.name.upper(), None) + if disabled_node: + attached_node = self.manifest.disabled[disabled_node[0].unique_id][0] + return attached_node + + def store_env_vars(self, target, schema_file_id, env_vars): + self.manifest.env_vars.update(env_vars) + if schema_file_id in self.manifest.files: + schema_file = self.manifest.files[schema_file_id] + if isinstance(target, UnpatchedSourceDefinition): + search_name = target.source.name + yaml_key = target.source.yaml_key + if "." in search_name: # source file definitions + (search_name, _) = search_name.split(".") + else: + search_name = target.name + yaml_key = target.yaml_key + for var in env_vars.keys(): + schema_file.add_env_var(var, yaml_key, search_name) + + # This does special shortcut processing for the two + # most common internal macros, not_null and unique, + # which avoids the jinja rendering to resolve config + # and variables, etc, which might be in the macro. + # In the future we will look at generalizing this + # more to handle additional macros or to use static + # parsing to avoid jinja overhead. + def render_test_update(self, node, config, builder, schema_file_id): + macro_unique_id = self.macro_resolver.get_macro_id( + node.package_name, "test_" + builder.name + ) + # Add the depends_on here so we can limit the macros added + # to the context in rendering processing + node.depends_on.add_macro(macro_unique_id) + if macro_unique_id in ["macro.dbt.test_not_null", "macro.dbt.test_unique"]: + config_call_dict = builder.get_static_config() + config._config_call_dict = config_call_dict + # This sets the config from dbt_project + self.update_parsed_node_config(node, config) + # source node tests are processed at patch_source time + if isinstance(builder.target, UnpatchedSourceDefinition): + sources = [builder.target.fqn[-2], builder.target.fqn[-1]] + node.sources.append(sources) + else: # all other nodes + node.refs.append(RefArgs(name=builder.target.name, version=builder.version)) + else: + try: + # make a base context that doesn't have the magic kwargs field + context = generate_test_context( + node, + self.root_project, + self.manifest, + config, + self.macro_resolver, + ) + # update with rendered test kwargs (which collects any refs) + # Note: This does not actually update the kwargs with the rendered + # values. That happens in compilation. + add_rendered_test_kwargs(context, node, capture_macros=True) + # the parsed node is not rendered in the native context. + get_rendered(node.raw_code, context, node, capture_macros=True) + self.update_parsed_node_config(node, config) + # env_vars should have been updated in the context env_var method + except ValidationError as exc: + # we got a ValidationError - probably bad types in config() + raise SchemaConfigError(exc, node=node) from exc + + # Set attached_node for generic test nodes, if available. + # Generic test node inherits attached node's group config value. + attached_node = self._lookup_attached_node(builder.target, builder.version) + if attached_node: + node.attached_node = attached_node.unique_id + node.group, node.group = attached_node.group, attached_node.group + + def parse_node(self, block: GenericTestBlock) -> GenericTestNode: + """In schema parsing, we rewrite most of the part of parse_node that + builds the initial node to be parsed, but rendering is basically the + same + """ + node = self.parse_generic_test( + target=block.target, + test=block.test, + tags=block.tags, + column_name=block.column_name, + schema_file_id=block.file.file_id, + version=block.version, + ) + self.add_test_node(block, node) + return node + + def add_test_node(self, block: GenericTestBlock, node: GenericTestNode): + test_from = {"key": block.target.yaml_key, "name": block.target.name} + if node.config.enabled: + self.manifest.add_node(block.file, node, test_from) + else: + self.manifest.add_disabled(block.file, node, test_from) + + def render_with_context( + self, + node: GenericTestNode, + config: ContextConfig, + ) -> None: + """Given the parsed node and a ContextConfig to use during + parsing, collect all the refs that might be squirreled away in the test + arguments. This includes the implicit "model" argument. + """ + # make a base context that doesn't have the magic kwargs field + context = self._context_for(node, config) + # update it with the rendered test kwargs (which collects any refs) + add_rendered_test_kwargs(context, node, capture_macros=True) + + # the parsed node is not rendered in the native context. + get_rendered(node.raw_code, context, node, capture_macros=True) + + def parse_test( + self, + target_block: TestBlock, + test: TestDef, + column: Optional[UnparsedColumn], + version: Optional[NodeVersion], + ) -> None: + if isinstance(test, str): + test = {test: {}} + + if column is None: + column_name: Optional[str] = None + column_tags: List[str] = [] + else: + column_name = column.name + should_quote = column.quote or (column.quote is None and target_block.quote_columns) + if should_quote: + column_name = get_adapter(self.root_project).quote(column_name) + column_tags = column.tags + + block = GenericTestBlock.from_test_block( + src=target_block, + test=test, + column_name=column_name, + tags=column_tags, + version=version, + ) + self.parse_node(block) + + def parse_tests(self, block: TestBlock) -> None: + for column in block.columns: + self.parse_column_tests(block, column, None) + + for test in block.tests: + self.parse_test(block, test, None, None) + + def parse_versioned_tests(self, block: VersionedTestBlock) -> None: + if not block.target.versions: + self.parse_tests(block) + else: + for version in block.target.versions: + for column in block.target.get_columns_for_version(version.v): + self.parse_column_tests(block, column, version.v) + + for test in block.target.get_tests_for_version(version.v): + self.parse_test(block, test, None, version.v) + + def generate_unique_id(self, resource_name: str, hash: Optional[str] = None) -> str: + return ".".join( + filter(None, [self.resource_type, self.project.project_name, resource_name, hash]) + ) diff --git a/core/dbt/parser/schema_yaml_readers.py b/core/dbt/parser/schema_yaml_readers.py new file mode 100644 index 00000000000..4a5863102c1 --- /dev/null +++ b/core/dbt/parser/schema_yaml_readers.py @@ -0,0 +1,271 @@ +from dbt.parser.schemas import YamlReader, SchemaParser +from dbt.parser.common import YamlBlock +from dbt.node_types import NodeType +from dbt.contracts.graph.unparsed import UnparsedExposure, UnparsedMetric, UnparsedGroup +from dbt.contracts.graph.nodes import Exposure, Metric, Group +from dbt.exceptions import DbtInternalError, YamlParseDictError, JSONValidationError +from dbt.context.providers import generate_parse_exposure, generate_parse_metrics +from dbt.contracts.graph.model_config import MetricConfig, ExposureConfig +from dbt.context.context_config import ( + BaseContextConfigGenerator, + ContextConfigGenerator, + UnrenderedConfigGenerator, +) +from dbt.clients.jinja import get_rendered +from typing import List +from dbt.dataclass_schema import ValidationError + + +class ExposureParser(YamlReader): + def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock): + super().__init__(schema_parser, yaml, NodeType.Exposure.pluralize()) + self.schema_parser = schema_parser + self.yaml = yaml + + def parse_exposure(self, unparsed: UnparsedExposure): + package_name = self.project.project_name + unique_id = f"{NodeType.Exposure}.{package_name}.{unparsed.name}" + path = self.yaml.path.relative_path + + fqn = self.schema_parser.get_fqn_prefix(path) + fqn.append(unparsed.name) + + config = self._generate_exposure_config( + target=unparsed, + fqn=fqn, + package_name=package_name, + rendered=True, + ) + + config = config.finalize_and_validate() + + unrendered_config = self._generate_exposure_config( + target=unparsed, + fqn=fqn, + package_name=package_name, + rendered=False, + ) + + if not isinstance(config, ExposureConfig): + raise DbtInternalError( + f"Calculated a {type(config)} for an exposure, but expected an ExposureConfig" + ) + + parsed = Exposure( + resource_type=NodeType.Exposure, + package_name=package_name, + path=path, + original_file_path=self.yaml.path.original_file_path, + unique_id=unique_id, + fqn=fqn, + name=unparsed.name, + type=unparsed.type, + url=unparsed.url, + meta=unparsed.meta, + tags=unparsed.tags, + description=unparsed.description, + label=unparsed.label, + owner=unparsed.owner, + maturity=unparsed.maturity, + config=config, + unrendered_config=unrendered_config, + ) + ctx = generate_parse_exposure( + parsed, + self.root_project, + self.schema_parser.manifest, + package_name, + ) + depends_on_jinja = "\n".join("{{ " + line + "}}" for line in unparsed.depends_on) + get_rendered(depends_on_jinja, ctx, parsed, capture_macros=True) + # parsed now has a populated refs/sources/metrics + + if parsed.config.enabled: + self.manifest.add_exposure(self.yaml.file, parsed) + else: + self.manifest.add_disabled(self.yaml.file, parsed) + + def _generate_exposure_config( + self, target: UnparsedExposure, fqn: List[str], package_name: str, rendered: bool + ): + generator: BaseContextConfigGenerator + if rendered: + generator = ContextConfigGenerator(self.root_project) + else: + generator = UnrenderedConfigGenerator(self.root_project) + + # configs with precendence set + precedence_configs = dict() + # apply exposure configs + precedence_configs.update(target.config) + + return generator.calculate_node_config( + config_call_dict={}, + fqn=fqn, + resource_type=NodeType.Exposure, + project_name=package_name, + base=False, + patch_config_dict=precedence_configs, + ) + + def parse(self): + for data in self.get_key_dicts(): + try: + UnparsedExposure.validate(data) + unparsed = UnparsedExposure.from_dict(data) + except (ValidationError, JSONValidationError) as exc: + raise YamlParseDictError(self.yaml.path, self.key, data, exc) + + self.parse_exposure(unparsed) + + +class MetricParser(YamlReader): + def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock): + super().__init__(schema_parser, yaml, NodeType.Metric.pluralize()) + self.schema_parser = schema_parser + self.yaml = yaml + + def parse_metric(self, unparsed: UnparsedMetric): + package_name = self.project.project_name + unique_id = f"{NodeType.Metric}.{package_name}.{unparsed.name}" + path = self.yaml.path.relative_path + + fqn = self.schema_parser.get_fqn_prefix(path) + fqn.append(unparsed.name) + + config = self._generate_metric_config( + target=unparsed, + fqn=fqn, + package_name=package_name, + rendered=True, + ) + + config = config.finalize_and_validate() + + unrendered_config = self._generate_metric_config( + target=unparsed, + fqn=fqn, + package_name=package_name, + rendered=False, + ) + + if not isinstance(config, MetricConfig): + raise DbtInternalError( + f"Calculated a {type(config)} for a metric, but expected a MetricConfig" + ) + + parsed = Metric( + resource_type=NodeType.Metric, + package_name=package_name, + path=path, + original_file_path=self.yaml.path.original_file_path, + unique_id=unique_id, + fqn=fqn, + model=unparsed.model, + name=unparsed.name, + description=unparsed.description, + label=unparsed.label, + calculation_method=unparsed.calculation_method, + expression=str(unparsed.expression), + timestamp=unparsed.timestamp, + dimensions=unparsed.dimensions, + window=unparsed.window, + time_grains=unparsed.time_grains, + filters=unparsed.filters, + meta=unparsed.meta, + tags=unparsed.tags, + config=config, + unrendered_config=unrendered_config, + group=config.group, + ) + + ctx = generate_parse_metrics( + parsed, + self.root_project, + self.schema_parser.manifest, + package_name, + ) + + if parsed.model is not None: + model_ref = "{{ " + parsed.model + " }}" + get_rendered(model_ref, ctx, parsed) + + parsed.expression = get_rendered( + parsed.expression, + ctx, + node=parsed, + ) + + # if the metric is disabled we do not want it included in the manifest, only in the disabled dict + if parsed.config.enabled: + self.manifest.add_metric(self.yaml.file, parsed) + else: + self.manifest.add_disabled(self.yaml.file, parsed) + + def _generate_metric_config( + self, target: UnparsedMetric, fqn: List[str], package_name: str, rendered: bool + ): + generator: BaseContextConfigGenerator + if rendered: + generator = ContextConfigGenerator(self.root_project) + else: + generator = UnrenderedConfigGenerator(self.root_project) + + # configs with precendence set + precedence_configs = dict() + # first apply metric configs + precedence_configs.update(target.config) + + config = generator.calculate_node_config( + config_call_dict={}, + fqn=fqn, + resource_type=NodeType.Metric, + project_name=package_name, + base=False, + patch_config_dict=precedence_configs, + ) + return config + + def parse(self): + for data in self.get_key_dicts(): + try: + UnparsedMetric.validate(data) + unparsed = UnparsedMetric.from_dict(data) + + except (ValidationError, JSONValidationError) as exc: + raise YamlParseDictError(self.yaml.path, self.key, data, exc) + self.parse_metric(unparsed) + + +class GroupParser(YamlReader): + def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock): + super().__init__(schema_parser, yaml, NodeType.Group.pluralize()) + self.schema_parser = schema_parser + self.yaml = yaml + + def parse_group(self, unparsed: UnparsedGroup): + package_name = self.project.project_name + unique_id = f"{NodeType.Group}.{package_name}.{unparsed.name}" + path = self.yaml.path.relative_path + + parsed = Group( + resource_type=NodeType.Group, + package_name=package_name, + path=path, + original_file_path=self.yaml.path.original_file_path, + unique_id=unique_id, + name=unparsed.name, + owner=unparsed.owner, + ) + + self.manifest.add_group(self.yaml.file, parsed) + + def parse(self): + for data in self.get_key_dicts(): + try: + UnparsedGroup.validate(data) + unparsed = UnparsedGroup.from_dict(data) + except (ValidationError, JSONValidationError) as exc: + raise YamlParseDictError(self.yaml.path, self.key, data, exc) + + self.parse_group(unparsed) diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index 727824f323e..0d187735f29 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -1,45 +1,21 @@ -import itertools -import os -import pathlib +import time from abc import ABCMeta, abstractmethod -from typing import Iterable, Dict, Any, Union, List, Optional, Generic, TypeVar, Type, Callable +from typing import Iterable, Dict, Any, List, Generic, TypeVar, Type, Callable from dataclasses import dataclass, field from dbt.dataclass_schema import ValidationError, dbtClassMixin -from dbt.adapters.factory import get_adapter, get_adapter_package_names -from dbt.clients.jinja import get_rendered, add_rendered_test_kwargs from dbt.clients.yaml_helper import load_yaml_text -from dbt.context.providers import RefArgs from dbt.parser.schema_renderer import SchemaYamlRenderer -from dbt.context.context_config import ( - ContextConfig, - BaseContextConfigGenerator, - ContextConfigGenerator, - UnrenderedConfigGenerator, -) +from dbt.parser.schema_generic_tests import SchemaGenericTestParser +from dbt.context.context_config import ContextConfig from dbt.context.configured import generate_schema_yml_context, SchemaYamlVars -from dbt.context.providers import ( - generate_parse_exposure, - generate_parse_metrics, - generate_test_context, -) -from dbt.context.macro_resolver import MacroResolver -from dbt.contracts.files import FileHash, SchemaSourceFile -from dbt.contracts.graph.model_config import MetricConfig, ExposureConfig +from dbt.contracts.files import SchemaSourceFile from dbt.contracts.graph.nodes import ( ParsedNodePatch, - ColumnInfo, - ColumnLevelConstraint, - GenericTestNode, ParsedMacroPatch, UnpatchedSourceDefinition, - Exposure, - Metric, - Group, - ManifestNode, - GraphMemberNode, ConstraintType, ModelNode, ModelLevelConstraint, @@ -47,53 +23,46 @@ from dbt.contracts.graph.unparsed import ( HasColumnDocs, HasColumnTests, - HasColumnProps, SourcePatch, UnparsedAnalysisUpdate, - UnparsedColumn, UnparsedMacroUpdate, UnparsedNodeUpdate, UnparsedModelUpdate, - UnparsedExposure, - UnparsedMetric, UnparsedSourceDefinition, - UnparsedGroup, - NodeVersion, ) from dbt.exceptions import ( - CompilationError, DuplicateMacroPatchNameError, DuplicatePatchPathError, DuplicateSourcePatchNameError, JSONValidationError, DbtInternalError, - SchemaConfigError, - TestConfigError, ParsingError, DbtValidationError, YamlLoadError, YamlParseDictError, YamlParseListError, + InvalidAccessTypeError, ) from dbt.events.functions import warn_or_error -from dbt.events.types import WrongResourceSchemaFile, NoNodeForYamlKey, MacroNotFoundForPatch -from dbt.node_types import NodeType +from dbt.events.types import ( + WrongResourceSchemaFile, + NoNodeForYamlKey, + MacroNotFoundForPatch, + ValidationWarning, +) +from dbt.node_types import NodeType, AccessType from dbt.parser.base import SimpleParser from dbt.parser.search import FileBlock -from dbt.parser.generic_test_builders import ( - TestBuilder, - GenericTestBlock, - TargetBlock, +from dbt.parser.common import ( YamlBlock, + TargetBlock, TestBlock, VersionedTestBlock, - Testable, - Versioned, + ParserRef, + trimmed, ) -from dbt.utils import get_pseudo_test_path, coerce_dict_str, md5, deep_merge - +from dbt.utils import coerce_dict_str, deep_merge -TestDef = Union[str, Dict[str, Any]] schema_file_keys = ( "models", @@ -107,6 +76,36 @@ ) +# =============================================================================== +# Schema Parser classes +# +# The SchemaParser is a subclass of the SimpleParser from base.py, as is +# the SchemaGenericTestParser. The schema sub-parsers are all subclasses of +# the YamlReader parsing class. Most of the action in creating SourceDefinition +# nodes actually happens in the SourcePatcher class, in sources.py, which is +# called as a late-stage parsing step in manifest.py. +# +# The "patch" parsers read yaml config and properties and apply them to +# nodes that were already created from sql files. +# +# The SchemaParser and SourcePatcher both use the SchemaGenericTestParser +# (in schema_generic_tests.py) to create generic test nodes. +# +# YamlReader +# MetricParser (metrics) [schema_yaml_readers.py] +# ExposureParser (exposures) [schema_yaml_readers.py] +# GroupParser (groups) [schema_yaml_readers.py] +# SourceParser (sources) +# PatchParser +# MacroPatchParser (macros) +# NodePatchParser +# ModelPatchParser (models) +# AnalysisPatchParser (analyses) +# TestablePatchParser (seeds, snapshots) +# +# =============================================================================== + + def yaml_from_file(source_file: SchemaSourceFile) -> Dict[str, Any]: """If loading the yaml fails, raise an exception.""" try: @@ -118,61 +117,9 @@ def yaml_from_file(source_file: SchemaSourceFile) -> Dict[str, Any]: ) -class ParserRef: - """A helper object to hold parse-time references.""" - - def __init__(self): - self.column_info: Dict[str, ColumnInfo] = {} - - def _add(self, column: HasColumnProps): - tags: List[str] = [] - tags.extend(getattr(column, "tags", ())) - quote: Optional[bool] - if isinstance(column, UnparsedColumn): - quote = column.quote - else: - quote = None - - if any( - c - for c in column.constraints - if "type" not in c or not ConstraintType.is_valid(c["type"]) - ): - raise ParsingError(f"Invalid constraint type on column {column.name}") - - self.column_info[column.name] = ColumnInfo( - name=column.name, - description=column.description, - data_type=column.data_type, - constraints=[ColumnLevelConstraint.from_dict(c) for c in column.constraints], - meta=column.meta, - tags=tags, - quote=quote, - _extra=column.extra, - ) - - @classmethod - def from_target(cls, target: Union[HasColumnDocs, HasColumnTests]) -> "ParserRef": - refs = cls() - for column in target.columns: - refs._add(column) - return refs - - @classmethod - def from_versioned_target(cls, target: Versioned, version: NodeVersion) -> "ParserRef": - refs = cls() - for base_column in target.get_columns_for_version(version): - refs._add(base_column) - return refs - - -def _trimmed(inp: str) -> str: - if len(inp) < 50: - return inp - return inp[:44] + "..." + inp[-3:] - - -class SchemaParser(SimpleParser[GenericTestBlock, GenericTestNode]): +# This is the main schema file parser, but almost everything happens in the +# the schema sub-parsers. +class SchemaParser(SimpleParser[YamlBlock, ModelNode]): def __init__( self, project, @@ -181,14 +128,12 @@ def __init__( ) -> None: super().__init__(project, manifest, root_project) + self.generic_test_parser = SchemaGenericTestParser(project, manifest, root_project) + self.schema_yaml_vars = SchemaYamlVars() self.render_ctx = generate_schema_yml_context( self.root_project, self.project.project_name, self.schema_yaml_vars ) - internal_package_names = get_adapter_package_names(self.root_project.credentials.type) - self.macro_resolver = MacroResolver( - self.manifest.macros, self.root_project.project_name, internal_package_names - ) @classmethod def get_compiled_path(cls, block: FileBlock) -> str: @@ -199,342 +144,6 @@ def get_compiled_path(cls, block: FileBlock) -> str: def resource_type(self) -> NodeType: return NodeType.Test - def parse_from_dict(self, dct, validate=True) -> GenericTestNode: - if validate: - GenericTestNode.validate(dct) - return GenericTestNode.from_dict(dct) - - def parse_column_tests( - self, block: TestBlock, column: UnparsedColumn, version: Optional[NodeVersion] - ) -> None: - if not column.tests: - return - - for test in column.tests: - self.parse_test(block, test, column, version) - - def create_test_node( - self, - target: Union[UnpatchedSourceDefinition, UnparsedNodeUpdate], - path: str, - config: ContextConfig, - tags: List[str], - fqn: List[str], - name: str, - raw_code: str, - test_metadata: Dict[str, Any], - file_key_name: str, - column_name: Optional[str], - ) -> GenericTestNode: - - HASH_LENGTH = 10 - - # N.B: This function builds a hashable string from any given test_metadata dict. - # it's a bit fragile for general use (only supports str, int, float, List, Dict) - # but it gets the job done here without the overhead of complete ser(de). - def get_hashable_md(data: Union[str, int, float, List, Dict]) -> Union[str, List, Dict]: - if type(data) == dict: - return {k: get_hashable_md(data[k]) for k in sorted(data.keys())} # type: ignore - elif type(data) == list: - return [get_hashable_md(val) for val in data] # type: ignore - else: - return str(data) - - hashable_metadata = repr(get_hashable_md(test_metadata)) - hash_string = "".join([name, hashable_metadata]) - test_hash = md5(hash_string)[-HASH_LENGTH:] - - dct = { - "alias": name, - "schema": self.default_schema, - "database": self.default_database, - "fqn": fqn, - "name": name, - "resource_type": self.resource_type, - "tags": tags, - "path": path, - "original_file_path": target.original_file_path, - "package_name": self.project.project_name, - "raw_code": raw_code, - "language": "sql", - "unique_id": self.generate_unique_id(name, test_hash), - "config": self.config_dict(config), - "test_metadata": test_metadata, - "column_name": column_name, - "checksum": FileHash.empty().to_dict(omit_none=True), - "file_key_name": file_key_name, - } - try: - GenericTestNode.validate(dct) - return GenericTestNode.from_dict(dct) - except ValidationError as exc: - # this is a bit silly, but build an UnparsedNode just for error - # message reasons - node = self._create_error_node( - name=target.name, - path=path, - original_file_path=target.original_file_path, - raw_code=raw_code, - ) - raise TestConfigError(exc, node) - - # lots of time spent in this method - def _parse_generic_test( - self, - target: Testable, - test: Dict[str, Any], - tags: List[str], - column_name: Optional[str], - schema_file_id: str, - version: Optional[NodeVersion], - ) -> GenericTestNode: - try: - builder = TestBuilder( - test=test, - target=target, - column_name=column_name, - version=version, - package_name=target.package_name, - render_ctx=self.render_ctx, - ) - if self.schema_yaml_vars.env_vars: - self.store_env_vars(target, schema_file_id, self.schema_yaml_vars.env_vars) - self.schema_yaml_vars.env_vars = {} - - except ParsingError as exc: - context = _trimmed(str(target)) - msg = "Invalid test config given in {}:\n\t{}\n\t@: {}".format( - target.original_file_path, exc.msg, context - ) - raise ParsingError(msg) from exc - - except CompilationError as exc: - context = _trimmed(str(target)) - msg = ( - "Invalid generic test configuration given in " - f"{target.original_file_path}: \n{exc.msg}\n\t@: {context}" - ) - raise CompilationError(msg) from exc - - original_name = os.path.basename(target.original_file_path) - compiled_path = get_pseudo_test_path(builder.compiled_name, original_name) - - # fqn is the relative path of the yaml file where this generic test is defined, - # minus the project-level directory and the file name itself - # TODO pass a consistent path object from both UnparsedNode and UnpatchedSourceDefinition - path = pathlib.Path(target.original_file_path) - relative_path = str(path.relative_to(*path.parts[:1])) - fqn = self.get_fqn(relative_path, builder.fqn_name) - - # this is the ContextConfig that is used in render_update - config: ContextConfig = self.initial_config(fqn) - - # builder.args contains keyword args for the test macro, - # not configs which have been separated out in the builder. - # The keyword args are not completely rendered until compilation. - metadata = { - "namespace": builder.namespace, - "name": builder.name, - "kwargs": builder.args, - } - tags = sorted(set(itertools.chain(tags, builder.tags()))) - - if isinstance(target, UnpatchedSourceDefinition): - file_key_name = f"{target.source.yaml_key}.{target.source.name}" - else: - file_key_name = f"{target.yaml_key}.{target.name}" - - node = self.create_test_node( - target=target, - path=compiled_path, - config=config, - fqn=fqn, - tags=tags, - name=builder.fqn_name, - raw_code=builder.build_raw_code(), - column_name=column_name, - test_metadata=metadata, - file_key_name=file_key_name, - ) - self.render_test_update(node, config, builder, schema_file_id) - - return node - - def _lookup_attached_node( - self, target: Testable, version: Optional[NodeVersion] - ) -> Optional[Union[ManifestNode, GraphMemberNode]]: - """Look up attached node for Testable target nodes other than sources. Can be None if generic test attached to SQL node with no corresponding .sql file.""" - attached_node = None # type: Optional[Union[ManifestNode, GraphMemberNode]] - if not isinstance(target, UnpatchedSourceDefinition): - attached_node_unique_id = self.manifest.ref_lookup.get_unique_id( - target.name, None, version - ) - if attached_node_unique_id: - attached_node = self.manifest.nodes[attached_node_unique_id] - else: - disabled_node = self.manifest.disabled_lookup.find( - target.name, None - ) or self.manifest.disabled_lookup.find(target.name.upper(), None) - if disabled_node: - attached_node = self.manifest.disabled[disabled_node[0].unique_id][0] - return attached_node - - def store_env_vars(self, target, schema_file_id, env_vars): - self.manifest.env_vars.update(env_vars) - if schema_file_id in self.manifest.files: - schema_file = self.manifest.files[schema_file_id] - if isinstance(target, UnpatchedSourceDefinition): - search_name = target.source.name - yaml_key = target.source.yaml_key - if "." in search_name: # source file definitions - (search_name, _) = search_name.split(".") - else: - search_name = target.name - yaml_key = target.yaml_key - for var in env_vars.keys(): - schema_file.add_env_var(var, yaml_key, search_name) - - # This does special shortcut processing for the two - # most common internal macros, not_null and unique, - # which avoids the jinja rendering to resolve config - # and variables, etc, which might be in the macro. - # In the future we will look at generalizing this - # more to handle additional macros or to use static - # parsing to avoid jinja overhead. - def render_test_update(self, node, config, builder, schema_file_id): - macro_unique_id = self.macro_resolver.get_macro_id( - node.package_name, "test_" + builder.name - ) - # Add the depends_on here so we can limit the macros added - # to the context in rendering processing - node.depends_on.add_macro(macro_unique_id) - if macro_unique_id in ["macro.dbt.test_not_null", "macro.dbt.test_unique"]: - config_call_dict = builder.get_static_config() - config._config_call_dict = config_call_dict - # This sets the config from dbt_project - self.update_parsed_node_config(node, config) - # source node tests are processed at patch_source time - if isinstance(builder.target, UnpatchedSourceDefinition): - sources = [builder.target.fqn[-2], builder.target.fqn[-1]] - node.sources.append(sources) - else: # all other nodes - node.refs.append(RefArgs(name=builder.target.name, version=builder.version)) - else: - try: - # make a base context that doesn't have the magic kwargs field - context = generate_test_context( - node, - self.root_project, - self.manifest, - config, - self.macro_resolver, - ) - # update with rendered test kwargs (which collects any refs) - # Note: This does not actually update the kwargs with the rendered - # values. That happens in compilation. - add_rendered_test_kwargs(context, node, capture_macros=True) - # the parsed node is not rendered in the native context. - get_rendered(node.raw_code, context, node, capture_macros=True) - self.update_parsed_node_config(node, config) - # env_vars should have been updated in the context env_var method - except ValidationError as exc: - # we got a ValidationError - probably bad types in config() - raise SchemaConfigError(exc, node=node) from exc - - # Set attached_node for generic test nodes, if available. - # Generic test node inherits attached node's group config value. - attached_node = self._lookup_attached_node(builder.target, builder.version) - if attached_node: - node.attached_node = attached_node.unique_id - node.group, node.group = attached_node.group, attached_node.group - - def parse_node(self, block: GenericTestBlock) -> GenericTestNode: - """In schema parsing, we rewrite most of the part of parse_node that - builds the initial node to be parsed, but rendering is basically the - same - """ - node = self._parse_generic_test( - target=block.target, - test=block.test, - tags=block.tags, - column_name=block.column_name, - schema_file_id=block.file.file_id, - version=block.version, - ) - self.add_test_node(block, node) - return node - - def add_test_node(self, block: GenericTestBlock, node: GenericTestNode): - test_from = {"key": block.target.yaml_key, "name": block.target.name} - if node.config.enabled: - self.manifest.add_node(block.file, node, test_from) - else: - self.manifest.add_disabled(block.file, node, test_from) - - def render_with_context( - self, - node: GenericTestNode, - config: ContextConfig, - ) -> None: - """Given the parsed node and a ContextConfig to use during - parsing, collect all the refs that might be squirreled away in the test - arguments. This includes the implicit "model" argument. - """ - # make a base context that doesn't have the magic kwargs field - context = self._context_for(node, config) - # update it with the rendered test kwargs (which collects any refs) - add_rendered_test_kwargs(context, node, capture_macros=True) - - # the parsed node is not rendered in the native context. - get_rendered(node.raw_code, context, node, capture_macros=True) - - def parse_test( - self, - target_block: TestBlock, - test: TestDef, - column: Optional[UnparsedColumn], - version: Optional[NodeVersion], - ) -> None: - if isinstance(test, str): - test = {test: {}} - - if column is None: - column_name: Optional[str] = None - column_tags: List[str] = [] - else: - column_name = column.name - should_quote = column.quote or (column.quote is None and target_block.quote_columns) - if should_quote: - column_name = get_adapter(self.root_project).quote(column_name) - column_tags = column.tags - - block = GenericTestBlock.from_test_block( - src=target_block, - test=test, - column_name=column_name, - tags=column_tags, - version=version, - ) - self.parse_node(block) - - def parse_tests(self, block: TestBlock) -> None: - for column in block.columns: - self.parse_column_tests(block, column, None) - - for test in block.tests: - self.parse_test(block, test, None, None) - - def parse_versioned_tests(self, block: VersionedTestBlock) -> None: - if not block.target.versions: - self.parse_tests(block) - else: - for version in block.target.versions: - for column in block.target.get_columns_for_version(version.v): - self.parse_column_tests(block, column, version.v) - - for test in block.target.get_tests_for_version(version.v): - self.parse_test(block, test, None, version.v) - def parse_file(self, block: FileBlock, dct: Dict = None) -> None: assert isinstance(block.file, SchemaSourceFile) @@ -544,10 +153,10 @@ def parse_file(self, block: FileBlock, dct: Dict = None) -> None: # contains the FileBlock and the data (dictionary) yaml_block = YamlBlock.from_file_block(block, dct) - parser: YamlDocsReader + parser: YamlReader - # There are 7 kinds of parsers: - # Model, Seed, Snapshot, Source, Macro, Analysis, Exposures + # There are 9 different yaml lists which are parsed by different parsers: + # Model, Seed, Snapshot, Source, Macro, Analysis, Exposure, Metric, Group # ModelPatchParser.parse() if "models" in dct: @@ -555,19 +164,19 @@ def parse_file(self, block: FileBlock, dct: Dict = None) -> None: # even if they are disabled in the schema file model_parse_result = ModelPatchParser(self, yaml_block, "models").parse() for versioned_test_block in model_parse_result.versioned_test_blocks: - self.parse_versioned_tests(versioned_test_block) + self.generic_test_parser.parse_versioned_tests(versioned_test_block) - # NonSourceParser.parse() + # PatchParser.parse() if "seeds" in dct: seed_parse_result = TestablePatchParser(self, yaml_block, "seeds").parse() for test_block in seed_parse_result.test_blocks: - self.parse_tests(test_block) + self.generic_test_parser.parse_tests(test_block) - # NonSourceParser.parse() + # PatchParser.parse() if "snapshots" in dct: snapshot_parse_result = TestablePatchParser(self, yaml_block, "snapshots").parse() for test_block in snapshot_parse_result.test_blocks: - self.parse_tests(test_block) + self.generic_test_parser.parse_tests(test_block) # This parser uses SourceParser.parse() which doesn't return # any test blocks. Source tests are handled at a later point @@ -576,28 +185,34 @@ def parse_file(self, block: FileBlock, dct: Dict = None) -> None: parser = SourceParser(self, yaml_block, "sources") parser.parse() - # NonSourceParser.parse() (but never test_blocks) + # PatchParser.parse() (but never test_blocks) if "macros" in dct: parser = MacroPatchParser(self, yaml_block, "macros") parser.parse() - # NonSourceParser.parse() (but never test_blocks) + # PatchParser.parse() (but never test_blocks) if "analyses" in dct: parser = AnalysisPatchParser(self, yaml_block, "analyses") parser.parse() - # parse exposures + # ExposureParser.parse() if "exposures" in dct: + from dbt.parser.schema_yaml_readers import ExposureParser + exp_parser = ExposureParser(self, yaml_block) exp_parser.parse() - # parse metrics + # MetricParser.parse() if "metrics" in dct: + from dbt.parser.schema_yaml_readers import MetricParser + metric_parser = MetricParser(self, yaml_block) metric_parser.parse() - # parse groups + # GroupParser.parse() if "groups" in dct: + from dbt.parser.schema_yaml_readers import GroupParser + group_parser = GroupParser(self, yaml_block) group_parser.parse() @@ -620,6 +235,7 @@ class ParseResult: # abstract base class (ABCMeta) +# Four subclasses: MetricParser, ExposureParser, GroupParser, SourceParser, PatchParser class YamlReader(metaclass=ABCMeta): def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock, key: str) -> None: self.schema_parser = schema_parser @@ -659,7 +275,7 @@ def get_key_dicts(self) -> Iterable[Dict[str, Any]]: if not isinstance(data, list): raise ParsingError( "{} must be a list, got {} instead: ({})".format( - self.key, type(data), _trimmed(str(data)) + self.key, type(data), trimmed(str(data)) ) ) path = self.yaml.path.original_file_path @@ -699,8 +315,6 @@ def render_entry(self, dct): ) from exc return dct - -class YamlDocsReader(YamlReader): @abstractmethod def parse(self) -> ParseResult: raise NotImplementedError("parse is abstract") @@ -710,7 +324,7 @@ def parse(self) -> ParseResult: # This parses the 'sources' keys in yaml files. -class SourceParser(YamlDocsReader): +class SourceParser(YamlReader): def _target_from_dict(self, cls: Type[T], data: Dict[str, Any]) -> T: path = self.yaml.path.original_file_path try: @@ -719,8 +333,7 @@ def _target_from_dict(self, cls: Type[T], data: Dict[str, Any]) -> T: except (ValidationError, JSONValidationError) as exc: raise YamlParseDictError(path, self.key, data, exc) - # The other parse method returns TestBlocks. This one doesn't. - # This takes the yaml dictionaries in 'sources' keys and uses them + # This parse method takes the yaml dictionaries in 'sources' keys and uses them # to create UnparsedSourceDefinition objects. They are then turned # into UnpatchedSourceDefinition objects in 'add_source_definitions' # or SourcePatch objects in 'add_source_patch' @@ -771,9 +384,8 @@ def add_source_definitions(self, source: UnparsedSourceDefinition) -> None: self.manifest.add_source(self.yaml.file, source_def) -# This class has three main subclasses: TestablePatchParser (models, -# seeds, snapshots), MacroPatchParser, and AnalysisPatchParser -class NonSourceParser(YamlDocsReader, Generic[NonSourceTarget, Parsed]): +# This class has two subclasses: NodePatchParser and MacroPatchParser +class PatchParser(YamlReader, Generic[NonSourceTarget, Parsed]): @abstractmethod def _target_type(self) -> Type[NonSourceTarget]: raise NotImplementedError("_target_type not implemented") @@ -896,7 +508,9 @@ def patch_node_config(self, node, patch): self.schema_parser.update_parsed_node_config(node, config, patch_config_dict=patch.config) -class NodePatchParser(NonSourceParser[NodeTarget, ParsedNodePatch], Generic[NodeTarget]): +# Subclasses of NodePatchParser: TestablePatchParser, ModelPatchParser, AnalysisPatchParser, +# so models, seeds, snapshots, analyses +class NodePatchParser(PatchParser[NodeTarget, ParsedNodePatch], Generic[NodeTarget]): def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: # We're not passing the ParsedNodePatch around anymore, so we # could possibly skip creating one. Leaving here for now for @@ -914,6 +528,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: access=block.target.access, version=None, latest_version=None, + constraints=block.target.constraints, ) assert isinstance(self.yaml.file, SchemaSourceFile) source_file: SchemaSourceFile = self.yaml.file @@ -970,7 +585,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: if patch.config: self.patch_node_config(node, patch) - node.patch(patch) + self.patch_node_properties(node, patch) else: warn_or_error( NoNodeForYamlKey( @@ -994,48 +609,30 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None: if patch.config: self.patch_node_config(node, patch) - node.patch(patch) - # TODO: We want to do all the actual patching either in the above node.patch() call - # or here, but it will require some thought to the details. For now the patching is - # awkwardly split. - self.patch_constraints(node, block.target.constraints) - node.build_contract_checksum() - - def patch_constraints(self, node, constraints): - contract_config = node.config.get("contract") - if isinstance(node, ModelNode) and contract_config.enforced is True: - self._validate_constraint_prerequisites(node) - - if any( - c for c in constraints if "type" not in c or not ConstraintType.is_valid(c["type"]) - ): - raise ParsingError( - f"Invalid constraint type on model {node.name}: " - f"Type must be one of {[ct.value for ct in ConstraintType]}" - ) - - node.constraints = [ModelLevelConstraint.from_dict(c) for c in constraints] - - def _validate_constraint_prerequisites(self, model_node: ModelNode): - errors = [] - if not model_node.columns: - errors.append( - "Constraints must be defined in a `yml` schema configuration file like `schema.yml`." - ) - - if model_node.config.materialized not in ["table", "view", "incremental"]: - errors.append( - f"Only table, view, and incremental materializations are supported for constraints, but found '{model_node.config.materialized}'" - ) - - if str(model_node.language) != "sql": - errors.append(f"Language Error: Expected 'sql' but found '{model_node.language}'") - - if errors: - raise ParsingError( - f"Constraint validation failed for: ({model_node.original_file_path})\n" - + "\n".join(errors) - ) + self.patch_node_properties(node, patch) + + def patch_node_properties(self, node, patch: "ParsedNodePatch"): + """Given a ParsedNodePatch, add the new information to the node.""" + # explicitly pick out the parts to update so we don't inadvertently + # step on the model name or anything + # Note: config should already be updated + node.patch_path = patch.file_id + # update created_at so process_docs will run in partial parsing + node.created_at = time.time() + node.description = patch.description + node.columns = patch.columns + node.name = patch.name + + if not isinstance(node, ModelNode): + for attr in ["latest_version", "access", "version", "constraints"]: + if getattr(patch, attr): + warn_or_error( + ValidationWarning( + field_name=attr, + resource_type=node.resource_type.value, + node_name=patch.name, + ) + ) # TestablePatchParser = seeds, snapshots @@ -1163,17 +760,12 @@ def parse_patch(self, block: TargetBlock[UnparsedModelUpdate], refs: ParserRef) access=unparsed_version.access or target.access, version=unparsed_version.v, latest_version=latest_version, + constraints=unparsed_version.constraints or target.constraints, ) # Node patched before config because config patching depends on model name, # which may have been updated in the version patch - versioned_model_node.patch(versioned_model_patch) - # TODO: We want to do all the actual patching either in the above node.patch() call - # or here, but it will require some thought to the details. For now the patching is - # awkwardly split. - self.patch_constraints( - versioned_model_node, unparsed_version.constraints or target.constraints - ) - versioned_model_node.build_contract_checksum() + # versioned_model_node.patch(versioned_model_patch) + self.patch_node_properties(versioned_model_node, versioned_model_patch) # Includes alias recomputation self.patch_node_config(versioned_model_node, versioned_model_patch) @@ -1186,6 +778,57 @@ def parse_patch(self, block: TargetBlock[UnparsedModelUpdate], refs: ParserRef) def _target_type(self) -> Type[UnparsedModelUpdate]: return UnparsedModelUpdate + def patch_node_properties(self, node, patch: "ParsedNodePatch"): + super().patch_node_properties(node, patch) + node.version = patch.version + node.latest_version = patch.latest_version + if patch.access: + if AccessType.is_valid(patch.access): + node.access = AccessType(patch.access) + else: + raise InvalidAccessTypeError( + unique_id=node.unique_id, + field_value=patch.access, + ) + self.patch_constraints(node, patch.constraints) + node.build_contract_checksum() + + def patch_constraints(self, node, constraints): + contract_config = node.config.get("contract") + if contract_config.enforced is True: + self._validate_constraint_prerequisites(node) + + if any( + c for c in constraints if "type" not in c or not ConstraintType.is_valid(c["type"]) + ): + raise ParsingError( + f"Invalid constraint type on model {node.name}: " + f"Type must be one of {[ct.value for ct in ConstraintType]}" + ) + + node.constraints = [ModelLevelConstraint.from_dict(c) for c in constraints] + + def _validate_constraint_prerequisites(self, model_node: ModelNode): + errors = [] + if not model_node.columns: + errors.append( + "Constraints must be defined in a `yml` schema configuration file like `schema.yml`." + ) + + if model_node.config.materialized not in ["table", "view", "incremental"]: + errors.append( + f"Only table, view, and incremental materializations are supported for constraints, but found '{model_node.config.materialized}'" + ) + + if str(model_node.language) != "sql": + errors.append(f"Language Error: Expected 'sql' but found '{model_node.language}'") + + if errors: + raise ParsingError( + f"Constraint validation failed for: ({model_node.original_file_path})\n" + + "\n".join(errors) + ) + class AnalysisPatchParser(NodePatchParser[UnparsedAnalysisUpdate]): def get_block(self, node: UnparsedAnalysisUpdate) -> TargetBlock: @@ -1195,7 +838,7 @@ def _target_type(self) -> Type[UnparsedAnalysisUpdate]: return UnparsedAnalysisUpdate -class MacroPatchParser(NonSourceParser[UnparsedMacroUpdate, ParsedMacroPatch]): +class MacroPatchParser(PatchParser[UnparsedMacroUpdate, ParsedMacroPatch]): def get_block(self, node: UnparsedMacroUpdate) -> TargetBlock: return TargetBlock.from_yaml_block(self.yaml, node) @@ -1226,259 +869,11 @@ def parse_patch(self, block: TargetBlock[UnparsedMacroUpdate], refs: ParserRef) package_name, existing_file_path = macro.patch_path.split("://") raise DuplicateMacroPatchNameError(patch, existing_file_path) source_file.macro_patches[patch.name] = unique_id - macro.patch(patch) - - -class ExposureParser(YamlReader): - def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock): - super().__init__(schema_parser, yaml, NodeType.Exposure.pluralize()) - self.schema_parser = schema_parser - self.yaml = yaml - - def parse_exposure(self, unparsed: UnparsedExposure): - package_name = self.project.project_name - unique_id = f"{NodeType.Exposure}.{package_name}.{unparsed.name}" - path = self.yaml.path.relative_path - - fqn = self.schema_parser.get_fqn_prefix(path) - fqn.append(unparsed.name) - - config = self._generate_exposure_config( - target=unparsed, - fqn=fqn, - package_name=package_name, - rendered=True, - ) - - config = config.finalize_and_validate() - - unrendered_config = self._generate_exposure_config( - target=unparsed, - fqn=fqn, - package_name=package_name, - rendered=False, - ) - - if not isinstance(config, ExposureConfig): - raise DbtInternalError( - f"Calculated a {type(config)} for an exposure, but expected an ExposureConfig" - ) - - parsed = Exposure( - resource_type=NodeType.Exposure, - package_name=package_name, - path=path, - original_file_path=self.yaml.path.original_file_path, - unique_id=unique_id, - fqn=fqn, - name=unparsed.name, - type=unparsed.type, - url=unparsed.url, - meta=unparsed.meta, - tags=unparsed.tags, - description=unparsed.description, - label=unparsed.label, - owner=unparsed.owner, - maturity=unparsed.maturity, - config=config, - unrendered_config=unrendered_config, - ) - ctx = generate_parse_exposure( - parsed, - self.root_project, - self.schema_parser.manifest, - package_name, - ) - depends_on_jinja = "\n".join("{{ " + line + "}}" for line in unparsed.depends_on) - get_rendered(depends_on_jinja, ctx, parsed, capture_macros=True) - # parsed now has a populated refs/sources/metrics - - if parsed.config.enabled: - self.manifest.add_exposure(self.yaml.file, parsed) - else: - self.manifest.add_disabled(self.yaml.file, parsed) - - def _generate_exposure_config( - self, target: UnparsedExposure, fqn: List[str], package_name: str, rendered: bool - ): - generator: BaseContextConfigGenerator - if rendered: - generator = ContextConfigGenerator(self.root_project) - else: - generator = UnrenderedConfigGenerator(self.root_project) - - # configs with precendence set - precedence_configs = dict() - # apply exposure configs - precedence_configs.update(target.config) - - return generator.calculate_node_config( - config_call_dict={}, - fqn=fqn, - resource_type=NodeType.Exposure, - project_name=package_name, - base=False, - patch_config_dict=precedence_configs, - ) - - def parse(self): - for data in self.get_key_dicts(): - try: - UnparsedExposure.validate(data) - unparsed = UnparsedExposure.from_dict(data) - except (ValidationError, JSONValidationError) as exc: - raise YamlParseDictError(self.yaml.path, self.key, data, exc) - - self.parse_exposure(unparsed) - - -class MetricParser(YamlReader): - def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock): - super().__init__(schema_parser, yaml, NodeType.Metric.pluralize()) - self.schema_parser = schema_parser - self.yaml = yaml - - def parse_metric(self, unparsed: UnparsedMetric): - package_name = self.project.project_name - unique_id = f"{NodeType.Metric}.{package_name}.{unparsed.name}" - path = self.yaml.path.relative_path - - fqn = self.schema_parser.get_fqn_prefix(path) - fqn.append(unparsed.name) - - config = self._generate_metric_config( - target=unparsed, - fqn=fqn, - package_name=package_name, - rendered=True, - ) - - config = config.finalize_and_validate() - - unrendered_config = self._generate_metric_config( - target=unparsed, - fqn=fqn, - package_name=package_name, - rendered=False, - ) - - if not isinstance(config, MetricConfig): - raise DbtInternalError( - f"Calculated a {type(config)} for a metric, but expected a MetricConfig" - ) - - parsed = Metric( - resource_type=NodeType.Metric, - package_name=package_name, - path=path, - original_file_path=self.yaml.path.original_file_path, - unique_id=unique_id, - fqn=fqn, - model=unparsed.model, - name=unparsed.name, - description=unparsed.description, - label=unparsed.label, - calculation_method=unparsed.calculation_method, - expression=str(unparsed.expression), - timestamp=unparsed.timestamp, - dimensions=unparsed.dimensions, - window=unparsed.window, - time_grains=unparsed.time_grains, - filters=unparsed.filters, - meta=unparsed.meta, - tags=unparsed.tags, - config=config, - unrendered_config=unrendered_config, - group=config.group, - ) - - ctx = generate_parse_metrics( - parsed, - self.root_project, - self.schema_parser.manifest, - package_name, - ) - - if parsed.model is not None: - model_ref = "{{ " + parsed.model + " }}" - get_rendered(model_ref, ctx, parsed) - - parsed.expression = get_rendered( - parsed.expression, - ctx, - node=parsed, - ) - - # if the metric is disabled we do not want it included in the manifest, only in the disabled dict - if parsed.config.enabled: - self.manifest.add_metric(self.yaml.file, parsed) - else: - self.manifest.add_disabled(self.yaml.file, parsed) - - def _generate_metric_config( - self, target: UnparsedMetric, fqn: List[str], package_name: str, rendered: bool - ): - generator: BaseContextConfigGenerator - if rendered: - generator = ContextConfigGenerator(self.root_project) - else: - generator = UnrenderedConfigGenerator(self.root_project) - - # configs with precendence set - precedence_configs = dict() - # first apply metric configs - precedence_configs.update(target.config) - - config = generator.calculate_node_config( - config_call_dict={}, - fqn=fqn, - resource_type=NodeType.Metric, - project_name=package_name, - base=False, - patch_config_dict=precedence_configs, - ) - return config - - def parse(self): - for data in self.get_key_dicts(): - try: - UnparsedMetric.validate(data) - unparsed = UnparsedMetric.from_dict(data) - - except (ValidationError, JSONValidationError) as exc: - raise YamlParseDictError(self.yaml.path, self.key, data, exc) - self.parse_metric(unparsed) - - -class GroupParser(YamlReader): - def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock): - super().__init__(schema_parser, yaml, NodeType.Group.pluralize()) - self.schema_parser = schema_parser - self.yaml = yaml - - def parse_group(self, unparsed: UnparsedGroup): - package_name = self.project.project_name - unique_id = f"{NodeType.Group}.{package_name}.{unparsed.name}" - path = self.yaml.path.relative_path - - parsed = Group( - resource_type=NodeType.Group, - package_name=package_name, - path=path, - original_file_path=self.yaml.path.original_file_path, - unique_id=unique_id, - name=unparsed.name, - owner=unparsed.owner, - ) - - self.manifest.add_group(self.yaml.file, parsed) - - def parse(self): - for data in self.get_key_dicts(): - try: - UnparsedGroup.validate(data) - unparsed = UnparsedGroup.from_dict(data) - except (ValidationError, JSONValidationError) as exc: - raise YamlParseDictError(self.yaml.path, self.key, data, exc) - self.parse_group(unparsed) + # former macro.patch code + macro.patch_path = patch.file_id + macro.description = patch.description + macro.created_at = time.time() + macro.meta = patch.meta + macro.docs = patch.docs + macro.arguments = patch.arguments diff --git a/core/dbt/parser/sources.py b/core/dbt/parser/sources.py index 164c885b30f..4095599a9c4 100644 --- a/core/dbt/parser/sources.py +++ b/core/dbt/parser/sources.py @@ -29,7 +29,8 @@ from dbt.exceptions import DbtInternalError from dbt.node_types import NodeType -from dbt.parser.schemas import SchemaParser, ParserRef +from dbt.parser.common import ParserRef +from dbt.parser.schema_generic_tests import SchemaGenericTestParser # An UnparsedSourceDefinition is taken directly from the yaml @@ -48,7 +49,7 @@ def __init__( ) -> None: self.root_project = root_project self.manifest = manifest - self.schema_parsers: Dict[str, SchemaParser] = {} + self.generic_test_parsers: Dict[str, SchemaGenericTestParser] = {} self.patches_used: Dict[SourceKey, Set[str]] = {} self.sources: Dict[str, SourceDefinition] = {} @@ -188,18 +189,18 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition: parsed_source.relation_name = self._get_relation_name(parsed_source) return parsed_source - # This code uses the SchemaParser because it shares the '_parse_generic_test' - # code. It might be nice to separate out the generic test code - # and make it common to the schema parser and source patcher. - def get_schema_parser_for(self, package_name: str) -> "SchemaParser": - if package_name in self.schema_parsers: - schema_parser = self.schema_parsers[package_name] + # Use the SchemaGenericTestParser to parse the source tests + def get_generic_test_parser_for(self, package_name: str) -> "SchemaGenericTestParser": + if package_name in self.generic_test_parsers: + generic_test_parser = self.generic_test_parsers[package_name] else: all_projects = self.root_project.load_dependencies() project = all_projects[package_name] - schema_parser = SchemaParser(project, self.manifest, self.root_project) - self.schema_parsers[package_name] = schema_parser - return schema_parser + generic_test_parser = SchemaGenericTestParser( + project, self.manifest, self.root_project + ) + self.generic_test_parsers[package_name] = generic_test_parser + return generic_test_parser def get_source_tests(self, target: UnpatchedSourceDefinition) -> Iterable[GenericTestNode]: for test, column in target.get_tests(): @@ -226,7 +227,7 @@ def get_patch_for( self.patches_used[key].add(unpatched.table.name) return patch - # This calls _parse_generic_test in the SchemaParser + # This calls parse_generic_test in the SchemaGenericTestParser def parse_source_test( self, target: UnpatchedSourceDefinition, @@ -247,10 +248,8 @@ def parse_source_test( tags_sources.append(column.tags) tags = list(itertools.chain.from_iterable(tags_sources)) - # TODO: make the generic_test code common so we don't need to - # create schema parsers to handle the tests - schema_parser = self.get_schema_parser_for(target.package_name) - node = schema_parser._parse_generic_test( + generic_test_parser = self.get_generic_test_parser_for(target.package_name) + node = generic_test_parser.parse_generic_test( target=target, test=test, tags=tags, diff --git a/test/unit/test_contracts_graph_parsed.py b/test/unit/test_contracts_graph_parsed.py index 651cc41793a..9fc11e985c7 100644 --- a/test/unit/test_contracts_graph_parsed.py +++ b/test/unit/test_contracts_graph_parsed.py @@ -20,7 +20,6 @@ GenericTestNode, SnapshotNode, IntermediateSnapshotNode, - ParsedNodePatch, Macro, Exposure, Metric, @@ -762,94 +761,6 @@ def test_compare_changed_seed(func, basic_parsed_seed_object): assert not node.same_contents(compare, "postgres") -@pytest.fixture -def basic_parsed_model_patch_dict(): - return { - "name": "foo", - "description": "The foo model", - "original_file_path": "path/to/schema.yml", - "docs": {"show": True}, - "meta": {}, - "yaml_key": "models", - "package_name": "test", - "columns": { - "a": { - "name": "a", - "description": "a text field", - "meta": {}, - "tags": [], - "constraints": [], - }, - }, - "config": {}, - "access": "public", - "version": "1", - "latest_version": "1", - } - - -@pytest.fixture -def basic_parsed_model_patch_object(): - return ParsedNodePatch( - name="foo", - yaml_key="models", - package_name="test", - description="The foo model", - original_file_path="path/to/schema.yml", - columns={"a": ColumnInfo(name="a", description="a text field", meta={})}, - docs=Docs(), - meta={}, - config={}, - access="public", - version="1", - latest_version="1", - ) - - -@pytest.fixture -def patched_model_object(): - return ModelNode( - package_name="test", - path="/root/x/path.sql", - original_file_path="/root/path.sql", - language="sql", - raw_code="select * from wherever", - name="foo", - resource_type=NodeType.Model, - unique_id="model.test.foo", - fqn=["test", "models", "foo"], - refs=[], - sources=[], - metrics=[], - depends_on=DependsOn(), - description="The foo model", - database="test_db", - schema="test_schema", - alias="bar", - tags=[], - meta={}, - config=NodeConfig(), - patch_path="test://path/to/schema.yml", - columns={"a": ColumnInfo(name="a", description="a text field", meta={})}, - docs=Docs(), - checksum=FileHash.from_contents(""), - unrendered_config={}, - access=AccessType.Public, - version="1", - latest_version="1", - ) - - -def test_patch_parsed_model( - basic_parsed_model_object, basic_parsed_model_patch_object, patched_model_object -): - pre_patch = basic_parsed_model_object - pre_patch.patch(basic_parsed_model_patch_object) - pre_patch.created_at = 1.0 - patched_model_object.created_at = 1.0 - assert patched_model_object == pre_patch - - @pytest.fixture def minimal_parsed_hook_dict(): return { @@ -1894,60 +1805,6 @@ def test_invalid_snapshot_bad_resource_type(basic_timestamp_snapshot_dict): assert_fails_validation(bad_resource_type, SnapshotNode) -def test_basic_parsed_node_patch(basic_parsed_model_patch_object, basic_parsed_model_patch_dict): - assert_symmetric(basic_parsed_model_patch_object, basic_parsed_model_patch_dict) - - -@pytest.fixture -def populated_parsed_node_patch_dict(): - return { - "name": "foo", - "description": "The foo model", - "original_file_path": "path/to/schema.yml", - "columns": { - "a": { - "name": "a", - "description": "a text field", - "meta": {}, - "tags": [], - "constraints": [], - }, - }, - "docs": {"show": False}, - "meta": {"key": ["value"]}, - "yaml_key": "models", - "package_name": "test", - "config": {}, - "access": "public", - "version": "1", - "latest_version": "1", - } - - -@pytest.fixture -def populated_parsed_node_patch_object(): - return ParsedNodePatch( - name="foo", - description="The foo model", - original_file_path="path/to/schema.yml", - columns={"a": ColumnInfo(name="a", description="a text field", meta={})}, - meta={"key": ["value"]}, - yaml_key="models", - package_name="test", - docs=Docs(show=False), - config={}, - access="public", - version="1", - latest_version="1", - ) - - -def test_populated_parsed_node_patch( - populated_parsed_node_patch_dict, populated_parsed_node_patch_object -): - assert_symmetric(populated_parsed_node_patch_object, populated_parsed_node_patch_dict) - - class TestParsedMacro(ContractTestCase): ContractType = Macro diff --git a/test/unit/test_parser.py b/test/unit/test_parser.py index 80a3404d702..eadb081415b 100644 --- a/test/unit/test_parser.py +++ b/test/unit/test_parser.py @@ -33,7 +33,7 @@ SnapshotParser, AnalysisParser, ) -from dbt.parser.generic_test_builders import YamlBlock +from dbt.parser.common import YamlBlock from dbt.parser.models import ( _get_config_call_dict, _shift_sources, @@ -727,31 +727,6 @@ def test__parsed_versioned_models(self): self.parser.parse_file(block, dct) self.assert_has_manifest_lengths(self.parser.manifest, nodes=2) - all_nodes = sorted(self.parser.manifest.nodes.values(), key=lambda n: n.unique_id) - models = [node for node in all_nodes if node.resource_type == NodeType.Model] - - # test v1 model - parsed_node_patch_v1 = models[0].patch.call_args_list[0][0][0] - self.assertEqual(models[0].unique_id, "model.snowplow.my_model.v1") - self.assertEqual(parsed_node_patch_v1.version, 1) - self.assertEqual(parsed_node_patch_v1.latest_version, 2) - self.assertEqual( - list(parsed_node_patch_v1.columns.keys()), ["color", "location_id", "extra"] - ) - self.assertEqual( - parsed_node_patch_v1.config, {"materialized": "table", "sql_header": "test_sql_header"} - ) - - # test v2 model - parsed_node_patch_v2 = models[1].patch.call_args_list[0][0][0] - self.assertEqual(models[1].unique_id, "model.snowplow.my_model.v2") - self.assertEqual(parsed_node_patch_v2.version, 2) - self.assertEqual(parsed_node_patch_v2.latest_version, 2) - self.assertEqual(list(parsed_node_patch_v2.columns.keys()), ["color", "extra"]) - self.assertEqual( - parsed_node_patch_v2.config, {"materialized": "view", "sql_header": "test_sql_header"} - ) - def test__parsed_versioned_models_v0(self): block = self.file_block_for(MULTIPLE_TABLE_VERSIONED_MODEL_V0, "test_one.yml") self.parser.manifest.files[block.file.file_id] = block.file @@ -759,21 +734,6 @@ def test__parsed_versioned_models_v0(self): self.parser.parse_file(block, dct) self.assert_has_manifest_lengths(self.parser.manifest, nodes=2) - all_nodes = sorted(self.parser.manifest.nodes.values(), key=lambda n: n.unique_id) - models = [node for node in all_nodes if node.resource_type == NodeType.Model] - - # test v0 model - parsed_node_patch_v1 = models[0].patch.call_args_list[0][0][0] - self.assertEqual(models[0].unique_id, "model.snowplow.my_model.v0") - self.assertEqual(parsed_node_patch_v1.version, 0) - self.assertEqual(parsed_node_patch_v1.latest_version, 2) - - # test v2 model - parsed_node_patch_v2 = models[1].patch.call_args_list[0][0][0] - self.assertEqual(models[1].unique_id, "model.snowplow.my_model.v2") - self.assertEqual(parsed_node_patch_v2.version, 2) - self.assertEqual(parsed_node_patch_v2.latest_version, 2) - def test__parsed_versioned_models_v0_latest_version(self): block = self.file_block_for( MULTIPLE_TABLE_VERSIONED_MODEL_V0_LATEST_VERSION, "test_one.yml" @@ -783,21 +743,6 @@ def test__parsed_versioned_models_v0_latest_version(self): self.parser.parse_file(block, dct) self.assert_has_manifest_lengths(self.parser.manifest, nodes=2) - all_nodes = sorted(self.parser.manifest.nodes.values(), key=lambda n: n.unique_id) - models = [node for node in all_nodes if node.resource_type == NodeType.Model] - - # test v0 model - parsed_node_patch_v1 = models[0].patch.call_args_list[0][0][0] - self.assertEqual(models[0].unique_id, "model.snowplow.my_model.v0") - self.assertEqual(parsed_node_patch_v1.version, 0) - self.assertEqual(parsed_node_patch_v1.latest_version, 0) - - # test v2 model - parsed_node_patch_v2 = models[1].patch.call_args_list[0][0][0] - self.assertEqual(models[1].unique_id, "model.snowplow.my_model.v2") - self.assertEqual(parsed_node_patch_v2.version, 2) - self.assertEqual(parsed_node_patch_v2.latest_version, 0) - sql_model = """ {{ config(materialized="table") }} @@ -1540,7 +1485,6 @@ def test_multiple_blocks(self): "{% macro foo(a, b) %}a ~ b{% endmacro %}\n{% macro bar(c, d) %}c + d{% endmacro %}" ) block = self.file_block_for(raw_code, "macro.sql") - print(f"--- test_multiple_blocks block: {block}") self.parser.manifest.files[block.file.file_id] = block.file self.parser.parse_file(block) self.assertEqual(len(self.parser.manifest.macros), 2) diff --git a/tests/functional/defer_state/test_modified_state.py b/tests/functional/defer_state/test_modified_state.py index da80bc78095..ad1131490d6 100644 --- a/tests/functional/defer_state/test_modified_state.py +++ b/tests/functional/defer_state/test_modified_state.py @@ -364,7 +364,7 @@ def test_changed_constraint(self, project): with pytest.raises(ContractBreakingChangeError): run_dbt(["run", "--models", "state:modified.contract", "--state", "./state"]) - # This should raise because a model level constraint was removed + # This should raise because a model level constraint was removed (primary_key on id) write_file(modified_model_constraint_schema_yml, "models", "schema.yml") # we don't have a way to know this failed unless we have a previous state to refer to, so the run succeeds results = run_dbt(["run"])