Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CT 1549 reorg logging events to have two top level keys #6553

Merged
merged 9 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230109-095907.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Reorganize structured logging events to have two top keys
time: 2023-01-09T09:59:07.842187-05:00
custom:
Author: gshank
Issue: "6311"
8 changes: 4 additions & 4 deletions core/dbt/clients/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
SystemErrorRetrievingModTime,
SystemCouldNotWrite,
SystemExecutingCmd,
SystemStdOutMsg,
SystemStdErrMsg,
SystemStdOut,
SystemStdErr,
SystemReportReturnCode,
)
import dbt.exceptions
Expand Down Expand Up @@ -441,8 +441,8 @@ def run_cmd(cwd: str, cmd: List[str], env: Optional[Dict[str, Any]] = None) -> T
except OSError as exc:
_interpret_oserror(exc, cwd, cmd)

fire_event(SystemStdOutMsg(bmsg=out))
fire_event(SystemStdErrMsg(bmsg=err))
fire_event(SystemStdOut(bmsg=out))
fire_event(SystemStdErr(bmsg=err))

if proc.returncode != 0:
fire_event(SystemReportReturnCode(returncode=proc.returncode))
Expand Down
66 changes: 47 additions & 19 deletions core/dbt/events/base_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
import threading
from datetime import datetime
import dbt.events.proto_types as pt
from typing import Protocol

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# These base types define the _required structure_ for the concrete event #
Expand Down Expand Up @@ -58,32 +60,58 @@ class EventLevel(str, Enum):
class BaseEvent:
"""BaseEvent for proto message generated python events"""

def __post_init__(self):
super().__post_init__()
if not self.info.level:
self.info.level = self.level_tag()
assert self.info.level in ["info", "warn", "error", "debug", "test"]
if not hasattr(self.info, "msg") or not self.info.msg:
self.info.msg = self.message()
self.info.invocation_id = get_invocation_id()
self.info.extra = get_global_metadata_vars()
self.info.ts = datetime.utcnow()
self.info.pid = get_pid()
self.info.thread = get_thread_name()
self.info.code = self.code()
self.info.name = type(self).__name__

# This is here because although we know that info should always
# exist, mypy doesn't.
def log_level(self) -> EventLevel:
return self.info.level # type: ignore
# def __post_init__(self):
# super().__post_init__()
# if not self.info.level:
# self.info.level = self.level_tag()
# assert self.info.level in ["info", "warn", "error", "debug", "test"]
# if not hasattr(self.info, "msg") or not self.info.msg:
# self.info.msg = self.message()
# self.info.invocation_id = get_invocation_id()
# self.info.extra = get_global_metadata_vars()
# self.info.ts = datetime.utcnow()
# self.info.pid = get_pid()
# self.info.thread = get_thread_name()
# self.info.code = self.code()
# self.info.name = type(self).__name__

def level_tag(self) -> EventLevel:
return EventLevel.DEBUG

def message(self) -> str:
raise Exception("message() not implemented for event")

def code(self) -> str:
raise Exception("code() not implemented for event")


class EventMsg(Protocol):
info: pt.EventInfo
data: BaseEvent


def msg_from_base_event(event: BaseEvent, level: EventLevel = None):

msg_class_name = f"{type(event).__name__}Msg"
msg_cls = getattr(pt, msg_class_name)

# level in EventInfo must be a string, not an EventLevel
msg_level: str = level.value if level else event.level_tag().value
assert msg_level is not None
event_info = pt.EventInfo(
level=msg_level,
msg=event.message(),
invocation_id=get_invocation_id(),
extra=get_global_metadata_vars(),
ts=datetime.utcnow(),
pid=get_pid(),
thread=get_thread_name(),
code=event.code(),
name=type(event).__name__,
)
new_event = msg_cls(data=event, info=event_info)
return new_event


