Skip to content

Commit

Permalink
Implement scheduler and heartbeat check-ins
Browse files Browse the repository at this point in the history
Implement a scheduler for check-in events, ensuring that the events
are sent asynchronously from the thread in which they are scheduled.

Implement a heartbeat check-in helper, which sends heartbeat check-in
events. When given the `continuous=True` flag, the events will be
sent continously from a separate thread during the duration of the
process.
  • Loading branch information
unflxw committed Oct 7, 2024
1 parent 8a4bae6 commit a9b9358
Show file tree
Hide file tree
Showing 19 changed files with 877 additions and 458 deletions.
26 changes: 26 additions & 0 deletions .changesets/implement-heartbeat-checkins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
bump: minor
type: add
---

Add support for heartbeat check-ins.

Use the `appsignal.check_in.heartbeat` function to send a single heartbeat check-in event from your application. This can be used, for example, in your application's main loop:

```python
from appsignal.check_in import heartbeat

while True:
heartbeat("job_processor")
process_job()
```

Heartbeats are deduplicated and sent asynchronously, without blocking the current thread. Regardless of how often the `.heartbeat` function is called, at most one heartbeat with the same identifier will be sent every ten seconds.

Pass `continuous=True` as the second argument to send heartbeats continuously during the entire lifetime of the current process. This can be used, for example, after your application has finished its boot process:

```python
def main():
start_app()
heartbeat("my_app", continuous=True)
```
6 changes: 6 additions & 0 deletions .changesets/send-checkins-concurrently.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
bump: patch
type: change
---

Send check-ins concurrently. When calling `appsignal.check_in.cron`, instead of blocking the current thread while the check-in events are sent, schedule them to be sent in a separate thread.
10 changes: 10 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from appsignal.agent import agent
from appsignal.client import _reset_client
from appsignal.heartbeat import _heartbeat_class_warning, _heartbeat_helper_warning
from appsignal.check_in.scheduler import _reset_scheduler
from appsignal.check_in.heartbeat import _kill_continuous_heartbeats, _reset_heartbeat_continuous_interval_seconds
from appsignal.internal_logger import _reset_logger
from appsignal.opentelemetry import METRICS_PREFERRED_TEMPORALITY

Expand Down Expand Up @@ -106,6 +108,14 @@ def reset_agent_active_state() -> Any:
def reset_global_client() -> Any:
_reset_client()

@pytest.fixture(scope="function", autouse=True)
def reset_checkins() -> Any:
yield

_kill_continuous_heartbeats()
_reset_scheduler()
_reset_heartbeat_continuous_interval_seconds()


@pytest.fixture(scope="function", autouse=True)
def stop_agent() -> Any:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies = [
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-exporter-otlp-proto-http",
"typing-extensions"
]
dynamic = ["version"]

Expand Down
6 changes: 3 additions & 3 deletions src/appsignal/check_in/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from os import urandom
from typing import Any, Callable, Literal, TypeVar

from .event import Event
from .event import cron as cron_event
from .scheduler import scheduler

T = TypeVar("T")
Expand All @@ -18,10 +18,10 @@ def __init__(self, identifier: str) -> None:
self.digest = hexlify(urandom(8)).decode("utf-8")

def start(self) -> None:
scheduler.schedule(Event.cron(self.identifier, self.digest, "start"))
scheduler().schedule(cron_event(self.identifier, self.digest, "start"))

def finish(self) -> None:
scheduler.schedule(Event.cron(self.identifier, self.digest, "finish"))
scheduler().schedule(cron_event(self.identifier, self.digest, "finish"))

def __enter__(self) -> None:
self.start()
Expand Down
120 changes: 56 additions & 64 deletions src/appsignal/check_in/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,77 +2,69 @@

from time import time

from typing import TYPE_CHECKING, TypedDict

if TYPE_CHECKING:
from typing import Literal, Optional, Union, Self, List
from typing import TypedDict, Literal, Union, List
from typing_extensions import NotRequired

EventKind = Union[Literal["start"], Literal["finish"]]

EventCheckInType = Union[Literal["cron"], Literal["heartbeat"]]

class Event(TypedDict):
identifier: str
digest: Optional[str]
kind: Optional[EventKind]
timestamp: int
check_in_type: EventCheckInType
identifier: str
digest: NotRequired[str]
kind: NotRequired[EventKind]
timestamp: int
check_in_type: EventCheckInType

