diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py index c8fa7f1453..22d1ee9f75 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py @@ -13,8 +13,18 @@ # limitations under the License. from abc import ABC, abstractmethod +from collections import defaultdict from random import randrange -from typing import Any, Callable, Dict, List, Optional, Sequence, Union +from typing import ( + Any, + Callable, + Dict, + List, + Mapping, + Optional, + Sequence, + Union, +) from opentelemetry import trace from opentelemetry.context import Context @@ -155,9 +165,9 @@ class FixedSizeExemplarReservoirABC(ExemplarReservoir): def __init__(self, size: int, **kwargs) -> None: super().__init__(**kwargs) self._size: int = size - self._reservoir_storage: List[ExemplarBucket] = [ - ExemplarBucket() for _ in range(self._size) - ] + self._reservoir_storage: Mapping[int, ExemplarBucket] = defaultdict( + ExemplarBucket + ) def collect(self, point_attributes: Attributes) -> List[Exemplar]: """Returns accumulated Exemplars and also resets the reservoir for the next @@ -171,15 +181,16 @@ def collect(self, point_attributes: Attributes) -> List[Exemplar]: exemplars contain the attributes that were filtered out by the aggregator, but recorded alongside the original measurement. """ - exemplars = filter( - lambda e: e is not None, - map( - lambda bucket: bucket.collect(point_attributes), - self._reservoir_storage, - ), - ) + exemplars = [ + e + for e in ( + bucket.collect(point_attributes) + for _, bucket in sorted(self._reservoir_storage.items()) + ) + if e is not None + ] self._reset() - return [*exemplars] + return exemplars def offer( self,