diff --git a/CHANGELOG.md b/CHANGELOG.md index a21835bf3cd..7c7fa321e19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - Bump `snowflake-connector-python` and releated dependencies, support Python 3.9 ([#2985](https://github.com/fishtown-analytics/dbt/issues/2985), [#3148](https://github.com/fishtown-analytics/dbt/pull/3148)) - General development environment clean up and improve experience running tests locally ([#3194](https://github.com/fishtown-analytics/dbt/issues/3194), [#3204](https://github.com/fishtown-analytics/dbt/pull/3204)) - Add a new materialization for tests, update data tests to use test materialization when executing. ([#3154](https://github.com/fishtown-analytics/dbt/issues/3154), [#3181](https://github.com/fishtown-analytics/dbt/pull/3181)) +- Switch from externally storing parsing state in ParseResult object to using Manifest ([#3163](http://github.com/fishtown-analytics/dbt/issues/3163), [#3219](https://github.com/fishtown-analytics/dbt/pull/3219)) Contributors: - [@yu-iskw](https://github.com/yu-iskw) ([#2928](https://github.com/fishtown-analytics/dbt/pull/2928)) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index ab7534acd3d..69dd0d3b12d 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -273,8 +273,8 @@ def check_macro_manifest(self) -> Optional[MacroManifest]: def load_macro_manifest(self) -> MacroManifest: if self._macro_manifest_lazy is None: # avoid a circular import - from dbt.parser.manifest import load_macro_manifest - manifest = load_macro_manifest( + from dbt.parser.manifest import ManifestLoader + manifest = ManifestLoader.load_macros( self.config, self.connections.set_query_header ) self._macro_manifest_lazy = manifest diff --git a/core/dbt/context/manifest.py b/core/dbt/context/manifest.py index e9c99e33952..e361c57da03 100644 --- a/core/dbt/context/manifest.py +++ b/core/dbt/context/manifest.py @@ -2,7 +2,7 @@ from dbt.clients.jinja import MacroStack from dbt.contracts.connection import AdapterRequiredConfig -from dbt.contracts.graph.manifest import Manifest, AnyManifest +from dbt.contracts.graph.manifest import Manifest from dbt.context.macro_resolver import TestMacroNamespace @@ -20,7 +20,7 @@ class ManifestContext(ConfiguredContext): def __init__( self, config: AdapterRequiredConfig, - manifest: AnyManifest, + manifest: Manifest, search_package: str, ) -> None: super().__init__(config) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 2d9ee3bb208..2794b296bbf 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -20,7 +20,7 @@ from .manifest import ManifestContext from dbt.contracts.connection import AdapterResponse from dbt.contracts.graph.manifest import ( - Manifest, AnyManifest, Disabled, MacroManifest + Manifest, Disabled ) from dbt.contracts.graph.compiled import ( CompiledResource, @@ -1210,7 +1210,7 @@ def __init__( self, model: ParsedMacro, config: RuntimeConfig, - manifest: AnyManifest, + manifest: Manifest, provider: Provider, search_package: Optional[str], ) -> None: @@ -1300,7 +1300,7 @@ def this(self) -> Optional[RelationProxy]: def generate_parser_model( model: ManifestNode, config: RuntimeConfig, - manifest: MacroManifest, + manifest: Manifest, context_config: ContextConfig, ) -> Dict[str, Any]: # The __init__ method of ModelContext also initializes @@ -1317,7 +1317,7 @@ def generate_parser_model( def generate_generate_component_name_macro( macro: ParsedMacro, config: RuntimeConfig, - manifest: MacroManifest, + manifest: Manifest, ) -> Dict[str, Any]: ctx = MacroContext( macro, config, manifest, GenerateNameProvider(), None @@ -1370,7 +1370,7 @@ def __call__(self, *args) -> str: def generate_parse_exposure( exposure: ParsedExposure, config: RuntimeConfig, - manifest: MacroManifest, + manifest: Manifest, package_name: str, ) -> Dict[str, Any]: project = config.load_dependencies()[package_name] diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index d6000c88fd7..43f0f76553c 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -15,19 +15,24 @@ ) from dbt.contracts.graph.parsed import ( ParsedMacro, ParsedDocumentation, ParsedNodePatch, ParsedMacroPatch, - ParsedSourceDefinition, ParsedExposure + ParsedSourceDefinition, ParsedExposure, HasUniqueID, + UnpatchedSourceDefinition, ManifestNodes ) -from dbt.contracts.files import SourceFile +from dbt.contracts.graph.unparsed import SourcePatch +from dbt.contracts.files import SourceFile, FileHash, RemoteFile from dbt.contracts.util import ( BaseArtifactMetadata, MacroKey, SourceKey, ArtifactMixin, schema_version ) from dbt.exceptions import ( + InternalException, CompilationException, raise_duplicate_resource_name, raise_compiler_error, warn_or_error, - raise_invalid_patch, + raise_invalid_patch, raise_duplicate_patch_name, + raise_duplicate_macro_patch_name, raise_duplicate_source_patch_name, ) from dbt.helper_types import PathSet from dbt.logger import GLOBAL_LOGGER as logger from dbt.node_types import NodeType +from dbt.ui import line_wrap_message from dbt import deprecations from dbt import flags from dbt import tracking @@ -115,7 +120,8 @@ def add_source(self, source: ParsedSourceDefinition): def populate(self): for source in self._manifest.sources.values(): - self.add_source(source) + if hasattr(source, 'source_name'): + self.add_source(source) def perform_lookup( self, unique_id: UniqueID @@ -505,6 +511,13 @@ def _find_macros_by_name( return candidates +@dataclass +class ManifestStateCheck(): + vars_hash: FileHash + profile_hash: FileHash + project_hashes: MutableMapping[str, FileHash] + + @dataclass class Manifest(MacroMethods): """The manifest for the full graph, after parsing and during compilation. @@ -522,6 +535,13 @@ class Manifest(MacroMethods): files: MutableMapping[str, SourceFile] metadata: ManifestMetadata = field(default_factory=ManifestMetadata) flat_graph: Dict[str, Any] = field(default_factory=dict) + state_check: Optional[ManifestStateCheck] = None + # Moved from the ParseResult object + macro_patches: MutableMapping[MacroKey, ParsedMacroPatch] = field(default_factory=dict) + patches: MutableMapping[str, ParsedNodePatch] = field(default_factory=dict) + source_patches: MutableMapping[SourceKey, SourcePatch] = field(default_factory=dict) + # following is from ParseResult + _disabled: MutableMapping[str, List[CompileResultNode]] = field(default_factory=dict) _docs_cache: Optional[DocCache] = None _sources_cache: Optional[SourceCache] = None _refs_cache: Optional[RefableCache] = None @@ -648,26 +668,51 @@ def add_nodes(self, new_nodes: Mapping[str, ManifestNode]): if node.resource_type in NodeType.refable(): self._refs_cache.add_node(node) - def patch_macros( - self, patches: MutableMapping[MacroKey, ParsedMacroPatch] + def add_patch( + self, source_file: SourceFile, patch: ParsedNodePatch, ) -> None: + # patches can't be overwritten + if patch.name in self.patches: + raise_duplicate_patch_name(patch, self.patches[patch.name]) + self.patches[patch.name] = patch + self.get_file(source_file).patches.append(patch.name) + + def add_macro_patch( + self, source_file: SourceFile, patch: ParsedMacroPatch, + ) -> None: + # macros are fully namespaced + key = (patch.package_name, patch.name) + if key in self.macro_patches: + raise_duplicate_macro_patch_name(patch, self.macro_patches[key]) + self.macro_patches[key] = patch + self.get_file(source_file).macro_patches.append(key) + + def add_source_patch( + self, source_file: SourceFile, patch: SourcePatch, + ) -> None: + # source patches must be unique + key = (patch.overrides, patch.name) + if key in self.source_patches: + raise_duplicate_source_patch_name(patch, self.source_patches[key]) + self.source_patches[key] = patch + self.get_file(source_file).source_patches.append(key) + + def patch_macros(self) -> None: for macro in self.macros.values(): key = (macro.package_name, macro.name) - patch = patches.pop(key, None) + patch = self.macro_patches.pop(key, None) if not patch: continue macro.patch(patch) - if patches: - for patch in patches.values(): + if self.macro_patches: + for patch in self.macro_patches.values(): warn_or_error( f'WARNING: Found documentation for macro "{patch.name}" ' f'which was not found' ) - def patch_nodes( - self, patches: MutableMapping[str, ParsedNodePatch] - ) -> None: + def patch_nodes(self) -> None: """Patch nodes with the given dict of patches. Note that this consumes the input! This relies on the fact that all nodes have unique _name_ fields, not @@ -677,8 +722,10 @@ def patch_nodes( # only have the node name in the patch, we have to iterate over all the # nodes looking for matching names. We could use a NameSearcher if we # were ok with doing an O(n*m) search (one nodes scan per patch) + # Q: could we save patches by node unique_ids instead, or convert + # between names and node ids? for node in self.nodes.values(): - patch = patches.pop(node.name, None) + patch = self.patches.pop(node.name, None) if not patch: continue @@ -696,9 +743,10 @@ def patch_nodes( node.patch(patch) - # log debug-level warning about nodes we couldn't find - if patches: - for patch in patches.values(): + # If anything is left in self.patches, it means that the node for + # that patch wasn't found. + if self.patches: + for patch in self.patches.values(): # since patches aren't nodes, we can't use the existing # target_not_found warning logger.debug(( @@ -719,6 +767,7 @@ def get_used_databases(self): chain(self.nodes.values(), self.sources.values()) ) + # This is used in dbt.task.rpc.sql_commands 'add_new_refs' def deepcopy(self): return Manifest( nodes={k: _deepcopy(v) for k, v in self.nodes.items()}, @@ -908,6 +957,212 @@ def merge_from_artifact( f'Merged {len(merged)} items from state (sample: {sample})' ) + # Methods that were formerly in ParseResult + def get_file(self, source_file: SourceFile) -> SourceFile: + key = source_file.search_key + if key is None: + return source_file + if key not in self.files: + self.files[key] = source_file + return self.files[key] + + def add_macro(self, source_file: SourceFile, macro: ParsedMacro): + if macro.unique_id in self.macros: + # detect that the macro exists and emit an error + other_path = self.macros[macro.unique_id].original_file_path + # subtract 2 for the "Compilation Error" indent + # note that the line wrap eats newlines, so if you want newlines, + # this is the result :( + msg = line_wrap_message( + f'''\ + dbt found two macros named "{macro.name}" in the project + "{macro.package_name}". + + + To fix this error, rename or remove one of the following + macros: + + - {macro.original_file_path} + + - {other_path} + ''', + subtract=2 + ) + raise_compiler_error(msg) + + self.macros[macro.unique_id] = macro + self.get_file(source_file).macros.append(macro.unique_id) + + def has_file(self, source_file: SourceFile) -> bool: + key = source_file.search_key + if key is None: + return False + if key not in self.files: + return False + my_checksum = self.files[key].checksum + return my_checksum == source_file.checksum + + def add_source( + self, source_file: SourceFile, source: UnpatchedSourceDefinition + ): + # sources can't be overwritten! + _check_duplicates(source, self.sources) + self.sources[source.unique_id] = source # type: ignore + self.get_file(source_file).sources.append(source.unique_id) + + def add_node_nofile(self, node: ManifestNodes): + # nodes can't be overwritten! + _check_duplicates(node, self.nodes) + self.nodes[node.unique_id] = node + + def add_node(self, source_file: SourceFile, node: ManifestNodes): + self.add_node_nofile(node) + self.get_file(source_file).nodes.append(node.unique_id) + + def add_exposure(self, source_file: SourceFile, exposure: ParsedExposure): + _check_duplicates(exposure, self.exposures) + self.exposures[exposure.unique_id] = exposure + self.get_file(source_file).exposures.append(exposure.unique_id) + + def add_disabled_nofile(self, node: CompileResultNode): + if node.unique_id in self._disabled: + self._disabled[node.unique_id].append(node) + else: + self._disabled[node.unique_id] = [node] + + def add_disabled(self, source_file: SourceFile, node: CompileResultNode): + self.add_disabled_nofile(node) + self.get_file(source_file).nodes.append(node.unique_id) + + def add_doc(self, source_file: SourceFile, doc: ParsedDocumentation): + _check_duplicates(doc, self.docs) + self.docs[doc.unique_id] = doc + self.get_file(source_file).docs.append(doc.unique_id) + + def _get_disabled( + self, + unique_id: str, + match_file: SourceFile, + ) -> List[CompileResultNode]: + if unique_id not in self._disabled: + raise InternalException( + 'called _get_disabled with id={}, but it does not exist' + .format(unique_id) + ) + return [ + n for n in self._disabled[unique_id] + if n.original_file_path == match_file.path.original_file_path + ] + + # This is only used by 'sanitized_update' which processes "old_manifest" + def _process_node( + self, + node_id: str, + source_file: SourceFile, + old_file: SourceFile, + old_manifest: Any, + ) -> None: + """Nodes are a special kind of complicated - there can be multiple + with the same name, as long as all but one are disabled. + + Only handle nodes where the matching node has the same resource type + as the current parser. + """ + source_path = source_file.path.original_file_path + found: bool = False + if node_id in old_manifest.nodes: + old_node = old_manifest.nodes[node_id] + if old_node.original_file_path == source_path: + self.add_node(source_file, old_node) + found = True + + if node_id in old_manifest._disabled: + matches = old_manifest._get_disabled(node_id, source_file) + for match in matches: + self.add_disabled(source_file, match) + found = True + + if not found: + raise CompilationException( + 'Expected to find "{}" in cached "manifest.nodes" or ' + '"manifest.disabled" based on cached file information: {}!' + .format(node_id, old_file) + ) + + # This is called by ManifestLoader._get_cached/parse_with_cache, + # which handles updating the ManifestLoader results with information + # from the "old_manifest", i.e. the pickle file if the checksums are + # the same. + def sanitized_update( + self, + source_file: SourceFile, + old_manifest: Any, + resource_type: NodeType, + ) -> bool: + + if isinstance(source_file.path, RemoteFile): + return False + + old_file = old_manifest.get_file(source_file) + for doc_id in old_file.docs: + doc = _expect_value(doc_id, old_manifest.docs, old_file, "docs") + self.add_doc(source_file, doc) + + for macro_id in old_file.macros: + macro = _expect_value( + macro_id, old_manifest.macros, old_file, "macros" + ) + self.add_macro(source_file, macro) + + for source_id in old_file.sources: + source = _expect_value( + source_id, old_manifest.sources, old_file, "sources" + ) + self.add_source(source_file, source) + + # because we know this is how we _parsed_ the node, we can safely + # assume if it's disabled it was done by the project or file, and + # we can keep our old data + # the node ID could be in old_manifest.disabled AND in old_manifest.nodes. + # In that case, we have to make sure the path also matches. + for node_id in old_file.nodes: + # cheat: look at the first part of the node ID and compare it to + # the parser resource type. On a mismatch, bail out. + if resource_type != node_id.split('.')[0]: + continue + self._process_node(node_id, source_file, old_file, old_manifest) + + for exposure_id in old_file.exposures: + exposure = _expect_value( + exposure_id, old_manifest.exposures, old_file, "exposures" + ) + self.add_exposure(source_file, exposure) + + # Note: There shouldn't be any patches in here after the cleanup. + # The pickled Manifest should have had all patches applied. + patched = False + for name in old_file.patches: + patch = _expect_value( + name, old_manifest.patches, old_file, "patches" + ) + self.add_patch(source_file, patch) + patched = True + if patched: + self.get_file(source_file).patches.sort() + + macro_patched = False + for key in old_file.macro_patches: + macro_patch = _expect_value( + key, old_manifest.macro_patches, old_file, "macro_patches" + ) + self.add_macro_patch(source_file, macro_patch) + macro_patched = True + if macro_patched: + self.get_file(source_file).macro_patches.sort() + + return True + # end of methods formerly in ParseResult + # Provide support for copy.deepcopy() - we just need to avoid the lock! # pickle and deepcopy use this. It returns a callable object used to # create the initial version of the object and a tuple of arguments @@ -927,6 +1182,11 @@ def __reduce_ex__(self, protocol): self.files, self.metadata, self.flat_graph, + self.state_check, + self.macro_patches, + self.patches, + self.source_patches, + self._disabled, self._docs_cache, self._sources_cache, self._refs_cache, @@ -992,3 +1252,22 @@ class WritableManifest(ArtifactMixin): metadata: ManifestMetadata = field(metadata=dict( description='Metadata about the manifest', )) + + +def _check_duplicates( + value: HasUniqueID, src: Mapping[str, HasUniqueID] +): + if value.unique_id in src: + raise_duplicate_resource_name(value, src[value.unique_id]) + + +def _expect_value( + key: K_T, src: Mapping[K_T, V_T], old_file: SourceFile, name: str +) -> V_T: + if key not in src: + raise CompilationException( + 'Expected to find "{}" in cached "result.{}" based ' + 'on cached file information: {}!' + .format(key, name, old_file) + ) + return src[key] diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index fb62d805e27..0282292eb03 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -720,6 +720,18 @@ def same_contents(self, old: Optional['ParsedExposure']) -> bool: ) +ManifestNodes = Union[ + ParsedAnalysisNode, + ParsedDataTestNode, + ParsedHookNode, + ParsedModelNode, + ParsedRPCNode, + ParsedSchemaTestNode, + ParsedSeedNode, + ParsedSnapshotNode, +] + + ParsedResource = Union[ ParsedDocumentation, ParsedMacro, diff --git a/core/dbt/parser/__init__.py b/core/dbt/parser/__init__.py index c509e357f75..b17d2925548 100644 --- a/core/dbt/parser/__init__.py +++ b/core/dbt/parser/__init__.py @@ -5,12 +5,11 @@ from .hooks import HookParser # noqa from .macros import MacroParser # noqa from .models import ModelParser # noqa -from .results import ParseResult # noqa from .schemas import SchemaParser # noqa from .seeds import SeedParser # noqa from .snapshots import SnapshotParser # noqa from . import ( # noqa - analysis, base, data_test, docs, hooks, macros, models, results, schemas, + analysis, base, data_test, docs, hooks, macros, models, schemas, snapshots ) diff --git a/core/dbt/parser/base.py b/core/dbt/parser/base.py index d2b9d9cd100..f48d2197779 100644 --- a/core/dbt/parser/base.py +++ b/core/dbt/parser/base.py @@ -23,15 +23,14 @@ from dbt.contracts.files import ( SourceFile, FilePath, FileHash ) -from dbt.contracts.graph.manifest import MacroManifest -from dbt.contracts.graph.parsed import HasUniqueID +from dbt.contracts.graph.manifest import Manifest +from dbt.contracts.graph.parsed import HasUniqueID, ManifestNodes from dbt.contracts.graph.unparsed import UnparsedNode from dbt.exceptions import ( CompilationException, validator_error_message, InternalException ) from dbt import hooks from dbt.node_types import NodeType -from dbt.parser.results import ParseResult, ManifestNodes from dbt.parser.search import FileBlock # internally, the parser may store a less-restrictive type that will be @@ -48,10 +47,10 @@ class BaseParser(Generic[FinalValue]): - def __init__(self, results: ParseResult, project: Project) -> None: - self.results = results + def __init__(self, project: Project, manifest: Manifest) -> None: self.project = project - # this should be a superset of [x.path for x in self.results.files] + self.manifest = manifest + # this should be a superset of [x.path for x in self.manifest.files] # because we fill it via search() self.searched: List[FilePath] = [] @@ -96,22 +95,20 @@ def load_file( class Parser(BaseParser[FinalValue], Generic[FinalValue]): def __init__( self, - results: ParseResult, project: Project, + manifest: Manifest, root_project: RuntimeConfig, - macro_manifest: MacroManifest, ) -> None: - super().__init__(results, project) + super().__init__(project, manifest) self.root_project = root_project - self.macro_manifest = macro_manifest class RelationUpdate: def __init__( - self, config: RuntimeConfig, macro_manifest: MacroManifest, + self, config: RuntimeConfig, manifest: Manifest, component: str ) -> None: - macro = macro_manifest.find_generate_macro_by_name( + macro = manifest.find_generate_macro_by_name( component=component, root_project_name=config.project_name, ) @@ -121,7 +118,7 @@ def __init__( ) root_context = generate_generate_component_name_macro( - macro, config, macro_manifest + macro, config, manifest ) self.updater = MacroGenerator(macro, root_context) self.component = component @@ -142,23 +139,22 @@ class ConfiguredParser( ): def __init__( self, - results: ParseResult, project: Project, + manifest: Manifest, root_project: RuntimeConfig, - macro_manifest: MacroManifest, ) -> None: - super().__init__(results, project, root_project, macro_manifest) + super().__init__(project, manifest, root_project) self._update_node_database = RelationUpdate( - macro_manifest=macro_manifest, config=root_project, + manifest=manifest, config=root_project, component='database' ) self._update_node_schema = RelationUpdate( - macro_manifest=macro_manifest, config=root_project, + manifest=manifest, config=root_project, component='schema' ) self._update_node_alias = RelationUpdate( - macro_manifest=macro_manifest, config=root_project, + manifest=manifest, config=root_project, component='alias' ) @@ -273,7 +269,7 @@ def _context_for( self, parsed_node: IntermediateNode, config: ContextConfig ) -> Dict[str, Any]: return generate_parser_model( - parsed_node, self.root_project, self.macro_manifest, config + parsed_node, self.root_project, self.manifest, config ) def render_with_context( @@ -386,9 +382,9 @@ def render_update( def add_result_node(self, block: FileBlock, node: ManifestNodes): if node.config.enabled: - self.results.add_node(block.file, node) + self.manifest.add_node(block.file, node) else: - self.results.add_disabled(block.file, node) + self.manifest.add_disabled(block.file, node) def parse_node(self, block: ConfiguredBlockType) -> FinalNode: compiled_path: str = self.get_compiled_path(block) diff --git a/core/dbt/parser/docs.py b/core/dbt/parser/docs.py index 43b84009526..aa3b7b07fc3 100644 --- a/core/dbt/parser/docs.py +++ b/core/dbt/parser/docs.py @@ -60,6 +60,6 @@ def parse_file(self, file_block: FileBlock): ) for block in searcher: for parsed in self.parse_block(block): - self.results.add_doc(file_block.file, parsed) + self.manifest.add_doc(file_block.file, parsed) # mark the file as seen, even if there are no macros in it - self.results.get_file(file_block.file) + self.manifest.get_file(file_block.file) diff --git a/core/dbt/parser/macros.py b/core/dbt/parser/macros.py index 7f796106b63..5fb877910ee 100644 --- a/core/dbt/parser/macros.py +++ b/core/dbt/parser/macros.py @@ -90,7 +90,7 @@ def parse_unparsed_macros( def parse_file(self, block: FileBlock): # mark the file as seen, even if there are no macros in it - self.results.get_file(block.file) + self.manifest.get_file(block.file) source_file = block.file original_file_path = source_file.path.original_file_path @@ -108,4 +108,4 @@ def parse_file(self, block: FileBlock): ) for node in self.parse_unparsed_macros(base_node): - self.results.add_macro(block.file, node) + self.manifest.add_macro(block.file, node) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index df7768c79ed..35e97312782 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -3,7 +3,7 @@ import os import pickle from typing import ( - Dict, Optional, Mapping, Callable, Any, List, Type, Union, MutableMapping + Dict, Optional, Mapping, Callable, Any, List, Type, Union ) import time @@ -12,6 +12,7 @@ import dbt.flags as flags from dbt.adapters.factory import ( + get_adapter, get_relation_class_by_name, ) from dbt.helper_types import PathSet @@ -24,7 +25,7 @@ from dbt.contracts.files import FilePath, FileHash from dbt.contracts.graph.compiled import ManifestNode from dbt.contracts.graph.manifest import ( - Manifest, MacroManifest, AnyManifest, Disabled + Manifest, Disabled, MacroManifest, ManifestStateCheck ) from dbt.contracts.graph.parsed import ( ParsedSourceDefinition, ParsedNode, ParsedMacro, ColumnInfo, ParsedExposure @@ -44,7 +45,6 @@ from dbt.parser.hooks import HookParser from dbt.parser.macros import MacroParser from dbt.parser.models import ModelParser -from dbt.parser.results import ParseResult from dbt.parser.schemas import SchemaParser from dbt.parser.search import FileBlock from dbt.parser.seeds import SeedParser @@ -98,67 +98,30 @@ class ManifestLoaderInfo(dbtClassMixin, Writable): ] -# TODO: this should be calculated per-file based on the vars() calls made in -# parsing, so changing one var doesn't invalidate everything. also there should -# be something like that for env_var - currently changing env_vars in way that -# impact graph selection or configs will result in weird test failures. -# finally, we should hash the actual profile used, not just root project + -# profiles.yml + relevant args. While sufficient, it is definitely overkill. -def make_parse_result( - config: RuntimeConfig, all_projects: Mapping[str, Project] -) -> ParseResult: - """Make a ParseResult from the project configuration and the profile.""" - # if any of these change, we need to reject the parser - vars_hash = FileHash.from_contents( - '\x00'.join([ - getattr(config.args, 'vars', '{}') or '{}', - getattr(config.args, 'profile', '') or '', - getattr(config.args, 'target', '') or '', - __version__ - ]) - ) - profile_path = os.path.join(config.args.profiles_dir, 'profiles.yml') - with open(profile_path) as fp: - profile_hash = FileHash.from_contents(fp.read()) - - project_hashes = {} - for name, project in all_projects.items(): - path = os.path.join(project.project_root, 'dbt_project.yml') - with open(path) as fp: - project_hashes[name] = FileHash.from_contents(fp.read()) - - return ParseResult( - vars_hash=vars_hash, - profile_hash=profile_hash, - project_hashes=project_hashes, - ) - - class ManifestLoader: def __init__( self, root_project: RuntimeConfig, all_projects: Mapping[str, Project], - macro_hook: Optional[Callable[[AnyManifest], Any]] = None, + macro_hook: Optional[Callable[[Manifest], Any]] = None, ) -> None: self.root_project: RuntimeConfig = root_project self.all_projects: Mapping[str, Project] = all_projects - self.macro_hook: Callable[[AnyManifest], Any] + self.manifest: Manifest = Manifest({}, {}, {}, {}, {}, {}, [], {}) + self.manifest.metadata = root_project.get_metadata() + self.macro_hook: Callable[[Manifest], Any] if macro_hook is None: self.macro_hook = lambda m: None else: self.macro_hook = macro_hook - # results holds all of the nodes created by parsing, - # in dictionaries: nodes, sources, docs, macros, exposures, - # macro_patches, patches, source_patches, files, etc - self.results: ParseResult = make_parse_result( - root_project, all_projects, - ) self._loaded_file_cache: Dict[str, FileBlock] = {} self._perf_info = ManifestLoaderInfo( is_partial_parse_enabled=self._partial_parse_enabled() ) + # Creating state_check must go before read_saved_manifest + self.manifest.state_check = self.build_manifest_state_check() + self.old_manifest: Optional[Manifest] = self.read_saved_manifest() def track_project_load(self): invocation_id = dbt.tracking.active_user.invocation_id @@ -177,30 +140,40 @@ def track_project_load(self): ), }) + # This is where we use the partial-parse state from the + # pickle file (if it exists) def parse_with_cache( self, path: FilePath, parser: BaseParser, - old_results: Optional[ParseResult], ) -> None: block = self._get_file(path, parser) - if not self._get_cached(block, old_results, parser): + # _get_cached actually copies the nodes, etc, that were + # generated from the file to the results, in 'sanitized_update' + if not self._get_cached(block, parser): parser.parse_file(block) + # check if we have a stored parse file, then check if + # file checksums are the same or not and either return + # the old ... stuff or return false (not cached) def _get_cached( self, block: FileBlock, - old_results: Optional[ParseResult], parser: BaseParser, ) -> bool: # TODO: handle multiple parsers w/ same files, by # tracking parser type vs node type? Or tracking actual # parser type during parsing? - if old_results is None: + if self.old_manifest is None: return False - if old_results.has_file(block.file): - return self.results.sanitized_update( - block.file, old_results, parser.resource_type + # The 'has_file' method is where we check to see if + # the checksum of the old file is the same as the new + # file. If the checksum is different, 'has_file' returns + # false. If it's the same, the file and the things that + # were generated from it are used. + if self.old_manifest.has_file(block.file): + return self.manifest.sanitized_update( + block.file, self.old_manifest, parser.resource_type ) return False @@ -215,13 +188,10 @@ def _get_file(self, path: FilePath, parser: BaseParser) -> FileBlock: def parse_project( self, project: Project, - macro_manifest: MacroManifest, - old_results: Optional[ParseResult], ) -> None: parsers: List[Parser] = [] for cls in _parser_types: - parser = cls(self.results, project, self.root_project, - macro_manifest) + parser = cls(project, self.manifest, self.root_project) parsers.append(parser) # per-project cache. @@ -234,7 +204,7 @@ def parse_project( parser_path_count = 0 parser_start_timer = time.perf_counter() for path in parser.search(): - self.parse_with_cache(path, parser, old_results) + self.parse_with_cache(path, parser) parser_path_count = parser_path_count + 1 if parser_path_count > 0: @@ -257,76 +227,58 @@ def parse_project( self._perf_info.path_count + total_path_count ) - def load_only_macros(self) -> MacroManifest: - old_results = self.read_parse_results() - - for project in self.all_projects.values(): - parser = MacroParser(self.results, project) - for path in parser.search(): - self.parse_with_cache(path, parser, old_results) - - # make a manifest with just the macros to get the context - macro_manifest = MacroManifest( - macros=self.results.macros, - files=self.results.files - ) - self.macro_hook(macro_manifest) - return macro_manifest - # This is where the main action happens - def load(self, macro_manifest: MacroManifest): - # if partial parse is enabled, load old results - old_results = self.read_parse_results() - if old_results is not None: - logger.debug('Got an acceptable cached parse result') - # store the macros & files from the adapter macro manifest - self.results.macros.update(macro_manifest.macros) - self.results.files.update(macro_manifest.files) - + def load(self): start_timer = time.perf_counter() + if self.old_manifest is not None: + logger.debug('Got an acceptable cached parse result') + for project in self.all_projects.values(): # parse a single project - self.parse_project(project, macro_manifest, old_results) + self.parse_project(project) self._perf_info.parse_project_elapsed = ( time.perf_counter() - start_timer ) - def write_parse_results(self): + def write_manifest_for_partial_parse(self): path = os.path.join(self.root_project.target_path, PARTIAL_PARSE_FILE_NAME) make_directory(self.root_project.target_path) with open(path, 'wb') as fp: - pickle.dump(self.results, fp) + pickle.dump(self.manifest, fp) - def matching_parse_results(self, result: ParseResult) -> bool: + def matching_parse_results(self, manifest: Manifest) -> bool: """Compare the global hashes of the read-in parse results' values to the known ones, and return if it is ok to re-use the results. """ try: - if result.dbt_version != __version__: + if manifest.metadata.dbt_version != __version__: logger.debug( 'dbt version mismatch: {} != {}, cache invalidated' - .format(result.dbt_version, __version__) + .format(manifest.metadata.dbt_version, __version__) ) return False - except AttributeError: - logger.debug('malformed result file, cache invalidated') + except AttributeError as exc: + logger.debug(f"malformed result file, cache invalidated: {exc}") return False valid = True - if self.results.vars_hash != result.vars_hash: + if not self.manifest.state_check or not manifest.state_check: + return False + + if self.manifest.state_check.vars_hash != manifest.state_check.vars_hash: logger.debug('vars hash mismatch, cache invalidated') valid = False - if self.results.profile_hash != result.profile_hash: + if self.manifest.state_check.profile_hash != manifest.state_check.profile_hash: logger.debug('profile hash mismatch, cache invalidated') valid = False missing_keys = { - k for k in self.results.project_hashes - if k not in result.project_hashes + k for k in self.manifest.state_check.project_hashes + if k not in manifest.state_check.project_hashes } if missing_keys: logger.debug( @@ -335,9 +287,9 @@ def matching_parse_results(self, result: ParseResult) -> bool: ) valid = False - for key, new_value in self.results.project_hashes.items(): - if key in result.project_hashes: - old_value = result.project_hashes[key] + for key, new_value in self.manifest.state_check.project_hashes.items(): + if key in manifest.state_check.project_hashes: + old_value = manifest.state_check.project_hashes[key] if new_value != old_value: logger.debug( 'For key {}, hash mismatch ({} -> {}), cache ' @@ -357,7 +309,7 @@ def _partial_parse_enabled(self): else: return DEFAULT_PARTIAL_PARSE - def read_parse_results(self) -> Optional[ParseResult]: + def read_saved_manifest(self) -> Optional[Manifest]: if not self._partial_parse_enabled(): logger.debug('Partial parsing not enabled') return None @@ -367,82 +319,132 @@ def read_parse_results(self) -> Optional[ParseResult]: if os.path.exists(path): try: with open(path, 'rb') as fp: - result: ParseResult = pickle.load(fp) + manifest: Manifest = pickle.load(fp) # keep this check inside the try/except in case something about # the file has changed in weird ways, perhaps due to being a # different version of dbt - if self.matching_parse_results(result): - return result + if self.matching_parse_results(manifest): + return manifest except Exception as exc: logger.debug( 'Failed to load parsed file from disk at {}: {}' .format(path, exc), exc_info=True ) - return None - def process_manifest(self, manifest: Manifest): + # This find the sources, refs, and docs and resolves them + # for nodes and exposures + def process_manifest(self): project_name = self.root_project.project_name - process_sources(manifest, project_name) - process_refs(manifest, project_name) - process_docs(manifest, self.root_project) - - def create_manifest(self) -> Manifest: - # before we do anything else, patch the sources. This mutates - # results.disabled, so it needs to come before the final 'disabled' - # list is created + process_sources(self.manifest, project_name) + process_refs(self.manifest, project_name) + process_docs(self.manifest, self.root_project) + + def update_manifest(self) -> Manifest: start_patch = time.perf_counter() - sources = patch_sources(self.results, self.root_project) + # patch_sources converts the UnparsedSourceDefinitions in the + # Manifest.sources to ParsedSourceDefinition via 'patch_source' + # in SourcePatcher + sources = patch_sources(self.root_project, self.manifest) + self.manifest.sources = sources + # ParseResults had a 'disabled' attribute which was a dictionary + # which is now named '_disabled'. This used to copy from + # ParseResults to the Manifest. Can this be normalized so + # there's only one disabled? + disabled = [] + for value in self.manifest._disabled.values(): + disabled.extend(value) + self.manifest.disabled = disabled self._perf_info.patch_sources_elapsed = ( time.perf_counter() - start_patch ) - disabled = [] - for value in self.results.disabled.values(): - disabled.extend(value) - nodes: MutableMapping[str, ManifestNode] = { - k: v for k, v in self.results.nodes.items() - } + self.manifest.selectors = self.root_project.manifest_selectors - manifest = Manifest( - nodes=nodes, - sources=sources, - macros=self.results.macros, - docs=self.results.docs, - exposures=self.results.exposures, - metadata=self.root_project.get_metadata(), - disabled=disabled, - files=self.results.files, - selectors=self.root_project.manifest_selectors, - ) - manifest.patch_nodes(self.results.patches) - manifest.patch_macros(self.results.macro_patches) + # do the node and macro patches + self.manifest.patch_nodes() + self.manifest.patch_macros() + + # process_manifest updates the refs, sources, and docs start_process = time.perf_counter() - self.process_manifest(manifest) + self.process_manifest() self._perf_info.process_manifest_elapsed = ( time.perf_counter() - start_process ) - return manifest + return self.manifest + + # TODO: this should be calculated per-file based on the vars() calls made in + # parsing, so changing one var doesn't invalidate everything. also there should + # be something like that for env_var - currently changing env_vars in way that + # impact graph selection or configs will result in weird test failures. + # finally, we should hash the actual profile used, not just root project + + # profiles.yml + relevant args. While sufficient, it is definitely overkill. + def build_manifest_state_check(self): + config = self.root_project + all_projects = self.all_projects + # if any of these change, we need to reject the parser + vars_hash = FileHash.from_contents( + '\x00'.join([ + getattr(config.args, 'vars', '{}') or '{}', + getattr(config.args, 'profile', '') or '', + getattr(config.args, 'target', '') or '', + __version__ + ]) + ) + + profile_path = os.path.join(config.args.profiles_dir, 'profiles.yml') + with open(profile_path) as fp: + profile_hash = FileHash.from_contents(fp.read()) + + project_hashes = {} + for name, project in all_projects.items(): + path = os.path.join(project.project_root, 'dbt_project.yml') + with open(path) as fp: + project_hashes[name] = FileHash.from_contents(fp.read()) + + state_check = ManifestStateCheck( + vars_hash=vars_hash, + profile_hash=profile_hash, + project_hashes=project_hashes, + ) + return state_check + # Get "full" manifest means a manifest with the macros already loaded + # We sometimes build a manifest without that, so it can't be done in + # an init routine. We call the adapter for this because apparently + # it stores a lightweight manifest with only macros and we want to + # reuse that if it exists. @classmethod - def load_all( + def get_full_manifest( cls, - root_config: RuntimeConfig, - macro_manifest: MacroManifest, - macro_hook: Callable[[AnyManifest], Any], + config: RuntimeConfig, + *, + reset: bool = False, ) -> Manifest: + + adapter = get_adapter(config) # type: ignore + # reset is set in a TaskManager load_manifest call + if reset: + config.clear_dependencies() + adapter.clear_macro_manifest() + # load_macro_manifest here calls load_macros in ManifestLoader + # The MacroManifest is like the Manifest but only contains + # macros and files. + macro_hook = adapter.connections.set_query_header + with PARSING_STATE: start_load_all = time.perf_counter() - projects = root_config.load_dependencies() - loader = cls(root_config, projects, macro_hook) - loader.load(macro_manifest=macro_manifest) - loader.write_parse_results() - manifest = loader.create_manifest() - _check_manifest(manifest, root_config) + projects = config.load_dependencies() + loader = ManifestLoader(config, projects, macro_hook) + loader.load_macros_from_adapter(adapter) + loader.load() + loader.write_manifest_for_partial_parse() + manifest = loader.update_manifest() + _check_manifest(manifest, config) manifest.build_flat_graph() loader._perf_info.load_all_elapsed = ( @@ -451,18 +453,57 @@ def load_all( loader.track_project_load() - return manifest + return manifest + # The point of this is to store the lightweight + # MacroManifest in the adapter, with the same files + # and macros in the Manifest + def load_macros_from_adapter(self, adapter): + if adapter._macro_manifest_lazy is None: + macro_manifest = self.create_macro_manifest() + adapter._macro_manifest_lazy = macro_manifest + # macro_hook = adapter MacroQueryStringSetter callable + self.macro_hook(macro_manifest) + # the files and macros are already in self.manifest + else: + macro_manifest = adapter._macro_manifest_lazy + self.manifest.files = macro_manifest.files + self.manifest.macros = macro_manifest.macros + # macro_hook = adapter MacroQueryStringSetter callable + self.macro_hook(macro_manifest) + + # This creates a MacroManifest with 'files' and 'macros' to + # save in the adapter + def create_macro_manifest(self): + for project in self.all_projects.values(): + # what is the manifest passed in actually used for? + parser = MacroParser(project, self.manifest) + for path in parser.search(): + self.parse_with_cache(path, parser) + macro_manifest = MacroManifest(self.manifest.macros, self.manifest.files) + return macro_manifest + + # This is called by the adapter code only, to create the + # MacroManifest that's stored in the adapter. + # 'get_full_manifest' uses a persistent ManifestLoader while this + # creates a temporary ManifestLoader and throws it away. + # Not sure when this would actually get used. The + # ManifestLoader uses 'load_macros_from_adapter' above. @classmethod def load_macros( cls, root_config: RuntimeConfig, - macro_hook: Callable[[AnyManifest], Any], - ) -> MacroManifest: + macro_hook: Callable[[Manifest], Any], + ) -> Manifest: with PARSING_STATE: projects = root_config.load_dependencies() + # This creates a loader object, including result, + # and then throws it away, returning only the + # manifest loader = cls(root_config, projects, macro_hook) - return loader.load_only_macros() + macro_manifest = loader.create_macro_manifest() + + return macro_manifest def invalid_ref_fail_unless_test(node, target_model_name, @@ -559,6 +600,7 @@ def _check_manifest(manifest: Manifest, config: RuntimeConfig) -> None: _warn_for_unused_resource_config_paths(manifest, config) +# This is just used in test cases def _load_projects(config, paths): for path in paths: try: @@ -592,6 +634,7 @@ def _get_node_column(node, column_name): ] +# node and column descriptions def _process_docs_for_node( context: Dict[str, Any], node: ManifestNode, @@ -601,6 +644,7 @@ def _process_docs_for_node( column.description = get_rendered(column.description, context) +# source and table descriptions, column descriptions def _process_docs_for_source( context: Dict[str, Any], source: ParsedSourceDefinition, @@ -618,6 +662,7 @@ def _process_docs_for_source( column.description = column_desc +# macro argument descriptions def _process_docs_for_macro( context: Dict[str, Any], macro: ParsedMacro ) -> None: @@ -626,12 +671,17 @@ def _process_docs_for_macro( arg.description = get_rendered(arg.description, context) +# exposure descriptions def _process_docs_for_exposure( context: Dict[str, Any], exposure: ParsedExposure ) -> None: exposure.description = get_rendered(exposure.description, context) +# nodes: node and column descriptions +# sources: source and table descriptions, column descriptions +# macros: macro argument descriptions +# exposures: exposure descriptions def process_docs(manifest: Manifest, config: RuntimeConfig): for node in manifest.nodes.values(): ctx = generate_runtime_docs( @@ -750,9 +800,12 @@ def _process_refs_for_node( # TODO: I think this is extraneous, node should already be the same # as manifest.nodes[node.unique_id] (we're mutating node here, not # making a new one) + # Q: could we stop doing this? manifest.update_node(node) +# Takes references in 'refs' array of nodes and exposures, finds the target +# node, and updates 'depends_on.nodes' with the unique id def process_refs(manifest: Manifest, current_project: str): for node in manifest.nodes.values(): _process_refs_for_node(manifest, current_project, node) @@ -812,6 +865,9 @@ def _process_sources_for_node( manifest.update_node(node) +# Loops through all nodes and exposures, for each element in +# 'sources' array finds the source node and updates the +# 'depends_on.nodes' array with the unique id def process_sources(manifest: Manifest, current_project: str): for node in manifest.nodes.values(): if node.resource_type == NodeType.Source: @@ -823,6 +879,8 @@ def process_sources(manifest: Manifest, current_project: str): return manifest +# This is called in task.rpc.sql_commands when a "dynamic" node is +# created in the manifest, in 'add_refs' def process_macro( config: RuntimeConfig, manifest: Manifest, macro: ParsedMacro ) -> None: @@ -835,6 +893,8 @@ def process_macro( _process_docs_for_macro(ctx, macro) +# This is called in task.rpc.sql_commands when a "dynamic" node is +# created in the manifest, in 'add_refs' def process_node( config: RuntimeConfig, manifest: Manifest, node: ManifestNode ): @@ -845,18 +905,3 @@ def process_node( _process_refs_for_node(manifest, config.project_name, node) ctx = generate_runtime_docs(config, node, manifest, config.project_name) _process_docs_for_node(ctx, node) - - -def load_macro_manifest( - config: RuntimeConfig, - macro_hook: Callable[[AnyManifest], Any], -) -> MacroManifest: - return ManifestLoader.load_macros(config, macro_hook) - - -def load_manifest( - config: RuntimeConfig, - macro_manifest: MacroManifest, - macro_hook: Callable[[AnyManifest], Any], -) -> Manifest: - return ManifestLoader.load_all(config, macro_manifest, macro_hook) diff --git a/core/dbt/parser/results.py b/core/dbt/parser/results.py deleted file mode 100644 index cfadfe52b34..00000000000 --- a/core/dbt/parser/results.py +++ /dev/null @@ -1,347 +0,0 @@ -from dataclasses import dataclass, field -from typing import TypeVar, MutableMapping, Mapping, Union, List - -from dbt.dataclass_schema import dbtClassMixin - -from dbt.contracts.files import RemoteFile, FileHash, SourceFile -from dbt.contracts.graph.compiled import CompileResultNode -from dbt.contracts.graph.parsed import ( - HasUniqueID, - ParsedAnalysisNode, - ParsedDataTestNode, - ParsedDocumentation, - ParsedHookNode, - ParsedMacro, - ParsedMacroPatch, - ParsedModelNode, - ParsedNodePatch, - ParsedExposure, - ParsedRPCNode, - ParsedSeedNode, - ParsedSchemaTestNode, - ParsedSnapshotNode, - UnpatchedSourceDefinition, -) -from dbt.contracts.graph.unparsed import SourcePatch -from dbt.contracts.util import Writable, Replaceable, MacroKey, SourceKey -from dbt.exceptions import ( - raise_duplicate_resource_name, raise_duplicate_patch_name, - raise_duplicate_macro_patch_name, CompilationException, InternalException, - raise_compiler_error, raise_duplicate_source_patch_name -) -from dbt.node_types import NodeType -from dbt.ui import line_wrap_message -from dbt.version import __version__ - - -# Parsers can return anything as long as it's a unique ID -ParsedValueType = TypeVar('ParsedValueType', bound=HasUniqueID) - - -def _check_duplicates( - value: HasUniqueID, src: Mapping[str, HasUniqueID] -): - if value.unique_id in src: - raise_duplicate_resource_name(value, src[value.unique_id]) - - -ManifestNodes = Union[ - ParsedAnalysisNode, - ParsedDataTestNode, - ParsedHookNode, - ParsedModelNode, - ParsedRPCNode, - ParsedSchemaTestNode, - ParsedSeedNode, - ParsedSnapshotNode, -] - - -def dict_field(): - return field(default_factory=dict) - - -@dataclass -class ParseResult(dbtClassMixin, Writable, Replaceable): - vars_hash: FileHash - profile_hash: FileHash - project_hashes: MutableMapping[str, FileHash] - nodes: MutableMapping[str, ManifestNodes] = dict_field() - sources: MutableMapping[str, UnpatchedSourceDefinition] = dict_field() - docs: MutableMapping[str, ParsedDocumentation] = dict_field() - macros: MutableMapping[str, ParsedMacro] = dict_field() - exposures: MutableMapping[str, ParsedExposure] = dict_field() - macro_patches: MutableMapping[MacroKey, ParsedMacroPatch] = dict_field() - patches: MutableMapping[str, ParsedNodePatch] = dict_field() - source_patches: MutableMapping[SourceKey, SourcePatch] = dict_field() - files: MutableMapping[str, SourceFile] = dict_field() - disabled: MutableMapping[str, List[CompileResultNode]] = dict_field() - dbt_version: str = __version__ - - def get_file(self, source_file: SourceFile) -> SourceFile: - key = source_file.search_key - if key is None: - return source_file - if key not in self.files: - self.files[key] = source_file - return self.files[key] - - def add_source( - self, source_file: SourceFile, source: UnpatchedSourceDefinition - ): - # sources can't be overwritten! - _check_duplicates(source, self.sources) - self.sources[source.unique_id] = source - self.get_file(source_file).sources.append(source.unique_id) - - def add_node_nofile(self, node: ManifestNodes): - # nodes can't be overwritten! - _check_duplicates(node, self.nodes) - self.nodes[node.unique_id] = node - - def add_node(self, source_file: SourceFile, node: ManifestNodes): - self.add_node_nofile(node) - self.get_file(source_file).nodes.append(node.unique_id) - - def add_exposure(self, source_file: SourceFile, exposure: ParsedExposure): - _check_duplicates(exposure, self.exposures) - self.exposures[exposure.unique_id] = exposure - self.get_file(source_file).exposures.append(exposure.unique_id) - - def add_disabled_nofile(self, node: CompileResultNode): - if node.unique_id in self.disabled: - self.disabled[node.unique_id].append(node) - else: - self.disabled[node.unique_id] = [node] - - def add_disabled(self, source_file: SourceFile, node: CompileResultNode): - self.add_disabled_nofile(node) - self.get_file(source_file).nodes.append(node.unique_id) - - def add_macro(self, source_file: SourceFile, macro: ParsedMacro): - if macro.unique_id in self.macros: - # detect that the macro exists and emit an error - other_path = self.macros[macro.unique_id].original_file_path - # subtract 2 for the "Compilation Error" indent - # note that the line wrap eats newlines, so if you want newlines, - # this is the result :( - dup_macro_msg = line_wrap_message( - f'''\ - dbt found two macros named "{macro.name}" in the project - "{macro.package_name}". - - - To fix this error, rename or remove one of the following - macros: - - - {macro.original_file_path} - - - {other_path} - ''', - subtract=2 - ) - dup_path_msg = line_wrap_message( - f"""\ - The macro {macro.name} in file {macro.original_file_path} - was parsed multiple times by dbt. This error happens when a - macro is defined multiple times in the same file, or when a - path is duplicated in a dbt_project.yml configuration. - To fix this error, check: - - - {macro.original_file_path} - - - The `macro-paths` configurations in dbt_project.yml - """, - subtract=2, - ) - msg = ( - dup_path_msg - if macro.original_file_path == other_path - else dup_macro_msg - ) - raise_compiler_error(msg) - - self.macros[macro.unique_id] = macro - self.get_file(source_file).macros.append(macro.unique_id) - - def add_doc(self, source_file: SourceFile, doc: ParsedDocumentation): - _check_duplicates(doc, self.docs) - self.docs[doc.unique_id] = doc - self.get_file(source_file).docs.append(doc.unique_id) - - def add_patch( - self, source_file: SourceFile, patch: ParsedNodePatch - ) -> None: - # patches can't be overwritten - if patch.name in self.patches: - raise_duplicate_patch_name(patch, self.patches[patch.name]) - self.patches[patch.name] = patch - self.get_file(source_file).patches.append(patch.name) - - def add_macro_patch( - self, source_file: SourceFile, patch: ParsedMacroPatch - ) -> None: - # macros are fully namespaced - key = (patch.package_name, patch.name) - if key in self.macro_patches: - raise_duplicate_macro_patch_name(patch, self.macro_patches[key]) - self.macro_patches[key] = patch - self.get_file(source_file).macro_patches.append(key) - - def add_source_patch( - self, source_file: SourceFile, patch: SourcePatch - ) -> None: - # source patches must be unique - key = (patch.overrides, patch.name) - if key in self.source_patches: - raise_duplicate_source_patch_name(patch, self.source_patches[key]) - self.source_patches[key] = patch - self.get_file(source_file).source_patches.append(key) - - def _get_disabled( - self, - unique_id: str, - match_file: SourceFile, - ) -> List[CompileResultNode]: - if unique_id not in self.disabled: - raise InternalException( - 'called _get_disabled with id={}, but it does not exist' - .format(unique_id) - ) - return [ - n for n in self.disabled[unique_id] - if n.original_file_path == match_file.path.original_file_path - ] - - def _process_node( - self, - node_id: str, - source_file: SourceFile, - old_file: SourceFile, - old_result: 'ParseResult', - ) -> None: - """Nodes are a special kind of complicated - there can be multiple - with the same name, as long as all but one are disabled. - - Only handle nodes where the matching node has the same resource type - as the current parser. - """ - source_path = source_file.path.original_file_path - found: bool = False - if node_id in old_result.nodes: - old_node = old_result.nodes[node_id] - if old_node.original_file_path == source_path: - self.add_node(source_file, old_node) - found = True - - if node_id in old_result.disabled: - matches = old_result._get_disabled(node_id, source_file) - for match in matches: - self.add_disabled(source_file, match) - found = True - - if not found: - raise CompilationException( - 'Expected to find "{}" in cached "manifest.nodes" or ' - '"manifest.disabled" based on cached file information: {}!' - .format(node_id, old_file) - ) - - def sanitized_update( - self, - source_file: SourceFile, - old_result: 'ParseResult', - resource_type: NodeType, - ) -> bool: - """Perform a santized update. If the file can't be updated, invalidate - it and return false. - """ - if isinstance(source_file.path, RemoteFile): - return False - - old_file = old_result.get_file(source_file) - for doc_id in old_file.docs: - doc = _expect_value(doc_id, old_result.docs, old_file, "docs") - self.add_doc(source_file, doc) - - for macro_id in old_file.macros: - macro = _expect_value( - macro_id, old_result.macros, old_file, "macros" - ) - self.add_macro(source_file, macro) - - for source_id in old_file.sources: - source = _expect_value( - source_id, old_result.sources, old_file, "sources" - ) - self.add_source(source_file, source) - - # because we know this is how we _parsed_ the node, we can safely - # assume if it's disabled it was done by the project or file, and - # we can keep our old data - # the node ID could be in old_result.disabled AND in old_result.nodes. - # In that case, we have to make sure the path also matches. - for node_id in old_file.nodes: - # cheat: look at the first part of the node ID and compare it to - # the parser resource type. On a mismatch, bail out. - if resource_type != node_id.split('.')[0]: - continue - self._process_node(node_id, source_file, old_file, old_result) - - for exposure_id in old_file.exposures: - exposure = _expect_value( - exposure_id, old_result.exposures, old_file, "exposures" - ) - self.add_exposure(source_file, exposure) - - patched = False - for name in old_file.patches: - patch = _expect_value( - name, old_result.patches, old_file, "patches" - ) - self.add_patch(source_file, patch) - patched = True - if patched: - self.get_file(source_file).patches.sort() - - macro_patched = False - for key in old_file.macro_patches: - macro_patch = _expect_value( - key, old_result.macro_patches, old_file, "macro_patches" - ) - self.add_macro_patch(source_file, macro_patch) - macro_patched = True - if macro_patched: - self.get_file(source_file).macro_patches.sort() - - return True - - def has_file(self, source_file: SourceFile) -> bool: - key = source_file.search_key - if key is None: - return False - if key not in self.files: - return False - my_checksum = self.files[key].checksum - return my_checksum == source_file.checksum - - @classmethod - def rpc(cls): - # ugh! - return cls(FileHash.empty(), FileHash.empty(), {}) - - -K_T = TypeVar('K_T') -V_T = TypeVar('V_T') - - -def _expect_value( - key: K_T, src: Mapping[K_T, V_T], old_file: SourceFile, name: str -) -> V_T: - if key not in src: - raise CompilationException( - 'Expected to find "{}" in cached "result.{}" based ' - 'on cached file information: {}!' - .format(key, name, old_file) - ) - return src[key] diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index 1a165b7a452..73ec1caa12b 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -158,9 +158,9 @@ def merge_freshness( class SchemaParser(SimpleParser[SchemaTestBlock, ParsedSchemaTestNode]): def __init__( - self, results, project, root_project, macro_manifest, + self, project, manifest, root_project, ) -> None: - super().__init__(results, project, root_project, macro_manifest) + super().__init__(project, manifest, root_project) all_v_2 = ( self.root_project.config_version == 2 and self.project.config_version == 2 @@ -180,7 +180,7 @@ def __init__( self.root_project.credentials.type ) self.macro_resolver = MacroResolver( - self.macro_manifest.macros, + self.manifest.macros, self.root_project.project_name, internal_package_names ) @@ -284,6 +284,8 @@ def _get_relation_name(self, node: ParsedSourceDefinition): relation_cls = adapter.Relation return str(relation_cls.create_from(self.root_project, node)) + # This converts an UnpatchedSourceDefinition to a ParsedSourceDefinition + # it is used by the SourcePatcher. def parse_source( self, target: UnpatchedSourceDefinition ) -> ParsedSourceDefinition: @@ -496,7 +498,7 @@ def render_test_update(self, node, config, builder): try: # make a base context that doesn't have the magic kwargs field context = generate_test_context( - node, self.root_project, self.macro_manifest, config, + node, self.root_project, self.manifest, config, self.macro_resolver, ) # update with rendered test kwargs (which collects any refs) @@ -542,9 +544,9 @@ def parse_source_test( ) # we can't go through result.add_node - no file... instead! if node.config.enabled: - self.results.add_node_nofile(node) + self.manifest.add_node_nofile(node) else: - self.results.add_disabled_nofile(node) + self.manifest.add_disabled_nofile(node) return node def parse_node(self, block: SchemaTestBlock) -> ParsedSchemaTestNode: @@ -618,13 +620,13 @@ def parse_tests(self, block: TestBlock) -> None: def parse_exposures(self, block: YamlBlock) -> None: parser = ExposureParser(self, block) for node in parser.parse(): - self.results.add_exposure(block.file, node) + self.manifest.add_exposure(block.file, node) def parse_file(self, block: FileBlock) -> None: dct = self._yaml_from_file(block.file) - # mark the file as seen, in ParseResult.files - self.results.get_file(block.file) + # mark the file as seen, in Manifest.files + self.manifest.get_file(block.file) if dct: try: @@ -716,8 +718,8 @@ def __init__( self.yaml = yaml @property - def results(self): - return self.schema_parser.results + def manifest(self): + return self.schema_parser.manifest @property def project(self): @@ -765,6 +767,7 @@ def parse(self) -> List[TestBlock]: T = TypeVar('T', bound=dbtClassMixin) +# This parses the 'sources' keys in yaml files. class SourceParser(YamlDocsReader): def _target_from_dict(self, cls: Type[T], data: Dict[str, Any]) -> T: path = self.yaml.path.original_file_path @@ -775,7 +778,11 @@ def _target_from_dict(self, cls: Type[T], data: Dict[str, Any]) -> T: msg = error_context(path, self.key, data, exc) raise CompilationException(msg) from exc - # the other parse method returns TestBlocks. This one doesn't. + # The other parse method returns TestBlocks. This one doesn't. + # This takes the yaml dictionaries in 'sources' keys and uses them + # to create UnparsedSourceDefinition objects. They are then turned + # into UnpatchedSourceDefinition objects in 'add_source_definitions' + # or SourcePatch objects in 'add_source_patch' def parse(self) -> List[TestBlock]: # get a verified list of dicts for the key handled by this parser for data in self.get_key_dicts(): @@ -787,7 +794,7 @@ def parse(self) -> List[TestBlock]: if is_override: data['path'] = self.yaml.path.original_file_path patch = self._target_from_dict(SourcePatch, data) - self.results.add_source_patch(self.yaml.file, patch) + self.manifest.add_source_patch(self.yaml.file, patch) else: source = self._target_from_dict(UnparsedSourceDefinition, data) self.add_source_definitions(source) @@ -817,7 +824,7 @@ def add_source_definitions(self, source: UnparsedSourceDefinition) -> None: resource_type=NodeType.Source, fqn=fqn, ) - self.results.add_source(self.yaml.file, result) + self.manifest.add_source(self.yaml.file, result) # This class has three main subclasses: TestablePatchParser (models, @@ -860,8 +867,8 @@ def parse(self) -> List[TestBlock]: refs: ParserRef = ParserRef.from_target(node) else: refs = ParserRef() - # This adds the node_block to self.results (a ParseResult - # object) as a ParsedNodePatch or ParsedMacroPatch + # This adds the node_block to self.manifest + # as a ParsedNodePatch or ParsedMacroPatch self.parse_patch(node_block, refs) return test_blocks @@ -898,7 +905,7 @@ class NodePatchParser( def parse_patch( self, block: TargetBlock[NodeTarget], refs: ParserRef ) -> None: - result = ParsedNodePatch( + patch = ParsedNodePatch( name=block.target.name, original_file_path=block.target.original_file_path, yaml_key=block.target.yaml_key, @@ -908,7 +915,7 @@ def parse_patch( meta=block.target.meta, docs=block.target.docs, ) - self.results.add_patch(self.yaml.file, result) + self.manifest.add_patch(self.yaml.file, patch) class TestablePatchParser(NodePatchParser[UnparsedNodeUpdate]): @@ -937,7 +944,7 @@ def _target_type(self) -> Type[UnparsedMacroUpdate]: def parse_patch( self, block: TargetBlock[UnparsedMacroUpdate], refs: ParserRef ) -> None: - result = ParsedMacroPatch( + patch = ParsedMacroPatch( name=block.target.name, original_file_path=block.target.original_file_path, yaml_key=block.target.yaml_key, @@ -947,7 +954,7 @@ def parse_patch( meta=block.target.meta, docs=block.target.docs, ) - self.results.add_macro_patch(self.yaml.file, result) + self.manifest.add_macro_patch(self.yaml.file, patch) class ExposureParser(YamlReader): @@ -981,7 +988,7 @@ def parse_exposure(self, unparsed: UnparsedExposure) -> ParsedExposure: ctx = generate_parse_exposure( parsed, self.root_project, - self.schema_parser.macro_manifest, + self.schema_parser.manifest, package_name, ) depends_on_jinja = '\n'.join( diff --git a/core/dbt/parser/snapshots.py b/core/dbt/parser/snapshots.py index db977ab4c1d..d4f788df304 100644 --- a/core/dbt/parser/snapshots.py +++ b/core/dbt/parser/snapshots.py @@ -86,4 +86,4 @@ def parse_file(self, file_block: FileBlock) -> None: # in case there are no snapshots declared, we still want to mark this # file as seen. But after we've finished, because we don't want to add # files with syntax errors - self.results.get_file(file_block.file) + self.manifest.get_file(file_block.file) diff --git a/core/dbt/parser/sources.py b/core/dbt/parser/sources.py index b6ef559ffd2..97d3bd1907f 100644 --- a/core/dbt/parser/sources.py +++ b/core/dbt/parser/sources.py @@ -4,9 +4,10 @@ Dict, Optional, Set, + Union, ) from dbt.config import RuntimeConfig -from dbt.contracts.graph.manifest import MacroManifest, SourceKey +from dbt.contracts.graph.manifest import Manifest, SourceKey from dbt.contracts.graph.parsed import ( UnpatchedSourceDefinition, ParsedSourceDefinition, @@ -21,22 +22,17 @@ from dbt.exceptions import warn_or_error from dbt.parser.schemas import SchemaParser, ParserRef -from dbt.parser.results import ParseResult from dbt import ui class SourcePatcher: def __init__( self, - results: ParseResult, root_project: RuntimeConfig, + manifest: Manifest, ) -> None: - self.results = results self.root_project = root_project - self.macro_manifest = MacroManifest( - macros=self.results.macros, - files=self.results.files - ) + self.manifest = manifest self.schema_parsers: Dict[str, SchemaParser] = {} self.patches_used: Dict[SourceKey, Set[str]] = {} self.sources: Dict[str, ParsedSourceDefinition] = {} @@ -85,7 +81,7 @@ def get_schema_parser_for(self, package_name: str) -> 'SchemaParser': all_projects = self.root_project.load_dependencies() project = all_projects[package_name] schema_parser = SchemaParser( - self.results, project, self.root_project, self.macro_manifest + project, self.manifest, self.root_project ) self.schema_parsers[package_name] = schema_parser return schema_parser @@ -103,10 +99,12 @@ def get_source_tests( def get_patch_for( self, - unpatched: UnpatchedSourceDefinition, + unpatched: Union[UnpatchedSourceDefinition, ParsedSourceDefinition], ) -> Optional[SourcePatch]: + if isinstance(unpatched, ParsedSourceDefinition): + return None key = (unpatched.package_name, unpatched.source.name) - patch: Optional[SourcePatch] = self.results.source_patches.get(key) + patch: Optional[SourcePatch] = self.manifest.source_patches.get(key) if patch is None: return None if key not in self.patches_used: @@ -119,7 +117,9 @@ def get_patch_for( def construct_sources(self) -> None: # given the UnpatchedSourceDefinition and SourcePatches, combine them # to make a beautiful baby ParsedSourceDefinition. - for unique_id, unpatched in self.results.sources.items(): + for unique_id, unpatched in self.manifest.sources.items(): + if isinstance(unpatched, ParsedSourceDefinition): + continue patch = self.get_patch_for(unpatched) patched = self.patch_source(unpatched, patch) @@ -127,22 +127,22 @@ def construct_sources(self) -> None: # data. for test in self.get_source_tests(patched): if test.config.enabled: - self.results.add_node_nofile(test) + self.manifest.add_node_nofile(test) else: - self.results.add_disabled_nofile(test) + self.manifest.add_disabled_nofile(test) schema_parser = self.get_schema_parser_for(unpatched.package_name) parsed = schema_parser.parse_source(patched) if parsed.config.enabled: self.sources[unique_id] = parsed else: - self.results.add_disabled_nofile(parsed) + self.manifest.add_disabled_nofile(parsed) self.warn_unused() def warn_unused(self) -> None: unused_tables: Dict[SourceKey, Optional[Set[str]]] = {} - for patch in self.results.source_patches.values(): + for patch in self.manifest.source_patches.values(): key = (patch.overrides, patch.name) if key not in self.patches_used: unused_tables[key] = None @@ -168,7 +168,7 @@ def get_unused_msg( 'target:', ] for key, table_names in unused_tables.items(): - patch = self.results.source_patches[key] + patch = self.manifest.source_patches[key] patch_name = f'{patch.overrides}.{patch.name}' if table_names is None: msg.append( @@ -185,8 +185,8 @@ def get_unused_msg( def patch_sources( - results: ParseResult, root_project: RuntimeConfig, + manifest: Manifest, ) -> Dict[str, ParsedSourceDefinition]: """Patch all the sources found in the results. Updates results.disabled and results.nodes. @@ -194,6 +194,6 @@ def patch_sources( Return a dict of ParsedSourceDefinitions, suitable for use in manifest.sources. """ - patcher = SourcePatcher(results, root_project) + patcher = SourcePatcher(root_project, manifest) patcher.construct_sources() return patcher.sources diff --git a/core/dbt/perf_utils.py b/core/dbt/perf_utils.py deleted file mode 100644 index e73941a918e..00000000000 --- a/core/dbt/perf_utils.py +++ /dev/null @@ -1,32 +0,0 @@ -"""A collection of performance-enhancing functions that have to know just a -little bit too much to go anywhere else. -""" -from dbt.adapters.factory import get_adapter -from dbt.parser.manifest import load_manifest -from dbt.contracts.graph.manifest import Manifest, MacroManifest -from dbt.config import RuntimeConfig - - -def get_full_manifest( - config: RuntimeConfig, - *, - reset: bool = False, -) -> Manifest: - """Load the full manifest, using the adapter's internal manifest if it - exists to skip parsing internal (dbt + plugins) macros a second time. - - Also, make sure that we force-laod the adapter's manifest, so it gets - attached to the adapter for any methods that need it. - """ - adapter = get_adapter(config) # type: ignore - if reset: - config.clear_dependencies() - adapter.clear_macro_manifest() - - macro_manifest: MacroManifest = adapter.load_macro_manifest() - - return load_manifest( - config, - macro_manifest, - adapter.connections.set_query_header, - ) diff --git a/core/dbt/rpc/task_manager.py b/core/dbt/rpc/task_manager.py index 91466fb9b59..6e3eada382b 100644 --- a/core/dbt/rpc/task_manager.py +++ b/core/dbt/rpc/task_manager.py @@ -19,7 +19,7 @@ TaskID, ) from dbt.logger import LogMessage, list_handler -from dbt.perf_utils import get_full_manifest +from dbt.parser.manifest import ManifestLoader from dbt.rpc.error import dbt_error from dbt.rpc.gc import GarbageCollector from dbt.rpc.task_handler_protocol import TaskHandlerProtocol, TaskHandlerMap @@ -187,7 +187,7 @@ def set_parsing(self) -> bool: return True def parse_manifest(self) -> None: - self.manifest = get_full_manifest(self.config, reset=True) + self.manifest = ManifestLoader.get_full_manifest(self.config, reset=True) def set_compile_exception(self, exc, logs=List[LogMessage]) -> None: assert self.last_parse.state == ManifestStatus.Compiling, \ diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index e37041e39a1..84821129725 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -17,7 +17,7 @@ from dbt.exceptions import InternalException from dbt.include.global_project import DOCS_INDEX_FILE_PATH from dbt.logger import GLOBAL_LOGGER as logger, print_timestamped_line -from dbt.perf_utils import get_full_manifest +from dbt.parser.manifest import ManifestLoader import dbt.utils import dbt.compilation import dbt.exceptions @@ -223,7 +223,7 @@ def run(self) -> CatalogArtifact: compile_results=compile_results ) else: - self.manifest = get_full_manifest(self.config) + self.manifest = ManifestLoader.get_full_manifest(self.config) shutil.copyfile( DOCS_INDEX_FILE_PATH, diff --git a/core/dbt/task/parse.py b/core/dbt/task/parse.py index 6aa122153dc..90dd65f6687 100644 --- a/core/dbt/task/parse.py +++ b/core/dbt/task/parse.py @@ -9,7 +9,7 @@ from dbt.task.base import ConfiguredTask from dbt.adapters.factory import get_adapter from dbt.parser.manifest import ( - Manifest, MacroManifest, ManifestLoader, _check_manifest + Manifest, ManifestLoader, _check_manifest ) from dbt.logger import DbtProcessState, print_timestamped_line from dbt.clients.system import write_file @@ -53,8 +53,6 @@ def write_perf_info(self): def get_full_manifest(self): adapter = get_adapter(self.config) # type: ignore - macro_manifest: MacroManifest = adapter.load_macro_manifest() - print_timestamped_line("Macro manifest loaded") root_config = self.config macro_hook = adapter.connections.set_query_header with PARSING_STATE: @@ -63,12 +61,14 @@ def get_full_manifest(self): print_timestamped_line("Dependencies loaded") loader = ManifestLoader(root_config, projects, macro_hook) print_timestamped_line("ManifestLoader created") - loader.load(macro_manifest=macro_manifest) + loader.load_macros_from_adapter(adapter) + print_timestamped_line("Macros loaded from adapter") + loader.load() print_timestamped_line("Manifest loaded") - loader.write_parse_results() - print_timestamped_line("Parse results written") - manifest = loader.create_manifest() - print_timestamped_line("Manifest created") + loader.write_manifest_for_partial_parse() + print_timestamped_line("Manifest for partial parse saved") + manifest = loader.update_manifest() + print_timestamped_line("Manifest updated") _check_manifest(manifest, root_config) print_timestamped_line("Manifest checked") manifest.build_flat_graph() diff --git a/core/dbt/task/rpc/cli.py b/core/dbt/task/rpc/cli.py index f05dee3e9f7..153e632c314 100644 --- a/core/dbt/task/rpc/cli.py +++ b/core/dbt/task/rpc/cli.py @@ -14,7 +14,7 @@ Result, ) from dbt.exceptions import InternalException -from dbt.perf_utils import get_full_manifest +from dbt.parser.manifest import ManifestLoader from .base import RPCTask @@ -104,7 +104,7 @@ def handle_request(self) -> Result: if dumped != self.args.vars: self.real_task.args.vars = dumped if isinstance(self.real_task, RemoteManifestMethod): - self.real_task.manifest = get_full_manifest( + self.real_task.manifest = ManifestLoader.get_full_manifest( self.config, reset=True ) diff --git a/core/dbt/task/rpc/sql_commands.py b/core/dbt/task/rpc/sql_commands.py index deea0c92580..557c5bac9a5 100644 --- a/core/dbt/task/rpc/sql_commands.py +++ b/core/dbt/task/rpc/sql_commands.py @@ -14,7 +14,6 @@ from dbt.contracts.rpc import RemoteExecutionResult from dbt.exceptions import RPCKilledException, InternalException from dbt.logger import GLOBAL_LOGGER as logger -from dbt.parser.results import ParseResult from dbt.parser.manifest import process_node, process_macro from dbt.parser.rpc import RPCCallParser, RPCMacroParser from dbt.rpc.error import invalid_params @@ -42,7 +41,9 @@ def add_new_refs( for macro in macros.values(): process_macro(config, manifest, macro) - manifest.add_nodes({node.unique_id: node}) + # We used to do 'manifest.add_nodes({node.unique_id: node}) here, but the + # node has already been added to the Manifest by the RPCCallParser + # now that we save nodes to the Manifest instead of ParseResults. process_node(config, manifest, node) @@ -102,22 +103,20 @@ def _get_exec_node(self): 'manifest not set in _get_exec_node' ) - results = ParseResult.rpc() macro_overrides = {} macros = self.args.macros sql, macros = self._extract_request_data(self.args.sql) if macros: - macro_parser = RPCMacroParser(results, self.config) + macro_parser = RPCMacroParser(self.config, self.manifest) for node in macro_parser.parse_remote(macros): macro_overrides[node.unique_id] = node self.manifest.macros.update(macro_overrides) rpc_parser = RPCCallParser( - results=results, project=self.config, + manifest=self.manifest, root_project=self.config, - macro_manifest=self.manifest, ) rpc_node = rpc_parser.parse_remote(sql, self.args.name) add_new_refs( diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index f191aecf7ce..cfe46571347 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -40,7 +40,7 @@ FailFastException ) from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, Graph -from dbt.perf_utils import get_full_manifest +from dbt.parser.manifest import ManifestLoader import dbt.exceptions from dbt import flags @@ -63,7 +63,7 @@ def write_manifest(self): self.manifest.write(path) def load_manifest(self): - self.manifest = get_full_manifest(self.config) + self.manifest = ManifestLoader.get_full_manifest(self.config) self.write_manifest() def compile_manifest(self): diff --git a/test/unit/test_compiler.py b/test/unit/test_compiler.py index 694427f276d..4b556df6035 100644 --- a/test/unit/test_compiler.py +++ b/test/unit/test_compiler.py @@ -349,7 +349,6 @@ def test__prepend_ctes(self): 'select * from source_table' ') ' 'select * from __dbt__cte__ephemeral')) - print(f"\n---- line 349 ----") self.assertFalse(manifest.nodes['model.root.ephemeral'].extra_ctes_injected) diff --git a/test/unit/test_docs_blocks.py b/test/unit/test_docs_blocks.py index 7870fdb3457..224e6c85300 100644 --- a/test/unit/test_docs_blocks.py +++ b/test/unit/test_docs_blocks.py @@ -2,11 +2,10 @@ import unittest from dbt.contracts.files import SourceFile, FileHash, FilePath -from dbt.contracts.graph.manifest import Manifest, MacroManifest +from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.parsed import ParsedDocumentation from dbt.node_types import NodeType from dbt.parser import docs -from dbt.parser.results import ParseResult from dbt.parser.search import FileBlock from .utils import config_from_parts_or_dicts @@ -144,17 +143,17 @@ def _build_file(self, contents, relative_path) -> FileBlock: def test_load_file(self): parser = docs.DocumentationParser( - results=ParseResult.rpc(), root_project=self.root_project_config, + manifest=Manifest({}, {}, {}, {}, {}, {}, [], {}), project=self.subdir_project_config, - macro_manifest=MacroManifest({}, {})) + ) file_block = self._build_file(TEST_DOCUMENTATION_FILE, 'test_file.md') parser.parse_file(file_block) - results = sorted(parser.results.docs.values(), key=lambda n: n.name) - self.assertEqual(len(results), 2) - for result in results: + docs_values = sorted(parser.manifest.docs.values(), key=lambda n: n.name) + self.assertEqual(len(docs_values), 2) + for result in docs_values: self.assertIsInstance(result, ParsedDocumentation) self.assertEqual(result.package_name, 'some_package') self.assertEqual(result.original_file_path, self.testfile_path) @@ -162,41 +161,41 @@ def test_load_file(self): self.assertEqual(result.resource_type, NodeType.Documentation) self.assertEqual(result.path, 'test_file.md') - self.assertEqual(results[0].name, 'snowplow_sessions') - self.assertEqual(results[1].name, 'snowplow_sessions__session_id') + self.assertEqual(docs_values[0].name, 'snowplow_sessions') + self.assertEqual(docs_values[1].name, 'snowplow_sessions__session_id') def test_load_file_extras(self): TEST_DOCUMENTATION_FILE + '{% model foo %}select 1 as id{% endmodel %}' parser = docs.DocumentationParser( - results=ParseResult.rpc(), root_project=self.root_project_config, + manifest=Manifest({}, {}, {}, {}, {}, {}, [], {}), project=self.subdir_project_config, - macro_manifest=MacroManifest({}, {})) + ) file_block = self._build_file(TEST_DOCUMENTATION_FILE, 'test_file.md') parser.parse_file(file_block) - results = sorted(parser.results.docs.values(), key=lambda n: n.name) - self.assertEqual(len(results), 2) - for result in results: + docs_values = sorted(parser.manifest.docs.values(), key=lambda n: n.name) + self.assertEqual(len(docs_values), 2) + for result in docs_values: self.assertIsInstance(result, ParsedDocumentation) - self.assertEqual(results[0].name, 'snowplow_sessions') - self.assertEqual(results[1].name, 'snowplow_sessions__session_id') + self.assertEqual(docs_values[0].name, 'snowplow_sessions') + self.assertEqual(docs_values[1].name, 'snowplow_sessions__session_id') def test_multiple_raw_blocks(self): parser = docs.DocumentationParser( - results=ParseResult.rpc(), root_project=self.root_project_config, + manifest=Manifest({}, {}, {}, {}, {}, {}, [], {}), project=self.subdir_project_config, - macro_manifest=MacroManifest({}, {})) + ) file_block = self._build_file(MULTIPLE_RAW_BLOCKS, 'test_file.md') parser.parse_file(file_block) - results = sorted(parser.results.docs.values(), key=lambda n: n.name) - self.assertEqual(len(results), 2) - for result in results: + docs_values = sorted(parser.manifest.docs.values(), key=lambda n: n.name) + self.assertEqual(len(docs_values), 2) + for result in docs_values: self.assertIsInstance(result, ParsedDocumentation) self.assertEqual(result.package_name, 'some_package') self.assertEqual(result.original_file_path, self.testfile_path) @@ -204,7 +203,7 @@ def test_multiple_raw_blocks(self): self.assertEqual(result.resource_type, NodeType.Documentation) self.assertEqual(result.path, 'test_file.md') - self.assertEqual(results[0].name, 'other_doc') - self.assertEqual(results[0].block_contents, '```\n {% docs %}other doc{% enddocs %}\n ```') - self.assertEqual(results[1].name, 'some_doc') - self.assertEqual(results[1].block_contents, '```\n {% docs %}some doc{% enddocs %}\n ```', ) + self.assertEqual(docs_values[0].name, 'other_doc') + self.assertEqual(docs_values[0].block_contents, '```\n {% docs %}other doc{% enddocs %}\n ```') + self.assertEqual(docs_values[1].name, 'some_doc') + self.assertEqual(docs_values[1].block_contents, '```\n {% docs %}some doc{% enddocs %}\n ```', ) diff --git a/test/unit/test_graph.py b/test/unit/test_graph.py index 944d39219c6..2ab864f262d 100644 --- a/test/unit/test_graph.py +++ b/test/unit/test_graph.py @@ -13,8 +13,7 @@ import dbt.utils import dbt.parser.manifest from dbt.contracts.files import SourceFile, FileHash, FilePath -from dbt.contracts.graph.manifest import Manifest, MacroManifest -from dbt.parser.results import ParseResult +from dbt.contracts.graph.manifest import Manifest, MacroManifest, ManifestStateCheck from dbt.parser.base import BaseParser from dbt.graph import NodeSelector, parse_difference @@ -38,35 +37,14 @@ def tearDown(self): self.get_adapter_patcher_parser.stop() self.mock_filesystem_constructor.stop() self.mock_hook_constructor.stop() - self.load_patch.stop() + self.load_state_check.stop() self.load_source_file_patcher.stop() reset_adapters() def setUp(self): + # create various attributes dbt.flags.STRICT_MODE = True self.graph_result = None - - self.write_gpickle_patcher = patch('networkx.write_gpickle') - self.load_projects_patcher = patch('dbt.parser.manifest._load_projects') - self.file_system_patcher = patch.object( - dbt.parser.search.FilesystemSearcher, '__new__' - ) - self.hook_patcher = patch.object( - dbt.parser.hooks.HookParser, '__new__' - ) - self.get_adapter_patcher = patch('dbt.context.providers.get_adapter') - self.factory = self.get_adapter_patcher.start() - # also patch this one - - self.get_adapter_patcher_parser = patch('dbt.parser.base.get_adapter') - self.factory_cmn = self.get_adapter_patcher_parser.start() - - - def mock_write_gpickle(graph, outfile): - self.graph_result = graph - self.mock_write_gpickle = self.write_gpickle_patcher.start() - self.mock_write_gpickle.side_effect = mock_write_gpickle - self.profile = { 'outputs': { 'test': { @@ -82,55 +60,79 @@ def mock_write_gpickle(graph, outfile): }, 'target': 'test' } - - self.mock_load_projects = self.load_projects_patcher.start() - def _load_projects(config, paths): - yield config.project_name, config - self.mock_load_projects.side_effect = _load_projects - - self.mock_models = [] - - def _mock_parse_result(config, all_projects): - return ParseResult( - vars_hash=FileHash.from_contents('vars'), - project_hashes={name: FileHash.from_contents(name) for name in all_projects}, - profile_hash=FileHash.from_contents('profile'), - ) - - self.load_patch = patch('dbt.parser.manifest.make_parse_result') - self.mock_parse_result = self.load_patch.start() - self.mock_parse_result.side_effect = _mock_parse_result - - self.load_source_file_patcher = patch.object(BaseParser, 'load_file') - self.mock_source_file = self.load_source_file_patcher.start() - self.mock_source_file.side_effect = lambda path: [n for n in self.mock_models if n.path == path][0] - self.macro_manifest = MacroManifest( {n.unique_id: n for n in generate_name_macros('test_models_compile')}, {}) + self.mock_models = [] # used by filesystem_searcher + + # Create gpickle patcher + self.write_gpickle_patcher = patch('networkx.write_gpickle') + def mock_write_gpickle(graph, outfile): + self.graph_result = graph + self.mock_write_gpickle = self.write_gpickle_patcher.start() + self.mock_write_gpickle.side_effect = mock_write_gpickle + # Create file system patcher and filesystem searcher + self.file_system_patcher = patch.object( + dbt.parser.search.FilesystemSearcher, '__new__' + ) + self.mock_filesystem_constructor = self.file_system_patcher.start() def filesystem_iter(iter_self): if 'sql' not in iter_self.extension: return [] if 'models' not in iter_self.relative_dirs: return [] return [model.path for model in self.mock_models] - def create_filesystem_searcher(cls, project, relative_dirs, extension): result = MagicMock(project=project, relative_dirs=relative_dirs, extension=extension) result.__iter__.side_effect = lambda: iter(filesystem_iter(result)) return result + self.mock_filesystem_constructor.side_effect = create_filesystem_searcher - def create_hook_patcher(cls, results, project, relative_dirs, extension): - result = MagicMock(results=results, project=project, relative_dirs=relative_dirs, extension=extension) + # Create HookParser patcher + self.hook_patcher = patch.object( + dbt.parser.hooks.HookParser, '__new__' + ) + def create_hook_patcher(cls, project, manifest, root_project): + result = MagicMock(project=project, manifest=manifest, root_project=root_project) result.__iter__.side_effect = lambda: iter([]) return result - - self.mock_filesystem_constructor = self.file_system_patcher.start() - self.mock_filesystem_constructor.side_effect = create_filesystem_searcher self.mock_hook_constructor = self.hook_patcher.start() self.mock_hook_constructor.side_effect = create_hook_patcher + + # Create get_adapter patcher + self.get_adapter_patcher = patch('dbt.context.providers.get_adapter') + self.factory = self.get_adapter_patcher.start() + # Also patch the base class + self.get_adapter_patcher_parser = patch('dbt.parser.base.get_adapter') + self.factory_cmn = self.get_adapter_patcher_parser.start() inject_plugin(PostgresPlugin) + # Create load_projects patcher + self.load_projects_patcher = patch('dbt.parser.manifest._load_projects') + self.mock_load_projects = self.load_projects_patcher.start() + def _load_projects(config, paths): + yield config.project_name, config + self.mock_load_projects.side_effect = _load_projects + + # Create the Manifest.state_check patcher + @patch('dbt.parser.manifest.ManifestLoader.build_manifest_state_check') + def _mock_state_check(self): + config = self.root_project + all_projects = self.all_projects + return ManifestStateCheck( + vars_hash=FileHash.from_contents('vars'), + project_hashes={name: FileHash.from_contents(name) for name in all_projects}, + profile_hash=FileHash.from_contents('profile'), + ) + self.load_state_check = patch('dbt.parser.manifest.ManifestLoader.build_manifest_state_check') + self.mock_state_check = self.load_state_check.start() + self.mock_state_check.side_effect = _mock_state_check + + # Create the source file patcher + self.load_source_file_patcher = patch.object(BaseParser, 'load_file') + self.mock_source_file = self.load_source_file_patcher.start() + self.mock_source_file.side_effect = lambda path: [n for n in self.mock_models if n.path == path][0] + def get_config(self, extra_cfg=None): if extra_cfg is None: extra_cfg = {} @@ -162,8 +164,10 @@ def use_models(self, models): def load_manifest(self, config): loader = dbt.parser.manifest.ManifestLoader(config, {config.project_name: config}) - loader.load(macro_manifest=self.macro_manifest) - return loader.create_manifest() + loader.manifest.macros = self.macro_manifest.macros + loader.load() + loader.update_manifest() + return loader.manifest def test__single_model(self): self.use_models({ @@ -319,13 +323,16 @@ def test__dependency_list(self): def test__partial_parse(self): config = self.get_config() + manifest = self.load_manifest(config) + + # we need a loader to compare the two manifests loader = dbt.parser.manifest.ManifestLoader(config, {config.project_name: config}) - loader.load(macro_manifest=self.macro_manifest) - loader.create_manifest() - results = loader.results - - self.assertTrue(loader.matching_parse_results(results)) - too_low = results.replace(dbt_version='0.0.1a1') - self.assertFalse(loader.matching_parse_results(too_low)) - too_high = results.replace(dbt_version='99999.99.99') - self.assertFalse(loader.matching_parse_results(too_high)) + loader.manifest.macros = self.macro_manifest.macros + loader.load() + loader.update_manifest() + + self.assertTrue(loader.matching_parse_results(manifest)) + manifest.metadata.dbt_version = '0.0.1a1' + self.assertFalse(loader.matching_parse_results(manifest)) + manifest.metadata.dbt_version = '99999.99.99' + self.assertFalse(loader.matching_parse_results(manifest)) diff --git a/test/unit/test_parse_manifest.py b/test/unit/test_parse_manifest.py index 20c9a4ca97f..d73e34ec98d 100644 --- a/test/unit/test_parse_manifest.py +++ b/test/unit/test_parse_manifest.py @@ -1,10 +1,11 @@ import unittest from unittest import mock +from unittest.mock import patch from .utils import config_from_parts_or_dicts, normalize from dbt.contracts.files import SourceFile, FileHash, FilePath -from dbt.parser import ParseResult +from dbt.contracts.graph.manifest import Manifest, ManifestStateCheck from dbt.parser.search import FileBlock from dbt.parser import manifest @@ -57,16 +58,31 @@ def setUp(self): cli_vars='{"test_schema_name": "foo"}' ) self.parser = mock.MagicMock() - self.patched_result_builder = mock.patch('dbt.parser.manifest.make_parse_result') - self.mock_result_builder = self.patched_result_builder.start() - self.patched_result_builder.return_value = self._new_results() + + # Create the Manifest.state_check patcher + @patch('dbt.parser.manifest.ManifestLoader.build_manifest_state_check') + def _mock_state_check(self): + config = self.root_project + all_projects = self.all_projects + return ManifestStateCheck( + vars_hash=FileHash.from_contents('vars'), + project_hashes={name: FileHash.from_contents(name) for name in all_projects}, + profile_hash=FileHash.from_contents('profile'), + ) + self.load_state_check = patch('dbt.parser.manifest.ManifestLoader.build_manifest_state_check') + self.mock_state_check = self.load_state_check.start() + self.mock_state_check.side_effect = _mock_state_check + self.loader = manifest.ManifestLoader( self.root_project_config, {'root': self.root_project_config} ) - def _new_results(self): - return ParseResult(MatchingHash(), MatchingHash(), {}) + def _new_manifest(self): + state_check = ManifestStateCheck(MatchingHash(), MatchingHash, []) + manifest = Manifest({}, {}, {}, {}, {}, {}, [], {}) + manifest.state_check = state_check + return manifest def _mismatched_file(self, searched, name): return self._new_file(searched, name, False) @@ -90,9 +106,8 @@ def test_model_no_cache(self): source_file = self._matching_file('models', 'model_1.sql') self.parser.load_file.return_value = source_file - old_results = None - - self.loader.parse_with_cache(source_file.path, self.parser, old_results) + self.loader.old_manifest = None + self.loader.parse_with_cache(source_file.path, self.parser) # there was nothing in the cache, so parse_file should get called # with a FileBlock that has the given source file in it self.parser.parse_file.assert_called_once_with(FileBlock(file=source_file)) @@ -104,11 +119,12 @@ def test_model_cache_hit(self): source_file_dupe = self._matching_file('models', 'model_1.sql') source_file_dupe.nodes.append('model.root.model_1') - old_results = self._new_results() - old_results.files[source_file_dupe.path.search_key] = source_file_dupe - old_results.nodes = {'model.root.model_1': mock.MagicMock()} + old_manifest = self._new_manifest() + old_manifest.files[source_file_dupe.path.search_key] = source_file_dupe + self.loader.old_manifest = old_manifest + self.loader.old_manifest.nodes = {'model.root.model_1': mock.MagicMock()} - self.loader.parse_with_cache(source_file.path, self.parser, old_results) + self.loader.parse_with_cache(source_file.path, self.parser) # there was a cache hit, so parse_file should never have been called self.parser.parse_file.assert_not_called() @@ -119,11 +135,12 @@ def test_model_cache_mismatch_checksum(self): source_file_dupe = self._mismatched_file('models', 'model_1.sql') source_file_dupe.nodes.append('model.root.model_1') - old_results = self._new_results() - old_results.files[source_file_dupe.path.search_key] = source_file_dupe - old_results.nodes = {'model.root.model_1': mock.MagicMock()} + old_manifest = self._new_manifest() + old_manifest.files[source_file_dupe.path.search_key] = source_file_dupe + old_manifest.nodes = {'model.root.model_1': mock.MagicMock()} + self.loader.old_manifest = old_manifest - self.loader.parse_with_cache(source_file.path, self.parser, old_results) + self.loader.parse_with_cache(source_file.path, self.parser) # there was a cache checksum mismatch, so parse_file should get called # with a FileBlock that has the given source file in it self.parser.parse_file.assert_called_once_with(FileBlock(file=source_file)) @@ -135,11 +152,12 @@ def test_model_cache_missing_file(self): source_file_different = self._matching_file('models', 'model_2.sql') source_file_different.nodes.append('model.root.model_2') - old_results = self._new_results() - old_results.files[source_file_different.path.search_key] = source_file_different - old_results.nodes = {'model.root.model_2': mock.MagicMock()} + old_manifest = self._new_manifest() + old_manifest.files[source_file_different.path.search_key] = source_file_different + old_manifest.nodes = {'model.root.model_2': mock.MagicMock()} - self.loader.parse_with_cache(source_file.path, self.parser, old_results) + self.loader.old_manifest = old_manifest + self.loader.parse_with_cache(source_file.path, self.parser) # the filename wasn't in the cache, so parse_file should get called # with a FileBlock that has the given source file in it. self.parser.parse_file.assert_called_once_with(FileBlock(file=source_file)) diff --git a/test/unit/test_parser.py b/test/unit/test_parser.py index 90b9d012a9a..d59b2852329 100644 --- a/test/unit/test_parser.py +++ b/test/unit/test_parser.py @@ -1,3 +1,4 @@ +import ipdb import unittest from unittest import mock @@ -8,7 +9,7 @@ import dbt.parser from dbt.exceptions import CompilationException from dbt.parser import ( - ModelParser, MacroParser, DataTestParser, SchemaParser, ParseResult, + ModelParser, MacroParser, DataTestParser, SchemaParser, SnapshotParser, AnalysisParser ) from dbt.parser.schemas import ( @@ -20,7 +21,7 @@ from dbt.node_types import NodeType from dbt.contracts.files import SourceFile, FileHash, FilePath -from dbt.contracts.graph.manifest import Manifest, MacroManifest +from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.model_config import ( NodeConfig, TestConfig, SnapshotConfig ) @@ -126,8 +127,9 @@ def setUp(self): self.parser_patcher = mock.patch('dbt.parser.base.get_adapter') self.factory_parser = self.parser_patcher.start() - self.macro_manifest = MacroManifest( - {m.unique_id: m for m in generate_name_macros('root')}, {} + self.manifest = Manifest( + macros={m.unique_id: m for m in generate_name_macros('root')}, + nodes={}, sources={}, docs={}, exposures={}, selectors={}, disabled={}, files={}, ) def tearDown(self): @@ -149,15 +151,15 @@ def file_block_for(self, data: str, filename: str, searched: str): source_file.contents = data return FileBlock(file=source_file) - def assert_has_results_length(self, results, files=1, macros=0, nodes=0, + def assert_has_manifest_lengths(self, manifest, files=1, macros=3, nodes=0, sources=0, docs=0, patches=0, disabled=0): - self.assertEqual(len(results.files), files) - self.assertEqual(len(results.macros), macros) - self.assertEqual(len(results.nodes), nodes) - self.assertEqual(len(results.sources), sources) - self.assertEqual(len(results.docs), docs) - self.assertEqual(len(results.patches), patches) - self.assertEqual(sum(len(v) for v in results.disabled.values()), disabled) + self.assertEqual(len(manifest.files), files) + self.assertEqual(len(manifest.macros), macros) + self.assertEqual(len(manifest.nodes), nodes) + self.assertEqual(len(manifest.sources), sources) + self.assertEqual(len(manifest.docs), docs) + self.assertEqual(sum(len(v) for v in manifest.disabled.values()), disabled) + SINGLE_TABLE_SOURCE = ''' @@ -222,10 +224,9 @@ class SchemaParserTest(BaseParserTest): def setUp(self): super().setUp() self.parser = SchemaParser( - results=ParseResult.rpc(), project=self.snowplow_project_config, + manifest=self.manifest, root_project=self.root_project_config, - macro_manifest=self.macro_manifest, ) def file_block_for(self, data, filename): @@ -250,20 +251,20 @@ def test__read_basic_source(self): self.assertEqual(len(model_blocks), 0) self.assertEqual(len(source_blocks), 0) self.assertEqual(len(macro_blocks), 0) - self.assertEqual(len(list(self.parser.results.patches)), 0) - self.assertEqual(len(list(self.parser.results.nodes)), 0) - results = list(self.parser.results.sources.values()) - self.assertEqual(len(results), 1) - self.assertEqual(results[0].source.name, 'my_source') - self.assertEqual(results[0].table.name, 'my_table') - self.assertEqual(results[0].table.description, '') - self.assertEqual(len(results[0].table.columns), 0) + self.assertEqual(len(list(self.parser.manifest.patches)), 0) + self.assertEqual(len(list(self.parser.manifest.nodes)), 0) + source_values = list(self.parser.manifest.sources.values()) + self.assertEqual(len(source_values), 1) + self.assertEqual(source_values[0].source.name, 'my_source') + self.assertEqual(source_values[0].table.name, 'my_table') + self.assertEqual(source_values[0].table.description, '') + self.assertEqual(len(source_values[0].table.columns), 0) def test__parse_basic_source(self): block = self.file_block_for(SINGLE_TABLE_SOURCE, 'test_one.yml') self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, sources=1) - src = list(self.parser.results.sources.values())[0] + self.assert_has_manifest_lengths(self.parser.manifest, sources=1) + src = list(self.parser.manifest.sources.values())[0] assert isinstance(src, UnpatchedSourceDefinition) assert src.package_name == 'snowplow' assert src.source.name == 'my_source' @@ -281,23 +282,23 @@ def test__read_basic_source_tests(self): self.assertEqual(len(model_tests), 0) self.assertEqual(len(source_tests), 0) self.assertEqual(len(macro_tests), 0) - self.assertEqual(len(list(self.parser.results.nodes)), 0) - self.assertEqual(len(list(self.parser.results.patches)), 0) - self.assertEqual(len(list(self.parser.results.source_patches)), 0) - results = list(self.parser.results.sources.values()) - self.assertEqual(len(results), 1) - self.assertEqual(results[0].source.name, 'my_source') - self.assertEqual(results[0].table.name, 'my_table') - self.assertEqual(results[0].table.description, 'A description of my table') - self.assertEqual(len(results[0].table.columns), 1) + self.assertEqual(len(list(self.parser.manifest.nodes)), 0) + self.assertEqual(len(list(self.parser.manifest.patches)), 0) + self.assertEqual(len(list(self.parser.manifest.source_patches)), 0) + source_values = list(self.parser.manifest.sources.values()) + self.assertEqual(len(source_values), 1) + self.assertEqual(source_values[0].source.name, 'my_source') + self.assertEqual(source_values[0].table.name, 'my_table') + self.assertEqual(source_values[0].table.description, 'A description of my table') + self.assertEqual(len(source_values[0].table.columns), 1) def test__parse_basic_source_tests(self): block = self.file_block_for(SINGLE_TABLE_SOURCE_TESTS, 'test_one.yml') self.parser.parse_file(block) - self.assertEqual(len(self.parser.results.nodes), 0) - self.assertEqual(len(self.parser.results.sources), 1) - self.assertEqual(len(self.parser.results.patches), 0) - src = list(self.parser.results.sources.values())[0] + self.assertEqual(len(self.parser.manifest.nodes), 0) + self.assertEqual(len(self.parser.manifest.sources), 1) + self.assertEqual(len(self.parser.manifest.patches), 0) + src = list(self.parser.manifest.sources.values())[0] self.assertEqual(src.source.name, 'my_source') self.assertEqual(src.source.schema, None) self.assertEqual(src.table.name, 'my_table') @@ -321,12 +322,12 @@ def test__parse_basic_source_tests(self): self.assertEqual(tests[1].fqn, ['snowplow', 'schema_test', tests[1].name]) path = get_abs_os_path('./dbt_modules/snowplow/models/test_one.yml') - self.assertIn(path, self.parser.results.files) - self.assertEqual(self.parser.results.files[path].nodes, []) - self.assertIn(path, self.parser.results.files) - self.assertEqual(self.parser.results.files[path].sources, + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(self.parser.manifest.files[path].nodes, []) + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(self.parser.manifest.files[path].sources, ['source.snowplow.my_source.my_table']) - self.assertEqual(self.parser.results.files[path].source_patches, []) + self.assertEqual(self.parser.manifest.files[path].source_patches, []) def test__read_source_patch(self): block = self.yaml_block_for(SINGLE_TABLE_SOURCE_PATCH, 'test_one.yml') @@ -338,16 +339,16 @@ def test__read_source_patch(self): self.assertEqual(len(model_tests), 0) self.assertEqual(len(source_tests), 0) self.assertEqual(len(macro_tests), 0) - self.assertEqual(len(list(self.parser.results.nodes)), 0) - self.assertEqual(len(list(self.parser.results.patches)), 0) - self.assertEqual(len(list(self.parser.results.sources)), 0) - results = list(self.parser.results.source_patches.values()) - self.assertEqual(len(results), 1) - self.assertEqual(results[0].name, 'my_source') - self.assertEqual(results[0].overrides, 'snowplow') - self.assertIsNone(results[0].description) - self.assertEqual(len(results[0].tables), 1) - table = results[0].tables[0] + self.assertEqual(len(list(self.parser.manifest.nodes)), 0) + self.assertEqual(len(list(self.parser.manifest.patches)), 0) + self.assertEqual(len(list(self.parser.manifest.sources)), 0) + source_patches = list(self.parser.manifest.source_patches.values()) + self.assertEqual(len(source_patches), 1) + self.assertEqual(source_patches[0].name, 'my_source') + self.assertEqual(source_patches[0].overrides, 'snowplow') + self.assertIsNone(source_patches[0].description) + self.assertEqual(len(source_patches[0].tables), 1) + table = source_patches[0].tables[0] self.assertEqual(table.name, 'my_table') self.assertIsNone(table.description) self.assertEqual(len(table.columns), 1) @@ -358,16 +359,17 @@ class SchemaParserModelsTest(SchemaParserTest): def test__read_basic_model_tests(self): block = self.yaml_block_for(SINGLE_TABLE_MODEL_TESTS, 'test_one.yml') self.parser.parse_file(block) - self.assertEqual(len(list(self.parser.results.patches)), 1) - self.assertEqual(len(list(self.parser.results.sources)), 0) - self.assertEqual(len(list(self.parser.results.nodes)), 3) + self.assertEqual(len(list(self.parser.manifest.patches)), 1) + self.assertEqual(len(list(self.parser.manifest.sources)), 0) + self.assertEqual(len(list(self.parser.manifest.nodes)), 3) def test__parse_basic_model_tests(self): block = self.file_block_for(SINGLE_TABLE_MODEL_TESTS, 'test_one.yml') self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, patches=1, nodes=3) + self.assert_has_manifest_lengths(self.parser.manifest, nodes=3) + self.assertEqual(len(self.parser.manifest.patches), 1) - patch = list(self.parser.results.patches.values())[0] + patch = list(self.parser.manifest.patches.values())[0] self.assertEqual(len(patch.columns), 1) self.assertEqual(patch.name, 'my_model') self.assertEqual(patch.description, 'A description of my model') @@ -383,7 +385,7 @@ def test__parse_basic_model_tests(self): ) self.assertEqual(patch, expected_patch) - tests = sorted(self.parser.results.nodes.values(), key=lambda n: n.unique_id) + tests = sorted(self.parser.manifest.nodes.values(), key=lambda n: n.unique_id) self.assertEqual(tests[0].config.severity, 'ERROR') self.assertEqual(tests[0].tags, ['schema']) self.assertEqual(tests[0].refs, [['my_model']]) @@ -444,21 +446,20 @@ def test__parse_basic_model_tests(self): ) path = get_abs_os_path('./dbt_modules/snowplow/models/test_one.yml') - self.assertIn(path, self.parser.results.files) - self.assertEqual(sorted(self.parser.results.files[path].nodes), + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(sorted(self.parser.manifest.files[path].nodes), [t.unique_id for t in tests]) - self.assertIn(path, self.parser.results.files) - self.assertEqual(self.parser.results.files[path].patches, ['my_model']) + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(self.parser.manifest.files[path].patches, ['my_model']) class ModelParserTest(BaseParserTest): def setUp(self): super().setUp() self.parser = ModelParser( - results=ParseResult.rpc(), project=self.snowplow_project_config, + manifest=self.manifest, root_project=self.root_project_config, - macro_manifest=self.macro_manifest, ) def file_block_for(self, data, filename): @@ -468,8 +469,8 @@ def test_basic(self): raw_sql = '{{ config(materialized="table") }}select 1 as id' block = self.file_block_for(raw_sql, 'nested/model_1.sql') self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, nodes=1) - node = list(self.parser.results.nodes.values())[0] + self.assert_has_manifest_lengths(self.parser.manifest, nodes=1) + node = list(self.parser.manifest.nodes.values())[0] expected = ParsedModelNode( alias='model_1', name='model_1', @@ -489,24 +490,23 @@ def test_basic(self): ) self.assertEqual(node, expected) path = get_abs_os_path('./dbt_modules/snowplow/models/nested/model_1.sql') - self.assertIn(path, self.parser.results.files) - self.assertEqual(self.parser.results.files[path].nodes, ['model.snowplow.model_1']) + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(self.parser.manifest.files[path].nodes, ['model.snowplow.model_1']) def test_parse_error(self): block = self.file_block_for('{{ SYNTAX ERROR }}', 'nested/model_1.sql') with self.assertRaises(CompilationException): self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, files=0) + self.assert_has_manifest_lengths(self.parser.manifest, files=0) class SnapshotParserTest(BaseParserTest): def setUp(self): super().setUp() self.parser = SnapshotParser( - results=ParseResult.rpc(), project=self.snowplow_project_config, + manifest=self.manifest, root_project=self.root_project_config, - macro_manifest=self.macro_manifest, ) def file_block_for(self, data, filename): @@ -516,7 +516,7 @@ def test_parse_error(self): block = self.file_block_for('{% snapshot foo %}select 1 as id{%snapshot bar %}{% endsnapshot %}', 'nested/snap_1.sql') with self.assertRaises(CompilationException): self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, files=0) + self.assert_has_manifest_lengths(self.parser.manifest, files=0) def test_single_block(self): raw_sql = '''{{ @@ -530,8 +530,8 @@ def test_single_block(self): '''.format(raw_sql) block = self.file_block_for(full_file, 'nested/snap_1.sql') self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, nodes=1) - node = list(self.parser.results.nodes.values())[0] + self.assert_has_manifest_lengths(self.parser.manifest, nodes=1) + node = list(self.parser.manifest.nodes.values())[0] expected = ParsedSnapshotNode( alias='foo', name='foo', @@ -565,8 +565,8 @@ def test_single_block(self): ) self.assertEqual(node, expected) path = get_abs_os_path('./dbt_modules/snowplow/snapshots/nested/snap_1.sql') - self.assertIn(path, self.parser.results.files) - self.assertEqual(self.parser.results.files[path].nodes, ['snapshot.snowplow.foo']) + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(self.parser.manifest.files[path].nodes, ['snapshot.snowplow.foo']) def test_multi_block(self): raw_1 = ''' @@ -591,8 +591,8 @@ def test_multi_block(self): '''.format(raw_1, raw_2) block = self.file_block_for(full_file, 'nested/snap_1.sql') self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, nodes=2) - nodes = sorted(self.parser.results.nodes.values(), key=lambda n: n.name) + self.assert_has_manifest_lengths(self.parser.manifest, nodes=2) + nodes = sorted(self.parser.manifest.nodes.values(), key=lambda n: n.name) expect_foo = ParsedSnapshotNode( alias='foo', name='foo', @@ -656,8 +656,8 @@ def test_multi_block(self): self.assertEqual(nodes[0], expect_bar) self.assertEqual(nodes[1], expect_foo) path = get_abs_os_path('./dbt_modules/snowplow/snapshots/nested/snap_1.sql') - self.assertIn(path, self.parser.results.files) - self.assertEqual(sorted(self.parser.results.files[path].nodes), + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(sorted(self.parser.manifest.files[path].nodes), ['snapshot.snowplow.bar', 'snapshot.snowplow.foo']) @@ -665,8 +665,8 @@ class MacroParserTest(BaseParserTest): def setUp(self): super().setUp() self.parser = MacroParser( - results=ParseResult.rpc(), project=self.snowplow_project_config, + manifest=Manifest({},{},{},{},{},{},{},{}) ) def file_block_for(self, data, filename): @@ -676,8 +676,8 @@ def test_single_block(self): raw_sql = '{% macro foo(a, b) %}a ~ b{% endmacro %}' block = self.file_block_for(raw_sql, 'macro.sql') self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, macros=1) - macro = list(self.parser.results.macros.values())[0] + self.assertEqual(len(self.parser.manifest.macros), 1) + macro = list(self.parser.manifest.macros.values())[0] expected = ParsedMacro( name='foo', resource_type=NodeType.Macro, @@ -690,15 +690,15 @@ def test_single_block(self): ) self.assertEqual(macro, expected) path = get_abs_os_path('./dbt_modules/snowplow/macros/macro.sql') - self.assertIn(path, self.parser.results.files) - self.assertEqual(self.parser.results.files[path].macros, ['macro.snowplow.foo']) + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(self.parser.manifest.files[path].macros, ['macro.snowplow.foo']) def test_multiple_blocks(self): raw_sql = '{% macro foo(a, b) %}a ~ b{% endmacro %}\n{% macro bar(c, d) %}c + d{% endmacro %}' block = self.file_block_for(raw_sql, 'macro.sql') self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, macros=2) - macros = sorted(self.parser.results.macros.values(), key=lambda m: m.name) + self.assertEqual(len(self.parser.manifest.macros), 2) + macros = sorted(self.parser.manifest.macros.values(), key=lambda m: m.name) expected_bar = ParsedMacro( name='bar', resource_type=NodeType.Macro, @@ -721,9 +721,9 @@ def test_multiple_blocks(self): ) self.assertEqual(macros, [expected_bar, expected_foo]) path = get_abs_os_path('./dbt_modules/snowplow/macros/macro.sql') - self.assertIn(path, self.parser.results.files) + self.assertIn(path, self.parser.manifest.files) self.assertEqual( - sorted(self.parser.results.files[path].macros), + sorted(self.parser.manifest.files[path].macros), ['macro.snowplow.bar', 'macro.snowplow.foo'], ) @@ -732,10 +732,9 @@ class DataTestParserTest(BaseParserTest): def setUp(self): super().setUp() self.parser = DataTestParser( - results=ParseResult.rpc(), project=self.snowplow_project_config, + manifest=self.manifest, root_project=self.root_project_config, - macro_manifest=self.macro_manifest, ) def file_block_for(self, data, filename): @@ -745,8 +744,8 @@ def test_basic(self): raw_sql = 'select * from {{ ref("blah") }} limit 0' block = self.file_block_for(raw_sql, 'test_1.sql') self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, nodes=1) - node = list(self.parser.results.nodes.values())[0] + self.assert_has_manifest_lengths(self.parser.manifest, nodes=1) + node = list(self.parser.manifest.nodes.values())[0] expected = ParsedDataTestNode( alias='test_1', name='test_1', @@ -768,18 +767,17 @@ def test_basic(self): ) self.assertEqual(node, expected) path = get_abs_os_path('./dbt_modules/snowplow/tests/test_1.sql') - self.assertIn(path, self.parser.results.files) - self.assertEqual(self.parser.results.files[path].nodes, ['test.snowplow.test_1']) + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(self.parser.manifest.files[path].nodes, ['test.snowplow.test_1']) class AnalysisParserTest(BaseParserTest): def setUp(self): super().setUp() self.parser = AnalysisParser( - results=ParseResult.rpc(), project=self.snowplow_project_config, + manifest=self.manifest, root_project=self.root_project_config, - macro_manifest=self.macro_manifest, ) def file_block_for(self, data, filename): @@ -789,8 +787,8 @@ def test_basic(self): raw_sql = 'select 1 as id' block = self.file_block_for(raw_sql, 'nested/analysis_1.sql') self.parser.parse_file(block) - self.assert_has_results_length(self.parser.results, nodes=1) - node = list(self.parser.results.nodes.values())[0] + self.assert_has_manifest_lengths(self.parser.manifest, nodes=1) + node = list(self.parser.manifest.nodes.values())[0] expected = ParsedAnalysisNode( alias='analysis_1', name='analysis_1', @@ -811,8 +809,8 @@ def test_basic(self): ) self.assertEqual(node, expected) path = get_abs_os_path('./dbt_modules/snowplow/analyses/nested/analysis_1.sql') - self.assertIn(path, self.parser.results.files) - self.assertEqual(self.parser.results.files[path].nodes, ['analysis.snowplow.analysis_1']) + self.assertIn(path, self.parser.manifest.files) + self.assertEqual(self.parser.manifest.files[path].nodes, ['analysis.snowplow.analysis_1']) class ProcessingTest(BaseParserTest): diff --git a/test/unit/test_postgres_adapter.py b/test/unit/test_postgres_adapter.py index 08363c1fdaf..85e9ce6475e 100644 --- a/test/unit/test_postgres_adapter.py +++ b/test/unit/test_postgres_adapter.py @@ -9,10 +9,11 @@ from dbt.adapters.base.query_headers import MacroQueryStringSetter from dbt.adapters.postgres import PostgresAdapter from dbt.adapters.postgres import Plugin as PostgresPlugin +from dbt.contracts.files import FileHash +from dbt.contracts.graph.manifest import ManifestStateCheck from dbt.clients import agate_helper from dbt.exceptions import ValidationException, DbtConfigError from dbt.logger import GLOBAL_LOGGER as logger # noqa -from dbt.parser.results import ParseResult from psycopg2 import extensions as psycopg2_extensions from psycopg2 import DatabaseError @@ -314,9 +315,19 @@ def setUp(self): self.patcher = mock.patch('dbt.adapters.postgres.connections.psycopg2') self.psycopg2 = self.patcher.start() - self.load_patch = mock.patch('dbt.parser.manifest.make_parse_result') - self.mock_parse_result = self.load_patch.start() - self.mock_parse_result.return_value = ParseResult.rpc() + # Create the Manifest.state_check patcher + @mock.patch('dbt.parser.manifest.ManifestLoader.build_manifest_state_check') + def _mock_state_check(self): + config = self.root_project + all_projects = self.all_projects + return ManifestStateCheck( + vars_hash=FileHash.from_contents('vars'), + project_hashes={name: FileHash.from_contents(name) for name in all_projects}, + profile_hash=FileHash.from_contents('profile'), + ) + self.load_state_check = mock.patch('dbt.parser.manifest.ManifestLoader.build_manifest_state_check') + self.mock_state_check = self.load_state_check.start() + self.mock_state_check.side_effect = _mock_state_check self.psycopg2.connect.return_value = self.handle self.adapter = PostgresAdapter(self.config) @@ -334,7 +345,7 @@ def tearDown(self): self.adapter.cleanup_connections() self.qh_patch.stop() self.patcher.stop() - self.load_patch.stop() + self.load_state_check.stop() clear_plugin(PostgresPlugin) def test_quoting_on_drop_schema(self): diff --git a/test/unit/test_snowflake_adapter.py b/test/unit/test_snowflake_adapter.py index 5403f209dd5..fafeae21efb 100644 --- a/test/unit/test_snowflake_adapter.py +++ b/test/unit/test_snowflake_adapter.py @@ -10,9 +10,10 @@ from dbt.adapters.snowflake import Plugin as SnowflakePlugin from dbt.adapters.snowflake.column import SnowflakeColumn from dbt.adapters.base.query_headers import MacroQueryStringSetter +from dbt.contracts.files import FileHash +from dbt.contracts.graph.manifest import ManifestStateCheck from dbt.clients import agate_helper from dbt.logger import GLOBAL_LOGGER as logger # noqa -from dbt.parser.results import ParseResult from snowflake import connector as snowflake_connector from .utils import config_from_parts_or_dicts, inject_adapter, mock_connection, TestAdapterConversions, load_internal_manifest_macros @@ -61,9 +62,19 @@ def setUp(self): ) self.snowflake = self.patcher.start() - self.load_patch = mock.patch('dbt.parser.manifest.make_parse_result') - self.mock_parse_result = self.load_patch.start() - self.mock_parse_result.return_value = ParseResult.rpc() + # Create the Manifest.state_check patcher + @mock.patch('dbt.parser.manifest.ManifestLoader.build_manifest_state_check') + def _mock_state_check(self): + config = self.root_project + all_projects = self.all_projects + return ManifestStateCheck( + vars_hash=FileHash.from_contents('vars'), + project_hashes={name: FileHash.from_contents(name) for name in all_projects}, + profile_hash=FileHash.from_contents('profile'), + ) + self.load_state_check = mock.patch('dbt.parser.manifest.ManifestLoader.build_manifest_state_check') + self.mock_state_check = self.load_state_check.start() + self.mock_state_check.side_effect = _mock_state_check self.snowflake.return_value = self.handle self.adapter = SnowflakeAdapter(self.config) @@ -82,7 +93,7 @@ def tearDown(self): self.adapter.cleanup_connections() self.qh_patch.stop() self.patcher.stop() - self.load_patch.stop() + self.load_state_check.stop() def test_quoting_on_drop_schema(self): relation = SnowflakeAdapter.Relation.create( diff --git a/test/unit/utils.py b/test/unit/utils.py index fe26e472042..118768d68af 100644 --- a/test/unit/utils.py +++ b/test/unit/utils.py @@ -343,12 +343,8 @@ def MockDocumentation(package, name, **kwargs): def load_internal_manifest_macros(config, macro_hook = lambda m: None): - from dbt.adapters.factory import get_include_paths from dbt.parser.manifest import ManifestLoader - paths = get_include_paths(config.credentials.type) - projects = {k: v for k, v in config.load_dependencies().items() if k.startswith('dbt')} - loader = ManifestLoader(config, projects, macro_hook) - return loader.load_only_macros() + return ManifestLoader.load_macros(config, macro_hook)