def __init__(self, **kwargs: Event) -> None:
super().__init__(**{
**kwargs,
"timestamp": int(time())
})

@classmethod
def cron(cls, identifier: str, digest: str, kind: EventKind) -> Self:
return cls(
identifier=identifier,
digest=digest,
kind=kind,
check_in_type="cron"
)

@classmethod
def heartbeat(cls, identifier: str) -> Self:
return cls(
identifier=identifier,
check_in_type="heartbeat"
)
def cron(identifier: str, digest: str, kind: EventKind) -> Event:
return Event(
identifier=identifier,
digest=digest,
kind=kind,
timestamp=int(time()),
check_in_type="cron"
)

def heartbeat(identifier: str) -> Event:
return Event(
identifier=identifier,
timestamp=int(time()),
check_in_type="heartbeat"
)

def is_redundant(self, other: Self) -> bool:
if (
self["check_in_type"] not in ["cron", "heartbeat"] or
self["check_in_type"] != other["check_in_type"] or
self["identifier"] != other["identifier"]
):
return False

if self["check_in_type"] == "cron" and (
self["digest"] != other["digest"] or
self["kind"] != other["kind"]
):
return False

return True
def is_redundant(event: Event, existing_event: Event) -> bool:
if (
event["check_in_type"] not in ["cron", "heartbeat"] or
event["check_in_type"] != existing_event["check_in_type"] or
event["identifier"] != existing_event["identifier"]
):
return False

if event["check_in_type"] == "cron" and (
event.get("digest") != existing_event.get("digest") or
event.get("kind") != existing_event.get("kind")
):
return False

return True

@classmethod
def describe(cls, events: List[Self]) -> str:
if not events:
# This shouldn't happen.
return "no check-in events"
elif len(events) > 1:
return f"{len(events)} check-in events"
else:
event = events[0]
if event["check_in_type"] == "cron":
return (
f"cron check-in `{event.get('identifier', 'unknown')}` "
f"{event.get('kind', 'unknown')} event "
f"(digest {event.get('digest', 'unknown')})"
)
elif event["check_in_type"] == "heartbeat":
return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event"
else:
# This shouldn't happen.
return "unknown check-in event"
def describe(events: List[Event]) -> str:
if not events:
# This shouldn't happen.
return "no check-in events"
elif len(events) > 1:
return f"{len(events)} check-in events"
else:
event = events[0]
if event["check_in_type"] == "cron":
return (
f"cron check-in `{event.get('identifier', 'unknown')}` "
f"{event.get('kind', 'unknown')} event "
f"(digest {event.get('digest', 'unknown')})"
)
elif event["check_in_type"] == "heartbeat":
return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event"
else:
# This shouldn't happen.
return "unknown check-in event" # type: ignore[unreachable]
53 changes: 42 additions & 11 deletions src/appsignal/check_in/heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,48 @@
from .scheduler import scheduler
from .event import Event
from threading import Thread
from time import sleep
from .event import heartbeat as heartbeat_event
from threading import Thread, Event

_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30
from typing import List, Tuple

def _continuous_heartbeat(name: str) -> None:
while True:
sleep(_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS)
heartbeat(name)
_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30.0

def _heartbeat_continuous_interval_seconds() -> float:
return _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS

def _set_heartbeat_continuous_interval_seconds(seconds: float) -> None:
global _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS
_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = seconds

def _reset_heartbeat_continuous_interval_seconds() -> None:
global _HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS
_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30.0

_started_continuous_heartbeats: List[Tuple[Event, Thread]] = []

def _kill_continuous_heartbeats() -> None:
for event, thread in _started_continuous_heartbeats:
event.set()
thread.join()

_started_continuous_heartbeats.clear()

def _start_continuous_heartbeat(name: str) -> None:
kill = Event()

def _run_continuous_heartbeat() -> None:
while True:
if kill.wait(_heartbeat_continuous_interval_seconds()):
break

heartbeat(name)

thread = Thread(target=_run_continuous_heartbeat)
thread.start()
_started_continuous_heartbeats.append((kill, thread))

def heartbeat(name: str, continuous: bool = False) -> None:
if continuous:
thread = Thread(target=_continuous_heartbeat, args=(name,))
thread.start()
scheduler.schedule(Event.heartbeat(name))
print("schedule is ", _heartbeat_continuous_interval_seconds())
_start_continuous_heartbeat(name)

scheduler().schedule(heartbeat_event(name))
Loading

0 comments on commit a9b9358

Please sign in to comment.