From 27af2fd70addc52876764293eb1a8245e3ca8f8b Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Mon, 19 Aug 2024 18:08:43 +0000 Subject: [PATCH] Move precomputed sum code to a new file --- .../src/metrics/internal/aggregate.rs | 7 +- opentelemetry-sdk/src/metrics/internal/mod.rs | 1 + .../src/metrics/internal/precomputed_sum.rs | 193 ++++++++++++++++++ opentelemetry-sdk/src/metrics/internal/sum.rs | 187 +---------------- 4 files changed, 198 insertions(+), 190 deletions(-) create mode 100644 opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 61619b2039..79a06f7242 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -5,11 +5,8 @@ use opentelemetry::KeyValue; use crate::metrics::data::{Aggregation, Gauge, Temporality}; use super::{ - exponential_histogram::ExpoHistogram, - histogram::Histogram, - last_value::LastValue, - sum::{PrecomputedSum, Sum}, - Number, + exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue, + precomputed_sum::PrecomputedSum, sum::Sum, Number, }; const STREAM_CARDINALITY_LIMIT: u32 = 2000; diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 3d89479f5a..41f97aa20b 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -2,6 +2,7 @@ mod aggregate; mod exponential_histogram; mod histogram; mod last_value; +mod precomputed_sum; mod sum; use core::fmt; diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs new file mode 100644 index 0000000000..14e9c19b25 --- /dev/null +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -0,0 +1,193 @@ +use opentelemetry::KeyValue; + +use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; + +use super::{Assign, AtomicTracker, Number, ValueMap}; +use std::{ + collections::{HashMap, HashSet}, + sync::{atomic::Ordering, Arc, Mutex}, + time::SystemTime, +}; + +/// Summarizes a set of pre-computed sums as their arithmetic sum. +pub(crate) struct PrecomputedSum> { + value_map: ValueMap, + monotonic: bool, + start: Mutex, + reported: Mutex, T>>, +} + +impl> PrecomputedSum { + pub(crate) fn new(monotonic: bool) -> Self { + PrecomputedSum { + value_map: ValueMap::new(), + monotonic, + start: Mutex::new(SystemTime::now()), + reported: Mutex::new(Default::default()), + } + } + + pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { + // The argument index is not applicable to PrecomputedSum. + self.value_map.measure(measurement, attrs, 0); + } + + pub(crate) fn delta( + &self, + dest: Option<&mut dyn Aggregation>, + ) -> (usize, Option>) { + let t = SystemTime::now(); + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + + let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); + let mut new_agg = if s_data.is_none() { + Some(data::Sum { + data_points: vec![], + temporality: Temporality::Delta, + is_monotonic: self.monotonic, + }) + } else { + None + }; + let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); + s_data.data_points.clear(); + s_data.temporality = Temporality::Delta; + s_data.is_monotonic = self.monotonic; + + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; + if n > s_data.data_points.capacity() { + s_data + .data_points + .reserve_exact(n - s_data.data_points.capacity()); + } + let mut new_reported = HashMap::with_capacity(n); + let mut reported = match self.reported.lock() { + Ok(r) => r, + Err(_) => return (0, None), + }; + + if self + .value_map + .has_no_attribute_value + .swap(false, Ordering::AcqRel) + { + let value = self.value_map.no_attribute_tracker.get_value(); + let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); + new_reported.insert(vec![], value); + + s_data.data_points.push(DataPoint { + attributes: vec![], + start_time: Some(prev_start), + time: Some(t), + value: delta, + exemplars: vec![], + }); + } + + let mut trackers = match self.value_map.trackers.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.drain() { + if seen.insert(Arc::as_ptr(&tracker)) { + let value = tracker.get_value(); + let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); + new_reported.insert(attrs.clone(), value); + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: delta, + exemplars: vec![], + }); + } + } + + // The delta collection cycle resets. + if let Ok(mut start) = self.start.lock() { + *start = t; + } + self.value_map.count.store(0, Ordering::SeqCst); + + *reported = new_reported; + drop(reported); // drop before values guard is dropped + + ( + s_data.data_points.len(), + new_agg.map(|a| Box::new(a) as Box<_>), + ) + } + + pub(crate) fn cumulative( + &self, + dest: Option<&mut dyn Aggregation>, + ) -> (usize, Option>) { + let t = SystemTime::now(); + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + + let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); + let mut new_agg = if s_data.is_none() { + Some(data::Sum { + data_points: vec![], + temporality: Temporality::Cumulative, + is_monotonic: self.monotonic, + }) + } else { + None + }; + let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); + s_data.data_points.clear(); + s_data.temporality = Temporality::Cumulative; + s_data.is_monotonic = self.monotonic; + + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; + if n > s_data.data_points.capacity() { + s_data + .data_points + .reserve_exact(n - s_data.data_points.capacity()); + } + + if self + .value_map + .has_no_attribute_value + .load(Ordering::Acquire) + { + s_data.data_points.push(DataPoint { + attributes: vec![], + start_time: Some(prev_start), + time: Some(t), + value: self.value_map.no_attribute_tracker.get_value(), + exemplars: vec![], + }); + } + + let trackers = match self.value_map.trackers.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.iter() { + if seen.insert(Arc::as_ptr(tracker)) { + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: tracker.get_value(), + exemplars: vec![], + }); + } + } + + ( + s_data.data_points.len(), + new_agg.map(|a| Box::new(a) as Box<_>), + ) + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 819c8ff58f..68a58d1e8d 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -2,13 +2,13 @@ use std::collections::HashSet; use std::sync::atomic::Ordering; use std::sync::Arc; use std::vec; -use std::{collections::HashMap, sync::Mutex, time::SystemTime}; +use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use opentelemetry::KeyValue; -use super::{Assign, Increment, ValueMap}; use super::{AtomicTracker, Number}; +use super::{Increment, ValueMap}; /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum> { @@ -185,186 +185,3 @@ impl> Sum { ) } } - -/// Summarizes a set of pre-computed sums as their arithmetic sum. -pub(crate) struct PrecomputedSum> { - value_map: ValueMap, - monotonic: bool, - start: Mutex, - reported: Mutex, T>>, -} - -impl> PrecomputedSum { - pub(crate) fn new(monotonic: bool) -> Self { - PrecomputedSum { - value_map: ValueMap::new(), - monotonic, - start: Mutex::new(SystemTime::now()), - reported: Mutex::new(Default::default()), - } - } - - pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - // The argument index is not applicable to PrecomputedSum. - self.value_map.measure(measurement, attrs, 0); - } - - pub(crate) fn delta( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { - let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Sum { - data_points: vec![], - temporality: Temporality::Delta, - is_monotonic: self.monotonic, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); - s_data.temporality = Temporality::Delta; - s_data.is_monotonic = self.monotonic; - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let mut new_reported = HashMap::with_capacity(n); - let mut reported = match self.reported.lock() { - Ok(r) => r, - Err(_) => return (0, None), - }; - - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - let value = self.value_map.no_attribute_tracker.get_value(); - let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); - new_reported.insert(vec![], value); - - s_data.data_points.push(DataPoint { - attributes: vec![], - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.get_value(); - let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); - new_reported.insert(attrs.clone(), value); - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); - - *reported = new_reported; - drop(reported); // drop before values guard is dropped - - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) - } - - pub(crate) fn cumulative( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { - let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - - let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if s_data.is_none() { - Some(data::Sum { - data_points: vec![], - temporality: Temporality::Cumulative, - is_monotonic: self.monotonic, - }) - } else { - None - }; - let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); - s_data.temporality = Temporality::Cumulative; - s_data.is_monotonic = self.monotonic; - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], - start_time: Some(prev_start), - time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), - exemplars: vec![], - }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.get_value(), - exemplars: vec![], - }); - } - } - - ( - s_data.data_points.len(), - new_agg.map(|a| Box::new(a) as Box<_>), - ) - } -}