# DynamicLevel requires that the level be supplied on the
# event construction call using the "info" function from functions.py
Expand Down
64 changes: 30 additions & 34 deletions core/dbt/events/eventmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
from typing import Any, Callable, List, Optional, TextIO
from uuid import uuid4

from dbt.events.base_types import BaseEvent, EventLevel
from dbt.events.base_types import BaseEvent, EventLevel, msg_from_base_event, EventMsg


# A Filter is a function which takes a BaseEvent and returns True if the event
# should be logged, False otherwise.
Filter = Callable[[BaseEvent], bool]
Filter = Callable[[EventMsg], bool]


# Default filter which logs every event
def NoFilter(_: BaseEvent) -> bool:
def NoFilter(_: EventMsg) -> bool:
return True


Expand Down Expand Up @@ -47,13 +47,6 @@ class LineFormat(Enum):
}


# We should consider fixing the problem, but log_level() can return a string for
# DynamicLevel events, even thought it is supposed to return an EventLevel. This
# function gets a string for the level, no matter what.
def _get_level_str(e: BaseEvent) -> str:
return e.log_level().value if isinstance(e.log_level(), EventLevel) else str(e.log_level())


# We need this function for now because the numeric log severity levels in
# Python do not match those for logbook, so we have to explicitly call the
# correct function by name.
Expand Down Expand Up @@ -113,14 +106,14 @@ def __init__(self, event_manager: "EventManager", config: LoggerConfig) -> None:

self._python_logger = log

def create_line(self, e: BaseEvent) -> str:
def create_line(self, msg: EventMsg) -> str:
raise NotImplementedError()

def write_line(self, e: BaseEvent):
line = self.create_line(e)
python_level = _log_level_map[e.log_level()]
def write_line(self, msg: EventMsg):
line = self.create_line(msg)
python_level = _log_level_map[EventLevel(msg.info.level)]
if self._python_logger is not None:
send_to_logger(self._python_logger, _get_level_str(e), line)
send_to_logger(self._python_logger, msg.info.level, line)
elif self._stream is not None and _log_level_map[self.level] <= python_level:
self._stream.write(line + "\n")

Expand All @@ -138,24 +131,26 @@ def __init__(self, event_manager: "EventManager", config: LoggerConfig) -> None:
self.use_colors = config.use_colors
self.use_debug_format = config.line_format == LineFormat.DebugText

def create_line(self, e: BaseEvent) -> str:
return self.create_debug_line(e) if self.use_debug_format else self.create_info_line(e)
def create_line(self, msg: EventMsg) -> str:
return self.create_debug_line(msg) if self.use_debug_format else self.create_info_line(msg)

def create_info_line(self, e: BaseEvent) -> str:
def create_info_line(self, msg: EventMsg) -> str:
ts: str = datetime.utcnow().strftime("%H:%M:%S")
scrubbed_msg: str = self.scrubber(e.message()) # type: ignore
scrubbed_msg: str = self.scrubber(msg.info.msg) # type: ignore
return f"{self._get_color_tag()}{ts} {scrubbed_msg}"

def create_debug_line(self, e: BaseEvent) -> str:
def create_debug_line(self, msg: EventMsg) -> str:
log_line: str = ""
# Create a separator if this is the beginning of an invocation
# TODO: This is an ugly hack, get rid of it if we can
if type(e).__name__ == "MainReportVersion":
if msg.info.name == "MainReportVersion":
separator = 30 * "="
log_line = f"\n\n{separator} {datetime.utcnow()} | {self.event_manager.invocation_id} {separator}\n"
ts: str = datetime.utcnow().strftime("%H:%M:%S.%f")
scrubbed_msg: str = self.scrubber(e.message()) # type: ignore
level = _get_level_str(e)
log_line = (
f"\n\n{separator} {msg.info.ts} | {self.event_manager.invocation_id} {separator}\n"
)
ts: str = msg.info.ts.strftime("%H:%M:%S.%f")
scrubbed_msg: str = self.scrubber(msg.info.msg) # type: ignore
level = msg.info.level
log_line += (
f"{self._get_color_tag()}{ts} [{level:<5}]{self._get_thread_name()} {scrubbed_msg}"
)
Expand All @@ -175,28 +170,29 @@ def _get_thread_name(self) -> str:


