Skip to content

Commit

Permalink
add node type codes to more events + more hook log data (#4378)
Browse files Browse the repository at this point in the history
* add node type codes to more events + more hook log

* minor fixes

* renames started/finished keys

* made process more clear

* fixed errors

* Put back report_node_data in fresshness.py

Co-authored-by: Gerda Shank <gerda@dbtlabs.com>
  • Loading branch information
emmyoop and gshank authored Dec 3, 2021
1 parent 9bdf5fe commit b3039fd
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 22 deletions.
3 changes: 2 additions & 1 deletion core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def add_query(

fire_event(
SQLQueryStatus(
status=str(self.get_response(cursor)), elapsed=round((time.time() - pre), 2)
status=self.get_response(cursor)._message,
elapsed=round((time.time() - pre), 2)
)
)

Expand Down
16 changes: 8 additions & 8 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from dbt.events.stubs import (
_CachedRelation,
BaseRelation,
ParsedModelNode,
ParsedHookNode,
ParsedModelNode,
RunResult
)
from dbt import ui
Expand Down Expand Up @@ -543,7 +543,7 @@ def message(self) -> str:

@dataclass
class SQLQueryStatus(DebugLevel, Cli, File):
status: str # could include AdapterResponse if we resolve circular imports
status: str
elapsed: float
code: str = "E017"

Expand Down Expand Up @@ -1727,7 +1727,7 @@ class PrintStartLine(InfoLevel, Cli, File, NodeInfo):
index: int
total: int
report_node_data: ParsedModelNode
code: str = "Z031"
code: str = "Q033"

def message(self) -> str:
msg = f"START {self.description}"
Expand All @@ -1745,8 +1745,8 @@ class PrintHookStartLine(InfoLevel, Cli, File, NodeInfo):
index: int
total: int
truncate: bool
report_node_data: Any # TODO use ParsedHookNode here
code: str = "Z032"
report_node_data: Any # TODO: resolve ParsedHookNode circular import
code: str = "Q032"

def message(self) -> str:
msg = f"START hook: {self.statement}"
Expand All @@ -1765,7 +1765,7 @@ class PrintHookEndLine(InfoLevel, Cli, File, NodeInfo):
total: int
execution_time: int
truncate: bool
report_node_data: Any # TODO use ParsedHookNode here
report_node_data: Any # TODO: resolve ParsedHookNode circular import
code: str = "Q007"

def message(self) -> str:
Expand All @@ -1786,7 +1786,7 @@ class SkippingDetails(InfoLevel, Cli, File, NodeInfo):
index: int
total: int
report_node_data: ParsedModelNode
code: str = "Z033"
code: str = "Q034"

def message(self) -> str:
if self.resource_type in NodeType.refable():
Expand Down Expand Up @@ -1901,7 +1901,7 @@ class PrintModelErrorResultLine(ErrorLevel, Cli, File, NodeInfo):
total: int
execution_time: int
report_node_data: ParsedModelNode
code: str = "Z035"
code: str = "Q035"

def message(self) -> str:
info = "ERROR creating"
Expand Down
25 changes: 17 additions & 8 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
print_run_end_messages,
get_counts,
)

from datetime import datetime
from dbt import tracking
from dbt import utils
from dbt.adapters.base import BaseRelation
Expand All @@ -21,7 +21,7 @@
from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.graph.model_config import Hook
from dbt.contracts.graph.parsed import ParsedHookNode
from dbt.contracts.results import NodeStatus, RunResult, RunStatus
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus
from dbt.exceptions import (
CompilationException,
InternalException,
Expand Down Expand Up @@ -342,6 +342,8 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context):
finishctx = TimestampNamed('node_finished_at')

for idx, hook in enumerate(ordered_hooks, start=1):
hook._event_status['started_at'] = datetime.utcnow().isoformat()
hook._event_status['node_status'] = RunningStatus.Started
sql = self.get_hook_sql(adapter, hook, idx, num_hooks,
extra_context)

Expand All @@ -360,26 +362,33 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context):
)
)

status = 'OK'

with Timer() as timer:
if len(sql.strip()) > 0:
status, _ = adapter.execute(sql, auto_begin=False,
fetch=False)
self.ran_hooks.append(hook)
response, _ = adapter.execute(sql, auto_begin=False, fetch=False)
status = response._message
else:
status = 'OK'

self.ran_hooks.append(hook)
hook._event_status['finished_at'] = datetime.utcnow().isoformat()
with finishctx, DbtModelState({'node_status': 'passed'}):
hook._event_status['node_status'] = RunStatus.Success
fire_event(
PrintHookEndLine(
statement=hook_text,
status=str(status),
status=status,
index=idx,
total=num_hooks,
execution_time=timer.elapsed,
truncate=True,
report_node_data=hook
)
)
# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
del hook._event_status["started_at"]
del hook._event_status["finished_at"]
del hook._event_status["node_status"]

self._total_executed += len(ordered_hooks)

Expand Down
9 changes: 4 additions & 5 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def call_runner(self, runner):
with RUNNING_STATE, uid_context:
startctx = TimestampNamed('node_started_at')
index = self.index_offset(runner.node_index)
runner.node._event_status['dbt_internal__started_at'] = datetime.utcnow().isoformat()
runner.node._event_status['started_at'] = datetime.utcnow().isoformat()
runner.node._event_status['node_status'] = RunningStatus.Started
extended_metadata = ModelMetadata(runner.node, index)

Expand All @@ -225,8 +225,7 @@ def call_runner(self, runner):
result = runner.run_with_hooks(self.manifest)
status = runner.get_result_status(result)
runner.node._event_status['node_status'] = result.status
runner.node._event_status['dbt_internal__finished_at'] = \
datetime.utcnow().isoformat()
runner.node._event_status['finished_at'] = datetime.utcnow().isoformat()
finally:
finishctx = TimestampNamed('finished_at')
with finishctx, DbtModelState(status):
Expand All @@ -239,8 +238,8 @@ def call_runner(self, runner):
)
# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
del runner.node._event_status["dbt_internal__started_at"]
del runner.node._event_status["dbt_internal__finished_at"]
del runner.node._event_status["started_at"]
del runner.node._event_status["finished_at"]
del runner.node._event_status["node_status"]

fail_fast = flags.FAIL_FAST
Expand Down

0 comments on commit b3039fd

Please sign in to comment.