From 3327d9932082a84c17ec0c4f1a3711de865a2eef Mon Sep 17 00:00:00 2001 From: Ian Knox <81931810+iknox-fa@users.noreply.github.com> Date: Thu, 2 Dec 2021 16:47:31 -0600 Subject: [PATCH] user configurable event buffer size (#4411) --- core/dbt/events/functions.py | 11 +++++----- core/dbt/flags.py | 20 +++++++++++++----- test/unit/test_events.py | 41 ++++++++++++++++++++---------------- 3 files changed, 44 insertions(+), 28 deletions(-) diff --git a/core/dbt/events/functions.py b/core/dbt/events/functions.py index 101095b5a39..6565ed64385 100644 --- a/core/dbt/events/functions.py +++ b/core/dbt/events/functions.py @@ -25,9 +25,8 @@ # create the global event history buffer with a max size of 100k records # python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore. -# TODO: make the maxlen something configurable from the command line via args(?) global EVENT_HISTORY -EVENT_HISTORY = deque(maxlen=100000) # type: ignore +EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore # create the global file logger with no configuration global FILE_LOG @@ -310,13 +309,15 @@ def fire_event(e: Event) -> None: # skip logs when `--log-cache-events` is not passed if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS: return + # if and only if the event history deque will be completely filled by this event # fire warning that old events are now being dropped global EVENT_HISTORY - if len(EVENT_HISTORY) == ((EVENT_HISTORY.maxlen or 100000) - 1): + if len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1): + EVENT_HISTORY.append(e) fire_event(EventBufferFull()) - - EVENT_HISTORY.append(e) + else: + EVENT_HISTORY.append(e) # backwards compatibility for plugins that require old logger (dbt-rpc) if flags.ENABLE_LEGACY_LOGGER: diff --git a/core/dbt/flags.py b/core/dbt/flags.py index cf75ccc1894..540d0ce66d2 100644 --- a/core/dbt/flags.py +++ b/core/dbt/flags.py @@ -34,6 +34,7 @@ WHICH = None INDIRECT_SELECTION = None LOG_CACHE_EVENTS = None +EVENT_BUFFER_SIZE = 100000 # Global CLI defaults. These flags are set from three places: # CLI args, environment variables, and user_config (profiles.yml). @@ -53,7 +54,8 @@ "SEND_ANONYMOUS_USAGE_STATS": True, "PRINTER_WIDTH": 80, "INDIRECT_SELECTION": 'eager', - "LOG_CACHE_EVENTS": False + "LOG_CACHE_EVENTS": False, + "EVENT_BUFFER_SIZE": 100000 } @@ -101,7 +103,7 @@ def set_from_args(args, user_config): USE_EXPERIMENTAL_PARSER, STATIC_PARSER, WRITE_JSON, PARTIAL_PARSE, \ USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT, INDIRECT_SELECTION, \ VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS, PRINTER_WIDTH, \ - WHICH, LOG_CACHE_EVENTS + WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE STRICT_MODE = False # backwards compatibility # cli args without user_config or env var option @@ -125,6 +127,7 @@ def set_from_args(args, user_config): PRINTER_WIDTH = get_flag_value('PRINTER_WIDTH', args, user_config) INDIRECT_SELECTION = get_flag_value('INDIRECT_SELECTION', args, user_config) LOG_CACHE_EVENTS = get_flag_value('LOG_CACHE_EVENTS', args, user_config) + EVENT_BUFFER_SIZE = get_flag_value('EVENT_BUFFER_SIZE', args, user_config) def get_flag_value(flag, args, user_config): @@ -137,7 +140,13 @@ def get_flag_value(flag, args, user_config): if env_value is not None and env_value != '': env_value = env_value.lower() # non Boolean values - if flag in ['LOG_FORMAT', 'PRINTER_WIDTH', 'PROFILES_DIR', 'INDIRECT_SELECTION']: + if flag in [ + 'LOG_FORMAT', + 'PRINTER_WIDTH', + 'PROFILES_DIR', + 'INDIRECT_SELECTION', + 'EVENT_BUFFER_SIZE' + ]: flag_value = env_value else: flag_value = env_set_bool(env_value) @@ -145,7 +154,7 @@ def get_flag_value(flag, args, user_config): flag_value = getattr(user_config, lc_flag) else: flag_value = flag_defaults[flag] - if flag == 'PRINTER_WIDTH': # printer_width must be an int or it hangs + if flag in ['PRINTER_WIDTH', 'EVENT_BUFFER_SIZE']: # must be ints flag_value = int(flag_value) if flag == 'PROFILES_DIR': flag_value = os.path.abspath(flag_value) @@ -169,5 +178,6 @@ def get_flag_dict(): "send_anonymous_usage_stats": SEND_ANONYMOUS_USAGE_STATS, "printer_width": PRINTER_WIDTH, "indirect_selection": INDIRECT_SELECTION, - "log_cache_events": LOG_CACHE_EVENTS + "log_cache_events": LOG_CACHE_EVENTS, + "event_buffer_size": EVENT_BUFFER_SIZE } diff --git a/test/unit/test_events.py b/test/unit/test_events.py index 73be8aba212..871df051b83 100644 --- a/test/unit/test_events.py +++ b/test/unit/test_events.py @@ -1,19 +1,20 @@ -from dbt import events -from dbt.events.functions import EVENT_HISTORY, fire_event + from dbt.events.test_types import UnitTestInfo from argparse import Namespace from dbt.events import AdapterLogger from dbt.events.functions import event_to_serializable_dict from dbt.events.types import * from dbt.events.test_types import * -from dbt.events.base_types import Event, Node +from dbt.events.base_types import Event from dbt.events.stubs import _CachedRelation, BaseRelation, _ReferenceKey, ParsedModelNode +from importlib import reload +import dbt.events.functions as event_funcs +import dbt.flags as flags import inspect import json -import datetime from unittest import TestCase from dbt.contracts.graph.parsed import ( - ParsedModelNode, NodeConfig, DependsOn, ParsedMacro + ParsedModelNode, NodeConfig, DependsOn ) from dbt.contracts.files import FileHash @@ -88,25 +89,29 @@ def test_event_codes(self): class TestEventBuffer(TestCase): + def setUp(self) -> None: + flags.EVENT_BUFFER_SIZE = 10 + reload(event_funcs) + # ensure events are populated to the buffer exactly once def test_buffer_populates(self): - fire_event(UnitTestInfo(msg="Test Event 1")) - fire_event(UnitTestInfo(msg="Test Event 2")) + event_funcs.fire_event(UnitTestInfo(msg="Test Event 1")) + event_funcs.fire_event(UnitTestInfo(msg="Test Event 2")) self.assertTrue( - EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1 + event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1 ) # ensure events drop from the front of the buffer when buffer maxsize is reached - # TODO commenting out till we can make this not spit out 100k log lines. - # def test_buffer_FIFOs(self): - # for n in range(0,100001): - # fire_event(UnitTestInfo(msg=f"Test Event {n}")) - # self.assertTrue( - # EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1 - # ) - # self.assertTrue( - # EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0 - # ) + def test_buffer_FIFOs(self): + for n in range(0,(flags.EVENT_BUFFER_SIZE + 1)): + event_funcs.fire_event(UnitTestInfo(msg=f"Test Event {n}")) + + self.assertTrue( + event_funcs.EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1 + ) + self.assertTrue( + event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0 + ) def MockNode(): return ParsedModelNode(