Skip to content

Commit

Permalink
Factor Out Repeated Logic in the PartialParsing Class (#7952)
Browse files Browse the repository at this point in the history
* CT-2711: Add remove_tests() call to delete_schema_source() so that call sites are more uniform with other node deletion call sites. This will enable further code factorization.

* CT-2711: Factor repeated code section (mostly) out of PartialParsing.handle_schema_file_changes()

* CT-2711: Factor a repeated code section out of schedule_nodes_for_parsing()
  • Loading branch information
peterallenwebb authored Jun 26, 2023
1 parent f9d4e9e commit b37e5b5
Showing 1 changed file with 51 additions and 120 deletions.
171 changes: 51 additions & 120 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os
from copy import deepcopy
from typing import MutableMapping, Dict, List
from typing import MutableMapping, Dict, List, Callable
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.files import (
AnySourceFile,
ParseFileType,
parse_file_type_to_parser,
SchemaSourceFile,
)
from dbt.events.functions import fire_event
from dbt.events.base_types import EventLevel
Expand Down Expand Up @@ -403,41 +404,19 @@ def schedule_nodes_for_parsing(self, unique_ids):
self.add_to_pp_files(self.saved_files[file_id])
elif unique_id in self.saved_manifest.sources:
source = self.saved_manifest.sources[unique_id]
file_id = source.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
sources = []
if "sources" in schema_file.dict_from_yaml:
sources = schema_file.dict_from_yaml["sources"]
source_element = self.get_schema_element(sources, source.source_name)
if source_element:
self.delete_schema_source(schema_file, source_element)
self.remove_tests(schema_file, "sources", source_element["name"])
self.merge_patch(schema_file, "sources", source_element)
self._schedule_for_parsing(
"sources", source, source.source_name, self.delete_schema_source
)
elif unique_id in self.saved_manifest.exposures:
exposure = self.saved_manifest.exposures[unique_id]
file_id = exposure.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
exposures = []
if "exposures" in schema_file.dict_from_yaml:
exposures = schema_file.dict_from_yaml["exposures"]
exposure_element = self.get_schema_element(exposures, exposure.name)
if exposure_element:
self.delete_schema_exposure(schema_file, exposure_element)
self.merge_patch(schema_file, "exposures", exposure_element)
self._schedule_for_parsing(
"exposures", exposure, exposure.name, self.delete_schema_exposure
)
elif unique_id in self.saved_manifest.metrics:
metric = self.saved_manifest.metrics[unique_id]
file_id = metric.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
metrics = []
if "metrics" in schema_file.dict_from_yaml:
metrics = schema_file.dict_from_yaml["metrics"]
metric_element = self.get_schema_element(metrics, metric.name)
if metric_element:
self.delete_schema_metric(schema_file, metric_element)
self.merge_patch(schema_file, "metrics", metric_element)
self._schedule_for_parsing(
"metrics", metric, metric.name, self.delete_schema_metric
)
elif unique_id in self.saved_manifest.macros:
macro = self.saved_manifest.macros[unique_id]
file_id = macro.file_id
Expand All @@ -447,6 +426,19 @@ def schedule_nodes_for_parsing(self, unique_ids):
self.saved_files[file_id] = deepcopy(self.new_files[file_id])
self.add_to_pp_files(self.saved_files[file_id])

def _schedule_for_parsing(self, dict_key: str, element, name, delete: Callable) -> None:
file_id = element.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
elements = []
assert isinstance(schema_file, SchemaSourceFile)
if dict_key in schema_file.dict_from_yaml:
elements = schema_file.dict_from_yaml[dict_key]
schema_element = self.get_schema_element(elements, name)
if schema_element:
delete(schema_file, schema_element)
self.merge_patch(schema_file, dict_key, schema_element)

def delete_macro_file(self, source_file, follow_references=False):
self.check_for_special_deleted_macros(source_file)
self.handle_macro_file_links(source_file, follow_references)
Expand Down Expand Up @@ -538,7 +530,6 @@ def schedule_macro_nodes_for_parsing(self, unique_ids):
# This is a source patch; need to re-parse orig source
self.remove_source_override_target(patch)
self.delete_schema_source(schema_file, patch)
self.remove_tests(schema_file, "sources", patch["name"])
self.merge_patch(schema_file, "sources", patch)
else:
file_id = node.file_id
Expand Down Expand Up @@ -639,14 +630,12 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
if "overrides" in source: # This is a source patch; need to re-parse orig source
self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source)
self.remove_tests(schema_file, dict_key, source["name"])
self.merge_patch(schema_file, dict_key, source)
if source_diff["deleted"]:
for source in source_diff["deleted"]:
if "overrides" in source: # This is a source patch; need to re-parse orig source
self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source)
self.remove_tests(schema_file, dict_key, source["name"])
if source_diff["added"]:
for source in source_diff["added"]:
if "overrides" in source: # This is a source patch; need to re-parse orig source
Expand All @@ -662,99 +651,40 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
if "overrides" in source:
self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source)
self.remove_tests(schema_file, dict_key, source["name"])
self.merge_patch(schema_file, dict_key, source)

