Skip to content

Commit

Permalink
Implement scheduler and heartbeat check-ins
Browse files Browse the repository at this point in the history
Implement a scheduler for check-in events, ensuring that the events
are sent asynchronously from the thread in which they are scheduled.

Implement a heartbeat check-in helper, which sends heartbeat check-in
events. When given the `continuous=True` flag, the events will be
sent continously from a separate thread during the duration of the
process.
  • Loading branch information
unflxw committed Oct 7, 2024
1 parent 91ca1f9 commit 97c05b9
Show file tree
Hide file tree
Showing 20 changed files with 1,003 additions and 457 deletions.
26 changes: 26 additions & 0 deletions .changesets/implement-heartbeat-checkins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
bump: minor
type: add
---

Add support for heartbeat check-ins.

Use the `appsignal.check_in.heartbeat` function to send a single heartbeat check-in event from your application. This can be used, for example, in your application's main loop:

```python
from appsignal.check_in import heartbeat

while True:
heartbeat("job_processor")
process_job()
```

Heartbeats are deduplicated and sent asynchronously, without blocking the current thread. Regardless of how often the `.heartbeat` function is called, at most one heartbeat with the same identifier will be sent every ten seconds.

Pass `continuous=True` as the second argument to send heartbeats continuously during the entire lifetime of the current process. This can be used, for example, after your application has finished its boot process:

```python
def main():
start_app()
heartbeat("my_app", continuous=True)
```
6 changes: 6 additions & 0 deletions .changesets/send-checkins-concurrently.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
bump: patch
type: change
---

Send check-ins concurrently. When calling `appsignal.check_in.cron`, instead of blocking the current thread while the check-in events are sent, schedule them to be sent in a separate thread.
29 changes: 23 additions & 6 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,30 @@
import os
import platform
import tempfile
from typing import Any, Callable, Generator

import pytest
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider, ReadableSpan
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.trace import set_tracer_provider

from appsignal import probes
from appsignal.agent import agent
from appsignal.check_in.heartbeat import (
_kill_continuous_heartbeats,
_reset_heartbeat_continuous_interval_seconds,
)
from appsignal.check_in.scheduler import _reset_scheduler
from appsignal.client import _reset_client
from appsignal.heartbeat import _heartbeat_class_warning, _heartbeat_helper_warning
from appsignal.internal_logger import _reset_logger
from appsignal.opentelemetry import METRICS_PREFERRED_TEMPORALITY

from typing import Any, Generator, Callable, Tuple


