Skip to content

Commit

Permalink
fix #7502: write run_results.json for run operation (#7655)
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke authored May 23, 2023
1 parent 4a4b7be commit c25d0c9
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 51 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230522-132924.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: write run_results.json for run operation
time: 2023-05-22T13:29:24.182612-07:00
custom:
Author: aranke
Issue: "7502"
4 changes: 1 addition & 3 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from dbt.contracts.results import (
CatalogArtifact,
RunExecutionResult,
RunOperationResultsArtifact,
)
from dbt.events.base_types import EventMsg
from dbt.task.build import BuildTask
Expand Down Expand Up @@ -53,8 +52,7 @@ class dbtRunnerResult:
List[str], # list/ls
Manifest, # parse
None, # clean, deps, init, source
RunExecutionResult, # build, compile, run, seed, snapshot, test
RunOperationResultsArtifact, # run-operation
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
] = None


Expand Down
34 changes: 0 additions & 34 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,40 +247,6 @@ def write(self, path: str):
write_json(path, self.to_dict(omit_none=False))


@dataclass
class RunOperationResult(ExecutionResult):
success: bool


@dataclass
class RunOperationResultMetadata(BaseArtifactMetadata):
dbt_schema_version: str = field(
default_factory=lambda: str(RunOperationResultsArtifact.dbt_schema_version)
)


@dataclass
@schema_version("run-operation-result", 1)
class RunOperationResultsArtifact(RunOperationResult, ArtifactMixin):
@classmethod
def from_success(
cls,
success: bool,
elapsed_time: float,
generated_at: datetime,
):
meta = RunOperationResultMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
)
return cls(
metadata=meta,
results=[],
elapsed_time=elapsed_time,
success=success,
)


# due to issues with typing.Union collapsing subclasses, this can't subclass
# PartialResult

Expand Down
64 changes: 55 additions & 9 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
from datetime import datetime
import os
import threading
import traceback
from datetime import datetime

import agate

from .base import ConfiguredTask

import dbt.exceptions
from dbt.adapters.factory import get_adapter
from dbt.contracts.results import RunOperationResultsArtifact
from dbt.contracts.files import FileHash
from dbt.contracts.graph.nodes import HookNode
from dbt.contracts.results import RunResultsArtifact, RunResult, RunStatus, TimingInfo
from dbt.events.functions import fire_event
from dbt.events.types import (
RunningOperationCaughtError,
RunningOperationUncaughtError,
LogDebugStackTrace,
)
from dbt.node_types import NodeType
from dbt.task.base import ConfiguredTask

RESULT_FILE_NAME = "run_results.json"


class RunOperationTask(ConfiguredTask):
Expand All @@ -22,7 +28,7 @@ def _get_macro_parts(self):
if "." in macro_name:
package_name, macro_name = macro_name.split(".", 1)
else:
package_name = None
package_name = self.config.project_name

return package_name, macro_name

Expand All @@ -40,7 +46,7 @@ def _run_unsafe(self) -> agate.Table:

return res

def run(self) -> RunOperationResultsArtifact:
def run(self) -> RunResultsArtifact:
start = datetime.utcnow()
self.compile_manifest()
try:
Expand All @@ -56,11 +62,51 @@ def run(self) -> RunOperationResultsArtifact:
else:
success = True
end = datetime.utcnow()
return RunOperationResultsArtifact.from_success(

package_name, macro_name = self._get_macro_parts()
fqn = [NodeType.Operation, package_name, macro_name]
unique_id = ".".join(fqn)

run_result = RunResult(
adapter_response={},
status=RunStatus.Success if success else RunStatus.Error,
execution_time=(end - start).total_seconds(),
failures=0 if success else 1,
message=None,
node=HookNode(
alias=macro_name,
checksum=FileHash.from_contents(unique_id),
database=self.config.credentials.database,
schema=self.config.credentials.schema,
resource_type=NodeType.Operation,
fqn=fqn,
name=macro_name,
unique_id=unique_id,
package_name=package_name,
path="",
original_file_path="",
),
thread_id=threading.current_thread().name,
timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)],
)

