Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #7502: write run_results.json for run operation #7655

Merged
merged 10 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230522-132924.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: write run_results.json for run operation
time: 2023-05-22T13:29:24.182612-07:00
custom:
Author: aranke
Issue: "7502"
4 changes: 1 addition & 3 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from dbt.contracts.results import (
CatalogArtifact,
RunExecutionResult,
RunOperationResultsArtifact,
)
from dbt.events.base_types import EventMsg
from dbt.task.build import BuildTask
Expand Down Expand Up @@ -53,8 +52,7 @@ class dbtRunnerResult:
List[str], # list/ls
Manifest, # parse
None, # clean, deps, init, source
RunExecutionResult, # build, compile, run, seed, snapshot, test
RunOperationResultsArtifact, # run-operation
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
] = None


Expand Down
34 changes: 0 additions & 34 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,40 +247,6 @@ def write(self, path: str):
write_json(path, self.to_dict(omit_none=False))


@dataclass
class RunOperationResult(ExecutionResult):
success: bool


@dataclass
class RunOperationResultMetadata(BaseArtifactMetadata):
dbt_schema_version: str = field(
default_factory=lambda: str(RunOperationResultsArtifact.dbt_schema_version)
)


@dataclass
@schema_version("run-operation-result", 1)
class RunOperationResultsArtifact(RunOperationResult, ArtifactMixin):
@classmethod
def from_success(
cls,
success: bool,
elapsed_time: float,
generated_at: datetime,
):
meta = RunOperationResultMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
)
return cls(
metadata=meta,
results=[],
elapsed_time=elapsed_time,
success=success,
)


# due to issues with typing.Union collapsing subclasses, this can't subclass
# PartialResult

Expand Down
64 changes: 55 additions & 9 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
from datetime import datetime
import os
import threading
import traceback
from datetime import datetime

import agate

from .base import ConfiguredTask

import dbt.exceptions
from dbt.adapters.factory import get_adapter
from dbt.contracts.results import RunOperationResultsArtifact
from dbt.contracts.files import FileHash
from dbt.contracts.graph.nodes import HookNode
from dbt.contracts.results import RunResultsArtifact, RunResult, RunStatus, TimingInfo
from dbt.events.functions import fire_event
from dbt.events.types import (
RunningOperationCaughtError,
RunningOperationUncaughtError,
LogDebugStackTrace,
)
from dbt.node_types import NodeType
from dbt.task.base import ConfiguredTask

RESULT_FILE_NAME = "run_results.json"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming you don't need to define the name here? the RESULT_FILE_NAME should be defined somewhere else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, they're defined inline:

RESULT_FILE_NAME = "run_results.json"

RESULT_FILE_NAME = "sources.json"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should put those constants in base task file or somewhere and import them in other tasks so the string is only being defined once. I will leave it to you to either do it here, a follow up PR, or not do anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it as-is for now.



class RunOperationTask(ConfiguredTask):
Expand All @@ -22,7 +28,7 @@ def _get_macro_parts(self):
if "." in macro_name:
package_name, macro_name = macro_name.split(".", 1)
else:
package_name = None
package_name = self.config.project_name

return package_name, macro_name

Expand All @@ -40,7 +46,7 @@ def _run_unsafe(self) -> agate.Table:

return res

def run(self) -> RunOperationResultsArtifact:
def run(self) -> RunResultsArtifact:
start = datetime.utcnow()
self.compile_manifest()
try:
Expand All @@ -56,11 +62,51 @@ def run(self) -> RunOperationResultsArtifact:
else:
success = True
end = datetime.utcnow()
return RunOperationResultsArtifact.from_success(

package_name, macro_name = self._get_macro_parts()
fqn = [NodeType.Operation, package_name, macro_name]
unique_id = ".".join(fqn)

run_result = RunResult(
adapter_response={},
status=RunStatus.Success if success else RunStatus.Error,
execution_time=(end - start).total_seconds(),
failures=0 if success else 1,
message=None,
node=HookNode(
alias=macro_name,
checksum=FileHash.from_contents(unique_id),
database=self.config.credentials.database,
schema=self.config.credentials.schema,
resource_type=NodeType.Operation,
fqn=fqn,
name=macro_name,
unique_id=unique_id,
package_name=package_name,
path="",
original_file_path="",
),
thread_id=threading.current_thread().name,
timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)],
)