class _JsonLogger(_Logger):
def create_line(self, e: BaseEvent) -> str:
from dbt.events.functions import event_to_dict
def create_line(self, msg: EventMsg) -> str:
from dbt.events.functions import msg_to_dict

event_dict = event_to_dict(e)
raw_log_line = json.dumps(event_dict, sort_keys=True)
msg_dict = msg_to_dict(msg)
raw_log_line = json.dumps(msg_dict, sort_keys=True)
line = self.scrubber(raw_log_line) # type: ignore
return line


class EventManager:
def __init__(self) -> None:
self.loggers: List[_Logger] = []
self.callbacks: List[Callable[[BaseEvent], None]] = []
self.callbacks: List[Callable[[EventMsg], None]] = []
self.invocation_id: str = str(uuid4())

def fire_event(self, e: BaseEvent) -> None:
def fire_event(self, e: BaseEvent, level: EventLevel = None) -> None:
msg = msg_from_base_event(e, level=level)
for logger in self.loggers:
if logger.filter(e): # type: ignore
logger.write_line(e)
if logger.filter(msg): # type: ignore
logger.write_line(msg)

for callback in self.callbacks:
callback(e)
callback(msg)

def add_logger(self, config: LoggerConfig):
logger = (
Expand Down
69 changes: 33 additions & 36 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import betterproto
from dbt.constants import METADATA_ENV_PREFIX
from dbt.events.base_types import BaseEvent, Cache, EventLevel, NoFile, NoStdOut
from dbt.events.base_types import BaseEvent, Cache, EventLevel, NoFile, NoStdOut, EventMsg
from dbt.events.eventmgr import EventManager, LoggerConfig, LineFormat, NoFilter
from dbt.events.helpers import env_secrets, scrub_secrets
from dbt.events.proto_types import EventInfo
from dbt.events.types import EmptyLine
import dbt.flags as flags
from dbt.logger import GLOBAL_LOGGER, make_log_dir_if_missing
Expand Down Expand Up @@ -59,14 +58,14 @@ def _get_stdout_config(level: Optional[EventLevel] = None) -> LoggerConfig:


def _stdout_filter(
log_cache_events: bool, debug_mode: bool, quiet_mode: bool, evt: BaseEvent
log_cache_events: bool, debug_mode: bool, quiet_mode: bool, msg: EventMsg
) -> bool:
return (
not isinstance(evt, NoStdOut)
and (not isinstance(evt, Cache) or log_cache_events)
and (evt.log_level() != EventLevel.DEBUG or debug_mode)
and (evt.log_level() == EventLevel.ERROR or not quiet_mode)
and not (flags.LOG_FORMAT == "json" and type(evt) == EmptyLine)
not isinstance(msg.data, NoStdOut)
and (not isinstance(msg.data, Cache) or log_cache_events)
and (EventLevel(msg.info.level) != EventLevel.DEBUG or debug_mode)
and (EventLevel(msg.info.level) == EventLevel.ERROR or not quiet_mode)
and not (flags.LOG_FORMAT == "json" and type(msg.data) == EmptyLine)
)


Expand All @@ -82,18 +81,18 @@ def _get_logfile_config(log_path: str) -> LoggerConfig:
)


def _logfile_filter(log_cache_events: bool, evt: BaseEvent) -> bool:
def _logfile_filter(log_cache_events: bool, msg: EventMsg) -> bool:
return (
not isinstance(evt, NoFile)
and not (isinstance(evt, Cache) and not log_cache_events)
and not (flags.LOG_FORMAT == "json" and type(evt) == EmptyLine)
not isinstance(msg.data, NoFile)
and not (isinstance(msg.data, Cache) and not log_cache_events)
and not (flags.LOG_FORMAT == "json" and type(msg.data) == EmptyLine)
)


