Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] user configurable event buffer size #4412

Merged
merged 1 commit into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@

# create the global event history buffer with a max size of 100k records
# python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore.
# TODO: make the maxlen something configurable from the command line via args(?)
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=100000) # type: ignore
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore

# create the global file logger with no configuration
global FILE_LOG
Expand Down Expand Up @@ -310,13 +309,15 @@ def fire_event(e: Event) -> None:
# skip logs when `--log-cache-events` is not passed
if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS:
return

# if and only if the event history deque will be completely filled by this event
# fire warning that old events are now being dropped
global EVENT_HISTORY
if len(EVENT_HISTORY) == ((EVENT_HISTORY.maxlen or 100000) - 1):
if len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1):
EVENT_HISTORY.append(e)
fire_event(EventBufferFull())

EVENT_HISTORY.append(e)
else:
EVENT_HISTORY.append(e)

# backwards compatibility for plugins that require old logger (dbt-rpc)
if flags.ENABLE_LEGACY_LOGGER:
Expand Down
20 changes: 15 additions & 5 deletions core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
WHICH = None
INDIRECT_SELECTION = None
LOG_CACHE_EVENTS = None
EVENT_BUFFER_SIZE = 100000

# Global CLI defaults. These flags are set from three places:
# CLI args, environment variables, and user_config (profiles.yml).
Expand All @@ -53,7 +54,8 @@
"SEND_ANONYMOUS_USAGE_STATS": True,
"PRINTER_WIDTH": 80,
"INDIRECT_SELECTION": 'eager',
"LOG_CACHE_EVENTS": False
"LOG_CACHE_EVENTS": False,
"EVENT_BUFFER_SIZE": 100000
}


Expand Down Expand Up @@ -101,7 +103,7 @@ def set_from_args(args, user_config):
USE_EXPERIMENTAL_PARSER, STATIC_PARSER, WRITE_JSON, PARTIAL_PARSE, \
USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT, INDIRECT_SELECTION, \
VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS, PRINTER_WIDTH, \
WHICH, LOG_CACHE_EVENTS
WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE

STRICT_MODE = False # backwards compatibility
# cli args without user_config or env var option
Expand All @@ -125,6 +127,7 @@ def set_from_args(args, user_config):
PRINTER_WIDTH = get_flag_value('PRINTER_WIDTH', args, user_config)
INDIRECT_SELECTION = get_flag_value('INDIRECT_SELECTION', args, user_config)
LOG_CACHE_EVENTS = get_flag_value('LOG_CACHE_EVENTS', args, user_config)
EVENT_BUFFER_SIZE = get_flag_value('EVENT_BUFFER_SIZE', args, user_config)


def get_flag_value(flag, args, user_config):
Expand All @@ -137,15 +140,21 @@ def get_flag_value(flag, args, user_config):
if env_value is not None and env_value != '':
env_value = env_value.lower()
# non Boolean values
if flag in ['LOG_FORMAT', 'PRINTER_WIDTH', 'PROFILES_DIR', 'INDIRECT_SELECTION']:
if flag in [
'LOG_FORMAT',
'PRINTER_WIDTH',
'PROFILES_DIR',
'INDIRECT_SELECTION',
'EVENT_BUFFER_SIZE'
]:
flag_value = env_value
else:
flag_value = env_set_bool(env_value)
elif user_config is not None and getattr(user_config, lc_flag, None) is not None:
flag_value = getattr(user_config, lc_flag)
else:
flag_value = flag_defaults[flag]
if flag == 'PRINTER_WIDTH': # printer_width must be an int or it hangs
if flag in ['PRINTER_WIDTH', 'EVENT_BUFFER_SIZE']: # must be ints
flag_value = int(flag_value)
if flag == 'PROFILES_DIR':
flag_value = os.path.abspath(flag_value)
Expand All @@ -169,5 +178,6 @@ def get_flag_dict():
"send_anonymous_usage_stats": SEND_ANONYMOUS_USAGE_STATS,
"printer_width": PRINTER_WIDTH,
"indirect_selection": INDIRECT_SELECTION,
"log_cache_events": LOG_CACHE_EVENTS
"log_cache_events": LOG_CACHE_EVENTS,
"event_buffer_size": EVENT_BUFFER_SIZE
}
41 changes: 23 additions & 18 deletions test/unit/test_events.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from dbt import events
from dbt.events.functions import EVENT_HISTORY, fire_event

from dbt.events.test_types import UnitTestInfo
from argparse import Namespace
from dbt.events import AdapterLogger
from dbt.events.functions import event_to_serializable_dict
from dbt.events.types import *
from dbt.events.test_types import *
from dbt.events.base_types import Event, Node
from dbt.events.base_types import Event
from dbt.events.stubs import _CachedRelation, BaseRelation, _ReferenceKey, ParsedModelNode
from importlib import reload
import dbt.events.functions as event_funcs
import dbt.flags as flags
import inspect
import json
import datetime
from unittest import TestCase
from dbt.contracts.graph.parsed import (
ParsedModelNode, NodeConfig, DependsOn, ParsedMacro
ParsedModelNode, NodeConfig, DependsOn
)
from dbt.contracts.files import FileHash

Expand Down Expand Up @@ -88,25 +89,29 @@ def test_event_codes(self):

class TestEventBuffer(TestCase):

def setUp(self) -> None:
flags.EVENT_BUFFER_SIZE = 10
reload(event_funcs)

# ensure events are populated to the buffer exactly once
def test_buffer_populates(self):
fire_event(UnitTestInfo(msg="Test Event 1"))
fire_event(UnitTestInfo(msg="Test Event 2"))
event_funcs.fire_event(UnitTestInfo(msg="Test Event 1"))
event_funcs.fire_event(UnitTestInfo(msg="Test Event 2"))
self.assertTrue(
EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1
event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1
)

# ensure events drop from the front of the buffer when buffer maxsize is reached
# TODO commenting out till we can make this not spit out 100k log lines.
# def test_buffer_FIFOs(self):
# for n in range(0,100001):
# fire_event(UnitTestInfo(msg=f"Test Event {n}"))
# self.assertTrue(
# EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1
# )
# self.assertTrue(
# EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
# )
def test_buffer_FIFOs(self):
for n in range(0,(flags.EVENT_BUFFER_SIZE + 1)):
event_funcs.fire_event(UnitTestInfo(msg=f"Test Event {n}"))

self.assertTrue(
event_funcs.EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1
)
self.assertTrue(
event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
)

def MockNode():
return ParsedModelNode(
Expand Down