results = RunResultsArtifact.from_execution_results(
generated_at=end,
elapsed_time=(end - start).total_seconds(),
success=success,
args={
k: v
for k, v in self.args.__dict__.items()
if k.islower() and type(v) in (str, int, float, bool, list, dict)
},
results=[run_result],
)

result_path = os.path.join(self.config.target_path, RESULT_FILE_NAME)

if self.args.write_json:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we settle on lower case or upper case? I remember the other place used WRITE_JSON.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets written to both, I saw lowercase in several places, so just ran with it.

results.write(result_path)

return results

def interpret_results(self, results):
return results.success
return results.results[0].status == RunStatus.Success
56 changes: 52 additions & 4 deletions tests/functional/artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import dbt
import jsonschema

from dbt.tests.util import run_dbt, get_artifact, check_datetime_between
from dbt.tests.util import run_dbt, get_artifact, check_datetime_between, run_dbt_and_capture
from tests.functional.artifacts.expected_manifest import (
expected_seeded_manifest,
expected_references_manifest,
Expand All @@ -17,7 +17,7 @@
)

from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.results import RunResultsArtifact
from dbt.contracts.results import RunResultsArtifact, RunStatus

models__schema_yml = """
version: 2
Expand Down Expand Up @@ -129,6 +129,17 @@
select * from {{ ref('seed') }}
"""

models__model_with_pre_hook_sql = """
{{
config(
pre_hook={
"sql": "{{ alter_timezone(timezone='Etc/UTC') }}"
}
)
}}
select current_setting('timezone') as timezone
"""

seed__schema_yml = """
version: 2
seeds:
Expand Down Expand Up @@ -184,6 +195,17 @@
{% endtest %}
"""

macros__alter_timezone_sql = """
{% macro alter_timezone(timezone='America/Los_Angeles') %}
{% set sql %}
SET TimeZone='{{ timezone }}';
{% endset %}
{% do run_query(sql) %}
{% do log("Timezone set to: " + timezone, info=True) %}
{% endmacro %}
"""

snapshot__snapshot_seed_sql = """
{% snapshot snapshot_seed %}
{{
Expand Down Expand Up @@ -328,7 +350,6 @@
"""


versioned_models__schema_yml = """
version: 2
Expand Down Expand Up @@ -509,7 +530,7 @@ def verify_run_results(project, expected_run_results, start_time, run_results_sc
# sort the results so we can make reasonable assertions
run_results["results"].sort(key=lambda r: r["unique_id"])
assert run_results["results"] == expected_run_results
set(run_results) == {"elapsed_time", "results", "metadata"}
assert set(run_results) == {"elapsed_time", "results", "metadata", "args"}


class BaseVerifyProject:
Expand Down Expand Up @@ -650,3 +671,30 @@ def test_versions(self, project, manifest_schema_path, run_results_schema_path):
verify_run_results(
project, expected_versions_run_results(), start_time, run_results_schema_path
)


class TestVerifyRunOperation(BaseVerifyProject):
@pytest.fixture(scope="class")
def macros(self):
return {"alter_timezone.sql": macros__alter_timezone_sql}

@pytest.fixture(scope="class")
def models(self):
return {
"model_with_pre_hook.sql": models__model_with_pre_hook_sql,
}

def test_run_operation(self, project):
results, log_output = run_dbt_and_capture(["run-operation", "alter_timezone"])
assert len(results) == 1
assert results[0].status == RunStatus.Success
assert results[0].unique_id == "operation.test.alter_timezone"
assert "Timezone set to: America/Los_Angeles" in log_output

def test_run_model_with_operation(self, project):
# pre-hooks are not included in run_results since they are an attribute of the node and not a node in their
# own right
results, log_output = run_dbt_and_capture(["run", "--select", "model_with_pre_hook"])
assert len(results) == 1
assert results[0].status == RunStatus.Success
assert "Timezone set to: Etc/UTC" in log_output
3 changes: 2 additions & 1 deletion tests/functional/run_query/test_types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

from dbt.contracts.results import NodeStatus
from dbt.tests.util import run_dbt

macros_sql = """
Expand Down Expand Up @@ -30,4 +31,4 @@ def macros(self):

def test_nested_types(self, project):
result = run_dbt(["run-operation", "test_array_results"])
assert result.success
assert result.results[0].status == NodeStatus.Success