Skip to content

Commit

Permalink
user configurable event buffer size (#4411) (#4412)
Browse files Browse the repository at this point in the history
Co-authored-by: Ian Knox <81931810+iknox-fa@users.noreply.github.com>
  • Loading branch information
leahwicz and iknox-fa authored Dec 2, 2021
1 parent c92e1ed commit 5c01f90
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 28 deletions.
11 changes: 6 additions & 5 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@

# create the global event history buffer with a max size of 100k records
# python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore.
# TODO: make the maxlen something configurable from the command line via args(?)
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=100000) # type: ignore
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore

# create the global file logger with no configuration
global FILE_LOG
Expand Down Expand Up @@ -310,13 +309,15 @@ def fire_event(e: Event) -> None:
# skip logs when `--log-cache-events` is not passed
if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS:
return

# if and only if the event history deque will be completely filled by this event
# fire warning that old events are now being dropped
global EVENT_HISTORY
if len(EVENT_HISTORY) == ((EVENT_HISTORY.maxlen or 100000) - 1):
if len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1):
EVENT_HISTORY.append(e)
fire_event(EventBufferFull())

EVENT_HISTORY.append(e)
else:
EVENT_HISTORY.append(e)

# backwards compatibility for plugins that require old logger (dbt-rpc)
if flags.ENABLE_LEGACY_LOGGER:
Expand Down
20 changes: 15 additions & 5 deletions core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
WHICH = None
INDIRECT_SELECTION = None
LOG_CACHE_EVENTS = None
EVENT_BUFFER_SIZE = 100000

# Global CLI defaults. These flags are set from three places:
# CLI args, environment variables, and user_config (profiles.yml).
Expand All @@ -53,7 +54,8 @@
"SEND_ANONYMOUS_USAGE_STATS": True,
"PRINTER_WIDTH": 80,
"INDIRECT_SELECTION": 'eager',
"LOG_CACHE_EVENTS": False
"LOG_CACHE_EVENTS": False,
"EVENT_BUFFER_SIZE": 100000
}


Expand Down Expand Up @@ -101,7 +103,7 @@ def set_from_args(args, user_config):
USE_EXPERIMENTAL_PARSER, STATIC_PARSER, WRITE_JSON, PARTIAL_PARSE, \
USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT, INDIRECT_SELECTION, \
VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS, PRINTER_WIDTH, \
WHICH, LOG_CACHE_EVENTS
WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE

STRICT_MODE = False # backwards compatibility
# cli args without user_config or env var option
Expand All @@ -125,6 +127,7 @@ def set_from_args(args, user_config):
PRINTER_WIDTH = get_flag_value('PRINTER_WIDTH', args, user_config)
INDIRECT_SELECTION = get_flag_value('INDIRECT_SELECTION', args, user_config)
LOG_CACHE_EVENTS = get_flag_value('LOG_CACHE_EVENTS', args, user_config)
EVENT_BUFFER_SIZE = get_flag_value('EVENT_BUFFER_SIZE', args, user_config)


def get_flag_value(flag, args, user_config):
Expand All @@ -137,15 +140,21 @@ def get_flag_value(flag, args, user_config):
if env_value is not None and env_value != '':
env_value = env_value.lower()
# non Boolean values
if flag in ['LOG_FORMAT', 'PRINTER_WIDTH', 'PROFILES_DIR', 'INDIRECT_SELECTION']:
if flag in [
'LOG_FORMAT',
'PRINTER_WIDTH',
'PROFILES_DIR',
'INDIRECT_SELECTION',
'EVENT_BUFFER_SIZE'
]:
flag_value = env_value
else:
flag_value = env_set_bool(env_value)
elif user_config is not None and getattr(user_config, lc_flag, None) is not None:
flag_value = getattr(user_config, lc_flag)
else:
flag_value = flag_defaults[flag]
if flag == 'PRINTER_WIDTH': # printer_width must be an int or it hangs
if flag in ['PRINTER_WIDTH', 'EVENT_BUFFER_SIZE']: # must be ints
flag_value = int(flag_value)
if flag == 'PROFILES_DIR':
flag_value = os.path.abspath(flag_value)
Expand All @@ -169,5 +178,6 @@ def get_flag_dict():
"send_anonymous_usage_stats": SEND_ANONYMOUS_USAGE_STATS,
"printer_width": PRINTER_WIDTH,
"indirect_selection": INDIRECT_SELECTION,
"log_cache_events": LOG_CACHE_EVENTS
"log_cache_events": LOG_CACHE_EVENTS,
"event_buffer_size": EVENT_BUFFER_SIZE
}
41 changes: 23 additions & 18 deletions test/unit/test_events.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from dbt import events
from dbt.events.functions import EVENT_HISTORY, fire_event

from dbt.events.test_types import UnitTestInfo
from argparse import Namespace
from dbt.events import AdapterLogger
from dbt.events.functions import event_to_serializable_dict
from dbt.events.types import *
from dbt.events.test_types import *
from dbt.events.base_types import Event, Node
from dbt.events.base_types import Event
from dbt.events.stubs import _CachedRelation, BaseRelation, _ReferenceKey, ParsedModelNode
from importlib import reload
import dbt.events.functions as event_funcs
import dbt.flags as flags
import inspect
import json
import datetime
from unittest import TestCase
from dbt.contracts.graph.parsed import (
ParsedModelNode, NodeConfig, DependsOn, ParsedMacro
ParsedModelNode, NodeConfig, DependsOn
)
from dbt.contracts.files import FileHash

Expand Down Expand Up @@ -88,25 +89,29 @@ def test_event_codes(self):

class TestEventBuffer(TestCase):

def setUp(self) -> None:
flags.EVENT_BUFFER_SIZE = 10
reload(event_funcs)

# ensure events are populated to the buffer exactly once
def test_buffer_populates(self):
fire_event(UnitTestInfo(msg="Test Event 1"))
fire_event(UnitTestInfo(msg="Test Event 2"))
event_funcs.fire_event(UnitTestInfo(msg="Test Event 1"))
event_funcs.fire_event(UnitTestInfo(msg="Test Event 2"))
self.assertTrue(
EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1
event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1
)

# ensure events drop from the front of the buffer when buffer maxsize is reached
# TODO commenting out till we can make this not spit out 100k log lines.
# def test_buffer_FIFOs(self):
# for n in range(0,100001):
# fire_event(UnitTestInfo(msg=f"Test Event {n}"))
# self.assertTrue(
# EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1
# )
# self.assertTrue(
# EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
# )
def test_buffer_FIFOs(self):
for n in range(0,(flags.EVENT_BUFFER_SIZE + 1)):
event_funcs.fire_event(UnitTestInfo(msg=f"Test Event {n}"))

self.assertTrue(
event_funcs.EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1
)
self.assertTrue(
event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
)

def MockNode():
return ParsedModelNode(
Expand Down

0 comments on commit 5c01f90

Please sign in to comment.