results = RunResultsArtifact.from_execution_results(
generated_at=end,
elapsed_time=(end - start).total_seconds(),
success=success,
args={
k: v
for k, v in self.args.__dict__.items()
if k.islower() and type(v) in (str, int, float, bool, list, dict)
},
results=[run_result],
)

result_path = os.path.join(self.config.target_path, RESULT_FILE_NAME)

if self.args.write_json:
results.write(result_path)

return results

def interpret_results(self, results):
return results.success
return results.results[0].status == RunStatus.Success
56 changes: 52 additions & 4 deletions tests/functional/artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import dbt
import jsonschema

from dbt.tests.util import run_dbt, get_artifact, check_datetime_between
from dbt.tests.util import run_dbt, get_artifact, check_datetime_between, run_dbt_and_capture
from tests.functional.artifacts.expected_manifest import (
expected_seeded_manifest,
expected_references_manifest,
Expand All @@ -17,7 +17,7 @@
)

from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.results import RunResultsArtifact
from dbt.contracts.results import RunResultsArtifact, RunStatus

models__schema_yml = """
version: 2
Expand Down Expand Up @@ -129,6 +129,17 @@
select * from {{ ref('seed') }}
"""

models__model_with_pre_hook_sql = """
{{
config(
pre_hook={
"sql": "{{ alter_timezone(timezone='Etc/UTC') }}"
}
)
}}
select current_setting('timezone') as timezone
"""

seed__schema_yml = """
version: 2
seeds:
Expand Down Expand Up @@ -184,6 +195,17 @@
{% endtest %}
"""

macros__alter_timezone_sql = """
{% macro alter_timezone(timezone='America/Los_Angeles') %}
{% set sql %}
SET TimeZone='{{ timezone }}';
{% endset %}
{% do run_query(sql) %}
{% do log("Timezone set to: " + timezone, info=True) %}
{% endmacro %}
"""

snapshot__snapshot_seed_sql = """
{% snapshot snapshot_seed %}
{{
Expand Down Expand Up @@ -328,7 +350,6 @@
"""


versioned_models__schema_yml = """
version: 2
Expand Down Expand Up @@ -509,7 +530,7 @@ def verify_run_results(project, expected_run_results, start_time, run_results_sc
# sort the results so we can make reasonable assertions
run_results["results"].sort(key=lambda r: r["unique_id"])
assert run_results["results"] == expected_run_results
set(run_results) == {"elapsed_time", "results", "metadata"}
assert set(run_results) == {"elapsed_time", "results", "metadata", "args"}


class BaseVerifyProject:
Expand Down Expand Up @@ -650,3 +671,30 @@ def test_versions(self, project, manifest_schema_path, run_results_schema_path):
verify_run_results(
project, expected_versions_run_results(), start_time, run_results_schema_path
)


class TestVerifyRunOperation(BaseVerifyProject):
@pytest.fixture(scope="class")
def macros(self):
return {"alter_timezone.sql": macros__alter_timezone_sql}

@pytest.fixture(scope="class")
def models(self):
return {
"model_with_pre_hook.sql": models__model_with_pre_hook_sql,
}

def test_run_operation(self, project):
results, log_output = run_dbt_and_capture(["run-operation", "alter_timezone"])
assert len(results) == 1
assert results[0].status == RunStatus.Success
assert results[0].unique_id == "operation.test.alter_timezone"
assert "Timezone set to: America/Los_Angeles" in log_output

def test_run_model_with_operation(self, project):
# pre-hooks are not included in run_results since they are an attribute of the node and not a node in their
# own right
results, log_output = run_dbt_and_capture(["run", "--select", "model_with_pre_hook"])
assert len(results) == 1
assert results[0].status == RunStatus.Success
assert "Timezone set to: Etc/UTC" in log_output
3 changes: 2 additions & 1 deletion tests/functional/run_query/test_types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

from dbt.contracts.results import NodeStatus
from dbt.tests.util import run_dbt

macros_sql = """
Expand Down Expand Up @@ -30,4 +31,4 @@ def macros(self):

def test_nested_types(self, project):
result = run_dbt(["run-operation", "test_array_results"])
assert result.success
assert result.results[0].status == NodeStatus.Success

0 comments on commit c25d0c9

Please sign in to comment.