Skip to content

Commit

Permalink
Struct log for adapter call sites (#4189)
Browse files Browse the repository at this point in the history
graph call sites for structured logging

Co-authored-by: Nathaniel May <nathaniel.may@fishtownanalytics.com>
Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>
  • Loading branch information
3 people committed Nov 9, 2021
1 parent 43b39fd commit b2aea11
Show file tree
Hide file tree
Showing 9 changed files with 512 additions and 96 deletions.
39 changes: 20 additions & 19 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@
from dbt.adapters.base.query_headers import (
MacroQueryStringSetter,
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import (
NewConnection,
ConnectionReused,
ConnectionLeftOpen,
ConnectionLeftOpen2,
ConnectionClosed,
ConnectionClosed2,
Rollback,
RollbackFailed
)
from dbt import flags


Expand Down Expand Up @@ -136,14 +146,10 @@ def set_connection_name(self, name: Optional[str] = None) -> Connection:
if conn.name == conn_name and conn.state == 'open':
return conn

logger.debug(
'Acquiring new {} connection "{}".'.format(self.TYPE, conn_name))
fire_event(NewConnection(conn_name=conn_name, conn_type=self.TYPE))

if conn.state == 'open':
logger.debug(
'Re-using an available connection from the pool (formerly {}).'
.format(conn.name)
)
fire_event(ConnectionReused(conn_name=conn_name))
else:
conn.handle = LazyHandle(self.open)

Expand Down Expand Up @@ -190,11 +196,9 @@ def cleanup_all(self) -> None:
with self.lock:
for connection in self.thread_connections.values():
if connection.state not in {'closed', 'init'}:
logger.debug("Connection '{}' was left open."
.format(connection.name))
fire_event(ConnectionLeftOpen(conn_name=connection.name))
else:
logger.debug("Connection '{}' was properly closed."
.format(connection.name))
fire_event(ConnectionClosed(conn_name=connection.name))
self.close(connection)

# garbage collect these connections
Expand All @@ -220,20 +224,17 @@ def _rollback_handle(cls, connection: Connection) -> None:
try:
connection.handle.rollback()
except Exception:
logger.debug(
'Failed to rollback {}'.format(connection.name),
exc_info=True
)
fire_event(RollbackFailed(conn_name=connection.name))

@classmethod
def _close_handle(cls, connection: Connection) -> None:
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
logger.debug(f'On {connection.name}: Close')
fire_event(ConnectionClosed2(conn_name=connection.name))
connection.handle.close()
else:
logger.debug(f'On {connection.name}: No close available on handle')
fire_event(ConnectionLeftOpen2(conn_name=connection.name))

@classmethod
def _rollback(cls, connection: Connection) -> None:
Expand All @@ -244,7 +245,7 @@ def _rollback(cls, connection: Connection) -> None:
f'"{connection.name}", but it does not have one open!'
)

logger.debug(f'On {connection.name}: ROLLBACK')
fire_event(Rollback(conn_name=connection.name))
cls._rollback_handle(connection)

connection.transaction_open = False
Expand All @@ -256,7 +257,7 @@ def close(cls, connection: Connection) -> Connection:
return connection

if connection.transaction_open and connection.handle:
logger.debug('On {}: ROLLBACK'.format(connection.name))
fire_event(Rollback(conn_name=connection.name))
cls._rollback_handle(connection)
connection.transaction_open = False

Expand Down
15 changes: 9 additions & 6 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.parsed import ParsedSeedNode
from dbt.exceptions import warn_or_error
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import CacheMiss, ListRelations
from dbt.utils import filter_null_values, executor

from dbt.adapters.base.connections import Connection, AdapterResponse
Expand Down Expand Up @@ -288,9 +289,12 @@ def _schema_is_cached(self, database: Optional[str], schema: str) -> bool:
"""Check if the schema is cached, and by default logs if it is not."""

if (database, schema) not in self.cache:
logger.debug(
'On "{}": cache miss for schema "{}.{}", this is inefficient'
.format(self.nice_connection_name(), database, schema)
fire_event(
CacheMiss(
conn_name=self.nice_connection_name,
database=database,
schema=schema
)
)
return False
else:
Expand Down Expand Up @@ -672,9 +676,8 @@ def list_relations(
relations = self.list_relations_without_caching(
schema_relation
)
fire_event(ListRelations(database=database, schema=schema, relations=relations))

logger.debug('with database={}, schema={}, relations={}'
.format(database, schema, relations))
return relations

def _make_match_kwargs(
Expand Down
75 changes: 34 additions & 41 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import threading
from collections import namedtuple
from copy import deepcopy
from typing import List, Iterable, Optional, Dict, Set, Tuple, Any
import threading
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

from dbt.logger import CACHE_LOGGER as logger
from dbt.utils import lowercase
import dbt.exceptions
from dbt.events.functions import fire_event
from dbt.events.types import (
AddLink,
AddRelation,
DropCascade,
DropMissingRelation,
DropRelation,
DumpAfterAddGraph,
DumpAfterRenameSchema,
DumpBeforeAddGraph,
DumpBeforeRenameSchema,
RenameSchema,
TemporaryRelation,
UncachedRelation,
UpdateReference
)
from dbt.utils import lowercase

_ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier')

Expand Down Expand Up @@ -157,12 +172,6 @@ def dump_graph_entry(self):
return [dot_separated(r) for r in self.referenced_by]


def lazy_log(msg, func):
if logger.disabled:
return
logger.debug(msg.format(func()))


class RelationsCache:
"""A cache of the relations known to dbt. Keeps track of relationships
declared between tables and handles renames/drops as a real database would.
Expand Down Expand Up @@ -278,6 +287,7 @@ def _add_link(self, referenced_key, dependent_key):

referenced.add_reference(dependent)

# TODO: Is this dead code? I can't seem to find it grepping the codebase.
def add_link(self, referenced, dependent):
"""Add a link between two relations to the database. If either relation
does not exist, it will be added as an "external" relation.
Expand All @@ -297,11 +307,7 @@ def add_link(self, referenced, dependent):
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
logger.debug(
'{dep!s} references {ref!s} but {ref.database}.{ref.schema} '
'is not in the cache, skipping assumed external relation'
.format(dep=dependent, ref=ref_key)
)
fire_event(UncachedRelation(dep_key=dependent, ref_key=ref_key))
return
if ref_key not in self.relations:
# Insert a dummy "external" relation.
Expand All @@ -317,9 +323,7 @@ def add_link(self, referenced, dependent):
type=referenced.External
)
self.add(dependent)
logger.debug(
'adding link, {!s} references {!s}'.format(dep_key, ref_key)
)
fire_event(AddLink(dep_key=dep_key, ref_key=ref_key))
with self.lock:
self._add_link(ref_key, dep_key)

Expand All @@ -330,14 +334,12 @@ def add(self, relation):
:param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
logger.debug('Adding relation: {!s}'.format(cached))

lazy_log('before adding: {!s}', self.dump_graph)
fire_event(AddRelation(relation=cached))
fire_event(DumpBeforeAddGraph(graph_func=self.dump_graph))

with self.lock:
self._setdefault(cached)

lazy_log('after adding: {!s}', self.dump_graph)
fire_event(DumpAfterAddGraph(graph_func=self.dump_graph))

def _remove_refs(self, keys):
"""Removes all references to all entries in keys. This does not
Expand All @@ -359,13 +361,10 @@ def _drop_cascade_relation(self, dropped):
:param _CachedRelation dropped: An existing _CachedRelation to drop.
"""
if dropped not in self.relations:
logger.debug('dropped a nonexistent relationship: {!s}'
.format(dropped))
fire_event(DropMissingRelation(relation=dropped))
return
consequences = self.relations[dropped].collect_consequences()
logger.debug(
'drop {} is cascading to {}'.format(dropped, consequences)
)
fire_event(DropCascade(dropped=dropped, consequences=consequences))
self._remove_refs(consequences)

def drop(self, relation):
Expand All @@ -380,7 +379,7 @@ def drop(self, relation):
:param str identifier: The identifier of the relation to drop.
"""
dropped = _make_key(relation)
logger.debug('Dropping relation: {!s}'.format(dropped))
fire_event(DropRelation(dropped=dropped))
with self.lock:
self._drop_cascade_relation(dropped)

Expand All @@ -403,9 +402,8 @@ def _rename_relation(self, old_key, new_relation):
# update all the relations that refer to it
for cached in self.relations.values():
if cached.is_referenced_by(old_key):
logger.debug(
'updated reference from {0} -> {2} to {1} -> {2}'
.format(old_key, new_key, cached.key())
fire_event(
UpdateReference(old_key=old_key, new_key=new_key, cached_key=cached.key())
)
cached.rename_key(old_key, new_key)

Expand Down Expand Up @@ -435,10 +433,7 @@ def _check_rename_constraints(self, old_key, new_key):
)

if old_key not in self.relations:
logger.debug(
'old key {} not found in self.relations, assuming temporary'
.format(old_key)
)
fire_event(TemporaryRelation(key=old_key))
return False
return True

Expand All @@ -456,19 +451,17 @@ def rename(self, old, new):
"""
old_key = _make_key(old)
new_key = _make_key(new)
logger.debug('Renaming relation {!s} to {!s}'.format(
old_key, new_key
))
fire_event(RenameSchema(old_key=old_key, new_key=new_key))

lazy_log('before rename: {!s}', self.dump_graph)
fire_event(DumpBeforeRenameSchema(graph_func=self.dump_graph))

with self.lock:
if self._check_rename_constraints(old_key, new_key):
self._rename_relation(old_key, _CachedRelation(new))
else:
self._setdefault(_CachedRelation(new))

lazy_log('after rename: {!s}', self.dump_graph)
fire_event(DumpAfterRenameSchema(graph_func=self.dump_graph))

def get_relations(
self, database: Optional[str], schema: Optional[str]
Expand Down
10 changes: 5 additions & 5 deletions core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
PACKAGE_PATH as GLOBAL_PROJECT_PATH,
PROJECT_NAME as GLOBAL_PROJECT_NAME,
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import AdapterImportError, PluginLoadError
from dbt.contracts.connection import Credentials, AdapterRequiredConfig


from dbt.adapters.protocol import (
AdapterProtocol,
AdapterConfig,
Expand Down Expand Up @@ -67,11 +66,12 @@ def load_plugin(self, name: str) -> Type[Credentials]:
# if we failed to import the target module in particular, inform
# the user about it via a runtime error
if exc.name == 'dbt.adapters.' + name:
fire_event(AdapterImportError(exc=exc))
raise RuntimeException(f'Could not find adapter type {name}!')
logger.info(f'Error importing adapter: {exc}')
# otherwise, the error had to have come from some underlying
# library. Log the stack trace.
logger.debug('', exc_info=True)

fire_event(PluginLoadError())
raise
plugin: AdapterPlugin = mod.Plugin
plugin_type = plugin.adapter.type()
Expand Down
24 changes: 10 additions & 14 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from dbt.contracts.connection import (
Connection, ConnectionState, AdapterResponse
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import ConnectionUsed, SQLQuery, SQLCommit, SQLQueryStatus


class SQLConnectionManager(BaseConnectionManager):
Expand Down Expand Up @@ -58,29 +59,24 @@ def add_query(
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()

logger.debug('Using {} connection "{}".'
.format(self.TYPE, connection.name))
fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=connection.name))

with self.exception_handler(sql):
if abridge_sql_log:
log_sql = '{}...'.format(sql[:512])
else:
log_sql = sql

logger.debug(
'On {connection_name}: {sql}',
connection_name=connection.name,
sql=log_sql,
)
fire_event(SQLQuery(conn_name=connection.name, sql=log_sql))
pre = time.time()

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)
logger.debug(
"SQL status: {status} in {elapsed:0.2f} seconds",
status=self.get_response(cursor),
elapsed=(time.time() - pre)

fire_event(
SQLQueryStatus(
status=self.get_response(cursor), elapsed=round((time.time() - pre), 2)
)
)

return connection, cursor
Expand Down Expand Up @@ -160,7 +156,7 @@ def commit(self):
'Tried to commit transaction on connection "{}", but '
'it does not have one open!'.format(connection.name))

logger.debug('On {}: COMMIT'.format(connection.name))
fire_event(SQLCommit(conn_name=connection.name))
self.add_commit_query()

connection.transaction_open = False
Expand Down
Loading

0 comments on commit b2aea11

Please sign in to comment.