@pytest.fixture(scope="function", autouse=True)
def disable_start_opentelemetry(mocker: Any) -> Any:
Expand Down Expand Up @@ -53,18 +57,22 @@ def start_in_memory_span_exporter() -> Generator[InMemorySpanExporter, None, Non


@pytest.fixture(scope="function")
def metrics(start_in_memory_metric_reader: InMemoryMetricReader) -> Generator[Callable[[], Any], None, None]:
def metrics(
start_in_memory_metric_reader: InMemoryMetricReader,
) -> Generator[Callable[[], Any], None, None]:
# Getting the metrics data implicitly wipes its state
start_in_memory_metric_reader.get_metrics_data()

yield start_in_memory_metric_reader.get_metrics_data


@pytest.fixture(scope="function")
def spans(start_in_memory_span_exporter: InMemorySpanExporter) -> Generator[Callable[[], Tuple[ReadableSpan, ...]], None, None]:
def spans(
start_in_memory_span_exporter: InMemorySpanExporter,
) -> Generator[Callable[[], tuple[ReadableSpan, ...]], None, None]:
start_in_memory_span_exporter.clear()

def get_and_clear_spans() -> Tuple[ReadableSpan, ...]:
def get_and_clear_spans() -> tuple[ReadableSpan, ...]:
spans = start_in_memory_span_exporter.get_finished_spans()
start_in_memory_span_exporter.clear()
return spans
Expand Down Expand Up @@ -107,6 +115,15 @@ def reset_global_client() -> Any:
_reset_client()


@pytest.fixture(scope="function", autouse=True)
def reset_checkins() -> Any:
yield

_kill_continuous_heartbeats()
_reset_scheduler()
_reset_heartbeat_continuous_interval_seconds()


@pytest.fixture(scope="function", autouse=True)
def stop_agent() -> Any:
tmp_path = "/tmp" if platform.system() == "Darwin" else tempfile.gettempdir()
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies = [
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-exporter-otlp-proto-http",
"typing-extensions"
]
dynamic = ["version"]

Expand Down
3 changes: 3 additions & 0 deletions src/appsignal/check_in/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
from .cron import Cron, cron
from .heartbeat import heartbeat


__all__ = ["Cron", "cron", "heartbeat"]
8 changes: 5 additions & 3 deletions src/appsignal/check_in/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from os import urandom
from typing import Any, Callable, Literal, TypeVar

from .event import Event
from .event import cron as cron_event
from .scheduler import scheduler


T = TypeVar("T")


class Cron:
identifier: str
digest: str
Expand All @@ -18,10 +20,10 @@ def __init__(self, identifier: str) -> None:
self.digest = hexlify(urandom(8)).decode("utf-8")

def start(self) -> None:
scheduler.schedule(Event.cron(self.identifier, self.digest, "start"))
scheduler().schedule(cron_event(self.identifier, self.digest, "start"))

def finish(self) -> None:
scheduler.schedule(Event.cron(self.identifier, self.digest, "finish"))
scheduler().schedule(cron_event(self.identifier, self.digest, "finish"))

def __enter__(self) -> None:
self.start()
Expand Down
96 changes: 45 additions & 51 deletions src/appsignal/check_in/event.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,72 @@
from __future__ import annotations

from time import time
from typing import Literal, TypedDict, Union

from typing import TYPE_CHECKING, TypedDict
from typing_extensions import NotRequired

if TYPE_CHECKING:
from typing import Literal, Optional, Union, Self, List

EventKind = Union[Literal["start"], Literal["finish"]]

EventCheckInType = Union[Literal["cron"], Literal["heartbeat"]]


class Event(TypedDict):
identifier: str
digest: Optional[str]
kind: Optional[EventKind]
digest: NotRequired[str]
kind: NotRequired[EventKind]
timestamp: int
check_in_type: EventCheckInType

def __init__(self, **kwargs: Event) -> None:
super().__init__(**{
**kwargs,
"timestamp": int(time())
})

@classmethod
def cron(cls, identifier: str, digest: str, kind: EventKind) -> Self:
return cls(

def cron(identifier: str, digest: str, kind: EventKind) -> Event:
return Event(
identifier=identifier,
digest=digest,
kind=kind,
check_in_type="cron"
)

@classmethod
def heartbeat(cls, identifier: str) -> Self:
return cls(
identifier=identifier,
check_in_type="heartbeat"
)

def is_redundant(self, other: Self) -> bool:
if (
self["check_in_type"] not in ["cron", "heartbeat"] or
self["check_in_type"] != other["check_in_type"] or
self["identifier"] != other["identifier"]
):
timestamp=int(time()),
check_in_type="cron",
)


def heartbeat(identifier: str) -> Event:
return Event(
identifier=identifier, timestamp=int(time()), check_in_type="heartbeat"
)


def is_redundant(event: Event, existing_event: Event) -> bool:
if (
event["check_in_type"] not in ["cron", "heartbeat"]
or event["check_in_type"] != existing_event["check_in_type"]
or event["identifier"] != existing_event["identifier"]
):
return False

if self["check_in_type"] == "cron" and (
self["digest"] != other["digest"] or
self["kind"] != other["kind"]
):
if event["check_in_type"] == "cron" and ( # noqa: SIM103
event.get("digest") != existing_event.get("digest")
or event.get("kind") != existing_event.get("kind")
):
return False

return True

@classmethod
def describe(cls, events: List[Self]) -> str:
if not events:
return True


def describe(events: list[Event]) -> str:
if not events:
# This shouldn't happen.
return "no check-in events"
elif len(events) > 1:
if len(events) > 1:
return f"{len(events)} check-in events"
else:
event = events[0]
if event["check_in_type"] == "cron":
return (

event = events[0]
if event["check_in_type"] == "cron":
return (
f"cron check-in `{event.get('identifier', 'unknown')}` "
f"{event.get('kind', 'unknown')} event "
f"(digest {event.get('digest', 'unknown')})"
)
elif event["check_in_type"] == "heartbeat":
return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event"
else:
# This shouldn't happen.
return "unknown check-in event"
)
if event["check_in_type"] == "heartbeat":
return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event"

# This shouldn't happen.
return "unknown check-in event" # type: ignore[unreachable]
62 changes: 51 additions & 11 deletions src/appsignal/check_in/heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,57 @@
from __future__ import annotations

from threading import Event, Thread

from .event import heartbeat as heartbeat_event
from .scheduler import scheduler
from .event import Event
from threading import Thread
from time import sleep

_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30

def _continuous_heartbeat(name: str) -> None:
while True:
sleep(_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS)
heartbeat(name)
_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30.0


def _heartbeat_continuous_interval_seconds() -> float:
return _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS


def _set_heartbeat_continuous_interval_seconds(seconds: float) -> None:
global _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS
_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = seconds


def _reset_heartbeat_continuous_interval_seconds() -> None:
global _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS
_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30.0


_started_continuous_heartbeats: list[tuple[Event, Thread]] = []


def _kill_continuous_heartbeats() -> None:
for event, thread in _started_continuous_heartbeats:
event.set()
thread.join()

_started_continuous_heartbeats.clear()


def _start_continuous_heartbeat(name: str) -> None:
kill = Event()

def _run_continuous_heartbeat() -> None:
while True:
if kill.wait(_heartbeat_continuous_interval_seconds()):
break

heartbeat(name)

thread = Thread(target=_run_continuous_heartbeat)
thread.start()
_started_continuous_heartbeats.append((kill, thread))


def heartbeat(name: str, continuous: bool = False) -> None:
if continuous:
thread = Thread(target=_continuous_heartbeat, args=(name,))
thread.start()
scheduler.schedule(Event.heartbeat(name))
print("schedule is ", _heartbeat_continuous_interval_seconds())
_start_continuous_heartbeat(name)

scheduler().schedule(heartbeat_event(name))
Loading

0 comments on commit 97c05b9

Please sign in to comment.