Skip to content

Commit

Permalink
[#3996] Fix or avoid several partial parsing bugs (#4020) (#4067)
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank authored Oct 15, 2021
1 parent a1c8374 commit 162f3a1
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 32 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

### Fixes
- Performance: Use child_map to find tests for nodes in resolve_graph ([#4012](https://github.com/dbt-labs/dbt/issues/4012), [#4057](https://github.com/dbt-labs/dbt/pull/4057))
- Fix multiple partial parsing errors ([#3996](https://github.com/dbt-labs/dbt/issues/3006), [#4020](https://github.com/dbt-labs/dbt/pull/4018))


## dbt 0.21.0 (October 04, 2021)


## dbt 0.21.0rc2 (September 27, 2021)


Expand Down
62 changes: 61 additions & 1 deletion core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,43 @@ def perform_lookup(
return manifest.nodes[unique_id]


class DisabledLookup(dbtClassMixin):
# model, seed, snapshot
_lookup_types: ClassVar[set] = set(NodeType.refable())

def __init__(self, manifest: 'Manifest'):
self.storage: Dict[str, Dict[PackageName, List[Any]]] = {}
self.populate(manifest)

def populate(self, manifest):
for node in manifest.disabled:
self.add_node(node)
for node in list(chain.from_iterable(manifest._disabled.values())):
self.add_node(node)

def add_node(self, node: ManifestNode):
if node.resource_type in self._lookup_types:
if node.name not in self.storage:
self.storage[node.name] = {}
if node.package_name not in self.storage[node.name]:
self.storage[node.name][node.package_name] = []
self.storage[node.name][node.package_name].append(node)

# This should return a list of disabled nodes
def find(self, key, package: PackageName):
if key not in self.storage:
return None

pkg_dct: Mapping[PackageName, List[ManifestNode]] = self.storage[key]

if not pkg_dct:
return None
elif package in pkg_dct:
return pkg_dct[package]
else:
return None


class AnalysisLookup(RefableLookup):
_lookup_types: ClassVar[set] = set([NodeType.Analysis])

Expand Down Expand Up @@ -568,7 +605,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
state_check: ManifestStateCheck = field(default_factory=ManifestStateCheck)
# Moved from the ParseResult object
source_patches: MutableMapping[SourceKey, SourcePatch] = field(default_factory=dict)
# following is from ParseResult
# following contains new disabled nodes until parsing is finished. This changes in 1.0.0
_disabled: MutableMapping[str, List[CompileResultNode]] = field(default_factory=dict)
_doc_lookup: Optional[DocLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
Expand All @@ -579,6 +616,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
_ref_lookup: Optional[RefableLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
_disabled_lookup: Optional[DisabledLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
_analysis_lookup: Optional[AnalysisLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
Expand Down Expand Up @@ -652,6 +692,15 @@ def build_flat_graph(self):
}
}

def build_disabled_by_file_id(self):
disabled_by_file_id = {}
for node in self.disabled:
disabled_by_file_id[node.file_id] = node
for node_list in self._disabled.values():
for node in node_list:
disabled_by_file_id[node.file_id] = node
return disabled_by_file_id

def find_disabled_by_name(
self, name: str, package: Optional[str] = None
) -> Optional[ManifestNode]:
Expand Down Expand Up @@ -822,6 +871,15 @@ def ref_lookup(self) -> RefableLookup:
def rebuild_ref_lookup(self):
self._ref_lookup = RefableLookup(self)

@property
def disabled_lookup(self) -> DisabledLookup:
if self._disabled_lookup is None:
self._disabled_lookup = DisabledLookup(self)
return self._disabled_lookup

def rebuild_disabled_lookup(self):
self._disabled_lookup = DisabledLookup(self)

@property
def analysis_lookup(self) -> AnalysisLookup:
if self._analysis_lookup is None:
Expand Down Expand Up @@ -1054,6 +1112,8 @@ def __reduce_ex__(self, protocol):
self._doc_lookup,
self._source_lookup,
self._ref_lookup,
self._disabled_lookup,
self._analysis_lookup
)
return self.__class__, args

Expand Down
18 changes: 12 additions & 6 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,17 @@ def load(self):
# get file info for local logs
parse_file_type = None
file_id = partial_parsing.processing_file
if file_id and file_id in self.manifest.files:
old_file = self.manifest.files[file_id]
parse_file_type = old_file.parse_file_type
logger.debug(f"Partial parsing exception processing file {file_id}")
file_dict = old_file.to_dict()
logger.debug(f"PP file: {file_dict}")
if file_id:
source_file = None
if file_id in self.saved_manifest.files:
source_file = self.saved_manifest.files[file_id]
elif file_id in self.manifest.files:
source_file = self.manifest.files[file_id]
if source_file:
parse_file_type = source_file.parse_file_type
logger.debug(f"Partial parsing exception processing file {file_id}")
file_dict = source_file.to_dict()
logger.debug(f"PP file: {file_dict}")
exc_info['parse_file_type'] = parse_file_type
logger.debug(f"PP exception info: {exc_info}")

Expand Down Expand Up @@ -310,6 +315,7 @@ def load(self):
# aren't in place yet
self.manifest.rebuild_ref_lookup()
self.manifest.rebuild_doc_lookup()
self.manifest.rebuild_disabled_lookup()

# Load yaml files
parser_types = [SchemaParser]
Expand Down
54 changes: 40 additions & 14 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, saved_manifest: Manifest, new_files: MutableMapping[str, AnyS
self.macro_child_map: Dict[str, List[str]] = {}
self.build_file_diff()
self.processing_file = None
self.disabled_by_file_id = self.saved_manifest.build_disabled_by_file_id()

def skip_parsing(self):
return (
Expand Down Expand Up @@ -233,24 +234,38 @@ def update_mssat_in_saved(self, new_source_file, old_source_file):
return

# These files only have one node.
unique_id = old_source_file.nodes[0]
unique_id = None
if old_source_file.nodes:
unique_id = old_source_file.nodes[0]
else:
# It's not clear when this would actually happen.
# Logging in case there are other associated errors.
logger.debug(f"Partial parsing: node not found for source_file {old_source_file}")

# replace source_file in saved and add to parsing list
file_id = new_source_file.file_id
self.deleted_manifest.files[file_id] = old_source_file
self.saved_files[file_id] = new_source_file
self.add_to_pp_files(new_source_file)
self.remove_node_in_saved(new_source_file, unique_id)
if unique_id:
self.remove_node_in_saved(new_source_file, unique_id)

def remove_node_in_saved(self, source_file, unique_id):
# Has already been deleted by another action
if unique_id not in self.saved_manifest.nodes:
if unique_id in self.saved_manifest.nodes:
# delete node in saved
node = self.saved_manifest.nodes.pop(unique_id)
self.deleted_manifest.nodes[unique_id] = node
elif source_file.file_id in self.disabled_by_file_id:
for dis_index, dis_node in self.saved_manifest.disabled:
if dis_node.file_id == source_file.file_id:
node = dis_node
break
if dis_node:
del self.saved_manifest.disabled[dis_index]
else:
# Has already been deleted by another action
return

# delete node in saved
node = self.saved_manifest.nodes.pop(unique_id)
self.deleted_manifest.nodes[unique_id] = node

# look at patch_path in model node to see if we need
# to reapply a patch from a schema_file.
if node.patch_path:
Expand All @@ -261,15 +276,22 @@ def remove_node_in_saved(self, source_file, unique_id):
schema_file = self.saved_files[file_id]
dict_key = parse_file_type_to_key[source_file.parse_file_type]
# look for a matching list dictionary
for elem in schema_file.dict_from_yaml[dict_key]:
if elem['name'] == node.name:
elem_patch = elem
break
elem_patch = None
if dict_key in schema_file.dict_from_yaml:
for elem in schema_file.dict_from_yaml[dict_key]:
if elem['name'] == node.name:
elem_patch = elem
break
if elem_patch:
self.delete_schema_mssa_links(schema_file, dict_key, elem_patch)
self.merge_patch(schema_file, dict_key, elem_patch)
if unique_id in schema_file.node_patches:
schema_file.node_patches.remove(unique_id)
if unique_id in self.saved_manifest.disabled:
# We have a patch_path in disabled nodes with a patch so
# that we can connect the patch to the node
for node in self.saved_manifest.disabled[unique_id]:
node.patch_path = None

def update_macro_in_saved(self, new_source_file, old_source_file):
if self.already_scheduled_for_parsing(old_source_file):
Expand All @@ -290,7 +312,8 @@ def remove_mssat_file(self, source_file):
# nodes [unique_ids] -- SQL files
# There should always be a node for a SQL file
if not source_file.nodes:
raise Exception(f"No nodes found for source file {source_file.file_id}")
logger.debug(f"No nodes found for source file {source_file.file_id}")
return
# There is generally only 1 node for SQL files, except for macros
for unique_id in source_file.nodes:
self.remove_node_in_saved(source_file, unique_id)
Expand All @@ -299,7 +322,10 @@ def remove_mssat_file(self, source_file):
# We need to re-parse nodes that reference another removed node
def schedule_referencing_nodes_for_parsing(self, unique_id):
# Look at "children", i.e. nodes that reference this node
self.schedule_nodes_for_parsing(self.saved_manifest.child_map[unique_id])
if unique_id in self.saved_manifest.child_map:
self.schedule_nodes_for_parsing(self.saved_manifest.child_map[unique_id])
else:
logger.debug(f"Partial parsing: {unique_id} not found in child_map")

def schedule_nodes_for_parsing(self, unique_ids):
for unique_id in unique_ids:
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,10 @@ def parse_patch(
)
if unique_id is None:
# This will usually happen when a node is disabled
disabled_nodes = self.manifest.disabled_lookup.find(patch.name, patch.package_name)
if disabled_nodes:
for node in disabled_nodes:
node.patch_path = source_file.file_id
return

# patches can't be overwritten
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
import json
from abc import abstractmethod
from concurrent.futures import as_completed
from datetime import datetime
Expand All @@ -14,6 +15,7 @@
)

from dbt import ui
from dbt.clients.system import write_file
from dbt.task.base import ConfiguredTask
from dbt.adapters.base import BaseRelation
from dbt.adapters.factory import get_adapter
Expand Down Expand Up @@ -69,6 +71,9 @@ def write_manifest(self):
if flags.WRITE_JSON:
path = os.path.join(self.config.target_path, MANIFEST_FILE_NAME)
self.manifest.write(path)
if os.getenv('DBT_WRITE_FILES'):
path = os.path.join(self.config.target_path, 'files.json')
write_file(path, json.dumps(self.manifest.files, cls=dbt.utils.JSONEncoder, indent=4))

def load_manifest(self):
self.manifest = ManifestLoader.get_full_manifest(self.config)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{ config(materialized='table', enabled=False) }}

with source_data as (

select 1 as id
union all
select null as id

)

select *
from source_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
- Disabled model
{{ config(materialized='table', enabled=False) }}

with source_data as (

select 1 as id
union all
select null as id

)

select *
from source_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{{ config(materialized='table') }}

with source_data as (

{#- This is model three #}

select 1 as id
union all
select null as id

)

select *
from source_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% snapshot orders_snapshot %}

{{
config(
target_schema=schema,
strategy='check',
unique_key='id',
check_cols=['status'],
)
}}

select * from {{ ref('orders') }}

{% endsnapshot %}

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% snapshot orders_snapshot %}

{{
config(
target_schema=schema,
strategy='check',
unique_key='id',
check_cols=['status'],
)
}}
select * from {{ ref('orders') }}

{% endsnapshot %}

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1 as id, 101 as user_id, 'pending' as status
Loading

0 comments on commit 162f3a1

Please sign in to comment.