# macros
dict_key = "macros"
macro_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
if macro_diff["changed"]:
for macro in macro_diff["changed"]:
self.delete_schema_macro_patch(schema_file, macro)
self.merge_patch(schema_file, dict_key, macro)
if macro_diff["deleted"]:
for macro in macro_diff["deleted"]:
self.delete_schema_macro_patch(schema_file, macro)
if macro_diff["added"]:
for macro in macro_diff["added"]:
self.merge_patch(schema_file, dict_key, macro)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in macro_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_macro_patch(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)

# exposures
dict_key = "exposures"
exposure_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
if exposure_diff["changed"]:
for exposure in exposure_diff["changed"]:
self.delete_schema_exposure(schema_file, exposure)
self.merge_patch(schema_file, dict_key, exposure)
if exposure_diff["deleted"]:
for exposure in exposure_diff["deleted"]:
self.delete_schema_exposure(schema_file, exposure)
if exposure_diff["added"]:
for exposure in exposure_diff["added"]:
self.merge_patch(schema_file, dict_key, exposure)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in exposure_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_exposure(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)

# metrics
dict_key = "metrics"
metric_diff = self.get_diff_for("metrics", saved_yaml_dict, new_yaml_dict)
if metric_diff["changed"]:
for metric in metric_diff["changed"]:
self.delete_schema_metric(schema_file, metric)
self.merge_patch(schema_file, dict_key, metric)
if metric_diff["deleted"]:
for metric in metric_diff["deleted"]:
self.delete_schema_metric(schema_file, metric)
if metric_diff["added"]:
for metric in metric_diff["added"]:
self.merge_patch(schema_file, dict_key, metric)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in metric_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_metric(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)
def handle_change(key: str, delete: Callable):
self._handle_element_change(
schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, key, delete
)

# groups
dict_key = "groups"
group_diff = self.get_diff_for("groups", saved_yaml_dict, new_yaml_dict)
if group_diff["changed"]:
for group in group_diff["changed"]:
self.delete_schema_group(schema_file, group)
self.merge_patch(schema_file, dict_key, group)
if group_diff["deleted"]:
for group in group_diff["deleted"]:
self.delete_schema_group(schema_file, group)
if group_diff["added"]:
for group in group_diff["added"]:
self.merge_patch(schema_file, dict_key, group)
handle_change("macros", self.delete_schema_macro_patch)
handle_change("exposures", self.delete_schema_exposure)
handle_change("metrics", self.delete_schema_metric)
handle_change("groups", self.delete_schema_group)

def _handle_element_change(
self, schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, dict_key: str, delete
):
element_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
if element_diff["changed"]:
for element in element_diff["changed"]:
delete(schema_file, element)
self.merge_patch(schema_file, dict_key, element)
if element_diff["deleted"]:
for element in element_diff["deleted"]:
delete(schema_file, element)
if element_diff["added"]:
for element in element_diff["added"]:
self.merge_patch(schema_file, dict_key, element)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in group_diff["changed_or_deleted_names"]:
if name in element_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_group(schema_file, elem)
delete(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)

# Take a "section" of the schema file yaml dictionary from saved and new schema files
Expand Down Expand Up @@ -887,6 +817,8 @@ def delete_schema_source(self, schema_file, source_dict):
schema_file.sources.remove(unique_id)
self.schedule_referencing_nodes_for_parsing(unique_id)

self.remove_tests(schema_file, "sources", source_name)

def delete_schema_macro_patch(self, schema_file, macro):
# This is just macro patches that need to be reapplied
macro_unique_id = None
Expand Down Expand Up @@ -970,7 +902,6 @@ def remove_source_override_target(self, source_dict):
(orig_file, orig_source) = self.get_source_override_file_and_dict(source_dict)
if orig_source:
self.delete_schema_source(orig_file, orig_source)
self.remove_tests(orig_file, "sources", orig_source["name"])
self.merge_patch(orig_file, "sources", orig_source)
self.add_to_pp_files(orig_file)

Expand Down

0 comments on commit b37e5b5

Please sign in to comment.