From a9b9358fb199018d50d170f71a68a72f132bf5be Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Mon, 7 Oct 2024 17:35:02 +0200 Subject: [PATCH] Implement scheduler and heartbeat check-ins Implement a scheduler for check-in events, ensuring that the events are sent asynchronously from the thread in which they are scheduled. Implement a heartbeat check-in helper, which sends heartbeat check-in events. When given the `continuous=True` flag, the events will be sent continously from a separate thread during the duration of the process. --- .changesets/implement-heartbeat-checkins.md | 26 ++ .changesets/send-checkins-concurrently.md | 6 + conftest.py | 10 + pyproject.toml | 1 + src/appsignal/check_in/cron.py | 6 +- src/appsignal/check_in/event.py | 120 ++++--- src/appsignal/check_in/heartbeat.py | 53 +++- src/appsignal/check_in/scheduler.py | 116 ++++--- src/appsignal/ndjson.py | 9 + src/appsignal/transmitter.py | 20 +- tests/check_in/__init__.py | 0 tests/check_in/test_cron.py | 304 ++++++++++++++++++ tests/check_in/test_event.py | 61 ++++ tests/check_in/test_heartbeat.py | 60 ++++ tests/check_in/test_scheduler.py | 123 ++++++++ tests/check_in/utils.py | 68 ++++ tests/test_check_in.py | 328 -------------------- tests/test_transmitter.py | 9 + tests/utils.py | 15 + 19 files changed, 877 insertions(+), 458 deletions(-) create mode 100644 .changesets/implement-heartbeat-checkins.md create mode 100644 .changesets/send-checkins-concurrently.md create mode 100644 src/appsignal/ndjson.py create mode 100644 tests/check_in/__init__.py create mode 100644 tests/check_in/test_cron.py create mode 100644 tests/check_in/test_event.py create mode 100644 tests/check_in/test_heartbeat.py create mode 100644 tests/check_in/test_scheduler.py create mode 100644 tests/check_in/utils.py delete mode 100644 tests/test_check_in.py create mode 100644 tests/utils.py diff --git a/.changesets/implement-heartbeat-checkins.md b/.changesets/implement-heartbeat-checkins.md new file mode 100644 index 00000000..56d8faf5 --- /dev/null +++ b/.changesets/implement-heartbeat-checkins.md @@ -0,0 +1,26 @@ +--- +bump: minor +type: add +--- + +Add support for heartbeat check-ins. + +Use the `appsignal.check_in.heartbeat` function to send a single heartbeat check-in event from your application. This can be used, for example, in your application's main loop: + +```python +from appsignal.check_in import heartbeat + +while True: + heartbeat("job_processor") + process_job() +``` + +Heartbeats are deduplicated and sent asynchronously, without blocking the current thread. Regardless of how often the `.heartbeat` function is called, at most one heartbeat with the same identifier will be sent every ten seconds. + +Pass `continuous=True` as the second argument to send heartbeats continuously during the entire lifetime of the current process. This can be used, for example, after your application has finished its boot process: + +```python +def main(): + start_app() + heartbeat("my_app", continuous=True) +``` diff --git a/.changesets/send-checkins-concurrently.md b/.changesets/send-checkins-concurrently.md new file mode 100644 index 00000000..25aa31ee --- /dev/null +++ b/.changesets/send-checkins-concurrently.md @@ -0,0 +1,6 @@ +--- +bump: patch +type: change +--- + +Send check-ins concurrently. When calling `appsignal.check_in.cron`, instead of blocking the current thread while the check-in events are sent, schedule them to be sent in a separate thread. diff --git a/conftest.py b/conftest.py index e2fddf83..8ca7033d 100644 --- a/conftest.py +++ b/conftest.py @@ -18,6 +18,8 @@ from appsignal.agent import agent from appsignal.client import _reset_client from appsignal.heartbeat import _heartbeat_class_warning, _heartbeat_helper_warning +from appsignal.check_in.scheduler import _reset_scheduler +from appsignal.check_in.heartbeat import _kill_continuous_heartbeats, _reset_heartbeat_continuous_interval_seconds from appsignal.internal_logger import _reset_logger from appsignal.opentelemetry import METRICS_PREFERRED_TEMPORALITY @@ -106,6 +108,14 @@ def reset_agent_active_state() -> Any: def reset_global_client() -> Any: _reset_client() +@pytest.fixture(scope="function", autouse=True) +def reset_checkins() -> Any: + yield + + _kill_continuous_heartbeats() + _reset_scheduler() + _reset_heartbeat_continuous_interval_seconds() + @pytest.fixture(scope="function", autouse=True) def stop_agent() -> Any: diff --git a/pyproject.toml b/pyproject.toml index 647372fe..04d35d51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-http", + "typing-extensions" ] dynamic = ["version"] diff --git a/src/appsignal/check_in/cron.py b/src/appsignal/check_in/cron.py index 830ea7db..efd74491 100644 --- a/src/appsignal/check_in/cron.py +++ b/src/appsignal/check_in/cron.py @@ -4,7 +4,7 @@ from os import urandom from typing import Any, Callable, Literal, TypeVar -from .event import Event +from .event import cron as cron_event from .scheduler import scheduler T = TypeVar("T") @@ -18,10 +18,10 @@ def __init__(self, identifier: str) -> None: self.digest = hexlify(urandom(8)).decode("utf-8") def start(self) -> None: - scheduler.schedule(Event.cron(self.identifier, self.digest, "start")) + scheduler().schedule(cron_event(self.identifier, self.digest, "start")) def finish(self) -> None: - scheduler.schedule(Event.cron(self.identifier, self.digest, "finish")) + scheduler().schedule(cron_event(self.identifier, self.digest, "finish")) def __enter__(self) -> None: self.start() diff --git a/src/appsignal/check_in/event.py b/src/appsignal/check_in/event.py index 66d157bd..13cf0f0d 100644 --- a/src/appsignal/check_in/event.py +++ b/src/appsignal/check_in/event.py @@ -2,77 +2,69 @@ from time import time -from typing import TYPE_CHECKING, TypedDict - -if TYPE_CHECKING: - from typing import Literal, Optional, Union, Self, List +from typing import TypedDict, Literal, Union, List +from typing_extensions import NotRequired EventKind = Union[Literal["start"], Literal["finish"]] EventCheckInType = Union[Literal["cron"], Literal["heartbeat"]] class Event(TypedDict): - identifier: str - digest: Optional[str] - kind: Optional[EventKind] - timestamp: int - check_in_type: EventCheckInType + identifier: str + digest: NotRequired[str] + kind: NotRequired[EventKind] + timestamp: int + check_in_type: EventCheckInType - def __init__(self, **kwargs: Event) -> None: - super().__init__(**{ - **kwargs, - "timestamp": int(time()) - }) - - @classmethod - def cron(cls, identifier: str, digest: str, kind: EventKind) -> Self: - return cls( - identifier=identifier, - digest=digest, - kind=kind, - check_in_type="cron" - ) - - @classmethod - def heartbeat(cls, identifier: str) -> Self: - return cls( - identifier=identifier, - check_in_type="heartbeat" - ) +def cron(identifier: str, digest: str, kind: EventKind) -> Event: + return Event( + identifier=identifier, + digest=digest, + kind=kind, + timestamp=int(time()), + check_in_type="cron" + ) + +def heartbeat(identifier: str) -> Event: + return Event( + identifier=identifier, + timestamp=int(time()), + check_in_type="heartbeat" + ) - def is_redundant(self, other: Self) -> bool: - if ( - self["check_in_type"] not in ["cron", "heartbeat"] or - self["check_in_type"] != other["check_in_type"] or - self["identifier"] != other["identifier"] - ): - return False - if self["check_in_type"] == "cron" and ( - self["digest"] != other["digest"] or - self["kind"] != other["kind"] - ): - return False - - return True +def is_redundant(event: Event, existing_event: Event) -> bool: + if ( + event["check_in_type"] not in ["cron", "heartbeat"] or + event["check_in_type"] != existing_event["check_in_type"] or + event["identifier"] != existing_event["identifier"] + ): + return False + + if event["check_in_type"] == "cron" and ( + event.get("digest") != existing_event.get("digest") or + event.get("kind") != existing_event.get("kind") + ): + return False + + return True - @classmethod - def describe(cls, events: List[Self]) -> str: - if not events: - # This shouldn't happen. - return "no check-in events" - elif len(events) > 1: - return f"{len(events)} check-in events" - else: - event = events[0] - if event["check_in_type"] == "cron": - return ( - f"cron check-in `{event.get('identifier', 'unknown')}` " - f"{event.get('kind', 'unknown')} event " - f"(digest {event.get('digest', 'unknown')})" - ) - elif event["check_in_type"] == "heartbeat": - return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event" - else: - # This shouldn't happen. - return "unknown check-in event" +def describe(events: List[Event]) -> str: + if not events: + # This shouldn't happen. + return "no check-in events" + elif len(events) > 1: + return f"{len(events)} check-in events" + else: + event = events[0] + if event["check_in_type"] == "cron": + return ( + f"cron check-in `{event.get('identifier', 'unknown')}` " + f"{event.get('kind', 'unknown')} event " + f"(digest {event.get('digest', 'unknown')})" + ) + elif event["check_in_type"] == "heartbeat": + return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event" + else: + # This shouldn't happen. + return "unknown check-in event" # type: ignore[unreachable] diff --git a/src/appsignal/check_in/heartbeat.py b/src/appsignal/check_in/heartbeat.py index 6d1da4ad..b0bf1459 100644 --- a/src/appsignal/check_in/heartbeat.py +++ b/src/appsignal/check_in/heartbeat.py @@ -1,17 +1,48 @@ from .scheduler import scheduler -from .event import Event -from threading import Thread -from time import sleep +from .event import heartbeat as heartbeat_event +from threading import Thread, Event -_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30 +from typing import List, Tuple -def _continuous_heartbeat(name: str) -> None: - while True: - sleep(_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS) - heartbeat(name) +_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30.0 + +def _heartbeat_continuous_interval_seconds() -> float: + return _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS + +def _set_heartbeat_continuous_interval_seconds(seconds: float) -> None: + global _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS + _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = seconds + +def _reset_heartbeat_continuous_interval_seconds() -> None: + global _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS + _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30.0 + +_started_continuous_heartbeats: List[Tuple[Event, Thread]] = [] + +def _kill_continuous_heartbeats() -> None: + for event, thread in _started_continuous_heartbeats: + event.set() + thread.join() + + _started_continuous_heartbeats.clear() + +def _start_continuous_heartbeat(name: str) -> None: + kill = Event() + + def _run_continuous_heartbeat() -> None: + while True: + if kill.wait(_heartbeat_continuous_interval_seconds()): + break + + heartbeat(name) + + thread = Thread(target=_run_continuous_heartbeat) + thread.start() + _started_continuous_heartbeats.append((kill, thread)) def heartbeat(name: str, continuous: bool = False) -> None: if continuous: - thread = Thread(target=_continuous_heartbeat, args=(name,)) - thread.start() - scheduler.schedule(Event.heartbeat(name)) + print("schedule is ", _heartbeat_continuous_interval_seconds()) + _start_continuous_heartbeat(name) + + scheduler().schedule(heartbeat_event(name)) diff --git a/src/appsignal/check_in/scheduler.py b/src/appsignal/check_in/scheduler.py index d7d3bd0a..9affea72 100644 --- a/src/appsignal/check_in/scheduler.py +++ b/src/appsignal/check_in/scheduler.py @@ -1,37 +1,30 @@ from __future__ import annotations -from threading import Thread, Lock +from threading import Thread, Lock, Event as ThreadEvent from queue import Queue -from typing import List, Optional, cast +from typing import List, Optional, Tuple, cast from time import sleep -from .event import Event +from .event import Event, describe, is_redundant from ..client import Client from .. import internal_logger as logger from ..transmitter import transmit +from ..config import Config -class AcquiredLock: - def __new__(_cls): - raise TypeError("AcquiredLock instances cannot be constructed") - -class TypedLock(Lock): - def __enter__(self) -> AcquiredLock: - super().__enter__() - return cast(AcquiredLock, None) class Scheduler: events: List[Event] - lock: TypedLock + lock: Lock queue: Queue stopped: bool thread: Optional[Thread] - waker: Optional[Thread] + waker: Optional[Tuple[Thread, ThreadEvent]] _transmitted: int - INITIAL_DEBOUNCE_SECONDS = 0.1 - BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 10 + INITIAL_DEBOUNCE_SECONDS: float = 0.1 + BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS: float = 10 - def __init__(self): + def __init__(self) -> None: self.lock = Lock() self.thread = None self.queue = Queue() @@ -40,48 +33,55 @@ def __init__(self): self.waker = None self._transmitted = 0 - def schedule(self, event: Event): - if not Client.config().is_active(): - logger.debug(f"Cannot transmit {Event.describe([event])}: AppSignal is not active") + def schedule(self, event: Event) -> None: + print("schedule called for event!", event, self.stopped) + + config = Client.config() + if config is None or not config.is_active(): + logger.debug(f"Cannot transmit {describe([event])}: AppSignal is not active") + print(f"Cannot transmit {describe([event])}: AppSignal is not active") return - with self.lock as locked: + with self.lock: if self.stopped: - logger.debug(f"Cannot transmit {Event.describe([event])}: AppSignal is stopped") + logger.debug(f"Cannot transmit {describe([event])}: AppSignal is stopped") + print(f"Cannot transmit {describe([event])}: AppSignal is stopped") return - self._add_event(locked, event) + self._add_event(event) if self.waker is None: - self._start_waker(locked, self.INITIAL_DEBOUNCE_SECONDS) + self._start_waker(self.INITIAL_DEBOUNCE_SECONDS) - logger.debug(f"Scheduling {Event.describe([event])} to be transmitted") + logger.debug(f"Scheduling {describe([event])} to be transmitted") + print(f"Scheduling {describe([event])} to be transmitted") if self.thread is None: self.thread = Thread(target=self._run) self.thread.start() - def stop(self): - with self.lock as locked: - self._push_events(locked) + def stop(self) -> None: + with self.lock: + self._push_events() self.stopped = True - self._stop_waker(locked) + self._stop_waker() if self.thread is not None: + self.queue.put(None) self.thread.join() self.thread = None - def _run(self): + def _run(self) -> None: while True: events = self.queue.get() if events is None: break self._transmit(events) - self.transmitted += 1 + self._transmitted += 1 - def _transmit(self, events: List[Event]): - description = Event.describe(events) + def _transmit(self, events: List[Event]) -> None: + description = describe(events) try: - response = transmit(f"{Client.config().option('logging_endpoint')}/check_ins/json", json=events) + response = transmit(f"{cast(Config, Client.config()).option('logging_endpoint')}/check_ins/json", ndjson=events) if 200 <= response.status_code <= 299: logger.debug(f"Transmitted {description}") @@ -90,38 +90,58 @@ def _transmit(self, events: List[Event]): except Exception as e: logger.error(f"Failed to transmit {description}: {e}") - def _add_event(self, locked: AcquiredLock, event: Event): + # Must be called from within a `with self.lock` block. + def _add_event(self, event: Event) -> None: self.events = [ existing_event for existing_event in self.events - if not event.is_redundant(existing_event) + if not is_redundant(event, existing_event) ] self.events.append(event) - def _run_waker(self, debounce: float): - sleep(debounce) + def _run_waker(self, debounce: float, kill: ThreadEvent) -> None: + if kill.wait(debounce): + return - with self.lock as locked: + with self.lock: self.waker = None - self._push_events(locked) + self._push_events() - def _start_waker(self, locked: AcquiredLock, debounce: float): - self._stop_waker(locked) + # Must be called from within a `with self.lock` block. + def _start_waker(self, debounce: float) -> None: + self._stop_waker() - self.waker = Thread(debounce, target=self._run_waker, args=(debounce,)) - self.waker.start() + kill = ThreadEvent() + thread = Thread(target=self._run_waker, args=(debounce, kill)) + thread.start() - def _stop_waker(self, locked: AcquiredLock): + self.waker = (thread, kill) + + # Must be called from within a `with self.lock` block. + def _stop_waker(self) -> None: if self.waker is not None: - self.waker.cancel() + thread, kill = self.waker + kill.set() + thread.join() self.waker = None - def _push_events(self, locked: AcquiredLock): + # Must be called from within a `with self.lock` block. + def _push_events(self) -> None: if not self.events: return self.queue.put(self.events.copy()) self.events.clear() - self._start_waker(locked, self.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS) + self._start_waker(self.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS) + +_scheduler = Scheduler() + +def scheduler() -> Scheduler: + return _scheduler -scheduler = Scheduler() +def _reset_scheduler() -> None: + global _scheduler + _scheduler.stop() + _scheduler = Scheduler() + Scheduler.INITIAL_DEBOUNCE_SECONDS = 0.1 + Scheduler.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 10 diff --git a/src/appsignal/ndjson.py b/src/appsignal/ndjson.py new file mode 100644 index 00000000..ccd76e37 --- /dev/null +++ b/src/appsignal/ndjson.py @@ -0,0 +1,9 @@ +import json + +from typing import List, Any + +def dumps(objs: List[Any]) -> str: + return "\n".join([json.dumps(line) for line in objs]) + +def loads(s: str) -> List[Any]: + return [json.loads(line) for line in s.split("\n")] diff --git a/src/appsignal/transmitter.py b/src/appsignal/transmitter.py index 4c1527ea..a3573c27 100644 --- a/src/appsignal/transmitter.py +++ b/src/appsignal/transmitter.py @@ -1,23 +1,30 @@ from __future__ import annotations import urllib -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, List import requests -from appsignal.client import Client -from appsignal.config import Config +from .client import Client +from .config import Config +from .ndjson import dumps as ndjson_dumps if TYPE_CHECKING: from requests import Response def transmit( - url: str, json: Any | None = None, config: Config | None = None + url: str, + json: Any | None = None, + ndjson: List[Any] | None = None, + config: Config | None = None ) -> Response: if config is None: config = Client.config() or Config() + + if json is not None and ndjson is not None: + raise ValueError("Cannot send both `json` and `ndjson`") params = urllib.parse.urlencode( { @@ -37,4 +44,9 @@ def transmit( cert = config.option("ca_file_path") + if ndjson is not None: + data = ndjson_dumps(ndjson) + headers = {"Content-Type": "application/x-ndjson"} + return requests.post(url, data=data, headers=headers, proxies=proxies, verify=cert) + return requests.post(url, json=json, proxies=proxies, verify=cert) diff --git a/tests/check_in/__init__.py b/tests/check_in/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/check_in/test_cron.py b/tests/check_in/test_cron.py new file mode 100644 index 00000000..8cf9522c --- /dev/null +++ b/tests/check_in/test_cron.py @@ -0,0 +1,304 @@ +from time import sleep + +from pytest import raises + +from appsignal import Heartbeat as DeprecatedHeartbeat, heartbeat as deprecated_heartbeat +from appsignal.check_in import Cron, cron +from appsignal.check_in.scheduler import scheduler +from appsignal.check_in.heartbeat import _set_heartbeat_continuous_interval_seconds + +from .utils import ( + init_client, + mock_requests, + received_events, + received_url, + mock_internal_logger, + mock_print, + received_messages, + assert_called_once, + assert_called_times, + assert_url_config, +) + + +def assert_cron_event(event, kind): + assert event["identifier"] == "some-cron-checkin" + assert event["kind"] == kind + assert isinstance(event["timestamp"], int) + assert isinstance(event["digest"], str) + assert event["check_in_type"] == "cron" + + +def test_cron_start_and_finish_when_appsignal_is_not_active_sends_nothing(mocker): + requests_mock = mock_requests(mocker) + init_client(active=False) + + cron = Cron("some-cron-checkin") + cron.start() + cron.finish() + scheduler().stop() + + assert not requests_mock.called + + +def test_cron_start_sends_cron_checkin_start_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + cron = Cron("some-cron-checkin") + cron.start() + scheduler().stop() + + assert_called_once(requests_mock) + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "start") + + +def test_cron_finish_sends_cron_checkin_finish_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + cron = Cron("some-cron-checkin") + cron.finish() + scheduler().stop() + + assert_called_once(requests_mock) + + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "finish") + + +def test_cron_sends_cron_checkin_finish_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + cron("some-cron-checkin") + scheduler().stop() + + assert_called_once(requests_mock) + + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "finish") + + +def test_cron_with_function_sends_cron_checkin_start_and_finish_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + def some_function(): + sleep(1.1) + return "output" + + assert cron("some-cron-checkin", some_function) == "output" + + scheduler().stop() + + assert_called_times(requests_mock, 2) + + print(requests_mock.call_args_list) + + first_events = received_events(requests_mock, call=0) + assert len(first_events) == 1 + assert_cron_event(first_events[0], "start") + start_event = first_events[0] + + second_events = received_events(requests_mock, call=1) + assert len(second_events) == 1 + assert_cron_event(second_events[0], "finish") + finish_event = second_events[0] + + assert start_event["timestamp"] < finish_event["timestamp"] + assert start_event["digest"] == finish_event["digest"] + + +def test_cron_with_function_does_not_send_cron_checkin_finish_event_on_exception( + mocker, +): + requests_mock = mock_requests(mocker) + init_client() + + def some_function(): + raise Exception("Whoops!") + + with raises(Exception, match="Whoops!"): + cron("some-cron-checkin", some_function) + + scheduler().stop() + + assert_called_once(requests_mock) + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "start") + + +def test_cron_context_manager_sends_cron_checkin_start_and_finish_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + with Cron("some-cron-checkin"): + sleep(1.1) + + scheduler().stop() + + assert_called_times(requests_mock, 2) + + print(requests_mock.call_args_list) + + first_events = received_events(requests_mock, call=0) + assert len(first_events) == 1 + assert_cron_event(first_events[0], "start") + start_event = first_events[0] + + second_events = received_events(requests_mock, call=1) + assert len(second_events) == 1 + assert_cron_event(second_events[0], "finish") + finish_event = second_events[0] + + assert start_event["timestamp"] < finish_event["timestamp"] + assert start_event["digest"] == finish_event["digest"] + + +def test_cron_context_manager_does_not_send_cron_checkin_finish_event_on_exception( + mocker, +): + requests_mock = mock_requests(mocker) + init_client() + + with raises(Exception, match="Whoops!"): + with Cron("some-cron-checkin"): + raise Exception("Whoops!") + + scheduler().stop() + + assert_called_once(requests_mock) + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "start") + + +def test_cron_logs_failure_to_send_event_when_status_code(mocker): + mock_requests(mocker, status_code=500) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "error") + + cron = Cron("some-cron-checkin") + cron.start() + scheduler().stop() + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 1 + assert "Failed to transmit cron check-in `some-cron-checkin` start event" in log_messages[0] + assert ": 500 status code" in log_messages[0] + + +def test_cron_logs_failure_to_send_event_when_exception(mocker): + mock_requests(mocker, raise_exception=True) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "error") + + cron = Cron("some-cron-checkin") + cron.start() + scheduler().stop() + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 1 + assert "Failed to transmit cron check-in `some-cron-checkin` start event" in log_messages[0] + assert ": Whoops!" in log_messages[0] + + +def test_deprecated_heartbeat_helper_behaves_like_cron_helper(mocker): + requests_mock = mock_requests(mocker) + init_client() + + def some_function(): + return "output" + + assert deprecated_heartbeat("some-cron-checkin", some_function) == "output" + scheduler().stop() + + assert_called_times(requests_mock, 1) + + events = received_events(requests_mock) + assert len(events) == 2 + assert_cron_event(events[0], "start") + assert_cron_event(events[1], "finish") + + +def test_deprecated_heartbeat_helper_emits_deprecation_warning(mocker, reset_heartbeat_warnings): + internal_logger_mock = mock_internal_logger(mocker, "warning") + print_mock = mock_print(mocker) + + deprecated_heartbeat("some-cron-checkin") + + for mock in [internal_logger_mock, print_mock]: + messages = received_messages(mock) + assert len([ + message for message in messages + if "The helper `heartbeat` has been deprecated" in message + ]) == 1 + +def test_deprecated_heartbeat_helper_only_emits_deprecation_warning_once( + mocker, reset_heartbeat_warnings +): + internal_logger_mock = mock_internal_logger(mocker, "warning") + print_mock = mock_print(mocker) + + deprecated_heartbeat("some-heartbeat") + deprecated_heartbeat("some-heartbeat") + + for mock in [internal_logger_mock, print_mock]: + messages = received_messages(mock) + assert len([ + message for message in messages + if "The helper `heartbeat` has been deprecated" in message + ]) == 1 + +def test_deprecated_heartbeat_class_returns_cron_instance(): + cron_instance = DeprecatedHeartbeat("some-heartbeat") + assert isinstance(cron_instance, Cron) + + +def test_deprecated_heartbeat_instance_is_instance_of_cron_class(): + for instance_class in [Cron, DeprecatedHeartbeat]: + for checked_class in [Cron, DeprecatedHeartbeat]: + assert isinstance(instance_class("some-instance"), checked_class) + + +def test_deprecated_heartbeat_class_emits_deprecation_warning(mocker, reset_heartbeat_warnings): + internal_logger_mock = mock_internal_logger(mocker, "warning") + print_mock = mock_print(mocker) + + DeprecatedHeartbeat("some-heartbeat") + + for mock in [internal_logger_mock, print_mock]: + messages = received_messages(mock) + assert len([ + message for message in messages + if "The class `Heartbeat` has been deprecated" in message + ]) == 1 + + +def test_deprecated_heartbeat_class_only_emits_deprecation_warning_once( + mocker, reset_heartbeat_warnings +): + internal_logger_mock = mock_internal_logger(mocker, "warning") + print_mock = mock_print(mocker) + + DeprecatedHeartbeat("some-heartbeat") + DeprecatedHeartbeat("some-heartbeat") + + for mock in [internal_logger_mock, print_mock]: + messages = received_messages(mock) + assert len([ + message for message in messages + if "The class `Heartbeat` has been deprecated" in message + ]) == 1 diff --git a/tests/check_in/test_event.py b/tests/check_in/test_event.py new file mode 100644 index 00000000..15e72107 --- /dev/null +++ b/tests/check_in/test_event.py @@ -0,0 +1,61 @@ +from appsignal.check_in.event import Event, heartbeat, cron, describe, is_redundant + +from typing import cast + +def test_describe_no_events(): + assert describe([]) == "no check-in events" + +def test_describe_heartbeat_event(): + events = [heartbeat("some-event")] + assert describe(events) == "heartbeat check-in `some-event` event" + +def test_describe_cron_event(): + events = [cron("some-event", "abc", "start")] + assert describe(events) == "cron check-in `some-event` start event (digest abc)" + +def test_describe_multiple_events(): + events = [heartbeat("event"), cron("some-event", "abc", "start")] + assert describe(events) == "2 check-in events" + +def test_describe_unknown_event(): + event = cast(Event, {"check_in_type": "unknown"}) + assert describe([event]) == "unknown check-in event" + +def test_is_redundant_heartbeat_with_same_name(): + event = heartbeat("some-event") + redundant = heartbeat("some-event") + assert is_redundant(event, redundant) + +def test_is_redundant_heartbeat_with_different_name(): + event = heartbeat("some-event") + redundant = heartbeat("other-event") + assert not is_redundant(event, redundant) + +def test_is_redundant_heartbeat_and_cron(): + event = heartbeat("some-event") + redundant = cron("some-event", "abc", "start") + assert not is_redundant(event, redundant) + +def test_is_redundant_cron_with_same_name_digest_and_kind(): + event = cron("some-event", "abc", "start") + redundant = cron("some-event", "abc", "start") + assert is_redundant(event, redundant) + +def test_is_redundant_cron_with_same_name_and_digest_but_different_kind(): + event = cron("some-event", "abc", "start") + redundant = cron("some-event", "abc", "finish") + assert not is_redundant(event, redundant) + +def test_is_redundant_cron_with_same_name_and_kind_but_different_digest(): + event = cron("some-event", "abc", "start") + redundant = cron("some-event", "def", "start") + assert not is_redundant(event, redundant) + +def test_is_redundant_cron_with_same_digest_and_kind_but_different_name(): + event = cron("some-event", "abc", "start") + redundant = cron("other-event", "abc", "start") + assert not is_redundant(event, redundant) + +def test_is_redundant_unknown_event(): + event = cast(Event, {"check_in_type": "unknown"}) + assert not is_redundant(event, event) diff --git a/tests/check_in/test_heartbeat.py b/tests/check_in/test_heartbeat.py new file mode 100644 index 00000000..f5527008 --- /dev/null +++ b/tests/check_in/test_heartbeat.py @@ -0,0 +1,60 @@ +from appsignal.check_in import heartbeat +from appsignal.check_in.scheduler import scheduler, Scheduler +from appsignal.check_in.heartbeat import _set_heartbeat_continuous_interval_seconds + + +from .utils import ( + init_client, + mock_requests, + received_events, + received_url, + assert_called_once, + assert_called_at_least, + assert_url_config, +) + +from ..utils import ( + wait_until, +) + + +def assert_heartbeat_event(event): + assert event["identifier"] == "some-heartbeat" + assert "kind" not in event + assert "digest" not in event + assert isinstance(event["timestamp"], int) + assert event["check_in_type"] == "heartbeat" + + +def test_heartbeat_sends_heartbeat_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + heartbeat("some-heartbeat") + scheduler().stop() + + assert_called_once(requests_mock) + + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 1 + assert_heartbeat_event(events[0]) + + +def test_heartbeat_continuous_sends_heartbeat_events_forever(mocker): + _set_heartbeat_continuous_interval_seconds(0.01) + Scheduler.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 0.1 + + requests_mock = mock_requests(mocker) + init_client() + + heartbeat("some-heartbeat", continuous=True) + wait_until(lambda: scheduler()._transmitted == 2, "the scheduler did not transmit two events") + + assert_called_at_least(requests_mock, 2) + + for call in [0, 1]: + events = received_events(requests_mock, call=call) + assert len(events) == 1 + assert_heartbeat_event(events[0]) diff --git a/tests/check_in/test_scheduler.py b/tests/check_in/test_scheduler.py new file mode 100644 index 00000000..2c81d2f3 --- /dev/null +++ b/tests/check_in/test_scheduler.py @@ -0,0 +1,123 @@ +from appsignal.check_in.event import heartbeat +from appsignal.check_in.scheduler import scheduler, Scheduler + +from .utils import ( + init_client, + mock_requests, + received_events, + received_url, + assert_called_once, + assert_called_times, + assert_url_config, + mock_internal_logger, + received_messages, +) + +from ..utils import wait_until + + +def test_scheduler_transmits_scheduled_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "debug") + + scheduler().schedule(heartbeat("event")) + assert not requests_mock.called + + wait_until(lambda: scheduler()._transmitted == 1, "the scheduler did not transmit the event") + + assert_called_once(requests_mock) + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 1 + assert events[0]["identifier"] == "event" + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 2 + assert "Scheduling heartbeat check-in `event` event to be transmitted" == log_messages[0] + assert "Transmitted heartbeat check-in `event` event" == log_messages[1] + + +def test_scheduler_debounces_events_scheduled_quickly(mocker): + requests_mock = mock_requests(mocker) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "debug") + + scheduler().schedule(heartbeat("first")) + scheduler().schedule(heartbeat("second")) + assert not requests_mock.called + + wait_until(lambda: scheduler()._transmitted == 1, "the scheduler did not transmit the events") + + assert_called_once(requests_mock) + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 2 + assert events[0]["identifier"] == "first" + assert events[1]["identifier"] == "second" + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 3 + assert "Scheduling heartbeat check-in `first` event to be transmitted" == log_messages[0] + assert "Scheduling heartbeat check-in `second` event to be transmitted" == log_messages[1] + assert "Transmitted 2 check-in events" == log_messages[2] + +def test_scheduler_sends_events_scheduled_slowly_separately(mocker): + requests_mock = mock_requests(mocker) + init_client() + Scheduler.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 0.1 + + scheduler().schedule(heartbeat("first")) + wait_until(lambda: scheduler()._transmitted == 1, "the scheduler did not transmit the first event") + scheduler().schedule(heartbeat("second")) + wait_until(lambda: scheduler()._transmitted == 2, "the scheduler did not transmit the second event") + + assert_called_times(requests_mock, 2) + + first_events = received_events(requests_mock, call=0) + assert len(first_events) == 1 + assert first_events[0]["identifier"] == "first" + + second_events = received_events(requests_mock, call=1) + assert len(second_events) == 1 + assert second_events[0]["identifier"] == "second" + +def test_scheduler_sends_events_scheduled_immediately_on_stop(mocker): + # Set all debounce intervals to 10 seconds, to make the test + # fail if it waits for the debounce -- this ensures that what is being + # tested is that the events are transmitted immediately when the + # scheduler is stopped, without waiting for any debounce. + + requests_mock = mock_requests(mocker) + init_client() + Scheduler.INITIAL_DEBOUNCE_SECONDS = 10 + Scheduler.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 10 + + scheduler().schedule(heartbeat("first")) + scheduler().stop() + wait_until(lambda: scheduler()._transmitted == 1, "the scheduler did not transmit the event") + + assert_called_once(requests_mock) + + events = received_events(requests_mock) + assert len(events) == 1 + assert events[0]["identifier"] == "first" + +def test_scheduler_does_not_transmit_when_stopped(mocker): + requests_mock = mock_requests(mocker) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "debug") + + scheduler().stop() + scheduler().schedule(heartbeat("some-heartbeat-checkin")) + + assert not requests_mock.called + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 1 + assert "Cannot transmit heartbeat check-in `some-heartbeat-checkin` event: AppSignal is stopped" == log_messages[0] diff --git a/tests/check_in/utils.py b/tests/check_in/utils.py new file mode 100644 index 00000000..bbd945ff --- /dev/null +++ b/tests/check_in/utils.py @@ -0,0 +1,68 @@ +from appsignal.client import Client +from appsignal import ndjson + +def init_client(active=True): + Client( + push_api_key="some-push-api-key", + name="some-app", + environment="staging", + hostname="beepboop.local", + active=active, + ) + + +def mock_requests(mocker, status_code=200, raise_exception=False): + requests_mock = mocker.patch("requests.post") + requests_mock.return_value.status_code = status_code + if raise_exception: + + def side_effect(*args, **kwargs): + raise Exception("Whoops!") + + requests_mock.side_effect = side_effect + + return requests_mock + + +def mock_internal_logger(mocker, level): + return mocker.patch(f"appsignal.internal_logger.{level}") + + +def mock_print(mocker): + return mocker.patch("builtins.print") + + +def assert_called_once(mock): + assert_called_times(mock, 1) + + +def assert_called_times(requests_mock, times: int): + assert requests_mock.called + assert len(requests_mock.call_args_list) == times + +def assert_called_at_least(requests_mock, times: int): + assert requests_mock.called + assert len(requests_mock.call_args_list) >= times + + +def received_events(request_mock, call: int = 0): + data = request_mock.call_args_list[call][1]["data"] + return ndjson.loads(data) + + +def received_url(request_mock, call: int = 0): + return request_mock.call_args_list[call][0][0] + + +def received_messages(internal_logger_mock): + return [args[0][0] for args in internal_logger_mock.call_args_list] + + +def assert_url_config(url): + assert "https://appsignal-endpoint.net/check_ins/json?" in url + # The ordering of query parameters is not guaranteed. + assert "api_key=some-push-api-key" in url + assert "environment=staging" in url + assert "hostname=beepboop.local" in url + assert "name=some-app" in url + diff --git a/tests/test_check_in.py b/tests/test_check_in.py deleted file mode 100644 index eaa94e62..00000000 --- a/tests/test_check_in.py +++ /dev/null @@ -1,328 +0,0 @@ -from time import sleep - -from pytest import raises - -from appsignal import Heartbeat, heartbeat -from appsignal.check_in import Cron, cron -from appsignal.client import Client - - -def init_client(active=True): - Client( - push_api_key="some-push-api-key", - name="some-app", - environment="staging", - hostname="beepboop.local", - active=active, - ) - - -def mock_requests(mocker, status_code=200, raise_exception=False): - requests_mock = mocker.patch("requests.post") - requests_mock.return_value.status_code = status_code - if raise_exception: - - def side_effect(*args, **kwargs): - raise Exception("Whoops!") - - requests_mock.side_effect = side_effect - - return requests_mock - - -def mock_internal_logger(mocker, level): - return mocker.patch(f"appsignal.internal_logger.{level}") - - -def mock_print(mocker): - return mocker.patch("builtins.print") - - -def test_cron_start_and_finish_when_appsignal_is_not_active_sends_nothing(mocker): - requests_mock = mock_requests(mocker) - init_client(active=False) - - cron = Cron("some-cron-checkin") - cron.start() - cron.finish() - - assert not requests_mock.called - - -def test_cron_start_sends_cron_checkin_start_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - cron = Cron("some-cron-checkin") - cron.start() - - assert requests_mock.called - assert ( - "https://appsignal-endpoint.net/check_ins/json?" - in requests_mock.call_args[0][0] - ) - # The ordering of query parameters is not guaranteed. - assert "api_key=some-push-api-key" in requests_mock.call_args[0][0] - assert "environment=staging" in requests_mock.call_args[0][0] - assert "hostname=beepboop.local" in requests_mock.call_args[0][0] - assert "name=some-app" in requests_mock.call_args[0][0] - - assert requests_mock.call_args[1]["json"]["identifier"] == "some-cron-checkin" - assert requests_mock.call_args[1]["json"]["kind"] == "start" - assert isinstance(requests_mock.call_args[1]["json"]["timestamp"], int) - assert isinstance(requests_mock.call_args[1]["json"]["digest"], str) - assert requests_mock.call_args[1]["json"]["check_in_type"] == "cron" - - -def test_cron_finish_sends_cron_checkin_finish_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - cron = Cron("some-cron-checkin") - cron.finish() - - assert requests_mock.called - assert ( - "https://appsignal-endpoint.net/check_ins/json?" - in requests_mock.call_args[0][0] - ) - # The ordering of query parameters is not guaranteed. - assert "api_key=some-push-api-key" in requests_mock.call_args[0][0] - assert "environment=staging" in requests_mock.call_args[0][0] - assert "hostname=beepboop.local" in requests_mock.call_args[0][0] - assert "name=some-app" in requests_mock.call_args[0][0] - - assert requests_mock.call_args[1]["json"]["identifier"] == "some-cron-checkin" - assert requests_mock.call_args[1]["json"]["kind"] == "finish" - assert isinstance(requests_mock.call_args[1]["json"]["timestamp"], int) - assert isinstance(requests_mock.call_args[1]["json"]["digest"], str) - assert requests_mock.call_args[1]["json"]["check_in_type"] == "cron" - - -def test_cron_sends_cron_checkin_finish_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - cron("some-cron-checkin") - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 1 - - assert requests_mock.call_args[1]["json"]["identifier"] == "some-cron-checkin" - assert requests_mock.call_args[1]["json"]["kind"] == "finish" - - -def test_cron_with_function_sends_cron_checkin_start_and_finish_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - def some_function(): - sleep(1.1) - return "output" - - assert cron("some-cron-checkin", some_function) == "output" - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 2 - - assert ( - requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - assert ( - requests_mock.call_args_list[1][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[1][1]["json"]["kind"] == "finish" - assert ( - requests_mock.call_args_list[0][1]["json"]["timestamp"] - < requests_mock.call_args_list[1][1]["json"]["timestamp"] - ) - assert ( - requests_mock.call_args_list[0][1]["json"]["digest"] - == requests_mock.call_args_list[1][1]["json"]["digest"] - ) - assert requests_mock.call_args_list[0][1]["json"]["check_in_type"] == "cron" - assert requests_mock.call_args_list[1][1]["json"]["check_in_type"] == "cron" - - -def test_cron_with_function_does_not_send_cron_checkin_finish_event_on_exception( - mocker, -): - requests_mock = mock_requests(mocker) - init_client() - - def some_function(): - raise Exception("Whoops!") - - with raises(Exception, match="Whoops!"): - cron("some-cron-checkin", some_function) - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 1 - - assert ( - requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - - -def test_cron_context_manager_sends_cron_checkin_start_and_finish_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - with Cron("some-cron-checkin"): - sleep(1.1) - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 2 - - assert ( - requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - assert ( - requests_mock.call_args_list[1][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[1][1]["json"]["kind"] == "finish" - assert ( - requests_mock.call_args_list[0][1]["json"]["timestamp"] - < requests_mock.call_args_list[1][1]["json"]["timestamp"] - ) - assert ( - requests_mock.call_args_list[0][1]["json"]["digest"] - == requests_mock.call_args_list[1][1]["json"]["digest"] - ) - assert requests_mock.call_args_list[0][1]["json"]["check_in_type"] == "cron" - assert requests_mock.call_args_list[1][1]["json"]["check_in_type"] == "cron" - - -def test_cron_context_manager_does_not_send_cron_checkin_finish_event_on_exception( - mocker, -): - requests_mock = mock_requests(mocker) - init_client() - - with raises(Exception, match="Whoops!"): - with Cron("some-cron-checkin"): - raise Exception("Whoops!") - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 1 - - assert ( - requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - - -def test_cron_logs_failure_to_send_event_when_status_code(mocker): - mock_requests(mocker, status_code=500) - init_client() - - internal_logger_mock = mock_internal_logger(mocker, "error") - - cron = Cron("some-cron-checkin") - cron.start() - - assert internal_logger_mock.called - assert len(internal_logger_mock.call_args_list) == 1 - assert ( - "Failed to transmit cron check-in start event: status code was 500" - in internal_logger_mock.call_args[0][0] - ) - - -def test_cron_logs_failure_to_send_event_when_exception(mocker): - mock_requests(mocker, raise_exception=True) - init_client() - - internal_logger_mock = mock_internal_logger(mocker, "error") - - cron = Cron("some-cron-checkin") - cron.start() - - assert internal_logger_mock.called - assert len(internal_logger_mock.call_args_list) == 1 - assert ( - "Failed to transmit cron check-in start event: Whoops!" - in internal_logger_mock.call_args[0][0] - ) - - -def test_heartbeat_helper_behaves_like_cron_helper(mocker): - requests_mock = mock_requests(mocker) - init_client() - - def some_function(): - return "output" - - assert heartbeat("some-heartbeat", some_function) == "output" - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 2 - - assert requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-heartbeat" - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - assert requests_mock.call_args_list[1][1]["json"]["identifier"] == "some-heartbeat" - assert requests_mock.call_args_list[1][1]["json"]["kind"] == "finish" - - -def test_heartbeat_helper_emits_deprecation_warning(mocker, reset_heartbeat_warnings): - internal_logger_mock = mock_internal_logger(mocker, "warning") - print_mock = mock_print(mocker) - - heartbeat("some-heartbeat") - - for mock in [internal_logger_mock, print_mock]: - assert mock.called - assert len(mock.call_args_list) == 1 - assert "The helper `heartbeat` has been deprecated" in mock.call_args[0][0] - - -def test_heartbeat_helper_only_emits_deprecation_warning_once( - mocker, reset_heartbeat_warnings -): - internal_logger_mock = mock_internal_logger(mocker, "warning") - print_mock = mock_print(mocker) - - heartbeat("some-heartbeat") - heartbeat("some-heartbeat") - - for mock in [internal_logger_mock, print_mock]: - assert mock.call_count == 1 - - -def test_heartbeat_class_returns_cron_instance(): - cron_instance = Heartbeat("some-heartbeat") - assert isinstance(cron_instance, Cron) - - -def test_cron_instance_as_instance_of_heartbeat(): - for instance_class in [Cron, Heartbeat]: - for checked_class in [Cron, Heartbeat]: - assert isinstance(instance_class("some-instance"), checked_class) - - -def test_heartbeat_class_emits_deprecation_warning(mocker, reset_heartbeat_warnings): - internal_logger_mock = mock_internal_logger(mocker, "warning") - print_mock = mock_print(mocker) - - Heartbeat("some-heartbeat") - - for mock in [internal_logger_mock, print_mock]: - assert mock.called - assert len(mock.call_args_list) == 1 - assert "The class `Heartbeat` has been deprecated" in mock.call_args[0][0] - - -def test_heartbeat_class_only_emits_deprecation_warning_once( - mocker, reset_heartbeat_warnings -): - internal_logger_mock = mock_internal_logger(mocker, "warning") - print_mock = mock_print(mocker) - - Heartbeat("some-heartbeat") - Heartbeat("some-heartbeat") - - for mock in [internal_logger_mock, print_mock]: - assert mock.call_count == 1 diff --git a/tests/test_transmitter.py b/tests/test_transmitter.py index 9a853301..394b7321 100644 --- a/tests/test_transmitter.py +++ b/tests/test_transmitter.py @@ -51,6 +51,15 @@ def test_transmitter_send_body_as_json(mocker): assert mock_request.called assert mock_request.call_args[1]["json"] == {"key": "value"} +def test_transmitter_send_body_as_ndjson(mocker): + mock_request = mocker.patch("requests.post") + + transmit("https://example.url/", ndjson=[{"key": "value"}, {"foo": "bar"}], config=config) + + assert mock_request.called + assert mock_request.call_args[1]["data"] == '{"key": "value"}\n{"foo": "bar"}' + assert mock_request.call_args[1]["headers"] == {"Content-Type": "application/x-ndjson"} + def test_transmitter_set_proxies_from_config(mocker): mock_request = mocker.patch("requests.post") diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 00000000..a4f1d58d --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,15 @@ +from typing import Callable +import time + +def wait_until( + predicate: Callable[[], bool], + message: str = "The predicate was not met", + timeout: float = 1.0 +) -> None: + start = time.time() + + while not predicate(): + if time.time() - start > timeout: + raise TimeoutError(message + f"(after {timeout} seconds)") + + time.sleep(0.1)