Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add node type codes to more events + more hook log data #4378

Merged
merged 6 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def add_query(

fire_event(
SQLQueryStatus(
status=str(self.get_response(cursor)), elapsed=round((time.time() - pre), 2)
status=self.get_response(cursor)._message,
elapsed=round((time.time() - pre), 2)
)
)

Expand Down
16 changes: 8 additions & 8 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from dbt.events.stubs import (
_CachedRelation,
BaseRelation,
ParsedModelNode,
ParsedHookNode,
ParsedModelNode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only conflict I fixed myself with the rebase was removing _ReferenceKey from here since @nathaniel-may added it above in his PR

RunResult
)
from dbt import ui
Expand Down Expand Up @@ -543,7 +543,7 @@ def message(self) -> str:

@dataclass
class SQLQueryStatus(DebugLevel, Cli, File):
status: str # could include AdapterResponse if we resolve circular imports
status: str
elapsed: float
code: str = "E017"

Expand Down Expand Up @@ -1727,7 +1727,7 @@ class PrintStartLine(InfoLevel, Cli, File, NodeInfo):
index: int
total: int
report_node_data: ParsedModelNode
code: str = "Z031"
code: str = "Q033"

def message(self) -> str:
msg = f"START {self.description}"
Expand All @@ -1745,8 +1745,8 @@ class PrintHookStartLine(InfoLevel, Cli, File, NodeInfo):
index: int
total: int
truncate: bool
report_node_data: Any # TODO use ParsedHookNode here
code: str = "Z032"
report_node_data: Any # TODO: resolve ParsedHookNode circular import
code: str = "Q032"

def message(self) -> str:
msg = f"START hook: {self.statement}"
Expand All @@ -1765,7 +1765,7 @@ class PrintHookEndLine(InfoLevel, Cli, File, NodeInfo):
total: int
execution_time: int
truncate: bool
report_node_data: Any # TODO use ParsedHookNode here
report_node_data: Any # TODO: resolve ParsedHookNode circular import
code: str = "Q007"

def message(self) -> str:
Expand All @@ -1786,7 +1786,7 @@ class SkippingDetails(InfoLevel, Cli, File, NodeInfo):
index: int
total: int
report_node_data: ParsedModelNode
code: str = "Z033"
code: str = "Q034"

def message(self) -> str:
if self.resource_type in NodeType.refable():
Expand Down Expand Up @@ -1901,7 +1901,7 @@ class PrintModelErrorResultLine(ErrorLevel, Cli, File, NodeInfo):
total: int
execution_time: int
report_node_data: ParsedModelNode
code: str = "Z035"
code: str = "Q035"

def message(self) -> str:
info = "ERROR creating"
Expand Down
12 changes: 4 additions & 8 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ def after_execute(self, result):
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
report_node_data=self.node
execution_time=result.execution_time
)
)
elif result.status == FreshnessStatus.Error:
Expand All @@ -70,8 +69,7 @@ def after_execute(self, result):
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
report_node_data=self.node
execution_time=result.execution_time
)
)
elif result.status == FreshnessStatus.Warn:
Expand All @@ -81,8 +79,7 @@ def after_execute(self, result):
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
report_node_data=self.node
execution_time=result.execution_time
)
)
else:
Expand All @@ -92,8 +89,7 @@ def after_execute(self, result):
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
report_node_data=self.node
execution_time=result.execution_time
)
)

Expand Down
25 changes: 17 additions & 8 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
print_run_end_messages,
get_counts,
)

from datetime import datetime
from dbt import tracking
from dbt import utils
from dbt.adapters.base import BaseRelation
Expand All @@ -21,7 +21,7 @@
from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.graph.model_config import Hook
from dbt.contracts.graph.parsed import ParsedHookNode
from dbt.contracts.results import NodeStatus, RunResult, RunStatus
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus
from dbt.exceptions import (
CompilationException,
InternalException,
Expand Down Expand Up @@ -342,6 +342,8 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context):
finishctx = TimestampNamed('node_finished_at')

for idx, hook in enumerate(ordered_hooks, start=1):
hook._event_status['started_at'] = datetime.utcnow().isoformat()
hook._event_status['node_status'] = RunningStatus.Started
sql = self.get_hook_sql(adapter, hook, idx, num_hooks,
extra_context)

Expand All @@ -360,26 +362,33 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context):
)
)

status = 'OK'

with Timer() as timer:
if len(sql.strip()) > 0:
status, _ = adapter.execute(sql, auto_begin=False,
fetch=False)
self.ran_hooks.append(hook)
response, _ = adapter.execute(sql, auto_begin=False, fetch=False)
status = response._message
else:
status = 'OK'

self.ran_hooks.append(hook)
hook._event_status['finished_at'] = datetime.utcnow().isoformat()
with finishctx, DbtModelState({'node_status': 'passed'}):
hook._event_status['node_status'] = RunStatus.Success
fire_event(
PrintHookEndLine(
statement=hook_text,
status=str(status),
status=status,
index=idx,
total=num_hooks,
execution_time=timer.elapsed,
truncate=True,
report_node_data=hook
)
)
# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
del hook._event_status["started_at"]
del hook._event_status["finished_at"]
del hook._event_status["node_status"]

self._total_executed += len(ordered_hooks)

Expand Down
9 changes: 4 additions & 5 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def call_runner(self, runner):
with RUNNING_STATE, uid_context:
startctx = TimestampNamed('node_started_at')
index = self.index_offset(runner.node_index)
runner.node._event_status['dbt_internal__started_at'] = datetime.utcnow().isoformat()
runner.node._event_status['started_at'] = datetime.utcnow().isoformat()
runner.node._event_status['node_status'] = RunningStatus.Started
extended_metadata = ModelMetadata(runner.node, index)

Expand All @@ -225,8 +225,7 @@ def call_runner(self, runner):
result = runner.run_with_hooks(self.manifest)
status = runner.get_result_status(result)
runner.node._event_status['node_status'] = result.status
runner.node._event_status['dbt_internal__finished_at'] = \
datetime.utcnow().isoformat()
runner.node._event_status['finished_at'] = datetime.utcnow().isoformat()
finally:
finishctx = TimestampNamed('finished_at')
with finishctx, DbtModelState(status):
Expand All @@ -239,8 +238,8 @@ def call_runner(self, runner):
)
# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
del runner.node._event_status["dbt_internal__started_at"]
del runner.node._event_status["dbt_internal__finished_at"]
del runner.node._event_status["started_at"]
del runner.node._event_status["finished_at"]
del runner.node._event_status["node_status"]

fail_fast = flags.FAIL_FAST
Expand Down