Skip to content

Commit

Permalink
Always write run_results.json (#7539)
Browse files Browse the repository at this point in the history
  • Loading branch information
iknox-fa authored May 9, 2023
1 parent 272beb2 commit dffbb6a
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 19 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230508-093732.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: '`run_results.json` is now written after every node completes.'
time: 2023-05-08T09:37:32.809356-05:00
custom:
Author: iknox-fa
Issue: "7302"
3 changes: 3 additions & 0 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ def from_node_results(
meta = FreshnessMetadata(generated_at=generated_at)
return cls(metadata=meta, results=results, elapsed_time=elapsed_time)

def write(self, path):
FreshnessExecutionResultArtifact.from_result(self).write(path)


@dataclass
@schema_version("sources", 3)
Expand Down
5 changes: 0 additions & 5 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from .runnable import GraphRunnableTask

from dbt.contracts.results import (
FreshnessExecutionResultArtifact,
FreshnessResult,
PartialSourceFreshnessResult,
SourceFreshnessResult,
Expand Down Expand Up @@ -178,10 +177,6 @@ def get_node_selector(self):
def get_runner_type(self, _):
return FreshnessRunner

def write_result(self, result):
artifact = FreshnessExecutionResultArtifact.from_result(result)
artifact.write(self.result_path())

def get_result(self, results, elapsed_time, generated_at):
return FreshnessResult.from_node_results(
elapsed_time=elapsed_time, generated_at=generated_at, results=results
Expand Down
34 changes: 21 additions & 13 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ class GraphRunnableTask(ConfiguredTask):

def __init__(self, args, config, manifest):
super().__init__(args, config, manifest)
self.job_queue: Optional[GraphQueue] = None
self._flattened_nodes: Optional[List[ResultNode]] = None

self.run_count: int = 0
self.num_nodes: int = 0
self.node_results = []
self._skipped_children = {}
self._raise_next_tick = None
self._skipped_children = {}
self.job_queue: Optional[GraphQueue] = None
self.node_results = []
self.num_nodes: int = 0
self.previous_state: Optional[PreviousState] = None
self.run_count: int = 0
self.started_at: float = 0

self.set_previous_state()

def set_previous_state(self):
Expand Down Expand Up @@ -302,6 +303,15 @@ def _handle_result(self, result):
cause = None
self._mark_dependent_errors(node.unique_id, result, cause)

interim_run_result = self.get_result(
results=self.node_results,
elapsed_time=time.time() - self.started_at,
generated_at=datetime.utcnow(),
)

if self.args.write_json and hasattr(interim_run_result, "write"):
interim_run_result.write(self.result_path())

def _cancel_connections(self, pool):
"""Given a pool, cancel all adapter connections and wait until all
runners gentle terminates.
Expand Down Expand Up @@ -393,24 +403,21 @@ def print_results_line(self, node_results, elapsed):

def execute_with_hooks(self, selected_uids: AbstractSet[str]):
adapter = get_adapter(self.config)
started = time.time()
self.started_at = time.time()
try:
self.before_run(adapter, selected_uids)
res = self.execute_nodes()
self.after_run(adapter, res)
finally:
adapter.cleanup_connections()
elapsed = time.time() - started
elapsed = time.time() - self.started_at
self.print_results_line(self.node_results, elapsed)
result = self.get_result(
results=self.node_results, elapsed_time=elapsed, generated_at=datetime.utcnow()
)

return result

def write_result(self, result):
result.write(self.result_path())

def run(self):
"""
Run dbt for the query, based on the graph.
Expand Down Expand Up @@ -447,9 +454,10 @@ def run(self):
)
)

if get_flags().WRITE_JSON:
if self.args.write_json:
write_manifest(self.manifest, self.config.target_path)
self.write_result(result)
if hasattr(result, "write"):
result.write(self.result_path())

self.task_end_messages(result.results)
return result
Expand Down
37 changes: 37 additions & 0 deletions tests/functional/artifacts/test_run_results.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from multiprocessing import Process
from pathlib import Path
import json
import pytest
import platform
from dbt.tests.util import run_dbt

good_model_sql = """
Expand All @@ -9,6 +13,11 @@
something bad
"""

slow_model_sql = """
{{ config(materialized='table') }}
select id from {{ ref('good_model') }}, pg_sleep(5)
"""


class TestRunResultsTimingSuccess:
@pytest.fixture(scope="class")
Expand All @@ -30,3 +39,31 @@ def test_timing_exists(self, project):
results = run_dbt(["run"], expect_pass=False)
assert len(results.results) == 1
assert len(results.results[0].timing) > 0


@pytest.mark.skipif(platform.system() != "Darwin", reason="Fails on linux in github actions")
class TestRunResultsWritesFileOnSignal:
@pytest.fixture(scope="class")
def models(self):
return {"good_model.sql": good_model_sql, "slow_model.sql": slow_model_sql}

def test_run_results_are_written_on_signal(self, project):
# Start the runner in a seperate process.
external_process_dbt = Process(
target=run_dbt, args=([["run"]]), kwargs={"expect_pass": False}
)
external_process_dbt.start()
assert external_process_dbt.is_alive()

# Wait until the first file write, then kill the process.
run_results_file = Path(project.project_root) / "target/run_results.json"
while run_results_file.is_file() is False:
pass
external_process_dbt.terminate()

# Wait until the process is dead, then check the file that there is only one result.
while external_process_dbt.is_alive() is True:
pass
with run_results_file.open() as run_results_str:
run_results = json.loads(run_results_str.read())
assert len(run_results["results"]) == 1
11 changes: 10 additions & 1 deletion tests/functional/fail_fast/test_fail_fast_run.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import pytest
import json
from pathlib import Path


from dbt.contracts.results import RunResult
from dbt.tests.util import run_dbt


models__one_sql = """
select 1 /failed
select 1
"""

models__two_sql = """
Expand All @@ -28,6 +31,12 @@ def test_fail_fast_run(
res = run_dbt(["run", "--fail-fast", "--threads", "1"], expect_pass=False)
# a RunResult contains only one node so we can be sure only one model was run
assert type(res) == RunResult
run_results_file = Path(project.project_root) / "target/run_results.json"
assert run_results_file.is_file()
with run_results_file.open() as run_results_str:
run_results = json.loads(run_results_str.read())
assert run_results["results"][0]["status"] == "success"
assert run_results["results"][1]["status"] == "error"


class TestFailFastFromConfig(FailFastBase):
Expand Down

0 comments on commit dffbb6a

Please sign in to comment.