def _get_logbook_log_config(level: Optional[EventLevel] = None) -> LoggerConfig:
config = _get_stdout_config(level)
config.name = "logbook_log"
config.filter = NoFilter if flags.LOG_CACHE_EVENTS else lambda e: not isinstance(e, Cache)
config.filter = NoFilter if flags.LOG_CACHE_EVENTS else lambda e: not isinstance(e.data, Cache)
config.logger = GLOBAL_LOGGER
return config

Expand Down Expand Up @@ -138,48 +137,54 @@ def stop_capture_stdout_logs():

# returns a dictionary representation of the event fields.
# the message may contain secrets which must be scrubbed at the usage site.
def event_to_json(event: BaseEvent) -> str:
event_dict = event_to_dict(event)
raw_log_line = json.dumps(event_dict, sort_keys=True)
def msg_to_json(msg: EventMsg) -> str:
msg_dict = msg_to_dict(msg)
raw_log_line = json.dumps(msg_dict, sort_keys=True)
return raw_log_line


def event_to_dict(event: BaseEvent) -> dict:
event_dict = dict()
def msg_to_dict(msg: EventMsg) -> dict:
msg_dict = dict()
try:
event_dict = event.to_dict(casing=betterproto.Casing.SNAKE, include_default_values=True) # type: ignore
msg_dict = msg.to_dict(casing=betterproto.Casing.SNAKE, include_default_values=True) # type: ignore
except AttributeError as exc:
event_type = type(event).__name__
event_type = type(msg).__name__
raise Exception(f"type {event_type} is not serializable. {str(exc)}")
# We don't want an empty NodeInfo in output
if "node_info" in event_dict and event_dict["node_info"]["node_name"] == "":
del event_dict["node_info"]
return event_dict
if (
"data" in msg_dict
and "node_info" in msg_dict["data"]
and msg_dict["data"]["node_info"]["node_name"] == ""
):
del msg_dict["data"]["node_info"]
return msg_dict


def warn_or_error(event, node=None):
if flags.WARN_ERROR:
# TODO: resolve this circular import when at top
from dbt.exceptions import EventCompilationException

raise EventCompilationException(event.info.msg, node)
raise EventCompilationException(event.message(), node)
else:
fire_event(event)


# an alternative to fire_event which only creates and logs the event value
# if the condition is met. Does nothing otherwise.
def fire_event_if(conditional: bool, lazy_e: Callable[[], BaseEvent]) -> None:
def fire_event_if(
conditional: bool, lazy_e: Callable[[], BaseEvent], level: EventLevel = None
) -> None:
if conditional:
fire_event(lazy_e())
fire_event(lazy_e(), level=level)


# top-level method for accessing the new eventing system
# this is where all the side effects happen branched by event type
# (i.e. - mutating the event history, printing to stdout, logging
# to files, etc.)
def fire_event(e: BaseEvent) -> None:
EVENT_MANAGER.fire_event(e)
def fire_event(e: BaseEvent, level: EventLevel = None) -> None:
EVENT_MANAGER.fire_event(e, level=level)


def get_metadata_vars() -> Dict[str, str]:
Expand All @@ -206,11 +211,3 @@ def set_invocation_id() -> None:
# This is primarily for setting the invocation_id for separate
# commands in the dbt servers. It shouldn't be necessary for the CLI.
EVENT_MANAGER.invocation_id = str(uuid.uuid4())


# Currently used to set the level in EventInfo, so logging events can
# provide more than one "level". Might be used in the future to set
# more fields in EventInfo, once some of that information is no longer global
def info(level="info"):
info = EventInfo(level=level)
return info
Loading