Skip to content

Commit

Permalink
Move merge_from_artifact from end of parsing back to task before_run …
Browse files Browse the repository at this point in the history
…to reduce scope of refactor
  • Loading branch information
jtcohen6 committed May 1, 2024
1 parent 5513e16 commit 8e84df8
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 27 deletions.
4 changes: 2 additions & 2 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1465,8 +1465,8 @@ def is_invalid_protected_ref(
node.package_name != target_model.package_name and restrict_package_access
)

# Called by requires.manifest after ManifestLoader.get_full_manifest
def merge_from_artifact(self, other: "Manifest") -> None:
# Called in GraphRunnableTask.before_run, RunTask.before_run, CloneTask.before_run
def merge_from_artifact(self, other: "Manifest", favor_state: bool = False) -> None:
"""Update this manifest by adding the 'defer_relation' attribute to all nodes
with a counterpart in the stateful manifest used for deferral.
Expand Down
22 changes: 0 additions & 22 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from dataclasses import field
import datetime
import os
from pathlib import Path
import traceback
from typing import (
Dict,
Expand All @@ -22,7 +21,6 @@

from dbt.context.query_header import generate_query_header_context
from dbt.contracts.graph.semantic_manifest import SemanticManifest
from dbt.contracts.state import PreviousState
from dbt_common.events.base_types import EventLevel
from dbt_common.exceptions.base import DbtValidationError
import dbt_common.utils
Expand Down Expand Up @@ -119,7 +117,6 @@
TargetNotFoundError,
AmbiguousAliasError,
InvalidAccessTypeError,
DbtRuntimeError,
scrub_secrets,
)
from dbt.parser.base import Parser
Expand Down Expand Up @@ -1903,25 +1900,6 @@ def parse_manifest(
write_perf_info=write_perf_info,
)

# If deferral is enabled, add 'defer_relation' attribute to all nodes
flags = get_flags()
if flags.defer:
defer_state_path = flags.defer_state or flags.state
if not defer_state_path:
raise DbtRuntimeError(
"Deferral is enabled and requires a stateful manifest, but none was provided"
)
defer_state = PreviousState(
state_path=defer_state_path,
target_path=Path(runtime_config.target_path),
project_root=Path(runtime_config.project_root),
)
if not defer_state.manifest:
raise DbtRuntimeError(
f'Could not find manifest in deferral state path: "{defer_state_path}"'
)
manifest.merge_from_artifact(defer_state.manifest)

# If we should (over)write the manifest in the target path, do that now
if write and write_json:
write_manifest(manifest, runtime_config.project_target_path)
Expand Down
3 changes: 2 additions & 1 deletion core/dbt/task/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe

def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
# only create *our* schemas, but cache *other* schemas in addition
self.defer_to_manifest()

Check warning on line 119 in core/dbt/task/clone.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/clone.py#L119

Added line #L119 was not covered by tests
# only create target schemas, but also cache defer_relation schemas
schemas_to_create = super().get_model_schemas(adapter, selected_uids)
self.create_schemas(adapter, schemas_to_create)
schemas_to_cache = self.get_model_schemas(adapter, selected_uids)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ def print_results_line(self, results, execution_time) -> None:

def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None:
with adapter.connection_named("master"):
self.defer_to_manifest()
required_schemas = self.get_model_schemas(adapter, selected_uids)
self.create_schemas(adapter, required_schemas)
self.populate_adapter_cache(adapter, required_schemas)
Expand Down
42 changes: 40 additions & 2 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> No
self.job_queue: Optional[GraphQueue] = None
self.node_results: List[BaseResult] = []
self.num_nodes: int = 0
# TODO: if --defer is enabled, we have already loaded the "previous state" artifacts into memory
# can we check to see, and reuse them if so?
self.previous_state: Optional[PreviousState] = None
self.previous_defer_state: Optional[PreviousState] = None
self.run_count: int = 0
self.started_at: float = 0

Expand All @@ -89,6 +88,13 @@ def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> No
project_root=Path(self.config.project_root),
)

if self.args.defer_state:
self.previous_defer_state = PreviousState(
state_path=self.args.defer_state,
target_path=Path(self.config.target_path),
project_root=Path(self.config.project_root),
)

def index_offset(self, value: int) -> int:
return value

Expand Down Expand Up @@ -127,6 +133,23 @@ def get_graph_queue(self) -> GraphQueue:
spec = self.get_selection_spec()
return selector.get_graph_queue(spec)

def defer_to_manifest(self):
deferred_manifest = self._get_deferred_manifest()
if deferred_manifest is None:
return
if self.manifest is None:
raise DbtInternalError(
"Expected to defer to manifest, but there is no runtime manifest to defer from!"
)
self.manifest.merge_from_artifact(
other=deferred_manifest,
favor_state=bool(self.args.favor_state),
)
# We're rewriting the manifest because it's been mutated during merge_from_artifact.
# This is to reflect which nodes had the defer_relation attribute added.
if self.args.write_json:
write_manifest(self.manifest, self.config.project_target_path)

def _runtime_initialize(self):
self.compile_manifest()
if self.manifest is None or self.graph is None:
Expand Down Expand Up @@ -454,6 +477,7 @@ def populate_adapter_cache(

def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
self.defer_to_manifest()
self.populate_adapter_cache(adapter)

def after_run(self, adapter, results):
Expand Down Expand Up @@ -640,3 +664,17 @@ def get_result(self, results, elapsed_time, generated_at):

def task_end_messages(self, results):
print_run_end_messages(results)

def _get_previous_state(self) -> Optional[Manifest]:
state = self.previous_defer_state or self.previous_state
if not state:
raise DbtRuntimeError(
"--state or --defer-state are required for deferral, but neither was provided"
)

if not state.manifest:
raise DbtRuntimeError(f'Could not find manifest in --state path: "{state.state_path}"')
return state.manifest

def _get_deferred_manifest(self) -> Optional[Manifest]:
return self._get_previous_state() if self.args.defer else None

0 comments on commit 8e84df8

Please sign in to comment.