Skip to content

Commit

Permalink
Add type hints to conftest.py
Browse files Browse the repository at this point in the history
This allows us to type-check it with mypy -- more importantly, it
makes it so tools that type-check every file in the workspace don't
freak out about every single line in it.
  • Loading branch information
unflxw committed Oct 7, 2024
1 parent 3c712c9 commit 91ca1f9
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 108 deletions.
30 changes: 16 additions & 14 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace import TracerProvider, ReadableSpan
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.trace import set_tracer_provider
Expand All @@ -21,15 +21,17 @@
from appsignal.internal_logger import _reset_logger
from appsignal.opentelemetry import METRICS_PREFERRED_TEMPORALITY

from typing import Any, Generator, Callable, Tuple


@pytest.fixture(scope="function", autouse=True)
def disable_start_opentelemetry(mocker):
def disable_start_opentelemetry(mocker: Any) -> Any:
mocker.patch("appsignal.opentelemetry._start_tracer")
mocker.patch("appsignal.opentelemetry._start_metrics")


@pytest.fixture(scope="session", autouse=True)
def start_in_memory_metric_reader():
def start_in_memory_metric_reader() -> Generator[InMemoryMetricReader, None, None]:
metric_reader = InMemoryMetricReader(
preferred_temporality=METRICS_PREFERRED_TEMPORALITY
)
Expand All @@ -40,7 +42,7 @@ def start_in_memory_metric_reader():


@pytest.fixture(scope="session", autouse=True)
def start_in_memory_span_exporter():
def start_in_memory_span_exporter() -> Generator[InMemorySpanExporter, None, None]:
span_exporter = InMemorySpanExporter()
exporter_processor = SimpleSpanProcessor(span_exporter)
provider = TracerProvider()
Expand All @@ -51,18 +53,18 @@ def start_in_memory_span_exporter():


@pytest.fixture(scope="function")
def metrics(start_in_memory_metric_reader):
def metrics(start_in_memory_metric_reader: InMemoryMetricReader) -> Generator[Callable[[], Any], None, None]:
# Getting the metrics data implicitly wipes its state
start_in_memory_metric_reader.get_metrics_data()

yield start_in_memory_metric_reader.get_metrics_data


@pytest.fixture(scope="function")
def spans(start_in_memory_span_exporter):
def spans(start_in_memory_span_exporter: InMemorySpanExporter) -> Generator[Callable[[], Tuple[ReadableSpan, ...]], None, None]:
start_in_memory_span_exporter.clear()

def get_and_clear_spans():
def get_and_clear_spans() -> Tuple[ReadableSpan, ...]:
spans = start_in_memory_span_exporter.get_finished_spans()
start_in_memory_span_exporter.clear()
return spans
Expand All @@ -71,7 +73,7 @@ def get_and_clear_spans():


@pytest.fixture(scope="function", autouse=True)
def reset_environment_between_tests():
def reset_environment_between_tests() -> Any:
old_environ = dict(os.environ)

yield
Expand All @@ -81,40 +83,40 @@ def reset_environment_between_tests():


@pytest.fixture(scope="function", autouse=True)
def reset_internal_logger_after_tests():
def reset_internal_logger_after_tests() -> Any:
yield

_reset_logger()


@pytest.fixture(scope="function", autouse=True)
def stop_and_clear_probes_after_tests():
def stop_and_clear_probes_after_tests() -> Any:
yield

probes.stop()
probes.clear()


@pytest.fixture(scope="function", autouse=True)
def reset_agent_active_state():
def reset_agent_active_state() -> Any:
agent.active = False


@pytest.fixture(scope="function", autouse=True)
def reset_global_client():
def reset_global_client() -> Any:
_reset_client()


@pytest.fixture(scope="function", autouse=True)
def stop_agent():
def stop_agent() -> Any:
tmp_path = "/tmp" if platform.system() == "Darwin" else tempfile.gettempdir()
working_dir = os.path.join(tmp_path, "appsignal")
if os.path.isdir(working_dir):
os.system(f"rm -rf {working_dir}")


