diff --git a/.changes/unreleased/Fixes-20230508-093732.yaml b/.changes/unreleased/Fixes-20230508-093732.yaml new file mode 100644 index 00000000000..b0605d11ac6 --- /dev/null +++ b/.changes/unreleased/Fixes-20230508-093732.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: '`run_results.json` is now written after every node completes.' +time: 2023-05-08T09:37:32.809356-05:00 +custom: + Author: iknox-fa + Issue: "7302" diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 63655f972c9..00a95b573fb 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -393,6 +393,9 @@ def from_node_results( meta = FreshnessMetadata(generated_at=generated_at) return cls(metadata=meta, results=results, elapsed_time=elapsed_time) + def write(self, path): + FreshnessExecutionResultArtifact.from_result(self).write(path) + @dataclass @schema_version("sources", 3) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index fbdb20b44ff..d662e35dd66 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -9,7 +9,6 @@ from .runnable import GraphRunnableTask from dbt.contracts.results import ( - FreshnessExecutionResultArtifact, FreshnessResult, PartialSourceFreshnessResult, SourceFreshnessResult, @@ -178,10 +177,6 @@ def get_node_selector(self): def get_runner_type(self, _): return FreshnessRunner - def write_result(self, result): - artifact = FreshnessExecutionResultArtifact.from_result(result) - artifact.write(self.result_path()) - def get_result(self, results, elapsed_time, generated_at): return FreshnessResult.from_node_results( elapsed_time=elapsed_time, generated_at=generated_at, results=results diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index a9055fc27ce..494acf98904 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -65,15 +65,16 @@ class GraphRunnableTask(ConfiguredTask): def __init__(self, args, config, manifest): super().__init__(args, config, manifest) - self.job_queue: Optional[GraphQueue] = None self._flattened_nodes: Optional[List[ResultNode]] = None - - self.run_count: int = 0 - self.num_nodes: int = 0 - self.node_results = [] - self._skipped_children = {} self._raise_next_tick = None + self._skipped_children = {} + self.job_queue: Optional[GraphQueue] = None + self.node_results = [] + self.num_nodes: int = 0 self.previous_state: Optional[PreviousState] = None + self.run_count: int = 0 + self.started_at: float = 0 + self.set_previous_state() def set_previous_state(self): @@ -302,6 +303,15 @@ def _handle_result(self, result): cause = None self._mark_dependent_errors(node.unique_id, result, cause) + interim_run_result = self.get_result( + results=self.node_results, + elapsed_time=time.time() - self.started_at, + generated_at=datetime.utcnow(), + ) + + if self.args.write_json and hasattr(interim_run_result, "write"): + interim_run_result.write(self.result_path()) + def _cancel_connections(self, pool): """Given a pool, cancel all adapter connections and wait until all runners gentle terminates. @@ -393,14 +403,14 @@ def print_results_line(self, node_results, elapsed): def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) - started = time.time() + self.started_at = time.time() try: self.before_run(adapter, selected_uids) res = self.execute_nodes() self.after_run(adapter, res) finally: adapter.cleanup_connections() - elapsed = time.time() - started + elapsed = time.time() - self.started_at self.print_results_line(self.node_results, elapsed) result = self.get_result( results=self.node_results, elapsed_time=elapsed, generated_at=datetime.utcnow() @@ -408,9 +418,6 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): return result - def write_result(self, result): - result.write(self.result_path()) - def run(self): """ Run dbt for the query, based on the graph. @@ -447,9 +454,10 @@ def run(self): ) ) - if get_flags().WRITE_JSON: + if self.args.write_json: write_manifest(self.manifest, self.config.target_path) - self.write_result(result) + if hasattr(result, "write"): + result.write(self.result_path()) self.task_end_messages(result.results) return result diff --git a/tests/functional/artifacts/test_run_results.py b/tests/functional/artifacts/test_run_results.py index a1e1b92187f..c03c7abdf8f 100644 --- a/tests/functional/artifacts/test_run_results.py +++ b/tests/functional/artifacts/test_run_results.py @@ -1,4 +1,8 @@ +from multiprocessing import Process +from pathlib import Path +import json import pytest +import platform from dbt.tests.util import run_dbt good_model_sql = """ @@ -9,6 +13,11 @@ something bad """ +slow_model_sql = """ +{{ config(materialized='table') }} +select id from {{ ref('good_model') }}, pg_sleep(5) +""" + class TestRunResultsTimingSuccess: @pytest.fixture(scope="class") @@ -30,3 +39,31 @@ def test_timing_exists(self, project): results = run_dbt(["run"], expect_pass=False) assert len(results.results) == 1 assert len(results.results[0].timing) > 0 + + +@pytest.mark.skipif(platform.system() != "Darwin", reason="Fails on linux in github actions") +class TestRunResultsWritesFileOnSignal: + @pytest.fixture(scope="class") + def models(self): + return {"good_model.sql": good_model_sql, "slow_model.sql": slow_model_sql} + + def test_run_results_are_written_on_signal(self, project): + # Start the runner in a seperate process. + external_process_dbt = Process( + target=run_dbt, args=([["run"]]), kwargs={"expect_pass": False} + ) + external_process_dbt.start() + assert external_process_dbt.is_alive() + + # Wait until the first file write, then kill the process. + run_results_file = Path(project.project_root) / "target/run_results.json" + while run_results_file.is_file() is False: + pass + external_process_dbt.terminate() + + # Wait until the process is dead, then check the file that there is only one result. + while external_process_dbt.is_alive() is True: + pass + with run_results_file.open() as run_results_str: + run_results = json.loads(run_results_str.read()) + assert len(run_results["results"]) == 1 diff --git a/tests/functional/fail_fast/test_fail_fast_run.py b/tests/functional/fail_fast/test_fail_fast_run.py index 4fb7821d072..92ed1c7aea4 100644 --- a/tests/functional/fail_fast/test_fail_fast_run.py +++ b/tests/functional/fail_fast/test_fail_fast_run.py @@ -1,11 +1,14 @@ import pytest +import json +from pathlib import Path + from dbt.contracts.results import RunResult from dbt.tests.util import run_dbt models__one_sql = """ -select 1 /failed +select 1 """ models__two_sql = """ @@ -28,6 +31,12 @@ def test_fail_fast_run( res = run_dbt(["run", "--fail-fast", "--threads", "1"], expect_pass=False) # a RunResult contains only one node so we can be sure only one model was run assert type(res) == RunResult + run_results_file = Path(project.project_root) / "target/run_results.json" + assert run_results_file.is_file() + with run_results_file.open() as run_results_str: + run_results = json.loads(run_results_str.read()) + assert run_results["results"][0]["status"] == "success" + assert run_results["results"][1]["status"] == "error" class TestFailFastFromConfig(FailFastBase):