Skip to content

Commit

Permalink
🏭 Migrate ConstantEmitter to queue dequeue time
Browse files Browse the repository at this point in the history
Instead of checking if it's the right moment to send old metrics in the
event_changed callback, we now do it on the regular timer used to
dequeue metrics to send.

This is a better design (less contention, less resource consumption).
  • Loading branch information
kamaradclimber committed Jan 11, 2025
1 parent 1495458 commit 1bb7585
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions custom_components/datadog_agentless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
metrics_queue = Queue(maxsize=0)
@callback
def metrics_dequeue_callback(now) -> None:
entry.async_create_background_task(hass, send_metrics_loop(metrics_queue, metrics_api), name="dequeue metrics to be sent to dd", eager_start=True)
entry.async_create_background_task(hass, send_metrics_loop(metrics_queue, metrics_api, constant_emitter), name="dequeue metrics to be sent to dd", eager_start=True)
metrics_cancel_schedule = async_track_time_interval(hass, metrics_dequeue_callback, datetime.timedelta(seconds=10))
def metrics_cancel_dequeuing(*_: Any) -> None:
metrics_cancel_schedule()
Expand Down Expand Up @@ -107,7 +107,12 @@ async def send_events_loop(queue, events_api):
_LOGGER.debug("Events queue is now empty, waiting for next run")

# this function will constantly dequeue metrics to send them by batch to the dd api
async def send_metrics_loop(queue, metrics_api):
async def send_metrics_loop(queue, metrics_api, constant_emitter):
_LOGGER.debug("Checking if time to flush old series")
if constant_emitter.should_flush():
old_series = constant_emitter.flush_old_series()
for serie in old_series:
queue.put(serie)
_LOGGER.debug("Starting dequeuing from metrics queue")
while not queue.empty():
try:
Expand Down Expand Up @@ -168,7 +173,7 @@ def flush_old_series(self) -> list[MetricSeries]:
serie = self.last_sent[metric_id]
if serie.points[0].timestamp < now - self.flush_interval:
serie.points[0].timestamp = now
_LOGGER.debug(f"Will flush {serie.metric} {serie.tags}")
# _LOGGER.debug(f"Will flush {serie.metric} {serie.tags}")
old_series.append(serie)
self.last_flush = now
_LOGGER.info(f"Flushing {len(old_series)} old metrics (out of {len(self.last_sent)}) to make sure we have regular points")
Expand Down Expand Up @@ -333,10 +338,6 @@ def full_event_listener(creds: dict, constant_emitter: ConstantMetricEmitter, me
points=[MetricPoint(timestamp=int(timestamp.timestamp()), value=value)])
metrics_queue.put(metric_serie)
constant_emitter.record_last_sent(metric_serie)
if constant_emitter.should_flush():
old_series = constant_emitter.flush_old_series()
for serie in old_series:
metrics_queue.put(serie)

def full_all_event_listener(creds: dict, events_queue, event: Event):
if event.event_type == "state_changed":
Expand Down

0 comments on commit 1bb7585

Please sign in to comment.