Skip to content

Commit

Permalink
Move precomputed sum code to a new file (#2036)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
  • Loading branch information
utpilla and cijothomas authored Aug 19, 2024
1 parent 20625f9 commit 3a4b17a
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 190 deletions.
7 changes: 2 additions & 5 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ use opentelemetry::KeyValue;
use crate::metrics::data::{Aggregation, Gauge, Temporality};

use super::{
exponential_histogram::ExpoHistogram,
histogram::Histogram,
last_value::LastValue,
sum::{PrecomputedSum, Sum},
Number,
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
precomputed_sum::PrecomputedSum, sum::Sum, Number,
};

const STREAM_CARDINALITY_LIMIT: u32 = 2000;
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod aggregate;
mod exponential_histogram;
mod histogram;
mod last_value;
mod precomputed_sum;
mod sum;

use core::fmt;
Expand Down
193 changes: 193 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use opentelemetry::KeyValue;

use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};

use super::{Assign, AtomicTracker, Number, ValueMap};
use std::{
collections::{HashMap, HashSet},
sync::{atomic::Ordering, Arc, Mutex},
time::SystemTime,
};

/// Summarizes a set of pre-computed sums as their arithmetic sum.
pub(crate) struct PrecomputedSum<T: Number<T>> {
value_map: ValueMap<T, T, Assign>,
monotonic: bool,
start: Mutex<SystemTime>,
reported: Mutex<HashMap<Vec<KeyValue>, T>>,
}

impl<T: Number<T>> PrecomputedSum<T> {
pub(crate) fn new(monotonic: bool) -> Self {
PrecomputedSum {
value_map: ValueMap::new(),
monotonic,
start: Mutex::new(SystemTime::now()),
reported: Mutex::new(Default::default()),
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
// The argument index is not applicable to PrecomputedSum.
self.value_map.measure(measurement, attrs, 0);
}

pub(crate) fn delta(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);

let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
let mut new_agg = if s_data.is_none() {
Some(data::Sum {
data_points: vec![],
temporality: Temporality::Delta,
is_monotonic: self.monotonic,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.data_points.clear();
s_data.temporality = Temporality::Delta;
s_data.is_monotonic = self.monotonic;

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
}
let mut new_reported = HashMap::with_capacity(n);
let mut reported = match self.reported.lock() {
Ok(r) => r,
Err(_) => return (0, None),
};

if self
.value_map
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
let value = self.value_map.no_attribute_tracker.get_value();
let delta = value - *reported.get(&vec![]).unwrap_or(&T::default());
new_reported.insert(vec![], value);

s_data.data_points.push(DataPoint {
attributes: vec![],
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}

let mut trackers = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
let value = tracker.get_value();
let delta = value - *reported.get(&attrs).unwrap_or(&T::default());
new_reported.insert(attrs.clone(), value);
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}
}

// The delta collection cycle resets.
if let Ok(mut start) = self.start.lock() {
*start = t;
}
self.value_map.count.store(0, Ordering::SeqCst);

*reported = new_reported;
drop(reported); // drop before values guard is dropped

(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}

pub(crate) fn cumulative(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);

let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
let mut new_agg = if s_data.is_none() {
Some(data::Sum {
data_points: vec![],
temporality: Temporality::Cumulative,
is_monotonic: self.monotonic,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.data_points.clear();
s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic;

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
}

if self
.value_map
.has_no_attribute_value
.load(Ordering::Acquire)
{
s_data.data_points.push(DataPoint {
attributes: vec![],
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_tracker.get_value(),
exemplars: vec![],
});
}

let trackers = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
exemplars: vec![],
});
}
}

(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}
}
Loading

0 comments on commit 3a4b17a

Please sign in to comment.