diff --git a/custom_components/datadog_agentless/__init__.py b/custom_components/datadog_agentless/__init__.py index 6e71dbc..172866b 100644 --- a/custom_components/datadog_agentless/__init__.py +++ b/custom_components/datadog_agentless/__init__.py @@ -66,7 +66,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: metrics_queue = Queue(maxsize=0) @callback def metrics_dequeue_callback(now) -> None: - entry.async_create_background_task(hass, send_metrics_loop(metrics_queue, metrics_api), name="dequeue metrics to be sent to dd", eager_start=True) + entry.async_create_background_task(hass, send_metrics_loop(metrics_queue, metrics_api, constant_emitter), name="dequeue metrics to be sent to dd", eager_start=True) metrics_cancel_schedule = async_track_time_interval(hass, metrics_dequeue_callback, datetime.timedelta(seconds=10)) def metrics_cancel_dequeuing(*_: Any) -> None: metrics_cancel_schedule() @@ -107,7 +107,12 @@ async def send_events_loop(queue, events_api): _LOGGER.debug("Events queue is now empty, waiting for next run") # this function will constantly dequeue metrics to send them by batch to the dd api -async def send_metrics_loop(queue, metrics_api): +async def send_metrics_loop(queue, metrics_api, constant_emitter): + _LOGGER.debug("Checking if time to flush old series") + if constant_emitter.should_flush(): + old_series = constant_emitter.flush_old_series() + for serie in old_series: + queue.put(serie) _LOGGER.debug("Starting dequeuing from metrics queue") while not queue.empty(): try: @@ -168,7 +173,7 @@ def flush_old_series(self) -> list[MetricSeries]: serie = self.last_sent[metric_id] if serie.points[0].timestamp < now - self.flush_interval: serie.points[0].timestamp = now - _LOGGER.debug(f"Will flush {serie.metric} {serie.tags}") + # _LOGGER.debug(f"Will flush {serie.metric} {serie.tags}") old_series.append(serie) self.last_flush = now _LOGGER.info(f"Flushing {len(old_series)} old metrics (out of {len(self.last_sent)}) to make sure we have regular points") @@ -333,10 +338,6 @@ def full_event_listener(creds: dict, constant_emitter: ConstantMetricEmitter, me points=[MetricPoint(timestamp=int(timestamp.timestamp()), value=value)]) metrics_queue.put(metric_serie) constant_emitter.record_last_sent(metric_serie) - if constant_emitter.should_flush(): - old_series = constant_emitter.flush_old_series() - for serie in old_series: - metrics_queue.put(serie) def full_all_event_listener(creds: dict, events_queue, event: Event): if event.event_type == "state_changed":