diff --git a/conftest.py b/conftest.py index 2c6eb79..e2fddf8 100644 --- a/conftest.py +++ b/conftest.py @@ -9,7 +9,7 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace import TracerProvider, ReadableSpan from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.trace import set_tracer_provider @@ -21,15 +21,17 @@ from appsignal.internal_logger import _reset_logger from appsignal.opentelemetry import METRICS_PREFERRED_TEMPORALITY +from typing import Any, Generator, Callable, Tuple + @pytest.fixture(scope="function", autouse=True) -def disable_start_opentelemetry(mocker): +def disable_start_opentelemetry(mocker: Any) -> Any: mocker.patch("appsignal.opentelemetry._start_tracer") mocker.patch("appsignal.opentelemetry._start_metrics") @pytest.fixture(scope="session", autouse=True) -def start_in_memory_metric_reader(): +def start_in_memory_metric_reader() -> Generator[InMemoryMetricReader, None, None]: metric_reader = InMemoryMetricReader( preferred_temporality=METRICS_PREFERRED_TEMPORALITY ) @@ -40,7 +42,7 @@ def start_in_memory_metric_reader(): @pytest.fixture(scope="session", autouse=True) -def start_in_memory_span_exporter(): +def start_in_memory_span_exporter() -> Generator[InMemorySpanExporter, None, None]: span_exporter = InMemorySpanExporter() exporter_processor = SimpleSpanProcessor(span_exporter) provider = TracerProvider() @@ -51,7 +53,7 @@ def start_in_memory_span_exporter(): @pytest.fixture(scope="function") -def metrics(start_in_memory_metric_reader): +def metrics(start_in_memory_metric_reader: InMemoryMetricReader) -> Generator[Callable[[], Any], None, None]: # Getting the metrics data implicitly wipes its state start_in_memory_metric_reader.get_metrics_data() @@ -59,10 +61,10 @@ def metrics(start_in_memory_metric_reader): @pytest.fixture(scope="function") -def spans(start_in_memory_span_exporter): +def spans(start_in_memory_span_exporter: InMemorySpanExporter) -> Generator[Callable[[], Tuple[ReadableSpan, ...]], None, None]: start_in_memory_span_exporter.clear() - def get_and_clear_spans(): + def get_and_clear_spans() -> Tuple[ReadableSpan, ...]: spans = start_in_memory_span_exporter.get_finished_spans() start_in_memory_span_exporter.clear() return spans @@ -71,7 +73,7 @@ def get_and_clear_spans(): @pytest.fixture(scope="function", autouse=True) -def reset_environment_between_tests(): +def reset_environment_between_tests() -> Any: old_environ = dict(os.environ) yield @@ -81,14 +83,14 @@ def reset_environment_between_tests(): @pytest.fixture(scope="function", autouse=True) -def reset_internal_logger_after_tests(): +def reset_internal_logger_after_tests() -> Any: yield _reset_logger() @pytest.fixture(scope="function", autouse=True) -def stop_and_clear_probes_after_tests(): +def stop_and_clear_probes_after_tests() -> Any: yield probes.stop() @@ -96,17 +98,17 @@ def stop_and_clear_probes_after_tests(): @pytest.fixture(scope="function", autouse=True) -def reset_agent_active_state(): +def reset_agent_active_state() -> Any: agent.active = False @pytest.fixture(scope="function", autouse=True) -def reset_global_client(): +def reset_global_client() -> Any: _reset_client() @pytest.fixture(scope="function", autouse=True) -def stop_agent(): +def stop_agent() -> Any: tmp_path = "/tmp" if platform.system() == "Darwin" else tempfile.gettempdir() working_dir = os.path.join(tmp_path, "appsignal") if os.path.isdir(working_dir): @@ -114,7 +116,7 @@ def stop_agent(): @pytest.fixture(scope="function") -def reset_heartbeat_warnings(): +def reset_heartbeat_warnings() -> Any: _heartbeat_class_warning.reset() _heartbeat_helper_warning.reset() diff --git a/src/appsignal/check_in.py b/src/appsignal/check_in.py deleted file mode 100644 index 5f915cd..0000000 --- a/src/appsignal/check_in.py +++ /dev/null @@ -1,94 +0,0 @@ -from __future__ import annotations - -from binascii import hexlify -from os import urandom -from time import time -from typing import Any, Callable, Literal, TypedDict, TypeVar, Union - -from . import internal_logger as logger -from .client import Client -from .config import Config -from .transmitter import transmit - - -T = TypeVar("T") - -EventKind = Union[Literal["start"], Literal["finish"]] - - -class Event(TypedDict): - identifier: str - digest: str - kind: EventKind - timestamp: int - check_in_type: Literal["cron"] - - -class Cron: - identifier: str - digest: str - - def __init__(self, identifier: str) -> None: - self.identifier = identifier - self.digest = hexlify(urandom(8)).decode("utf-8") - - def _event(self, kind: EventKind) -> Event: - return Event( - identifier=self.identifier, - digest=self.digest, - kind=kind, - timestamp=int(time()), - check_in_type="cron", - ) - - def _transmit(self, event: Event) -> None: - config = Client.config() or Config() - - if not config.is_active(): - logger.debug("AppSignal not active, not transmitting cron check-in event") - return - - url = f"{config.option('logging_endpoint')}/check_ins/json" - try: - response = transmit(url, json=event) - if 200 <= response.status_code <= 299: - logger.debug( - f"Transmitted cron check-in `{event['identifier']}` " - f"({event['digest']}) {event['kind']} event" - ) - else: - logger.error( - f"Failed to transmit cron check-in {event['kind']} event: " - f"status code was {response.status_code}" - ) - except Exception as e: - logger.error(f"Failed to transmit cron check-in {event['kind']} event: {e}") - - def start(self) -> None: - self._transmit(self._event("start")) - - def finish(self) -> None: - self._transmit(self._event("finish")) - - def __enter__(self) -> None: - self.start() - - def __exit__( - self, exc_type: Any = None, exc_value: Any = None, traceback: Any = None - ) -> Literal[False]: - if exc_type is None: - self.finish() - - return False - - -def cron(identifier: str, fn: Callable[[], T] | None = None) -> None | T: - cron = Cron(identifier) - output = None - - if fn is not None: - cron.start() - output = fn() - - cron.finish() - return output diff --git a/src/appsignal/check_in/__init__.py b/src/appsignal/check_in/__init__.py new file mode 100644 index 0000000..28197cd --- /dev/null +++ b/src/appsignal/check_in/__init__.py @@ -0,0 +1,2 @@ +from .cron import Cron, cron +from .heartbeat import heartbeat diff --git a/src/appsignal/check_in/cron.py b/src/appsignal/check_in/cron.py new file mode 100644 index 0000000..830ea7d --- /dev/null +++ b/src/appsignal/check_in/cron.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from binascii import hexlify +from os import urandom +from typing import Any, Callable, Literal, TypeVar + +from .event import Event +from .scheduler import scheduler + +T = TypeVar("T") + +class Cron: + identifier: str + digest: str + + def __init__(self, identifier: str) -> None: + self.identifier = identifier + self.digest = hexlify(urandom(8)).decode("utf-8") + + def start(self) -> None: + scheduler.schedule(Event.cron(self.identifier, self.digest, "start")) + + def finish(self) -> None: + scheduler.schedule(Event.cron(self.identifier, self.digest, "finish")) + + def __enter__(self) -> None: + self.start() + + def __exit__( + self, exc_type: Any = None, exc_value: Any = None, traceback: Any = None + ) -> Literal[False]: + if exc_type is None: + self.finish() + + return False + + +def cron(identifier: str, fn: Callable[[], T] | None = None) -> None | T: + cron = Cron(identifier) + output = None + + if fn is not None: + cron.start() + output = fn() + + cron.finish() + return output diff --git a/src/appsignal/check_in/event.py b/src/appsignal/check_in/event.py new file mode 100644 index 0000000..66d157b --- /dev/null +++ b/src/appsignal/check_in/event.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +from time import time + +from typing import TYPE_CHECKING, TypedDict + +if TYPE_CHECKING: + from typing import Literal, Optional, Union, Self, List + +EventKind = Union[Literal["start"], Literal["finish"]] + +EventCheckInType = Union[Literal["cron"], Literal["heartbeat"]] + +class Event(TypedDict): + identifier: str + digest: Optional[str] + kind: Optional[EventKind] + timestamp: int + check_in_type: EventCheckInType + + def __init__(self, **kwargs: Event) -> None: + super().__init__(**{ + **kwargs, + "timestamp": int(time()) + }) + + @classmethod + def cron(cls, identifier: str, digest: str, kind: EventKind) -> Self: + return cls( + identifier=identifier, + digest=digest, + kind=kind, + check_in_type="cron" + ) + + @classmethod + def heartbeat(cls, identifier: str) -> Self: + return cls( + identifier=identifier, + check_in_type="heartbeat" + ) + + def is_redundant(self, other: Self) -> bool: + if ( + self["check_in_type"] not in ["cron", "heartbeat"] or + self["check_in_type"] != other["check_in_type"] or + self["identifier"] != other["identifier"] + ): + return False + + if self["check_in_type"] == "cron" and ( + self["digest"] != other["digest"] or + self["kind"] != other["kind"] + ): + return False + + return True + + @classmethod + def describe(cls, events: List[Self]) -> str: + if not events: + # This shouldn't happen. + return "no check-in events" + elif len(events) > 1: + return f"{len(events)} check-in events" + else: + event = events[0] + if event["check_in_type"] == "cron": + return ( + f"cron check-in `{event.get('identifier', 'unknown')}` " + f"{event.get('kind', 'unknown')} event " + f"(digest {event.get('digest', 'unknown')})" + ) + elif event["check_in_type"] == "heartbeat": + return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event" + else: + # This shouldn't happen. + return "unknown check-in event" diff --git a/src/appsignal/check_in/heartbeat.py b/src/appsignal/check_in/heartbeat.py new file mode 100644 index 0000000..6d1da4a --- /dev/null +++ b/src/appsignal/check_in/heartbeat.py @@ -0,0 +1,17 @@ +from .scheduler import scheduler +from .event import Event +from threading import Thread +from time import sleep + +_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30 + +def _continuous_heartbeat(name: str) -> None: + while True: + sleep(_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS) + heartbeat(name) + +def heartbeat(name: str, continuous: bool = False) -> None: + if continuous: + thread = Thread(target=_continuous_heartbeat, args=(name,)) + thread.start() + scheduler.schedule(Event.heartbeat(name)) diff --git a/src/appsignal/check_in/scheduler.py b/src/appsignal/check_in/scheduler.py new file mode 100644 index 0000000..d7d3bd0 --- /dev/null +++ b/src/appsignal/check_in/scheduler.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +from threading import Thread, Lock +from queue import Queue +from typing import List, Optional, cast +from time import sleep + +from .event import Event +from ..client import Client +from .. import internal_logger as logger +from ..transmitter import transmit + +class AcquiredLock: + def __new__(_cls): + raise TypeError("AcquiredLock instances cannot be constructed") + +class TypedLock(Lock): + def __enter__(self) -> AcquiredLock: + super().__enter__() + return cast(AcquiredLock, None) + +class Scheduler: + events: List[Event] + lock: TypedLock + queue: Queue + stopped: bool + thread: Optional[Thread] + waker: Optional[Thread] + _transmitted: int + + INITIAL_DEBOUNCE_SECONDS = 0.1 + BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 10 + + def __init__(self): + self.lock = Lock() + self.thread = None + self.queue = Queue() + self.stopped = False + self.events = [] + self.waker = None + self._transmitted = 0 + + def schedule(self, event: Event): + if not Client.config().is_active(): + logger.debug(f"Cannot transmit {Event.describe([event])}: AppSignal is not active") + return + + with self.lock as locked: + if self.stopped: + logger.debug(f"Cannot transmit {Event.describe([event])}: AppSignal is stopped") + return + self._add_event(locked, event) + if self.waker is None: + self._start_waker(locked, self.INITIAL_DEBOUNCE_SECONDS) + + logger.debug(f"Scheduling {Event.describe([event])} to be transmitted") + + if self.thread is None: + self.thread = Thread(target=self._run) + self.thread.start() + + def stop(self): + with self.lock as locked: + self._push_events(locked) + self.stopped = True + self._stop_waker(locked) + if self.thread is not None: + self.thread.join() + self.thread = None + + def _run(self): + while True: + events = self.queue.get() + if events is None: + break + + self._transmit(events) + self.transmitted += 1 + + def _transmit(self, events: List[Event]): + description = Event.describe(events) + + try: + response = transmit(f"{Client.config().option('logging_endpoint')}/check_ins/json", json=events) + + if 200 <= response.status_code <= 299: + logger.debug(f"Transmitted {description}") + else: + logger.error(f"Failed to transmit {description}: {response.status_code} status code") + except Exception as e: + logger.error(f"Failed to transmit {description}: {e}") + + def _add_event(self, locked: AcquiredLock, event: Event): + self.events = [ + existing_event for existing_event in self.events + if not event.is_redundant(existing_event) + ] + + self.events.append(event) + + def _run_waker(self, debounce: float): + sleep(debounce) + + with self.lock as locked: + self.waker = None + self._push_events(locked) + + def _start_waker(self, locked: AcquiredLock, debounce: float): + self._stop_waker(locked) + + self.waker = Thread(debounce, target=self._run_waker, args=(debounce,)) + self.waker.start() + + def _stop_waker(self, locked: AcquiredLock): + if self.waker is not None: + self.waker.cancel() + self.waker = None + + def _push_events(self, locked: AcquiredLock): + if not self.events: + return + + self.queue.put(self.events.copy()) + self.events.clear() + self._start_waker(locked, self.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS) + +scheduler = Scheduler()