Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🏭 Refactor metrics/events send mechanism #23

Merged
merged 1 commit into from
Jan 11, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 61 additions & 26 deletions custom_components/datadog_agentless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import orjson
from threading import Lock
import asyncio
from queue import Queue


from datadog_api_client import AsyncApiClient, ApiClient, Configuration
Expand All @@ -21,12 +22,13 @@
from datadog_api_client.v1.api.events_api import EventsApi
from datadog_api_client.v1.model.event_create_request import EventCreateRequest

from homeassistant.const import Platform, EVENT_STATE_CHANGED, MATCH_ALL
from homeassistant.const import Platform, EVENT_STATE_CHANGED, MATCH_ALL, EVENT_HOMEASSISTANT_STOP
from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED
from homeassistant.const import __version__ as HAVERSION
import homeassistant.const
from homeassistant.core import HomeAssistant, Event, EventStateChangedData, State
from homeassistant.core import HomeAssistant, Event, EventStateChangedData, State, callback
from homeassistant.config_entries import ConfigEntry
from .const import DOMAIN
from homeassistant.helpers.state import state_as_number
Expand Down Expand Up @@ -60,22 +62,69 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
)
api_client = AsyncApiClient(configuration)
metrics_api = MetricsApi(api_client)

event_listener=partial(full_event_listener, metrics_api, entry.data, constant_emitter, hass)

metrics_queue = Queue(maxsize=0)
@callback
def metrics_dequeue_callback(now) -> None:
entry.async_create_background_task(hass, send_metrics_loop(metrics_queue, metrics_api), name="dequeue metrics to be sent to dd", eager_start=True)
metrics_cancel_schedule = async_track_time_interval(hass, metrics_dequeue_callback, datetime.timedelta(seconds=10))
def metrics_cancel_dequeuing(*_: Any) -> None:
metrics_cancel_schedule()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, metrics_cancel_dequeuing)

event_listener=partial(full_event_listener, entry.data, constant_emitter, metrics_queue)
unsubscribe = hass.bus.async_listen(EVENT_STATE_CHANGED, event_listener)
hass.data[DOMAIN][entry.entry_id]["unsubscribe_handler"] = unsubscribe

events_api = EventsApi(api_client)

all_event_listener=partial(full_all_event_listener, entry.data, events_api, hass)
events_queue = Queue(maxsize=0)
@callback
def events_dequeue_callback(now) -> None:
entry.async_create_background_task(hass, send_events_loop(events_queue, events_api), name="dequeue events to be sent to dd", eager_start=True)
events_cancel_schedule = async_track_time_interval(hass, events_dequeue_callback, datetime.timedelta(seconds=10))
def events_cancel_dequeuing(*_: Any) -> None:
events_cancel_schedule()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, events_cancel_dequeuing)

all_event_listener=partial(full_all_event_listener, entry.data, events_queue)
unsubscribe_all_events = hass.bus.async_listen(MATCH_ALL, all_event_listener)
hass.data[DOMAIN][entry.entry_id]["unsubscribe_all_event_handler"] = unsubscribe_all_events


return True

# this function will constantly dequeue metrics to send them by batch to the dd api
async def send_events_loop(queue, events_api):
_LOGGER.debug("Starting dequeuing from events queue")
while not queue.empty():
try:
event_request = queue.get()
_LOGGER.debug("Sending one event to dd api")
response = await events_api.create_event(body=event_request)
if response.status != "ok":
_LOGGER.error(f"Error sending event to Datadog {response["errors"]}")
except Exception as e:
_LOGGER.error(e)
_LOGGER.debug("Events queue is now empty, waiting for next run")

# this function will constantly dequeue metrics to send them by batch to the dd api
async def send_metrics_loop(queue, metrics_api):
_LOGGER.debug("Starting dequeuing from metrics queue")
while not queue.empty():
try:
series = []
while len(series) < 1024 and not queue.empty():
series.append(queue.get())

if len(series) > 0:
payload = MetricPayload(series=series)
_LOGGER.debug(f"Sending {len(series)} metrics to the api")
response = await metrics_api.submit_metrics(content_encoding=MetricContentEncoding.ZSTD1, body=payload)
if len(response["errors"]) > 0:
_LOGGER.error(f"Error sending metric to Datadog {response['errors'][0]}")
except Exception as e:
_LOGGER.error(e)
_LOGGER.debug("Metrics queue is now empty, waiting for next run")


async def update_entry(hass, entry):
"""
Expand Down Expand Up @@ -264,7 +313,7 @@ def _extract_state(new_state: State, entity_id: str, value: Any, main_state: boo
_LOGGER.warn(f"Cannot treat this state changed event: {entity_id} to convert to metric. Error was: %s", e)
return None

def full_event_listener(metrics_api: MetricsApi, creds: dict, constant_emitter: ConstantMetricEmitter, hass, event: Event[EventStateChangedData]):
def full_event_listener(creds: dict, constant_emitter: ConstantMetricEmitter, metrics_queue, event: Event[EventStateChangedData]):
new_state = event.data["new_state"]
if new_state is None:
_LOGGER.warn(f"This event has no new state, isn't it strange?. Event is {event}")
Expand All @@ -274,7 +323,6 @@ def full_event_listener(metrics_api: MetricsApi, creds: dict, constant_emitter:
values = extract_states(event)
if len(values) == 0:
return
series = []
for (name, value) in values:
tags = [f"entity:{name}", "service:home-assistant", f"version:{HAVERSION}", f"env:{creds['env']}"]
unit = None
Expand All @@ -283,23 +331,14 @@ def full_event_listener(metrics_api: MetricsApi, creds: dict, constant_emitter:
value = int(value)
metric_serie = MetricSeries(metric=metric_name, type=MetricIntakeType.GAUGE, tags=tags, unit=unit,
points=[MetricPoint(timestamp=int(timestamp.timestamp()), value=value)])
series.append(metric_serie)
metrics_queue.put(metric_serie)
constant_emitter.record_last_sent(metric_serie)
if constant_emitter.should_flush():
old_series = constant_emitter.flush_old_series()
for serie in old_series:
series.append(serie)


payload = MetricPayload(series=series)
return asyncio.run_coroutine_threadsafe(send_metrics(metrics_api, payload), hass.loop)

async def send_metrics(metrics_api, payload) -> None:
response = await metrics_api.submit_metrics(content_encoding=MetricContentEncoding.ZSTD1, body=payload)
if len(response["errors"]) > 0:
_LOGGER.error(f"Error sending metric to Datadog {response['errors'][0]}")
metrics_queue.put(serie)

def full_all_event_listener(creds: dict, events_api: EventsApi, hass, event: Event):
def full_all_event_listener(creds: dict, events_queue, event: Event):
if event.event_type == "state_changed":
if len(extract_states(event)) > 0:
# those events will be converted to metric, no need to double send them
Expand All @@ -318,12 +357,8 @@ def full_all_event_listener(creds: dict, events_api: EventsApi, hass, event: Eve
text=text,
tags=tags + event_specific_tags,
)
return asyncio.run_coroutine_threadsafe(send_events(events_api, event_request), hass.loop)
events_queue.put(event_request)

async def send_events(events_api, event_request) -> None:
response = await events_api.create_event(body=event_request)
if response.status != "ok":
_LOGGER.error(f"Error sending event to Datadog {response["errors"]}")


def generate_message(event: Event) -> Tuple[str, list[str]]:
Expand Down
Loading