Skip to content

Commit

Permalink
add generic snowplow tracker with file logger for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Sep 14, 2024
1 parent 9798ca7 commit e671471
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 12 deletions.
32 changes: 32 additions & 0 deletions dbt_common/events/cookie.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from pathlib import Path
import uuid
from typing import Any, Dict

import yaml

# the C version is faster, but it doesn't always exist
try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader


class Cookie:
def __init__(self, directory: Path) -> None:
self.id: str = str(uuid.uuid4())
self.path: Path = directory / ".user.yml"
self.save()

def as_dict(self) -> Dict[str, Any]:
return {"id": self.id}

def save(self) -> None:
with open(self.path, "w") as fh:
yaml.dump(self.as_dict(), fh)

def load(self) -> Dict[str, Any]:
with open(self.path, "r") as fh:
try:
return yaml.load(fh, Loader=SafeLoader)
except yaml.reader.ReaderError:
return {}
9 changes: 9 additions & 0 deletions dbt_common/events/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

from dbt_common.events.base_types import BaseEvent, EventLevel, msg_from_base_event, TCallback
from dbt_common.events.logger import LoggerConfig, _Logger, _TextLogger, _JsonLogger, LineFormat
from dbt_common.events.tracker import TrackerConfig, _Tracker


class EventManager:
def __init__(self) -> None:
self.loggers: List[_Logger] = []
self.trackers: List[_Tracker] = []
self.callbacks: List[TCallback] = []

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
Expand Down Expand Up @@ -37,6 +39,9 @@ def add_logger(self, config: LoggerConfig) -> None:
)
self.loggers.append(logger)

def add_tracker(self, config: TrackerConfig) -> None:
self.trackers.append(_Tracker(config))

def add_callback(self, callback: TCallback) -> None:
self.callbacks.append(callback)

Expand All @@ -48,13 +53,17 @@ def flush(self) -> None:
class IEventManager(Protocol):
callbacks: List[TCallback]
loggers: List[_Logger]
trackers: List[_Tracker]

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
...

def add_logger(self, config: LoggerConfig) -> None:
...

def add_tracker(self, config: TrackerConfig) -> None:
...

def add_callback(self, callback: TCallback) -> None:
...

Expand Down
6 changes: 6 additions & 0 deletions dbt_common/events/event_manager_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ def add_logger_to_manager(logger) -> None:
_EVENT_MANAGER.add_logger(logger)


def add_tracker_to_manager(tracker) -> None:
global _EVENT_MANAGER
_EVENT_MANAGER.add_tracker(tracker)


def add_callback_to_manager(callback: TCallback) -> None:
global _EVENT_MANAGER
_EVENT_MANAGER.add_callback(callback)
Expand All @@ -32,4 +37,5 @@ def cleanup_event_logger() -> None:
# especially important for tests, since pytest replaces the stdout stream
# during test runs, and closes the stream after the test is over.
_EVENT_MANAGER.loggers.clear()
_EVENT_MANAGER.trackers.clear()
_EVENT_MANAGER.callbacks.clear()
111 changes: 99 additions & 12 deletions dbt_common/events/functions.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
from pathlib import Path

from dbt_common.events.event_manager_client import get_event_manager
from dbt_common.exceptions import EventCompilationError
from dbt_common.invocation import get_invocation_id
from dbt_common.helper_types import WarnErrorOptions
from dbt_common.utils.encoding import ForgivingJSONEncoder
from dbt_common.events.base_types import BaseEvent, EventLevel, EventMsg
from dbt_common.events.logger import LoggerConfig, LineFormat
from dbt_common.exceptions import scrub_secrets, env_secrets
from dbt_common.events.types import Note
from functools import partial
import json
import os
from pathlib import Path
import sys
from typing import Callable, Dict, Optional, TextIO, Union
from typing import Any, Callable, Dict, Optional, TextIO, Union

from google.protobuf.json_format import MessageToDict
from snowplow_tracker import Subject
from snowplow_tracker.typing import FailureCallback

from dbt_common.helper_types import WarnErrorOptions
from dbt_common.invocation import get_invocation_id
from dbt_common.events.base_types import BaseEvent, EventLevel, EventMsg
from dbt_common.events.cookie import Cookie
from dbt_common.events.event_manager_client import get_event_manager
from dbt_common.events.logger import LoggerConfig, LineFormat
from dbt_common.events.tracker import TrackerConfig
from dbt_common.events.types import DisableTracking, Note
from dbt_common.events.user import User
from dbt_common.exceptions import EventCompilationError, scrub_secrets, env_secrets
from dbt_common.utils.encoding import ForgivingJSONEncoder


LOG_VERSION = 3
metadata_vars: Optional[Dict[str, str]] = None
_METADATA_ENV_PREFIX = "DBT_ENV_CUSTOM_ENV_"
WARN_ERROR_OPTIONS = WarnErrorOptions(include=[], exclude=[])
WARN_ERROR = False


# This global, and the following two functions for capturing stdout logs are
# an unpleasant hack we intend to remove as part of API-ification. The GitHub
# issue #6350 was opened for that work.
Expand Down Expand Up @@ -58,6 +65,24 @@ def get_stdout_config(
)


def get_logfile_config(
name: str,
log_path: str,
line_format: Optional[LineFormat] = LineFormat.PlainText,
use_colors: Optional[bool] = False,
log_file_max_bytes: Optional[int] = 10 * 1024 * 1024,
) -> LoggerConfig:
return LoggerConfig(
name=name,
line_format=line_format,
level=EventLevel.DEBUG, # File log is *always* debug level
use_colors=use_colors,
invocation_id=get_invocation_id(),
output_file_name=log_path,
output_file_max_bytes=log_file_max_bytes,
)


