diff --git a/docs/examples/exemplars/semantic_exemplars.py b/docs/examples/exemplars/semantic_exemplars.py index a7b780a9a18..9b85602d8da 100644 --- a/docs/examples/exemplars/semantic_exemplars.py +++ b/docs/examples/exemplars/semantic_exemplars.py @@ -20,14 +20,9 @@ import time from opentelemetry import metrics -from opentelemetry.sdk.metrics import ( - MeterProvider, - ValueRecorder, -) +from opentelemetry.sdk.metrics import MeterProvider, ValueRecorder from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter -from opentelemetry.sdk.metrics.export.aggregate import ( - HistogramAggregator, -) +from opentelemetry.sdk.metrics.export.aggregate import HistogramAggregator from opentelemetry.sdk.metrics.view import View, ViewConfig # Set up OpenTelemetry metrics @@ -35,7 +30,9 @@ meter = metrics.get_meter(__name__) # Use the Google Cloud Monitoring Metrics Exporter since its the only one that currently supports exemplars -metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 10) +metrics.get_meter_provider().start_pipeline( + meter, ConsoleMetricsExporter(), 10 +) # Create our duration metric request_duration = meter.create_metric( @@ -53,8 +50,26 @@ # [>=0ms, >=25ms, >=50ms, >=75ms, >=100ms, >=200ms, >=400ms, >=600ms, >=800ms, >=1s, >=2s, >=4s, >=6s] # We want to generate 1 exemplar per bucket, where each exemplar has a linked trace that was recorded. # So we need to set num_exemplars to 1 and not specify statistical_exemplars (defaults to false) - HistogramAggregator(config={"bounds": [0, 25, 50, 75, 100, 200, 400, 600, 800, 1000, 2000, 4000, 6000], - "num_exemplars": 1}), + HistogramAggregator( + config={ + "bounds": [ + 0, + 25, + 50, + 75, + 100, + 200, + 400, + 600, + 800, + 1000, + 2000, + 4000, + 6000, + ], + "num_exemplars": 1, + } + ), label_keys=["environment"], config=ViewConfig.LABEL_KEYS, ) @@ -63,5 +78,8 @@ for i in range(100): # Generate some random data for the histogram with a dropped label "customer_id" - request_duration.record(random.randint(1, 8000), {"environment": "staging", "customer_id": random.randint(1, 100)}) + request_duration.record( + random.randint(1, 8000), + {"environment": "staging", "customer_id": random.randint(1, 100)}, + ) time.sleep(1) diff --git a/docs/examples/exemplars/statistical_exemplars.py b/docs/examples/exemplars/statistical_exemplars.py index f64866b782a..9f6476fdb01 100644 --- a/docs/examples/exemplars/statistical_exemplars.py +++ b/docs/examples/exemplars/statistical_exemplars.py @@ -1,17 +1,18 @@ -import numpy as np -import matplotlib.pyplot as plt import random - from collections import defaultdict +import matplotlib.pyplot as plt +import numpy as np from opentelemetry import metrics from opentelemetry.sdk.metrics import Counter, MeterProvider from opentelemetry.sdk.metrics.export.aggregate import SumAggregator from opentelemetry.sdk.metrics.export.controller import PushController -from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter +from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import ( + InMemoryMetricsExporter, +) from opentelemetry.sdk.metrics.view import View, ViewConfig -## set up opentelemetry +# set up opentelemetry # Sets the global MeterProvider instance metrics.set_meter_provider(MeterProvider()) @@ -46,7 +47,8 @@ meter.register_view(counter_view) -## generate the random metric data +# generate the random metric data + def unknown_customer_calls(): """Generate customer call data to our application""" @@ -57,23 +59,49 @@ def unknown_customer_calls(): random.seed(1) # customer 123 is a big user, and made 1000 requests in this timeframe - requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100 + requests = np.random.normal( + 1000, 250, 1000 + ) # 1000 requests with average 1000 bytes, covariance 100 for request in requests: - bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 123}) + bytes_counter.add( + int(request), + { + "environment": "production", + "method": "REST", + "customer_id": 123, + }, + ) # customer 247 is another big user, making fewer, but bigger requests - requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes + requests = np.random.normal( + 5000, 1250, 200 + ) # 200 requests with average size of 5k bytes for request in requests: - bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 247}) + bytes_counter.add( + int(request), + { + "environment": "production", + "method": "REST", + "customer_id": 247, + }, + ) # There are many other smaller customers for customer_id in range(250): requests = np.random.normal(1000, 250, np.random.randint(1, 10)) method = "REST" if np.random.randint(2) else "gRPC" for request in requests: - bytes_counter.add(int(request), {"environment": "production", "method": method, "customer_id": customer_id}) + bytes_counter.add( + int(request), + { + "environment": "production", + "method": method, + "customer_id": customer_id, + }, + ) + unknown_customer_calls() @@ -93,10 +121,15 @@ def unknown_customer_calls(): customer_bytes_map[exemplar.dropped_labels] += exemplar.value -customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True) +customer_bytes_list = sorted( + list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True +) # Save our top 5 customers and sum all of the rest into "Others". -top_5_customers = [("Customer {}".format(dict(val[0])["customer_id"]), val[1]) for val in customer_bytes_list[:5]] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))] +top_5_customers = [ + ("Customer {}".format(dict(val[0])["customer_id"]), val[1]) + for val in customer_bytes_list[:5] +] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))] # unzip the data into X (sizes of each customer's contribution) and labels labels, X = zip(*top_5_customers) @@ -106,7 +139,9 @@ def unknown_customer_calls(): plt.show() # Estimate how many bytes customer 123 sent -customer_123_bytes = customer_bytes_map[(("customer_id", 123), ("method", "REST"))] +customer_123_bytes = customer_bytes_map[ + (("customer_id", 123), ("method", "REST")) +] # Since the exemplars were randomly sampled, all sample_counts will be the same sample_count = exemplars[0].sample_count @@ -114,18 +149,35 @@ def unknown_customer_calls(): full_customer_123_bytes = sample_count * customer_123_bytes # With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate) -print("Customer 123 sent about {} bytes this interval".format(int(full_customer_123_bytes))) +print( + "Customer 123 sent about {} bytes this interval".format( + int(full_customer_123_bytes) + ) +) # Determine the top 25 customers by how many bytes they sent in exemplars top_25_customers = customer_bytes_list[:25] # out of those 25 customers, determine how many used grpc, and come up with a ratio -percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == "gRPC", top_25_customers))) / len(top_25_customers) - -print("~{}% of the top 25 customers (by bytes in) used gRPC this interval".format(int(percent_grpc*100))) +percent_grpc = len( + list( + filter( + lambda customer_value: customer_value[0][1][1] == "gRPC", + top_25_customers, + ) + ) +) / len(top_25_customers) + +print( + "~{}% of the top 25 customers (by bytes in) used gRPC this interval".format( + int(percent_grpc * 100) + ) +) # Determine the 50th, 90th, and 99th percentile of byte size sent in -quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99]) +quantiles = np.quantile( + [exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99] +) print("50th Percentile Bytes In:", int(quantiles[0])) print("90th Percentile Bytes In:", int(quantiles[1])) print("99th Percentile Bytes In:", int(quantiles[2])) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py index 8abada0a3c7..ddd08df13c8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py @@ -82,7 +82,7 @@ def export( record.instrument, record.labels, record.aggregator.checkpoint, - record.aggregator.checkpoint_exemplars + record.aggregator.checkpoint_exemplars, ) ) return MetricsExportResult.SUCCESS diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index a47159b63b8..0df454642d0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -16,17 +16,14 @@ import logging import threading from collections import OrderedDict, namedtuple -import itertools -from collections import namedtuple, OrderedDict -from opentelemetry.util import time_ns from opentelemetry.sdk.metrics.export.exemplars import ( - Exemplar, - RandomExemplarSampler, - MinMaxExemplarSampler, BucketedExemplarSampler, - ExemplarManager + ExemplarManager, + MinMaxExemplarSampler, + RandomExemplarSampler, ) +from opentelemetry.util import time_ns logger = logging.getLogger(__name__) @@ -67,7 +64,9 @@ def __init__(self, config=None): self.checkpoint = 0 self._lock = threading.Lock() self.last_update_timestamp = None - self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + self.exemplar_manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) def update(self, value, dropped_labels=None): with self._lock: @@ -89,7 +88,9 @@ def merge(self, other): self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) - self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars) + self.checkpoint_exemplars = self.exemplar_manager.merge( + self.checkpoint_exemplars, other.checkpoint_exemplars + ) class MinMaxSumCountAggregator(Aggregator): @@ -118,7 +119,9 @@ def __init__(self, config=None): self._lock = threading.Lock() self.last_update_timestamp = None - self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + self.exemplar_manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) def update(self, value, dropped_labels=None): with self._lock: @@ -151,7 +154,9 @@ def merge(self, other): self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) - self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars) + self.checkpoint_exemplars = self.exemplar_manager.merge( + self.checkpoint_exemplars, other.checkpoint_exemplars + ) class HistogramAggregator(Aggregator): @@ -161,7 +166,7 @@ def __init__(self, config=None): super().__init__(config=config) self._lock = threading.Lock() self.last_update_timestamp = None - boundaries = self.config.get('bounds', None) + boundaries = self.config.get("bounds", None) if boundaries and self._validate_boundaries(boundaries): self._boundaries = boundaries else: @@ -169,7 +174,12 @@ def __init__(self, config=None): self.current = OrderedDict([(bb, 0) for bb in self._boundaries]) self.checkpoint = OrderedDict([(bb, 0) for bb in self._boundaries]) - self.exemplar_manager = ExemplarManager(config, BucketedExemplarSampler, BucketedExemplarSampler, boundaries=self._boundaries) + self.exemplar_manager = ExemplarManager( + config, + BucketedExemplarSampler, + BucketedExemplarSampler, + boundaries=self._boundaries, + ) self.current[">"] = 0 self.checkpoint[">"] = 0 @@ -205,14 +215,18 @@ def update(self, value, dropped_labels=None): # greater than max value if value >= self._boundaries[len(self._boundaries) - 1]: self.current[">"] += 1 - self.exemplar_manager.sample(value, dropped_labels, bucket_index=len(self._boundaries)) + self.exemplar_manager.sample( + value, dropped_labels, bucket_index=len(self._boundaries) + ) else: for index, bb in enumerate(self._boundaries): # find first bucket that value is less than if value < bb: self.current[bb] += 1 - self.exemplar_manager.sample(value, dropped_labels, bucket_index=index) + self.exemplar_manager.sample( + value, dropped_labels, bucket_index=index + ) break self.last_update_timestamp = time_ns() @@ -232,7 +246,9 @@ def merge(self, other): self.checkpoint, other.checkpoint ) - self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars) + self.checkpoint_exemplars = self.exemplar_manager.merge( + self.checkpoint_exemplars, other.checkpoint_exemplars + ) self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py index 9fc74cebe57..9eb82a5aed8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py @@ -27,17 +27,27 @@ """ import abc -import random import itertools +import random from opentelemetry.context import get_current from opentelemetry.util import time_ns + class Exemplar: """ A sample data point for an aggregator. Exemplars represent individual measurements recorded. """ - def __init__(self, value, timestamp, dropped_labels=None, span_id=None, trace_id=None, sample_count=None): + + def __init__( + self, + value, + timestamp, + dropped_labels=None, + span_id=None, + trace_id=None, + sample_count=None, + ): self._value = value self._timestamp = timestamp self._span_id = span_id @@ -46,7 +56,13 @@ def __init__(self, value, timestamp, dropped_labels=None, span_id=None, trace_id self._dropped_labels = dropped_labels def __repr__(self): - return f"Exemplar(value={self._value}, timestamp={self._timestamp}, labels={dict(self._dropped_labels) if self._dropped_labels else None}, context={{'span_id':{self._span_id}, 'trace_id':{self._trace_id}}})" + return "Exemplar(value={}, timestamp={}, labels={}, context={{'span_id':{}, 'trace_id':{}}})".format( + self._value, + self._timestamp, + dict(self._dropped_labels) if self._dropped_labels else None, + self._span_id, + self._trace_id, + ) @property def value(self): @@ -67,24 +83,26 @@ def span_id(self): def trace_id(self): """The trace ID of the context when the exemplar was recorded""" return self._trace_id - + @property def dropped_labels(self): """Labels that were dropped by the aggregator but still passed by the user""" return self._dropped_labels - + @property def sample_count(self): """For statistical exemplars, how many measurements a single exemplar represents""" return self._sample_count - + def set_sample_count(self, count): self._sample_count = count + class ExemplarSampler: """ Abstract class to sample exemplars through a stream of incoming measurements """ + def __init__(self, k, statistical=False): self._k = k self._statistical = statistical @@ -95,7 +113,6 @@ def sample(self, exemplar, **kwargs): """ Given an exemplar, determine if it should be sampled or not """ - pass @property @abc.abstractmethod @@ -103,21 +120,19 @@ def sample_set(self): """ Return the list of exemplars that have been sampled """ - pass @abc.abstractmethod def merge(self, set1, set2): """ Given two lists of sampled exemplars, merge them while maintaining the invariants specified by this sampler """ - pass @abc.abstractmethod def reset(self): """ Reset the sampler """ - pass + class RandomExemplarSampler(ExemplarSampler): """ @@ -127,6 +142,7 @@ class RandomExemplarSampler(ExemplarSampler): If RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. This value will be equal to the number of measurements recorded per every exemplar measured - all exemplars will have the same sample_count value. """ + def __init__(self, k, statistical=False): super().__init__(k, statistical=statistical) self.rand_count = 0 @@ -138,40 +154,52 @@ def sample(self, exemplar, **kwargs): self.sample_set.append(exemplar) return - j = random.randint(0, self.rand_count-1) + replace_index = random.randint(0, self.rand_count - 1) - if j < self._k: - self.sample_set[j] = exemplar + if replace_index < self._k: + self.sample_set[replace_index] = exemplar def merge(self, set1, set2): combined = set1 + set2 if len(combined) <= self._k: return combined - else: - return random.sample(combined, self._k) + return random.sample(combined, self._k) @property def sample_set(self): if self._statistical: for exemplar in self._sample_set: - exemplar.set_sample_count(self.rand_count / len(self._sample_set)) + exemplar.set_sample_count( + self.rand_count / len(self._sample_set) + ) return self._sample_set def reset(self): self._sample_set = [] self.rand_count = 0 + class MinMaxExemplarSampler(ExemplarSampler): """ Sample the minimum and maximum measurements recorded only """ + def __init__(self, k, statistical=False): # K will always be 2 (min and max), and selecting min and max can never be statistical super().__init__(2, statistical=False) self._sample_set = [] def sample(self, exemplar, **kwargs): - self._sample_set = [min(self._sample_set + [exemplar], key=lambda exemplar: exemplar.value), max(self._sample_set + [exemplar], key=lambda exemplar: exemplar.value)] + self._sample_set = [ + min( + self._sample_set + [exemplar], + key=lambda exemplar: exemplar.value, + ), + max( + self._sample_set + [exemplar], + key=lambda exemplar: exemplar.value, + ), + ] if self._sample_set[0] == self._sample_set[1]: self._sample_set = [self._sample_set[0]] @@ -189,6 +217,7 @@ def merge(self, set1, set2): def reset(self): self._sample_set = [] + class BucketedExemplarSampler(ExemplarSampler): """ Randomly sample k exemplars for each bucket in the aggregator. @@ -196,10 +225,14 @@ class BucketedExemplarSampler(ExemplarSampler): If `BucketedExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. This value will be equal to `len(bucket.exemplars) / bucket.count`, that is the number of measurements each exemplar represents. """ + def __init__(self, k, statistical=False, boundaries=None): super().__init__(k) self._boundaries = boundaries - self._sample_set = [RandomExemplarSampler(k, statistical=statistical) for _ in range(len(self._boundaries) + 1)] + self._sample_set = [ + RandomExemplarSampler(k, statistical=statistical) + for _ in range(len(self._boundaries) + 1) + ] def sample(self, exemplar, **kwargs): bucket_index = kwargs.get("bucket_index") @@ -210,10 +243,15 @@ def sample(self, exemplar, **kwargs): @property def sample_set(self): - return list(itertools.chain.from_iterable([sampler.sample_set for sampler in self._sample_set])) + return list( + itertools.chain.from_iterable( + [sampler.sample_set for sampler in self._sample_set] + ) + ) def merge(self, set1, set2): exemplar_set = [list() for _ in range(len(self._boundaries) + 1)] + # Sort both sets back into buckets for setx in [set1, set2]: bucket_idx = 0 for exemplar in setx: @@ -224,16 +262,18 @@ def merge(self, set1, set2): while exemplar.value >= self._boundaries[bucket_idx]: bucket_idx += 1 exemplar_set[bucket_idx].append(exemplar) - - for i, inner_set in enumerate(exemplar_set): + + # Pick only k exemplars for every bucket + for index, inner_set in enumerate(exemplar_set): if len(inner_set) > self._k: - exemplar_set[i] = random.sample(inner_set, self._k) + exemplar_set[index] = random.sample(inner_set, self._k) return list(itertools.chain.from_iterable(exemplar_set)) - + def reset(self): for sampler in self._sample_set: sampler.reset() + class ExemplarManager: """ Manages two different exemplar samplers: @@ -241,28 +281,58 @@ class ExemplarManager: 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) """ - def __init__(self, config, default_exemplar_sampler, statistical_exemplar_sampler=None, **kwargs): + def __init__( + self, + config, + default_exemplar_sampler, + statistical_exemplar_sampler, + **kwargs + ): if config: - self.exemplars_count = config.get('num_exemplars', 0) + self.exemplars_count = config.get("num_exemplars", 0) self.record_exemplars = self.exemplars_count > 0 - self.statistical_exemplars = config.get('statistical_exemplars', False) - if self.statistical_exemplars and statistical_exemplar_sampler: - self.exemplar_sampler = statistical_exemplar_sampler(self.exemplars_count, statistical=self.statistical_exemplars, **kwargs) + self.statistical_exemplars = config.get( + "statistical_exemplars", False + ) + if self.statistical_exemplars: + self.exemplar_sampler = statistical_exemplar_sampler( + self.exemplars_count, + statistical=self.statistical_exemplars, + **kwargs + ) else: - self.exemplar_sampler = default_exemplar_sampler(self.exemplars_count, statistical=self.statistical_exemplars, **kwargs) + self.exemplar_sampler = default_exemplar_sampler( + self.exemplars_count, + statistical=self.statistical_exemplars, + **kwargs + ) else: self.record_exemplars = False def sample(self, value, dropped_labels, **kwargs): context = get_current() - is_sampled = 'current-span' in context and context['current-span'].get_context().trace_flags.sampled if context else False + is_sampled = ( + "current-span" in context + and context["current-span"].get_context().trace_flags.sampled + if context + else False + ) # if not statistical, we want to gather traced exemplars only - so otherwise don't sample - if self.record_exemplars and (is_sampled or self.statistical_exemplars): - span_id = context['current-span'].context.span_id if context else None - trace_id = context['current-span'].context.trace_id if context else None - self.exemplar_sampler.sample(Exemplar(value, time_ns(), dropped_labels, span_id, trace_id), **kwargs) + if self.record_exemplars and ( + is_sampled or self.statistical_exemplars + ): + span_id = ( + context["current-span"].context.span_id if context else None + ) + trace_id = ( + context["current-span"].context.trace_id if context else None + ) + self.exemplar_sampler.sample( + Exemplar(value, time_ns(), dropped_labels, span_id, trace_id), + **kwargs + ) def take_checkpoint(self): if self.record_exemplars: @@ -273,5 +343,7 @@ def take_checkpoint(self): def merge(self, checkpoint_exemplars, other_checkpoint_exemplars): if self.record_exemplars: - return self.exemplar_sampler.merge(checkpoint_exemplars, other_checkpoint_exemplars) + return self.exemplar_sampler.merge( + checkpoint_exemplars, other_checkpoint_exemplars + ) return [] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py index b83e8a71f21..8b5e2126fb7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py @@ -39,7 +39,12 @@ class ViewData: - def __init__(self, labels: Tuple[Tuple[str, str]], aggregator: Aggregator, dropped_labels: Tuple[Tuple[str, str]] = None): + def __init__( + self, + labels: Tuple[Tuple[str, str]], + aggregator: Aggregator, + dropped_labels: Tuple[Tuple[str, str]] = None, + ): self.labels = labels self.aggregator = aggregator self.dropped_labels = dropped_labels @@ -144,7 +149,11 @@ def generate_view_datas(self, metric, labels): # ViewData that is duplicate (same labels and aggregator) will be # aggregated together as one view_datas.add( - ViewData(tuple(updated_labels), view.aggregator, dropped_labels=dropped_labels) + ViewData( + tuple(updated_labels), + view.aggregator, + dropped_labels=dropped_labels, + ) ) return view_datas diff --git a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py index 95412d2b8e9..9cd4c10f042 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py +++ b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py @@ -13,31 +13,32 @@ # limitations under the License. import unittest -from unittest.mock import patch from time import time +from unittest.mock import patch +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider, ValueRecorder from opentelemetry.sdk.metrics.export.aggregate import ( - SumAggregator, - MinMaxSumCountAggregator, HistogramAggregator, - Exemplar, - RandomExemplarSampler, MinMaxExemplarSampler, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) +from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.sdk.metrics.export.exemplars import ( BucketedExemplarSampler, + Exemplar, ExemplarManager, - ValueObserverAggregator + RandomExemplarSampler, ) -from opentelemetry.sdk.metrics import ( - MeterProvider, - ValueRecorder, +from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import ( + InMemoryMetricsExporter, ) -from opentelemetry import trace, metrics +from opentelemetry.sdk.metrics.view import View, ViewConfig from opentelemetry.sdk.trace import TracerProvider from opentelemetry.trace.sampling import ALWAYS_OFF, ALWAYS_ON -from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter -from opentelemetry.sdk.metrics.view import View, ViewConfig -from opentelemetry.sdk.metrics.export.controller import PushController class TestRandomExemplarSampler(unittest.TestCase): def test_sample(self): @@ -57,8 +58,9 @@ def test_sample(self): self.assertEqual(exemplar1.sample_count, 1) self.assertEqual(exemplar2.sample_count, 1) - def _patched_randint(mn, mx): - return mn + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument + return minimum with patch("random.randint", _patched_randint): sampler.sample(exemplar3) @@ -66,8 +68,9 @@ def _patched_randint(mn, mx): self.assertEqual(sampler.sample_set[0], exemplar3) self.assertEqual(exemplar3.sample_count, 1.5) self.assertEqual(exemplar2.sample_count, 1.5) - - def _patched_randint(mn, mx): + + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument return 1 with patch("random.randint", _patched_randint): @@ -94,9 +97,9 @@ def test_merge(self): set1 = [1, 2, 3] set2 = [4, 5, 6] sampler = RandomExemplarSampler(6) - self.assertEqual(set1+set2, sampler.merge(set1, set2)) + self.assertEqual(set1 + set2, sampler.merge(set1, set2)) sampler = RandomExemplarSampler(8) - self.assertEqual(set1+set2, sampler.merge(set1, set2)) + self.assertEqual(set1 + set2, sampler.merge(set1, set2)) sampler = RandomExemplarSampler(4) self.assertEqual(4, len(sampler.merge(set1, set2))) @@ -145,7 +148,9 @@ def test_merge(self): class TestBucketedExemplarSampler(unittest.TestCase): def test_exemplars(self): - sampler = BucketedExemplarSampler(1, boundaries=[2, 4, 7], statistical=True) + sampler = BucketedExemplarSampler( + 1, boundaries=[2, 4, 7], statistical=True + ) sampler.sample(Exemplar(3, time()), bucket_index=1) self.assertEqual(len(sampler.sample_set), 1) self.assertEqual(sampler.sample_set[0].value, 3) @@ -156,7 +161,8 @@ def test_exemplars(self): self.assertEqual(sampler.sample_set[1].value, 5) self.assertEqual(sampler.sample_set[1].sample_count, 1) - def _patched_randint(mn, mx): + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument return 0 with patch("random.randint", _patched_randint): @@ -178,20 +184,35 @@ def _patched_randint(mn, mx): def test_merge(self): sampler = BucketedExemplarSampler(1, boundaries=[3, 4, 6]) - self.assertEqual(len(sampler.merge([Exemplar(1, time())], [Exemplar(2, time())])), 1) + self.assertEqual( + len(sampler.merge([Exemplar(1, time())], [Exemplar(2, time())])), 1 + ) - self.assertEqual(len(sampler.merge([Exemplar(1, time()), Exemplar(5, time())], [Exemplar(2, time())])), 2) + self.assertEqual( + len( + sampler.merge( + [Exemplar(1, time()), Exemplar(5, time())], + [Exemplar(2, time())], + ) + ), + 2, + ) class TestExemplarManager(unittest.TestCase): def test_statistical(self): config = {"statistical_exemplars": True, "num_exemplars": 1} - manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) self.assertIsInstance(manager.exemplar_sampler, RandomExemplarSampler) manager.sample(5, {"dropped_label": "value"}) self.assertEqual(len(manager.exemplar_sampler.sample_set), 1) self.assertEqual(manager.exemplar_sampler.sample_set[0].value, 5) - self.assertEqual(manager.exemplar_sampler.sample_set[0].dropped_labels, {"dropped_label": "value"}) + self.assertEqual( + manager.exemplar_sampler.sample_set[0].dropped_labels, + {"dropped_label": "value"}, + ) checkpoint = manager.take_checkpoint() self.assertEqual(len(checkpoint), 1) @@ -204,7 +225,9 @@ def test_statistical(self): def test_semantic(self): config = {"statistical_exemplars": True, "num_exemplars": 1} - manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler) + manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) self.assertIsInstance(manager.exemplar_sampler, RandomExemplarSampler) manager.sample(5, {}) self.assertEqual(len(manager.exemplar_sampler.sample_set), 1) @@ -229,7 +252,9 @@ def _no_exemplars_test(self, aggregator): agg.take_checkpoint() self.assertEqual(agg.checkpoint_exemplars, []) - other_agg = aggregator(config={"num_exemplars": 2, "statistical_exemplars": True}) + other_agg = aggregator( + config={"num_exemplars": 2, "statistical_exemplars": True} + ) other_agg.update(2) other_agg.update(4) other_agg.take_checkpoint() @@ -244,7 +269,10 @@ def _simple_exemplars_test(self, aggregator): agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 1) self.assertEqual(agg.checkpoint_exemplars[0].value, 2) - self.assertEqual(agg.checkpoint_exemplars[0].dropped_labels, {"dropped_label": "value"}) + self.assertEqual( + agg.checkpoint_exemplars[0].dropped_labels, + {"dropped_label": "value"}, + ) agg.update(2) agg.update(5) @@ -255,8 +283,10 @@ def _simple_exemplars_test(self, aggregator): agg.update(2) agg.update(5) - def _patched_randint(mn, mx): + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument return 1 + with patch("random.randint", _patched_randint): agg.update(7) @@ -282,11 +312,13 @@ def _record_traces_only_test(self, aggregator): agg.update(5) agg.update(7) agg.update(6) - + agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 2) - self.assertEqual(agg.checkpoint_exemplars[0].span_id, span.context.span_id) + self.assertEqual( + agg.checkpoint_exemplars[0].span_id, span.context.span_id + ) self.assertEqual(agg.checkpoint_exemplars[0].value, 5) self.assertEqual(agg.checkpoint_exemplars[1].value, 7) @@ -346,7 +378,11 @@ def test_no_exemplars(self): agg.take_checkpoint() self.assertEqual(agg.checkpoint_exemplars, []) - other_agg = HistogramAggregator(config=dict(config, **{"num_exemplars": 1, "statistical_exemplars": True})) + other_agg = HistogramAggregator( + config=dict( + config, **{"num_exemplars": 1, "statistical_exemplars": True} + ) + ) other_agg.update(3) other_agg.update(5) @@ -357,13 +393,20 @@ def test_no_exemplars(self): self.assertEqual(agg.checkpoint_exemplars, []) def test_simple_exemplars(self): - config = {"bounds": [2, 4, 7], "num_exemplars": 1, "statistical_exemplars": True} + config = { + "bounds": [2, 4, 7], + "num_exemplars": 1, + "statistical_exemplars": True, + } agg = HistogramAggregator(config=config) agg.update(2, dropped_labels={"dropped_label": "value"}) agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 1) self.assertEqual(agg.checkpoint_exemplars[0].value, 2) - self.assertEqual(agg.checkpoint_exemplars[0].dropped_labels, {"dropped_label": "value"}) + self.assertEqual( + agg.checkpoint_exemplars[0].dropped_labels, + {"dropped_label": "value"}, + ) agg.update(2) agg.update(5) @@ -373,7 +416,8 @@ def test_simple_exemplars(self): agg.update(5) - def _patched_randint(mn, mx): + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument return 0 with patch("random.randint", _patched_randint): @@ -391,7 +435,11 @@ def _patched_randint(mn, mx): self.assertEqual(len(agg.checkpoint_exemplars), 4) def test_record_traces_only(self): - config = {"bounds": [2, 4, 6], "num_exemplars": 2, "statistical_exemplars": False} + config = { + "bounds": [2, 4, 6], + "num_exemplars": 2, + "statistical_exemplars": False, + } agg = HistogramAggregator(config=config) agg.update(2) @@ -409,7 +457,9 @@ def test_record_traces_only(self): agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 1) - self.assertEqual(agg.checkpoint_exemplars[0].span_id, span.context.span_id) + self.assertEqual( + agg.checkpoint_exemplars[0].span_id, span.context.span_id + ) tp = TracerProvider(sampler=ALWAYS_OFF) tracer = tp.get_tracer(__name__) @@ -420,6 +470,7 @@ def test_record_traces_only(self): agg.take_checkpoint() self.assertEqual(len(agg.checkpoint_exemplars), 0) + class TestFullPipelineExemplars(unittest.TestCase): def test_histogram(self): # Use the meter type provided by the SDK package @@ -438,7 +489,13 @@ def test_histogram(self): size_view = View( requests_size, - HistogramAggregator(config={"bounds": [20, 40, 60, 80, 100], "num_exemplars": 1, "statistical_exemplars": True}), + HistogramAggregator( + config={ + "bounds": [20, 40, 60, 80, 100], + "num_exemplars": 1, + "statistical_exemplars": True, + } + ), label_keys=["environment"], config=ViewConfig.LABEL_KEYS, ) @@ -456,5 +513,14 @@ def test_histogram(self): self.assertEqual(len(metrics_list), 1) exemplars = metrics_list[0].aggregator.checkpoint_exemplars self.assertEqual(len(exemplars), 3) - self.assertEqual([(exemplar.value, exemplar.dropped_labels) for exemplar in exemplars], - [(1, (("test", "value2"),)), (25, (("test", "value"),)), (200, (("test", "value3"),))]) + self.assertEqual( + [ + (exemplar.value, exemplar.dropped_labels) + for exemplar in exemplars + ], + [ + (1, (("test", "value2"),)), + (25, (("test", "value"),)), + (200, (("test", "value3"),)), + ], + ) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 4e90bf305ed..6c844ac5a71 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -50,7 +50,7 @@ def test_export(self): labels = {"environment": "staging"} aggregator = SumAggregator() record = MetricRecord(metric, labels, aggregator) - result = '{}(data="{}", labels="{}", value={})'.format( + result = '{}(data="{}", labels="{}", value={}, exemplars=[])'.format( ConsoleMetricsExporter.__name__, metric, labels,