diff --git a/.changesets/implement-heartbeat-checkins.md b/.changesets/implement-heartbeat-checkins.md new file mode 100644 index 0000000..56d8faf --- /dev/null +++ b/.changesets/implement-heartbeat-checkins.md @@ -0,0 +1,26 @@ +--- +bump: minor +type: add +--- + +Add support for heartbeat check-ins. + +Use the `appsignal.check_in.heartbeat` function to send a single heartbeat check-in event from your application. This can be used, for example, in your application's main loop: + +```python +from appsignal.check_in import heartbeat + +while True: + heartbeat("job_processor") + process_job() +``` + +Heartbeats are deduplicated and sent asynchronously, without blocking the current thread. Regardless of how often the `.heartbeat` function is called, at most one heartbeat with the same identifier will be sent every ten seconds. + +Pass `continuous=True` as the second argument to send heartbeats continuously during the entire lifetime of the current process. This can be used, for example, after your application has finished its boot process: + +```python +def main(): + start_app() + heartbeat("my_app", continuous=True) +``` diff --git a/.changesets/send-checkins-concurrently.md b/.changesets/send-checkins-concurrently.md new file mode 100644 index 0000000..25aa31e --- /dev/null +++ b/.changesets/send-checkins-concurrently.md @@ -0,0 +1,6 @@ +--- +bump: patch +type: change +--- + +Send check-ins concurrently. When calling `appsignal.check_in.cron`, instead of blocking the current thread while the check-in events are sent, schedule them to be sent in a separate thread. diff --git a/conftest.py b/conftest.py index e2fddf8..48cb986 100644 --- a/conftest.py +++ b/conftest.py @@ -3,26 +3,30 @@ import os import platform import tempfile +from typing import Any, Callable, Generator import pytest from opentelemetry.metrics import set_meter_provider from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider, ReadableSpan +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.trace import set_tracer_provider from appsignal import probes from appsignal.agent import agent +from appsignal.check_in.heartbeat import ( + _kill_continuous_heartbeats, + _reset_heartbeat_continuous_interval_seconds, +) +from appsignal.check_in.scheduler import _reset_scheduler from appsignal.client import _reset_client from appsignal.heartbeat import _heartbeat_class_warning, _heartbeat_helper_warning from appsignal.internal_logger import _reset_logger from appsignal.opentelemetry import METRICS_PREFERRED_TEMPORALITY -from typing import Any, Generator, Callable, Tuple - @pytest.fixture(scope="function", autouse=True) def disable_start_opentelemetry(mocker: Any) -> Any: @@ -53,7 +57,9 @@ def start_in_memory_span_exporter() -> Generator[InMemorySpanExporter, None, Non @pytest.fixture(scope="function") -def metrics(start_in_memory_metric_reader: InMemoryMetricReader) -> Generator[Callable[[], Any], None, None]: +def metrics( + start_in_memory_metric_reader: InMemoryMetricReader, +) -> Generator[Callable[[], Any], None, None]: # Getting the metrics data implicitly wipes its state start_in_memory_metric_reader.get_metrics_data() @@ -61,10 +67,12 @@ def metrics(start_in_memory_metric_reader: InMemoryMetricReader) -> Generator[Ca @pytest.fixture(scope="function") -def spans(start_in_memory_span_exporter: InMemorySpanExporter) -> Generator[Callable[[], Tuple[ReadableSpan, ...]], None, None]: +def spans( + start_in_memory_span_exporter: InMemorySpanExporter, +) -> Generator[Callable[[], tuple[ReadableSpan, ...]], None, None]: start_in_memory_span_exporter.clear() - def get_and_clear_spans() -> Tuple[ReadableSpan, ...]: + def get_and_clear_spans() -> tuple[ReadableSpan, ...]: spans = start_in_memory_span_exporter.get_finished_spans() start_in_memory_span_exporter.clear() return spans @@ -107,6 +115,15 @@ def reset_global_client() -> Any: _reset_client() +@pytest.fixture(scope="function", autouse=True) +def reset_checkins() -> Any: + yield + + _reset_heartbeat_continuous_interval_seconds() + _kill_continuous_heartbeats() + _reset_scheduler() + + @pytest.fixture(scope="function", autouse=True) def stop_agent() -> Any: tmp_path = "/tmp" if platform.system() == "Darwin" else tempfile.gettempdir() diff --git a/pyproject.toml b/pyproject.toml index 647372f..04d35d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-http", + "typing-extensions" ] dynamic = ["version"] diff --git a/src/appsignal/check_in/__init__.py b/src/appsignal/check_in/__init__.py index 28197cd..d332c0f 100644 --- a/src/appsignal/check_in/__init__.py +++ b/src/appsignal/check_in/__init__.py @@ -1,2 +1,5 @@ from .cron import Cron, cron from .heartbeat import heartbeat + + +__all__ = ["Cron", "cron", "heartbeat"] diff --git a/src/appsignal/check_in/cron.py b/src/appsignal/check_in/cron.py index 830ea7d..c68f125 100644 --- a/src/appsignal/check_in/cron.py +++ b/src/appsignal/check_in/cron.py @@ -4,11 +4,13 @@ from os import urandom from typing import Any, Callable, Literal, TypeVar -from .event import Event +from .event import cron as cron_event from .scheduler import scheduler + T = TypeVar("T") + class Cron: identifier: str digest: str @@ -18,10 +20,10 @@ def __init__(self, identifier: str) -> None: self.digest = hexlify(urandom(8)).decode("utf-8") def start(self) -> None: - scheduler.schedule(Event.cron(self.identifier, self.digest, "start")) + scheduler().schedule(cron_event(self.identifier, self.digest, "start")) def finish(self) -> None: - scheduler.schedule(Event.cron(self.identifier, self.digest, "finish")) + scheduler().schedule(cron_event(self.identifier, self.digest, "finish")) def __enter__(self) -> None: self.start() diff --git a/src/appsignal/check_in/event.py b/src/appsignal/check_in/event.py index 66d157b..a970091 100644 --- a/src/appsignal/check_in/event.py +++ b/src/appsignal/check_in/event.py @@ -1,78 +1,72 @@ from __future__ import annotations from time import time +from typing import Literal, TypedDict, Union -from typing import TYPE_CHECKING, TypedDict +from typing_extensions import NotRequired -if TYPE_CHECKING: - from typing import Literal, Optional, Union, Self, List EventKind = Union[Literal["start"], Literal["finish"]] EventCheckInType = Union[Literal["cron"], Literal["heartbeat"]] + class Event(TypedDict): identifier: str - digest: Optional[str] - kind: Optional[EventKind] + digest: NotRequired[str] + kind: NotRequired[EventKind] timestamp: int check_in_type: EventCheckInType - def __init__(self, **kwargs: Event) -> None: - super().__init__(**{ - **kwargs, - "timestamp": int(time()) - }) - - @classmethod - def cron(cls, identifier: str, digest: str, kind: EventKind) -> Self: - return cls( + +def cron(identifier: str, digest: str, kind: EventKind) -> Event: + return Event( identifier=identifier, digest=digest, kind=kind, - check_in_type="cron" - ) - - @classmethod - def heartbeat(cls, identifier: str) -> Self: - return cls( - identifier=identifier, - check_in_type="heartbeat" - ) - - def is_redundant(self, other: Self) -> bool: - if ( - self["check_in_type"] not in ["cron", "heartbeat"] or - self["check_in_type"] != other["check_in_type"] or - self["identifier"] != other["identifier"] - ): + timestamp=int(time()), + check_in_type="cron", + ) + + +def heartbeat(identifier: str) -> Event: + return Event( + identifier=identifier, timestamp=int(time()), check_in_type="heartbeat" + ) + + +def is_redundant(event: Event, existing_event: Event) -> bool: + if ( + event["check_in_type"] not in ["cron", "heartbeat"] + or event["check_in_type"] != existing_event["check_in_type"] + or event["identifier"] != existing_event["identifier"] + ): return False - if self["check_in_type"] == "cron" and ( - self["digest"] != other["digest"] or - self["kind"] != other["kind"] - ): + if event["check_in_type"] == "cron" and ( # noqa: SIM103 + event.get("digest") != existing_event.get("digest") + or event.get("kind") != existing_event.get("kind") + ): return False - - return True - - @classmethod - def describe(cls, events: List[Self]) -> str: - if not events: + return True + + +def describe(events: list[Event]) -> str: + if not events: # This shouldn't happen. return "no check-in events" - elif len(events) > 1: + if len(events) > 1: return f"{len(events)} check-in events" - else: - event = events[0] - if event["check_in_type"] == "cron": - return ( + + event = events[0] + if event["check_in_type"] == "cron": + return ( f"cron check-in `{event.get('identifier', 'unknown')}` " f"{event.get('kind', 'unknown')} event " f"(digest {event.get('digest', 'unknown')})" - ) - elif event["check_in_type"] == "heartbeat": - return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event" - else: - # This shouldn't happen. - return "unknown check-in event" + ) + if event["check_in_type"] == "heartbeat": + return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event" + + # This shouldn't happen. + return "unknown check-in event" # type: ignore[unreachable] diff --git a/src/appsignal/check_in/heartbeat.py b/src/appsignal/check_in/heartbeat.py index 6d1da4a..8d5ed95 100644 --- a/src/appsignal/check_in/heartbeat.py +++ b/src/appsignal/check_in/heartbeat.py @@ -1,17 +1,56 @@ +from __future__ import annotations + +from threading import Event, Thread + +from .event import heartbeat as heartbeat_event from .scheduler import scheduler -from .event import Event -from threading import Thread -from time import sleep -_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30 -def _continuous_heartbeat(name: str) -> None: - while True: - sleep(_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS) - heartbeat(name) +_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30.0 + + +def _heartbeat_continuous_interval_seconds() -> float: + return _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS + + +def _set_heartbeat_continuous_interval_seconds(seconds: float) -> None: + global _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS + _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = seconds + + +def _reset_heartbeat_continuous_interval_seconds() -> None: + global _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS + _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30.0 + + +_started_continuous_heartbeats: list[tuple[Event, Thread]] = [] + + +def _kill_continuous_heartbeats() -> None: + for event, thread in _started_continuous_heartbeats: + event.set() + thread.join() + + _started_continuous_heartbeats.clear() + + +def _start_continuous_heartbeat(name: str) -> None: + kill = Event() + + def _run_continuous_heartbeat() -> None: + while True: + if kill.wait(_heartbeat_continuous_interval_seconds()): + break + + heartbeat(name) + + thread = Thread(target=_run_continuous_heartbeat) + thread.start() + _started_continuous_heartbeats.append((kill, thread)) + def heartbeat(name: str, continuous: bool = False) -> None: if continuous: - thread = Thread(target=_continuous_heartbeat, args=(name,)) - thread.start() - scheduler.schedule(Event.heartbeat(name)) + _start_continuous_heartbeat(name) + + scheduler().schedule(heartbeat_event(name)) diff --git a/src/appsignal/check_in/scheduler.py b/src/appsignal/check_in/scheduler.py index d7d3bd0..6bd878b 100644 --- a/src/appsignal/check_in/scheduler.py +++ b/src/appsignal/check_in/scheduler.py @@ -1,37 +1,30 @@ from __future__ import annotations -from threading import Thread, Lock from queue import Queue -from typing import List, Optional, cast -from time import sleep +from threading import Event as ThreadEvent +from threading import Lock, Thread +from typing import cast -from .event import Event -from ..client import Client from .. import internal_logger as logger +from ..client import Client +from ..config import Config from ..transmitter import transmit +from .event import Event, describe, is_redundant -class AcquiredLock: - def __new__(_cls): - raise TypeError("AcquiredLock instances cannot be constructed") - -class TypedLock(Lock): - def __enter__(self) -> AcquiredLock: - super().__enter__() - return cast(AcquiredLock, None) class Scheduler: - events: List[Event] - lock: TypedLock + events: list[Event] + lock: Lock queue: Queue stopped: bool - thread: Optional[Thread] - waker: Optional[Thread] + thread: Thread | None + waker: ThreadEvent | None _transmitted: int - INITIAL_DEBOUNCE_SECONDS = 0.1 - BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 10 + INITIAL_DEBOUNCE_SECONDS: float = 0.1 + BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS: float = 10.0 - def __init__(self): + def __init__(self) -> None: self.lock = Lock() self.thread = None self.queue = Queue() @@ -40,88 +33,121 @@ def __init__(self): self.waker = None self._transmitted = 0 - def schedule(self, event: Event): - if not Client.config().is_active(): - logger.debug(f"Cannot transmit {Event.describe([event])}: AppSignal is not active") + def schedule(self, event: Event) -> None: + config = Client.config() + if config is None or not config.is_active(): + logger.debug( + f"Cannot transmit {describe([event])}: AppSignal is not active" + ) return - with self.lock as locked: + with self.lock: if self.stopped: - logger.debug(f"Cannot transmit {Event.describe([event])}: AppSignal is stopped") + logger.debug( + f"Cannot transmit {describe([event])}: AppSignal is stopped" + ) return - self._add_event(locked, event) + + self._add_event(event) if self.waker is None: - self._start_waker(locked, self.INITIAL_DEBOUNCE_SECONDS) + self._start_waker(self.INITIAL_DEBOUNCE_SECONDS) - logger.debug(f"Scheduling {Event.describe([event])} to be transmitted") + logger.debug(f"Scheduling {describe([event])} to be transmitted") if self.thread is None: self.thread = Thread(target=self._run) self.thread.start() - def stop(self): - with self.lock as locked: - self._push_events(locked) + def stop(self) -> None: + with self.lock: + self._push_events() self.stopped = True - self._stop_waker(locked) + self._stop_waker() if self.thread is not None: + self.queue.put(None) self.thread.join() self.thread = None - def _run(self): + def _run(self) -> None: while True: events = self.queue.get() if events is None: break - self._transmit(events) - self.transmitted += 1 + self._transmitted += 1 - def _transmit(self, events: List[Event]): - description = Event.describe(events) + def _transmit(self, events: list[Event]) -> None: + description = describe(events) try: - response = transmit(f"{Client.config().option('logging_endpoint')}/check_ins/json", json=events) + response = transmit( + f"{cast(Config, Client.config()).option('logging_endpoint')}" + "/check_ins/json", + ndjson=events, + ) if 200 <= response.status_code <= 299: logger.debug(f"Transmitted {description}") else: - logger.error(f"Failed to transmit {description}: {response.status_code} status code") + logger.error( + f"Failed to transmit {description}: " + f"{response.status_code} status code" + ) except Exception as e: logger.error(f"Failed to transmit {description}: {e}") - def _add_event(self, locked: AcquiredLock, event: Event): + # Must be called from within a `with self.lock` block. + def _add_event(self, event: Event) -> None: self.events = [ - existing_event for existing_event in self.events - if not event.is_redundant(existing_event) + existing_event + for existing_event in self.events + if not is_redundant(event, existing_event) ] - - self.events.append(event) - def _run_waker(self, debounce: float): - sleep(debounce) + self.events.append(event) - with self.lock as locked: + def _run_waker(self, debounce: float, kill: ThreadEvent) -> None: + if kill.wait(debounce): + return + with self.lock: self.waker = None - self._push_events(locked) + self._push_events() - def _start_waker(self, locked: AcquiredLock, debounce: float): - self._stop_waker(locked) + # Must be called from within a `with self.lock` block. + def _start_waker(self, debounce: float) -> None: + self._stop_waker() - self.waker = Thread(debounce, target=self._run_waker, args=(debounce,)) - self.waker.start() + kill = ThreadEvent() + thread = Thread(target=self._run_waker, args=(debounce, kill)) + thread.start() + self.waker = kill - def _stop_waker(self, locked: AcquiredLock): + # Must be called from within a `with self.lock` block. + def _stop_waker(self) -> None: if self.waker is not None: - self.waker.cancel() + kill = self.waker + kill.set() self.waker = None - def _push_events(self, locked: AcquiredLock): + # Must be called from within a `with self.lock` block. + def _push_events(self) -> None: if not self.events: return - self.queue.put(self.events.copy()) self.events.clear() - self._start_waker(locked, self.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS) + self._start_waker(self.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS) + + +_scheduler = Scheduler() + + +def scheduler() -> Scheduler: + return _scheduler + -scheduler = Scheduler() +def _reset_scheduler() -> None: + global _scheduler + _scheduler.stop() + _scheduler = Scheduler() + Scheduler.INITIAL_DEBOUNCE_SECONDS = 0.1 + Scheduler.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 10.0 diff --git a/src/appsignal/ndjson.py b/src/appsignal/ndjson.py new file mode 100644 index 0000000..153a080 --- /dev/null +++ b/src/appsignal/ndjson.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +import json +from typing import Any + + +def dumps(objs: list[Any]) -> str: + return "\n".join([json.dumps(line) for line in objs]) + + +def loads(s: str) -> list[Any]: + return [json.loads(line) for line in s.split("\n")] diff --git a/src/appsignal/transmitter.py b/src/appsignal/transmitter.py index 4c1527e..f22bdba 100644 --- a/src/appsignal/transmitter.py +++ b/src/appsignal/transmitter.py @@ -5,8 +5,9 @@ import requests -from appsignal.client import Client -from appsignal.config import Config +from .client import Client +from .config import Config +from .ndjson import dumps as ndjson_dumps if TYPE_CHECKING: @@ -14,11 +15,17 @@ def transmit( - url: str, json: Any | None = None, config: Config | None = None + url: str, + json: Any | None = None, + ndjson: list[Any] | None = None, + config: Config | None = None, ) -> Response: if config is None: config = Client.config() or Config() + if json is not None and ndjson is not None: + raise ValueError("Cannot send both `json` and `ndjson`") + params = urllib.parse.urlencode( { "api_key": config.option("push_api_key") or "", @@ -37,4 +44,11 @@ def transmit( cert = config.option("ca_file_path") + if ndjson is not None: + data = ndjson_dumps(ndjson) + headers = {"Content-Type": "application/x-ndjson"} + return requests.post( + url, data=data, headers=headers, proxies=proxies, verify=cert + ) + return requests.post(url, json=json, proxies=proxies, verify=cert) diff --git a/tests/check_in/__init__.py b/tests/check_in/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/check_in/test_cron.py b/tests/check_in/test_cron.py new file mode 100644 index 0000000..8f38297 --- /dev/null +++ b/tests/check_in/test_cron.py @@ -0,0 +1,336 @@ +from time import sleep + +from pytest import raises + +from appsignal import Heartbeat as DeprecatedHeartbeat +from appsignal import heartbeat as deprecated_heartbeat +from appsignal.check_in import Cron, cron +from appsignal.check_in.scheduler import scheduler + +from .utils import ( + assert_called_once, + assert_called_times, + assert_url_config, + init_client, + mock_internal_logger, + mock_print, + mock_requests, + received_events, + received_messages, + received_url, +) + + +def assert_cron_event(event, kind): + assert event["identifier"] == "some-cron-checkin" + assert event["kind"] == kind + assert isinstance(event["timestamp"], int) + assert isinstance(event["digest"], str) + assert event["check_in_type"] == "cron" + + +def test_cron_start_and_finish_when_appsignal_is_not_active_sends_nothing(mocker): + requests_mock = mock_requests(mocker) + init_client(active=False) + + cron = Cron("some-cron-checkin") + cron.start() + cron.finish() + scheduler().stop() + + assert not requests_mock.called + + +def test_cron_start_sends_cron_checkin_start_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + cron = Cron("some-cron-checkin") + cron.start() + scheduler().stop() + + assert_called_once(requests_mock) + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "start") + + +def test_cron_finish_sends_cron_checkin_finish_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + cron = Cron("some-cron-checkin") + cron.finish() + scheduler().stop() + + assert_called_once(requests_mock) + + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "finish") + + +def test_cron_sends_cron_checkin_finish_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + cron("some-cron-checkin") + scheduler().stop() + + assert_called_once(requests_mock) + + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "finish") + + +def test_cron_with_function_sends_cron_checkin_start_and_finish_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + def some_function(): + sleep(1.1) + return "output" + + assert cron("some-cron-checkin", some_function) == "output" + + scheduler().stop() + + assert_called_times(requests_mock, 2) + + first_events = received_events(requests_mock, call=0) + assert len(first_events) == 1 + assert_cron_event(first_events[0], "start") + start_event = first_events[0] + + second_events = received_events(requests_mock, call=1) + assert len(second_events) == 1 + assert_cron_event(second_events[0], "finish") + finish_event = second_events[0] + + assert start_event["timestamp"] < finish_event["timestamp"] + assert start_event["digest"] == finish_event["digest"] + + +def test_cron_with_function_does_not_send_cron_checkin_finish_event_on_exception( + mocker, +): + requests_mock = mock_requests(mocker) + init_client() + + def some_function(): + raise Exception("Whoops!") + + with raises(Exception, match="Whoops!"): + cron("some-cron-checkin", some_function) + + scheduler().stop() + + assert_called_once(requests_mock) + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "start") + + +def test_cron_context_manager_sends_cron_checkin_start_and_finish_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + with Cron("some-cron-checkin"): + sleep(1.1) + + scheduler().stop() + + assert_called_times(requests_mock, 2) + + first_events = received_events(requests_mock, call=0) + assert len(first_events) == 1 + assert_cron_event(first_events[0], "start") + start_event = first_events[0] + + second_events = received_events(requests_mock, call=1) + assert len(second_events) == 1 + assert_cron_event(second_events[0], "finish") + finish_event = second_events[0] + + assert start_event["timestamp"] < finish_event["timestamp"] + assert start_event["digest"] == finish_event["digest"] + + +def test_cron_context_manager_does_not_send_cron_checkin_finish_event_on_exception( + mocker, +): + requests_mock = mock_requests(mocker) + init_client() + + with raises(Exception, match="Whoops!"): + with Cron("some-cron-checkin"): + raise Exception("Whoops!") + + scheduler().stop() + + assert_called_once(requests_mock) + events = received_events(requests_mock) + assert len(events) == 1 + assert_cron_event(events[0], "start") + + +def test_cron_logs_failure_to_send_event_when_status_code(mocker): + mock_requests(mocker, status_code=500) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "error") + + cron = Cron("some-cron-checkin") + cron.start() + scheduler().stop() + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 1 + assert ( + "Failed to transmit cron check-in `some-cron-checkin` start event" + in log_messages[0] + ) + assert ": 500 status code" in log_messages[0] + + +def test_cron_logs_failure_to_send_event_when_exception(mocker): + mock_requests(mocker, raise_exception=True) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "error") + + cron = Cron("some-cron-checkin") + cron.start() + scheduler().stop() + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 1 + assert ( + "Failed to transmit cron check-in `some-cron-checkin` start event" + in log_messages[0] + ) + assert ": Whoops!" in log_messages[0] + + +def test_deprecated_heartbeat_helper_behaves_like_cron_helper(mocker): + requests_mock = mock_requests(mocker) + init_client() + + def some_function(): + return "output" + + assert deprecated_heartbeat("some-cron-checkin", some_function) == "output" + scheduler().stop() + + assert_called_times(requests_mock, 1) + + events = received_events(requests_mock) + assert len(events) == 2 + assert_cron_event(events[0], "start") + assert_cron_event(events[1], "finish") + + +def test_deprecated_heartbeat_helper_emits_deprecation_warning( + mocker, reset_heartbeat_warnings +): + internal_logger_mock = mock_internal_logger(mocker, "warning") + print_mock = mock_print(mocker) + + deprecated_heartbeat("some-cron-checkin") + + for mock in [internal_logger_mock, print_mock]: + messages = received_messages(mock) + assert ( + len( + [ + message + for message in messages + if "The helper `heartbeat` has been deprecated" in message + ] + ) + == 1 + ) + + +def test_deprecated_heartbeat_helper_only_emits_deprecation_warning_once( + mocker, reset_heartbeat_warnings +): + internal_logger_mock = mock_internal_logger(mocker, "warning") + print_mock = mock_print(mocker) + + deprecated_heartbeat("some-heartbeat") + deprecated_heartbeat("some-heartbeat") + + for mock in [internal_logger_mock, print_mock]: + messages = received_messages(mock) + assert ( + len( + [ + message + for message in messages + if "The helper `heartbeat` has been deprecated" in message + ] + ) + == 1 + ) + + +def test_deprecated_heartbeat_class_returns_cron_instance(): + cron_instance = DeprecatedHeartbeat("some-heartbeat") + assert isinstance(cron_instance, Cron) + + +def test_deprecated_heartbeat_instance_is_instance_of_cron_class(): + for instance_class in [Cron, DeprecatedHeartbeat]: + for checked_class in [Cron, DeprecatedHeartbeat]: + assert isinstance(instance_class("some-instance"), checked_class) + + +def test_deprecated_heartbeat_class_emits_deprecation_warning( + mocker, reset_heartbeat_warnings +): + internal_logger_mock = mock_internal_logger(mocker, "warning") + print_mock = mock_print(mocker) + + DeprecatedHeartbeat("some-heartbeat") + + for mock in [internal_logger_mock, print_mock]: + messages = received_messages(mock) + assert ( + len( + [ + message + for message in messages + if "The class `Heartbeat` has been deprecated" in message + ] + ) + == 1 + ) + + +def test_deprecated_heartbeat_class_only_emits_deprecation_warning_once( + mocker, reset_heartbeat_warnings +): + internal_logger_mock = mock_internal_logger(mocker, "warning") + print_mock = mock_print(mocker) + + DeprecatedHeartbeat("some-heartbeat") + DeprecatedHeartbeat("some-heartbeat") + + for mock in [internal_logger_mock, print_mock]: + messages = received_messages(mock) + assert ( + len( + [ + message + for message in messages + if "The class `Heartbeat` has been deprecated" in message + ] + ) + == 1 + ) diff --git a/tests/check_in/test_event.py b/tests/check_in/test_event.py new file mode 100644 index 0000000..e747c00 --- /dev/null +++ b/tests/check_in/test_event.py @@ -0,0 +1,94 @@ +from typing import cast + +from appsignal.check_in.event import Event, cron, describe, heartbeat, is_redundant +import json + +def test_describe_no_events(): + assert describe([]) == "no check-in events" + + +def test_describe_heartbeat_event(): + events = [heartbeat("some-event")] + assert describe(events) == "heartbeat check-in `some-event` event" + + +def test_describe_cron_event(): + events = [cron("some-event", "abc", "start")] + assert describe(events) == "cron check-in `some-event` start event (digest abc)" + + +def test_describe_multiple_events(): + events = [heartbeat("event"), cron("some-event", "abc", "start")] + assert describe(events) == "2 check-in events" + + +def test_describe_unknown_event(): + event = cast(Event, {"check_in_type": "unknown"}) + assert describe([event]) == "unknown check-in event" + + +def test_is_redundant_heartbeat_with_same_name(): + event = heartbeat("some-event") + redundant = heartbeat("some-event") + assert is_redundant(event, redundant) + + +def test_is_redundant_heartbeat_with_different_name(): + event = heartbeat("some-event") + redundant = heartbeat("other-event") + assert not is_redundant(event, redundant) + + +def test_is_redundant_heartbeat_and_cron(): + event = heartbeat("some-event") + redundant = cron("some-event", "abc", "start") + assert not is_redundant(event, redundant) + + +def test_is_redundant_cron_with_same_name_digest_and_kind(): + event = cron("some-event", "abc", "start") + redundant = cron("some-event", "abc", "start") + assert is_redundant(event, redundant) + + +def test_is_redundant_cron_with_same_name_and_digest_but_different_kind(): + event = cron("some-event", "abc", "start") + redundant = cron("some-event", "abc", "finish") + assert not is_redundant(event, redundant) + + +def test_is_redundant_cron_with_same_name_and_kind_but_different_digest(): + event = cron("some-event", "abc", "start") + redundant = cron("some-event", "def", "start") + assert not is_redundant(event, redundant) + + +def test_is_redundant_cron_with_same_digest_and_kind_but_different_name(): + event = cron("some-event", "abc", "start") + redundant = cron("other-event", "abc", "start") + assert not is_redundant(event, redundant) + + +def test_is_redundant_unknown_event(): + event = cast(Event, {"check_in_type": "unknown"}) + assert not is_redundant(event, event) + + +def test_cron_event_json_serialised(): + event = cron("some-event", "abc", "start") + serialised = json.dumps(event) + assert "\"check_in_type\": \"cron\"" in serialised + assert "\"kind\": \"start\"" in serialised + assert "\"digest\": \"abc\"" in serialised + assert "\"identifier\": \"some-event\"" in serialised + assert "\"timestamp\":" in serialised + + +def test_heartbeat_event_json_serialised(): + event = heartbeat("some-event") + serialised = json.dumps(event) + assert "\"check_in_type\": \"heartbeat\"" in serialised + assert "\"identifier\": \"some-event\"" in serialised + assert "\"timestamp\":" in serialised + assert "\"kind\":" not in serialised + assert "\"digest\":" not in serialised diff --git a/tests/check_in/test_heartbeat.py b/tests/check_in/test_heartbeat.py new file mode 100644 index 0000000..cb9a67a --- /dev/null +++ b/tests/check_in/test_heartbeat.py @@ -0,0 +1,59 @@ +from appsignal.check_in import heartbeat +from appsignal.check_in.heartbeat import _set_heartbeat_continuous_interval_seconds +from appsignal.check_in.scheduler import Scheduler, scheduler + +from ..utils import wait_until +from .utils import ( + assert_called_at_least, + assert_called_once, + assert_url_config, + init_client, + mock_requests, + received_events, + received_url, +) + + +def assert_heartbeat_event(event): + assert event["identifier"] == "some-heartbeat" + assert "kind" not in event + assert "digest" not in event + assert isinstance(event["timestamp"], int) + assert event["check_in_type"] == "heartbeat" + + +def test_heartbeat_sends_heartbeat_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + heartbeat("some-heartbeat") + scheduler().stop() + + assert_called_once(requests_mock) + + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 1 + assert_heartbeat_event(events[0]) + + +def test_heartbeat_continuous_sends_heartbeat_events_forever(mocker): + _set_heartbeat_continuous_interval_seconds(0.05) + Scheduler.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 0.1 + + requests_mock = mock_requests(mocker) + init_client() + + heartbeat("some-heartbeat", continuous=True) + wait_until( + lambda: scheduler()._transmitted >= 2, + "the scheduler did not transmit two events", + ) + + assert_called_at_least(requests_mock, 2) + + for call in [0, 1]: + events = received_events(requests_mock, call=call) + assert len(events) == 1 + assert_heartbeat_event(events[0]) diff --git a/tests/check_in/test_scheduler.py b/tests/check_in/test_scheduler.py new file mode 100644 index 0000000..da4efcc --- /dev/null +++ b/tests/check_in/test_scheduler.py @@ -0,0 +1,152 @@ +from appsignal.check_in.event import heartbeat +from appsignal.check_in.scheduler import Scheduler, scheduler + +from ..utils import wait_until +from .utils import ( + assert_called_once, + assert_called_times, + assert_url_config, + init_client, + mock_internal_logger, + mock_requests, + received_events, + received_messages, + received_url, +) + + +def test_scheduler_transmits_scheduled_event(mocker): + requests_mock = mock_requests(mocker) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "debug") + + scheduler().schedule(heartbeat("event")) + assert not requests_mock.called + + wait_until( + lambda: scheduler()._transmitted == 1, + "the scheduler did not transmit the event", + ) + + assert_called_once(requests_mock) + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 1 + assert events[0]["identifier"] == "event" + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 2 + assert ( + log_messages[0] + == "Scheduling heartbeat check-in `event` event to be transmitted" + ) + assert log_messages[1] == "Transmitted heartbeat check-in `event` event" + + +def test_scheduler_debounces_events_scheduled_quickly(mocker): + requests_mock = mock_requests(mocker) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "debug") + + scheduler().schedule(heartbeat("first")) + scheduler().schedule(heartbeat("second")) + assert not requests_mock.called + + wait_until( + lambda: scheduler()._transmitted == 1, + "the scheduler did not transmit the events", + ) + + assert_called_once(requests_mock) + assert_url_config(received_url(requests_mock)) + + events = received_events(requests_mock) + assert len(events) == 2 + assert events[0]["identifier"] == "first" + assert events[1]["identifier"] == "second" + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 3 + assert ( + log_messages[0] + == "Scheduling heartbeat check-in `first` event to be transmitted" + ) + assert ( + log_messages[1] + == "Scheduling heartbeat check-in `second` event to be transmitted" + ) + assert log_messages[2] == "Transmitted 2 check-in events" + + +def test_scheduler_sends_events_scheduled_slowly_separately(mocker): + requests_mock = mock_requests(mocker) + init_client() + Scheduler.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 0.1 + + scheduler().schedule(heartbeat("first")) + wait_until( + lambda: scheduler()._transmitted == 1, + "the scheduler did not transmit the first event", + ) + scheduler().schedule(heartbeat("second")) + wait_until( + lambda: scheduler()._transmitted == 2, + "the scheduler did not transmit the second event", + ) + + assert_called_times(requests_mock, 2) + + first_events = received_events(requests_mock, call=0) + assert len(first_events) == 1 + assert first_events[0]["identifier"] == "first" + + second_events = received_events(requests_mock, call=1) + assert len(second_events) == 1 + assert second_events[0]["identifier"] == "second" + + +def test_scheduler_sends_events_scheduled_immediately_on_stop(mocker): + # Set all debounce intervals to 10 seconds, to make the test + # fail if it waits for the debounce -- this ensures that what is being + # tested is that the events are transmitted immediately when the + # scheduler is stopped, without waiting for any debounce. + + requests_mock = mock_requests(mocker) + init_client() + Scheduler.INITIAL_DEBOUNCE_SECONDS = 10 + Scheduler.BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 10 + + scheduler().schedule(heartbeat("first")) + scheduler().stop() + wait_until( + lambda: scheduler()._transmitted == 1, + "the scheduler did not transmit the event", + ) + + assert_called_once(requests_mock) + + events = received_events(requests_mock) + assert len(events) == 1 + assert events[0]["identifier"] == "first" + + +def test_scheduler_does_not_transmit_when_stopped(mocker): + requests_mock = mock_requests(mocker) + init_client() + + internal_logger_mock = mock_internal_logger(mocker, "debug") + + scheduler().stop() + scheduler().schedule(heartbeat("some-heartbeat-checkin")) + + assert not requests_mock.called + + log_messages = received_messages(internal_logger_mock) + assert len(log_messages) == 1 + assert log_messages[0] == ( + "Cannot transmit heartbeat check-in `some-heartbeat-checkin` event: " + "AppSignal is stopped" + ) diff --git a/tests/check_in/utils.py b/tests/check_in/utils.py new file mode 100644 index 0000000..eda78be --- /dev/null +++ b/tests/check_in/utils.py @@ -0,0 +1,69 @@ +from appsignal import ndjson +from appsignal.client import Client + + +def init_client(active=True): + Client( + push_api_key="some-push-api-key", + name="some-app", + environment="staging", + hostname="beepboop.local", + active=active, + ) + + +def mock_requests(mocker, status_code=200, raise_exception=False): + requests_mock = mocker.patch("requests.post") + requests_mock.return_value.status_code = status_code + if raise_exception: + + def side_effect(*args, **kwargs): + raise Exception("Whoops!") + + requests_mock.side_effect = side_effect + + return requests_mock + + +def mock_internal_logger(mocker, level): + return mocker.patch(f"appsignal.internal_logger.{level}") + + +def mock_print(mocker): + return mocker.patch("builtins.print") + + +def assert_called_once(mock): + assert_called_times(mock, 1) + + +def assert_called_times(requests_mock, times: int): + assert requests_mock.called + assert len(requests_mock.call_args_list) == times + + +def assert_called_at_least(requests_mock, times: int): + assert requests_mock.called + assert len(requests_mock.call_args_list) >= times + + +def received_events(request_mock, call: int = 0): + data = request_mock.call_args_list[call][1]["data"] + return ndjson.loads(data) + + +def received_url(request_mock, call: int = 0): + return request_mock.call_args_list[call][0][0] + + +def received_messages(internal_logger_mock): + return [args[0][0] for args in internal_logger_mock.call_args_list] + + +def assert_url_config(url): + assert "https://appsignal-endpoint.net/check_ins/json?" in url + # The ordering of query parameters is not guaranteed. + assert "api_key=some-push-api-key" in url + assert "environment=staging" in url + assert "hostname=beepboop.local" in url + assert "name=some-app" in url diff --git a/tests/test_check_in.py b/tests/test_check_in.py deleted file mode 100644 index eaa94e6..0000000 --- a/tests/test_check_in.py +++ /dev/null @@ -1,328 +0,0 @@ -from time import sleep - -from pytest import raises - -from appsignal import Heartbeat, heartbeat -from appsignal.check_in import Cron, cron -from appsignal.client import Client - - -def init_client(active=True): - Client( - push_api_key="some-push-api-key", - name="some-app", - environment="staging", - hostname="beepboop.local", - active=active, - ) - - -def mock_requests(mocker, status_code=200, raise_exception=False): - requests_mock = mocker.patch("requests.post") - requests_mock.return_value.status_code = status_code - if raise_exception: - - def side_effect(*args, **kwargs): - raise Exception("Whoops!") - - requests_mock.side_effect = side_effect - - return requests_mock - - -def mock_internal_logger(mocker, level): - return mocker.patch(f"appsignal.internal_logger.{level}") - - -def mock_print(mocker): - return mocker.patch("builtins.print") - - -def test_cron_start_and_finish_when_appsignal_is_not_active_sends_nothing(mocker): - requests_mock = mock_requests(mocker) - init_client(active=False) - - cron = Cron("some-cron-checkin") - cron.start() - cron.finish() - - assert not requests_mock.called - - -def test_cron_start_sends_cron_checkin_start_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - cron = Cron("some-cron-checkin") - cron.start() - - assert requests_mock.called - assert ( - "https://appsignal-endpoint.net/check_ins/json?" - in requests_mock.call_args[0][0] - ) - # The ordering of query parameters is not guaranteed. - assert "api_key=some-push-api-key" in requests_mock.call_args[0][0] - assert "environment=staging" in requests_mock.call_args[0][0] - assert "hostname=beepboop.local" in requests_mock.call_args[0][0] - assert "name=some-app" in requests_mock.call_args[0][0] - - assert requests_mock.call_args[1]["json"]["identifier"] == "some-cron-checkin" - assert requests_mock.call_args[1]["json"]["kind"] == "start" - assert isinstance(requests_mock.call_args[1]["json"]["timestamp"], int) - assert isinstance(requests_mock.call_args[1]["json"]["digest"], str) - assert requests_mock.call_args[1]["json"]["check_in_type"] == "cron" - - -def test_cron_finish_sends_cron_checkin_finish_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - cron = Cron("some-cron-checkin") - cron.finish() - - assert requests_mock.called - assert ( - "https://appsignal-endpoint.net/check_ins/json?" - in requests_mock.call_args[0][0] - ) - # The ordering of query parameters is not guaranteed. - assert "api_key=some-push-api-key" in requests_mock.call_args[0][0] - assert "environment=staging" in requests_mock.call_args[0][0] - assert "hostname=beepboop.local" in requests_mock.call_args[0][0] - assert "name=some-app" in requests_mock.call_args[0][0] - - assert requests_mock.call_args[1]["json"]["identifier"] == "some-cron-checkin" - assert requests_mock.call_args[1]["json"]["kind"] == "finish" - assert isinstance(requests_mock.call_args[1]["json"]["timestamp"], int) - assert isinstance(requests_mock.call_args[1]["json"]["digest"], str) - assert requests_mock.call_args[1]["json"]["check_in_type"] == "cron" - - -def test_cron_sends_cron_checkin_finish_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - cron("some-cron-checkin") - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 1 - - assert requests_mock.call_args[1]["json"]["identifier"] == "some-cron-checkin" - assert requests_mock.call_args[1]["json"]["kind"] == "finish" - - -def test_cron_with_function_sends_cron_checkin_start_and_finish_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - def some_function(): - sleep(1.1) - return "output" - - assert cron("some-cron-checkin", some_function) == "output" - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 2 - - assert ( - requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - assert ( - requests_mock.call_args_list[1][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[1][1]["json"]["kind"] == "finish" - assert ( - requests_mock.call_args_list[0][1]["json"]["timestamp"] - < requests_mock.call_args_list[1][1]["json"]["timestamp"] - ) - assert ( - requests_mock.call_args_list[0][1]["json"]["digest"] - == requests_mock.call_args_list[1][1]["json"]["digest"] - ) - assert requests_mock.call_args_list[0][1]["json"]["check_in_type"] == "cron" - assert requests_mock.call_args_list[1][1]["json"]["check_in_type"] == "cron" - - -def test_cron_with_function_does_not_send_cron_checkin_finish_event_on_exception( - mocker, -): - requests_mock = mock_requests(mocker) - init_client() - - def some_function(): - raise Exception("Whoops!") - - with raises(Exception, match="Whoops!"): - cron("some-cron-checkin", some_function) - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 1 - - assert ( - requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - - -def test_cron_context_manager_sends_cron_checkin_start_and_finish_event(mocker): - requests_mock = mock_requests(mocker) - init_client() - - with Cron("some-cron-checkin"): - sleep(1.1) - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 2 - - assert ( - requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - assert ( - requests_mock.call_args_list[1][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[1][1]["json"]["kind"] == "finish" - assert ( - requests_mock.call_args_list[0][1]["json"]["timestamp"] - < requests_mock.call_args_list[1][1]["json"]["timestamp"] - ) - assert ( - requests_mock.call_args_list[0][1]["json"]["digest"] - == requests_mock.call_args_list[1][1]["json"]["digest"] - ) - assert requests_mock.call_args_list[0][1]["json"]["check_in_type"] == "cron" - assert requests_mock.call_args_list[1][1]["json"]["check_in_type"] == "cron" - - -def test_cron_context_manager_does_not_send_cron_checkin_finish_event_on_exception( - mocker, -): - requests_mock = mock_requests(mocker) - init_client() - - with raises(Exception, match="Whoops!"): - with Cron("some-cron-checkin"): - raise Exception("Whoops!") - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 1 - - assert ( - requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-cron-checkin" - ) - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - - -def test_cron_logs_failure_to_send_event_when_status_code(mocker): - mock_requests(mocker, status_code=500) - init_client() - - internal_logger_mock = mock_internal_logger(mocker, "error") - - cron = Cron("some-cron-checkin") - cron.start() - - assert internal_logger_mock.called - assert len(internal_logger_mock.call_args_list) == 1 - assert ( - "Failed to transmit cron check-in start event: status code was 500" - in internal_logger_mock.call_args[0][0] - ) - - -def test_cron_logs_failure_to_send_event_when_exception(mocker): - mock_requests(mocker, raise_exception=True) - init_client() - - internal_logger_mock = mock_internal_logger(mocker, "error") - - cron = Cron("some-cron-checkin") - cron.start() - - assert internal_logger_mock.called - assert len(internal_logger_mock.call_args_list) == 1 - assert ( - "Failed to transmit cron check-in start event: Whoops!" - in internal_logger_mock.call_args[0][0] - ) - - -def test_heartbeat_helper_behaves_like_cron_helper(mocker): - requests_mock = mock_requests(mocker) - init_client() - - def some_function(): - return "output" - - assert heartbeat("some-heartbeat", some_function) == "output" - - assert requests_mock.called - assert len(requests_mock.call_args_list) == 2 - - assert requests_mock.call_args_list[0][1]["json"]["identifier"] == "some-heartbeat" - assert requests_mock.call_args_list[0][1]["json"]["kind"] == "start" - assert requests_mock.call_args_list[1][1]["json"]["identifier"] == "some-heartbeat" - assert requests_mock.call_args_list[1][1]["json"]["kind"] == "finish" - - -def test_heartbeat_helper_emits_deprecation_warning(mocker, reset_heartbeat_warnings): - internal_logger_mock = mock_internal_logger(mocker, "warning") - print_mock = mock_print(mocker) - - heartbeat("some-heartbeat") - - for mock in [internal_logger_mock, print_mock]: - assert mock.called - assert len(mock.call_args_list) == 1 - assert "The helper `heartbeat` has been deprecated" in mock.call_args[0][0] - - -def test_heartbeat_helper_only_emits_deprecation_warning_once( - mocker, reset_heartbeat_warnings -): - internal_logger_mock = mock_internal_logger(mocker, "warning") - print_mock = mock_print(mocker) - - heartbeat("some-heartbeat") - heartbeat("some-heartbeat") - - for mock in [internal_logger_mock, print_mock]: - assert mock.call_count == 1 - - -def test_heartbeat_class_returns_cron_instance(): - cron_instance = Heartbeat("some-heartbeat") - assert isinstance(cron_instance, Cron) - - -def test_cron_instance_as_instance_of_heartbeat(): - for instance_class in [Cron, Heartbeat]: - for checked_class in [Cron, Heartbeat]: - assert isinstance(instance_class("some-instance"), checked_class) - - -def test_heartbeat_class_emits_deprecation_warning(mocker, reset_heartbeat_warnings): - internal_logger_mock = mock_internal_logger(mocker, "warning") - print_mock = mock_print(mocker) - - Heartbeat("some-heartbeat") - - for mock in [internal_logger_mock, print_mock]: - assert mock.called - assert len(mock.call_args_list) == 1 - assert "The class `Heartbeat` has been deprecated" in mock.call_args[0][0] - - -def test_heartbeat_class_only_emits_deprecation_warning_once( - mocker, reset_heartbeat_warnings -): - internal_logger_mock = mock_internal_logger(mocker, "warning") - print_mock = mock_print(mocker) - - Heartbeat("some-heartbeat") - Heartbeat("some-heartbeat") - - for mock in [internal_logger_mock, print_mock]: - assert mock.call_count == 1 diff --git a/tests/test_transmitter.py b/tests/test_transmitter.py index 9a85330..d4b1109 100644 --- a/tests/test_transmitter.py +++ b/tests/test_transmitter.py @@ -52,6 +52,20 @@ def test_transmitter_send_body_as_json(mocker): assert mock_request.call_args[1]["json"] == {"key": "value"} +def test_transmitter_send_body_as_ndjson(mocker): + mock_request = mocker.patch("requests.post") + + transmit( + "https://example.url/", ndjson=[{"key": "value"}, {"foo": "bar"}], config=config + ) + + assert mock_request.called + assert mock_request.call_args[1]["data"] == '{"key": "value"}\n{"foo": "bar"}' + assert mock_request.call_args[1]["headers"] == { + "Content-Type": "application/x-ndjson" + } + + def test_transmitter_set_proxies_from_config(mocker): mock_request = mocker.patch("requests.post") diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..92f256e --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,16 @@ +import time +from typing import Callable + + +def wait_until( + predicate: Callable[[], bool], + message: str = "The predicate was not met", + timeout: float = 1.0, +) -> None: + start = time.time() + + while not predicate(): + if time.time() - start > timeout: + raise TimeoutError(message + f"(after {timeout} seconds)") + + time.sleep(0.1)