Skip to content

Commit

Permalink
add comments, make ThreadingTimer the default, fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pmcollins committed Oct 18, 2023
1 parent f54bdd8 commit 97db147
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,44 @@

class SpanAccumulator:
"""
Accumulates and batches spans in a thread-safe manner.
A thread-safe container designed to collect and batch spans. It accumulates spans until a specified batch size is
reached, at which point the accumulated spans are moved into a FIFO queue. Provides methods to add spans, check if
the accumulator is non-empty, and retrieve the earliest batch of spans from the queue.
"""

def __init__(self, max_len: int):
self._max_len = max_len
def __init__(self, batch_size: int):
self._batch_size = batch_size
self._spans: typing.List[ReadableSpan] = []
self._batches = collections.deque() # fixme set max size?
self._batches = collections.deque() # fixme set max size
self._lock = threading.Lock()

def nonempty(self):
return len(self._spans) > 0 or len(self._batches) > 0
def nonempty(self) -> bool:
"""
Checks if the accumulator contains any spans or batches. It returns True if either the span list or the batch
queue is non-empty, and False otherwise.
"""
with self._lock:
return len(self._spans) > 0 or len(self._batches) > 0

def push(self, span: ReadableSpan) -> bool:
"""
Adds a span to the accumulator. If the addition causes the number of spans to reach the
specified batch size, the accumulated spans are moved into a FIFO queue as a new batch. Returns True if a new
batch was created, otherwise returns False.
"""
with self._lock:
self._spans.append(span)
if len(self._spans) < self._max_len:
if len(self._spans) < self._batch_size:
return False
self._batches.appendleft(self._spans)
self._spans = []
return True

def batch(self) -> typing.List[ReadableSpan]:
"""
Returns the earliest (first in line) batch of spans from the FIFO queue. If the queue is empty, returns any
remaining spans that haven't been batched.
"""
try:
return self._batches.pop()
except IndexError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, Span
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.sdk.trace.export.experimental.accumulator import SpanAccumulator
from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, PeriodicTimer
from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, PeriodicTimer, ThreadingTimer


class BatchSpanProcessor2(SpanProcessor):
Expand All @@ -22,7 +22,7 @@ def __init__(
):
self._exporter = exporter
self._accumulator = SpanAccumulator(max_batch_size)
self._timer = timer or PeriodicTimer(interval_sec)
self._timer = timer or ThreadingTimer(interval_sec)
self._timer.set_callback(self._export_batch)
self._timer.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import threading
import typing

from icecream import icecream


class TimerABC(abc.ABC):
"""
Expand All @@ -27,9 +25,47 @@ def stop(self) -> None:
pass


class ThreadingTimer(TimerABC):

def __init__(self, interval_sec: int):
self.interval_sec = interval_sec
self.cb = lambda: None
self.timer = None
self.lock = threading.Lock()

def set_callback(self, cb) -> None:
with self.lock:
self.cb = cb

def start(self) -> None:
with self.lock:
self.timer = threading.Timer(self.interval_sec, self._work)
self.timer.daemon = True
self.timer.start()

def _work(self):
self.cb()
self.start()

def poke(self) -> None:
with self.lock:
self._do_stop()
threading.Thread(target=self._work, daemon=True).start()

def stop(self) -> None:
with self.lock:
self._do_stop()

def _do_stop(self):
if self.timer is None:
return
self.timer.cancel()
self.timer = None


class PeriodicTimer(TimerABC):
"""
Executes the passed-in callback on a timer at the specified interval. The callback can be run sooner than the
DEPRECATED. Executes the passed-in callback on a timer at the specified interval. The callback can be run sooner than the
interval via the poke() method, which also resets the timer.
"""

Expand Down Expand Up @@ -82,80 +118,6 @@ def stopped(self) -> bool:
return self._stop.is_set()


class IntervalTimer(TimerABC):

def __init__(self, interval_sec):
self._thread = threading.Thread(target=self._work)
self._interval_sec = interval_sec
self._cb = lambda: None
self._poke = threading.Event()
self._stop = threading.Event()

def set_callback(self, cb) -> None:
self._cb = cb

def start(self) -> None:
self._thread.start()

def _work(self):
while True:
self._poke.wait(self._interval_sec)
if self._stop.is_set():
return
self._poke.clear()
self._cb()

def poke(self) -> None:
self._poke.set()

def stop(self) -> None:
self._stop.set()

def started(self) -> bool:
pass

def stopped(self) -> bool:
pass


class ThreadingTimer(TimerABC):

def __init__(self, interval_sec: int):
self.interval_sec = interval_sec
self.cb = lambda: None
self.timer = None
self.lock = threading.Lock()

def set_callback(self, cb) -> None:
with self.lock:
self.cb = cb

def start(self) -> None:
with self.lock:
self.timer = threading.Timer(self.interval_sec, self._work)
self.timer.start()

def _work(self):
self.cb()
self.start()

def poke(self) -> None:
with self.lock:
self._do_stop()
# start a new thread from a thread that's just about to be shut down
threading.Thread(target=self._work, daemon=True).start()

def stop(self) -> None:
with self.lock:
self._do_stop()

def _do_stop(self):
if self.timer is None:
return
self.timer.cancel()
self.timer = None


class ThreadlessTimer(TimerABC):
"""
For testing/experimentation. Only executes the callback when you run poke().
Expand Down
16 changes: 1 addition & 15 deletions opentelemetry-sdk/tests/trace/export/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,20 @@ def test_full_speed(self):
server.start()
max_interval_sec = 4

# timer = ThreadingTimer(max_interval_sec)
timer = PeriodicTimer(max_interval_sec)

bsp = BatchSpanProcessor2(OTLPSpanExporter2(), timer=timer)
bsp = BatchSpanProcessor2(OTLPSpanExporter2())
num_spans_per_firehose = 1_000
sf = SpanFirehose(bsp, num_spans=num_spans_per_firehose, sleep_sec=0)

start = time.time()

threads = []
num_threads = 128
for _ in range(num_threads):
thread = threading.Thread(target=sf.run)
thread.start()
threads.append(thread)

checkpoint = time.time()

for thread in threads:
thread.join()

joined = time.time()

# ThreadingTimer: checkpoint: 0.72, joined: 5.89
# PeriodicTimer: checkpoint: 0.8266980648040771, joined: 4.759222030639648
print(f'checkpoint: {checkpoint - start}, joined: {joined - start}')

time.sleep(max_interval_sec * 2)

num_span_received = server.get_num_spans_received()
Expand Down Expand Up @@ -85,7 +72,6 @@ def test_slow_enough_to_engage_timer(self):
sf = SpanFirehose(bsp, num_spans=num_spans, sleep_sec=1)
sf.run()
time.sleep(5)
# bsp.force_flush()
num_span_received = server.get_num_spans_received()
self.assertEqual(num_spans, num_span_received)
server.stop()
Expand Down

0 comments on commit 97db147

Please sign in to comment.