@pytest.fixture(scope="function")
def reset_heartbeat_warnings():
def reset_heartbeat_warnings() -> Any:
_heartbeat_class_warning.reset()
_heartbeat_helper_warning.reset()

Expand Down
94 changes: 0 additions & 94 deletions src/appsignal/check_in.py

This file was deleted.

2 changes: 2 additions & 0 deletions src/appsignal/check_in/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .cron import Cron, cron
from .heartbeat import heartbeat
47 changes: 47 additions & 0 deletions src/appsignal/check_in/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

from binascii import hexlify
from os import urandom
from typing import Any, Callable, Literal, TypeVar

from .event import Event
from .scheduler import scheduler

T = TypeVar("T")

class Cron:
identifier: str
digest: str

def __init__(self, identifier: str) -> None:
self.identifier = identifier
self.digest = hexlify(urandom(8)).decode("utf-8")

def start(self) -> None:
scheduler.schedule(Event.cron(self.identifier, self.digest, "start"))

def finish(self) -> None:
scheduler.schedule(Event.cron(self.identifier, self.digest, "finish"))

def __enter__(self) -> None:
self.start()

def __exit__(
self, exc_type: Any = None, exc_value: Any = None, traceback: Any = None
) -> Literal[False]:
if exc_type is None:
self.finish()

return False


def cron(identifier: str, fn: Callable[[], T] | None = None) -> None | T:
cron = Cron(identifier)
output = None

if fn is not None:
cron.start()
output = fn()

cron.finish()
return output
78 changes: 78 additions & 0 deletions src/appsignal/check_in/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import annotations

from time import time

from typing import TYPE_CHECKING, TypedDict

if TYPE_CHECKING:
from typing import Literal, Optional, Union, Self, List

EventKind = Union[Literal["start"], Literal["finish"]]

EventCheckInType = Union[Literal["cron"], Literal["heartbeat"]]

class Event(TypedDict):
identifier: str
digest: Optional[str]
kind: Optional[EventKind]
timestamp: int
check_in_type: EventCheckInType

def __init__(self, **kwargs: Event) -> None:
super().__init__(**{
**kwargs,
"timestamp": int(time())
})

@classmethod
def cron(cls, identifier: str, digest: str, kind: EventKind) -> Self:
return cls(
identifier=identifier,
digest=digest,
kind=kind,
check_in_type="cron"
)

@classmethod
def heartbeat(cls, identifier: str) -> Self:
return cls(
identifier=identifier,
check_in_type="heartbeat"
)

def is_redundant(self, other: Self) -> bool:
if (
self["check_in_type"] not in ["cron", "heartbeat"] or
self["check_in_type"] != other["check_in_type"] or
self["identifier"] != other["identifier"]
):
return False

if self["check_in_type"] == "cron" and (
self["digest"] != other["digest"] or
self["kind"] != other["kind"]
):
return False

return True

@classmethod
def describe(cls, events: List[Self]) -> str:
if not events:
# This shouldn't happen.
return "no check-in events"
elif len(events) > 1:
return f"{len(events)} check-in events"
else:
event = events[0]
if event["check_in_type"] == "cron":
return (
f"cron check-in `{event.get('identifier', 'unknown')}` "
f"{event.get('kind', 'unknown')} event "
f"(digest {event.get('digest', 'unknown')})"
)
elif event["check_in_type"] == "heartbeat":
return f"heartbeat check-in `{event.get('identifier', 'unknown')}` event"
else:
# This shouldn't happen.
return "unknown check-in event"
17 changes: 17 additions & 0 deletions src/appsignal/check_in/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .scheduler import scheduler
from .event import Event
from threading import Thread
from time import sleep

_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS = 30

def _continuous_heartbeat(name: str) -> None:
while True:
sleep(_HEARTBEAT_CONTINUOUS_INTERVAL_SECONDS)
heartbeat(name)

def heartbeat(name: str, continuous: bool = False) -> None:
if continuous:
thread = Thread(target=_continuous_heartbeat, args=(name,))
thread.start()
scheduler.schedule(Event.heartbeat(name))
Loading

0 comments on commit 91ca1f9

Please sign in to comment.