def make_log_dir_if_missing(log_path: Union[Path, str]) -> None:
if isinstance(log_path, str):
log_path = Path(log_path)
Expand Down Expand Up @@ -153,3 +178,65 @@ def get_metadata_vars() -> Dict[str, str]:
def reset_metadata_vars() -> None:
global metadata_vars
metadata_vars = None


def _default_on_failure(num_ok, unsent):
"""
num_ok will always be 0, unsent will always be 1 entry long
because the buffer is length 1, so not much to talk about
TODO: add `disable_tracking` as a callback on `DisableTracking`
"""
fire_event(DisableTracking())


def snowplow_config(
user: User,
endpoint: str,
protocol: Optional[str] = "https",
on_failure: Optional[FailureCallback] = _default_on_failure,
) -> TrackerConfig:
return TrackerConfig(
invocation_id=user.invocation_id,
endpoint=endpoint,
protocol=protocol,
on_failure=on_failure,
)


def enable_tracking(tracker, user: User):
cookie = _get_cookie(user)
user.enable_tracking(cookie)

subject = Subject()
subject.set_user_id(cookie.get("id"))
tracker.set_subject(subject)


def disable_tracking(tracker, user: User):
user.disable_tracking()
tracker.set_subject(None)


def _get_cookie(user: User) -> Dict[str, Any]:
if cookie := user.cookie:
return cookie
return _set_cookie(user)


def _set_cookie(user: User) -> Dict[str, Any]:
"""
If the user points dbt to a profile directory which exists AND
contains a profiles.yml file, then we can set a cookie. If the
specified folder does not exist, or if there is not a profiles.yml
file in this folder, then an inconsistent cookie can be used. This
will change in every dbt invocation until the user points to a
profile dir file which contains a valid profiles.yml file.
See: https://github.com/dbt-labs/dbt-core/issues/1645
"""
if user.profile.exists():
cookie = Cookie(user.directory)
user.cookie = cookie.as_dict()
return user.cookie
return {}
71 changes: 71 additions & 0 deletions dbt_common/events/tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from dataclasses import dataclass
import logging
from logging.handlers import RotatingFileHandler
from typing import Optional

from snowplow_tracker import Emitter, Tracker
from snowplow_tracker.typing import FailureCallback

from dbt_common.events.base_types import EventMsg


@dataclass
class TrackerConfig:
invocation_id: Optional[str] = None
endpoint: Optional[str] = None
protocol: Optional[str] = None
on_failure: Optional[FailureCallback] = None
name: Optional[str] = None
output_file_name: Optional[str] = None
output_file_max_bytes: Optional[int] = 10 * 1024 * 1024 # 10 mb


class _Tracker:
def __init__(self, config: TrackerConfig) -> None:
self.invocation_id: Optional[str] = config.invocation_id

if all([config.name, config.output_file_name]):
file_handler = RotatingFileHandler(
filename=str(config.output_file_name),
encoding="utf8",
maxBytes=config.output_file_max_bytes, # type: ignore
backupCount=5,
)
self._tracker = self._python_file_logger(config.name, file_handler)

elif all([config.endpoint, config.protocol]):
self._tracker = self._snowplow_tracker(config.endpoint, config.protocol)

def track(self, msg: EventMsg) -> str:
raise NotImplementedError()

def _python_file_logger(self, name: str, handler: logging.Handler) -> logging.Logger:
log = logging.getLogger(name)
log.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter(fmt="%(message)s"))
log.handlers.clear()
log.propagate = False
log.addHandler(handler)
return log

def _snowplow_tracker(
self,
endpoint: str,
protocol: Optional[str] = "https",
on_failure: Optional[FailureCallback] = None,
) -> Tracker:
emitter = Emitter(
endpoint,
protocol,
method="post",
batch_size=30,
on_failure=on_failure,
byte_limit=None,
request_timeout=5.0,
)
tracker = Tracker(
emitters=emitter,
namespace="cf",
app_id="dbt",
)
return tracker
9 changes: 9 additions & 0 deletions dbt_common/events/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ message FormattingMsg {
Formatting data = 2;
}

// Z039
message DisableTracking {
}

message DisableTrackingMsg {
CoreEventInfo info = 1;
DisableTracking data = 2;
}

// Z050
message Note {
string msg = 1;
Expand Down
12 changes: 12 additions & 0 deletions dbt_common/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ def message(self) -> str:
return self.msg


class DisableTracking(DebugLevel):
def code(self) -> str:
return "Z039"

def message(self) -> str:
return (
"Error sending anonymous usage statistics. Disabling tracking for this execution. "
"If you wish to permanently disable tracking, see: "
"https://docs.getdbt.com/reference/global-configs#send-anonymous-usage-stats."
)


class Note(InfoLevel):
"""Unstructured events.
Expand Down
37 changes: 37 additions & 0 deletions dbt_common/events/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Union

import pytz

from dbt_common.events.functions import get_invocation_id


class User:
def __init__(self, directory: Union[str, Path]) -> None:
self.cookie: Dict[str, Any] = {}
self.directory: Path = Path(directory)
self.invocation_id: str = get_invocation_id()
self.run_started_at: datetime = datetime.now(tz=pytz.utc)

@property
def id(self) -> Optional[str]:
if self.cookie:
return self.cookie.get("id")

@property
def do_not_track(self) -> bool:
return self.cookie != {}

def state(self):
return "do not track" if self.do_not_track else "tracking"

@property
def profile(self) -> Path:
return Path(self.directory) / "profiles.yml"

def enable_tracking(self, cookie: Dict[str, Any]):
self.cookie = cookie

def disable_tracking(self):
self.cookie = {}

0 comments on commit e671471

Please sign in to comment.