From 97db14769cd367b9e4e6441824389f04ce00c21b Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Wed, 18 Oct 2023 18:47:54 -0400 Subject: [PATCH] add comments, make ThreadingTimer the default, fix tests --- .../trace/export/experimental/accumulator.py | 30 +++-- .../trace/export/experimental/processor.py | 4 +- .../sdk/trace/export/experimental/timer.py | 116 ++++++------------ .../tests/trace/export/test_integration.py | 16 +-- 4 files changed, 65 insertions(+), 101 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py index d814fe2fc2c..bd25103d973 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py @@ -7,28 +7,44 @@ class SpanAccumulator: """ - Accumulates and batches spans in a thread-safe manner. + A thread-safe container designed to collect and batch spans. It accumulates spans until a specified batch size is + reached, at which point the accumulated spans are moved into a FIFO queue. Provides methods to add spans, check if + the accumulator is non-empty, and retrieve the earliest batch of spans from the queue. """ - def __init__(self, max_len: int): - self._max_len = max_len + def __init__(self, batch_size: int): + self._batch_size = batch_size self._spans: typing.List[ReadableSpan] = [] - self._batches = collections.deque() # fixme set max size? + self._batches = collections.deque() # fixme set max size self._lock = threading.Lock() - def nonempty(self): - return len(self._spans) > 0 or len(self._batches) > 0 + def nonempty(self) -> bool: + """ + Checks if the accumulator contains any spans or batches. It returns True if either the span list or the batch + queue is non-empty, and False otherwise. + """ + with self._lock: + return len(self._spans) > 0 or len(self._batches) > 0 def push(self, span: ReadableSpan) -> bool: + """ + Adds a span to the accumulator. If the addition causes the number of spans to reach the + specified batch size, the accumulated spans are moved into a FIFO queue as a new batch. Returns True if a new + batch was created, otherwise returns False. + """ with self._lock: self._spans.append(span) - if len(self._spans) < self._max_len: + if len(self._spans) < self._batch_size: return False self._batches.appendleft(self._spans) self._spans = [] return True def batch(self) -> typing.List[ReadableSpan]: + """ + Returns the earliest (first in line) batch of spans from the FIFO queue. If the queue is empty, returns any + remaining spans that haven't been batched. + """ try: return self._batches.pop() except IndexError: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py index 6bf5ab56c12..d2db600ece4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py @@ -4,7 +4,7 @@ from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, Span from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.sdk.trace.export.experimental.accumulator import SpanAccumulator -from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, PeriodicTimer +from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, PeriodicTimer, ThreadingTimer class BatchSpanProcessor2(SpanProcessor): @@ -22,7 +22,7 @@ def __init__( ): self._exporter = exporter self._accumulator = SpanAccumulator(max_batch_size) - self._timer = timer or PeriodicTimer(interval_sec) + self._timer = timer or ThreadingTimer(interval_sec) self._timer.set_callback(self._export_batch) self._timer.start() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py index bd0fc43e204..f32419c1d1e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py @@ -2,8 +2,6 @@ import threading import typing -from icecream import icecream - class TimerABC(abc.ABC): """ @@ -27,9 +25,47 @@ def stop(self) -> None: pass +class ThreadingTimer(TimerABC): + + def __init__(self, interval_sec: int): + self.interval_sec = interval_sec + self.cb = lambda: None + self.timer = None + self.lock = threading.Lock() + + def set_callback(self, cb) -> None: + with self.lock: + self.cb = cb + + def start(self) -> None: + with self.lock: + self.timer = threading.Timer(self.interval_sec, self._work) + self.timer.daemon = True + self.timer.start() + + def _work(self): + self.cb() + self.start() + + def poke(self) -> None: + with self.lock: + self._do_stop() + threading.Thread(target=self._work, daemon=True).start() + + def stop(self) -> None: + with self.lock: + self._do_stop() + + def _do_stop(self): + if self.timer is None: + return + self.timer.cancel() + self.timer = None + + class PeriodicTimer(TimerABC): """ - Executes the passed-in callback on a timer at the specified interval. The callback can be run sooner than the + DEPRECATED. Executes the passed-in callback on a timer at the specified interval. The callback can be run sooner than the interval via the poke() method, which also resets the timer. """ @@ -82,80 +118,6 @@ def stopped(self) -> bool: return self._stop.is_set() -class IntervalTimer(TimerABC): - - def __init__(self, interval_sec): - self._thread = threading.Thread(target=self._work) - self._interval_sec = interval_sec - self._cb = lambda: None - self._poke = threading.Event() - self._stop = threading.Event() - - def set_callback(self, cb) -> None: - self._cb = cb - - def start(self) -> None: - self._thread.start() - - def _work(self): - while True: - self._poke.wait(self._interval_sec) - if self._stop.is_set(): - return - self._poke.clear() - self._cb() - - def poke(self) -> None: - self._poke.set() - - def stop(self) -> None: - self._stop.set() - - def started(self) -> bool: - pass - - def stopped(self) -> bool: - pass - - -class ThreadingTimer(TimerABC): - - def __init__(self, interval_sec: int): - self.interval_sec = interval_sec - self.cb = lambda: None - self.timer = None - self.lock = threading.Lock() - - def set_callback(self, cb) -> None: - with self.lock: - self.cb = cb - - def start(self) -> None: - with self.lock: - self.timer = threading.Timer(self.interval_sec, self._work) - self.timer.start() - - def _work(self): - self.cb() - self.start() - - def poke(self) -> None: - with self.lock: - self._do_stop() - # start a new thread from a thread that's just about to be shut down - threading.Thread(target=self._work, daemon=True).start() - - def stop(self) -> None: - with self.lock: - self._do_stop() - - def _do_stop(self): - if self.timer is None: - return - self.timer.cancel() - self.timer = None - - class ThreadlessTimer(TimerABC): """ For testing/experimentation. Only executes the callback when you run poke(). diff --git a/opentelemetry-sdk/tests/trace/export/test_integration.py b/opentelemetry-sdk/tests/trace/export/test_integration.py index 343fb3bce73..facc78e8895 100644 --- a/opentelemetry-sdk/tests/trace/export/test_integration.py +++ b/opentelemetry-sdk/tests/trace/export/test_integration.py @@ -24,15 +24,10 @@ def test_full_speed(self): server.start() max_interval_sec = 4 - # timer = ThreadingTimer(max_interval_sec) - timer = PeriodicTimer(max_interval_sec) - - bsp = BatchSpanProcessor2(OTLPSpanExporter2(), timer=timer) + bsp = BatchSpanProcessor2(OTLPSpanExporter2()) num_spans_per_firehose = 1_000 sf = SpanFirehose(bsp, num_spans=num_spans_per_firehose, sleep_sec=0) - start = time.time() - threads = [] num_threads = 128 for _ in range(num_threads): @@ -40,17 +35,9 @@ def test_full_speed(self): thread.start() threads.append(thread) - checkpoint = time.time() - for thread in threads: thread.join() - joined = time.time() - - # ThreadingTimer: checkpoint: 0.72, joined: 5.89 - # PeriodicTimer: checkpoint: 0.8266980648040771, joined: 4.759222030639648 - print(f'checkpoint: {checkpoint - start}, joined: {joined - start}') - time.sleep(max_interval_sec * 2) num_span_received = server.get_num_spans_received() @@ -85,7 +72,6 @@ def test_slow_enough_to_engage_timer(self): sf = SpanFirehose(bsp, num_spans=num_spans, sleep_sec=1) sf.run() time.sleep(5) - # bsp.force_flush() num_span_received = server.get_num_spans_received() self.assertEqual(num_spans, num_span_received) server.stop()