Skip to content

Commit

Permalink
sparse histogram with cumulative counts with read-only strait
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkingfish committed Jul 8, 2024
1 parent 09e8616 commit 8ebc80f
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 0 deletions.
2 changes: 2 additions & 0 deletions histogram/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ pub enum Error {
Underflow,
#[error("the histogram is not a subset")]
InvalidSubset,
#[error("the bucket index and count vectors must have the same length")]
LengthMismatch,
}
156 changes: 156 additions & 0 deletions histogram/src/sparse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,114 @@ pub struct SparseHistogram {
pub count: Vec<u64>,
}

#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
pub struct SparseHistogramRO {
/// parameters representing the resolution and the range of
/// the histogram tracking request latencies
pub config: Config,
/// indices for the non-zero buckets in the histogram
pub(crate) index: Vec<usize>,
/// histogram cumulative bucket counts corresponding to the indices
pub(crate) cumulative: Vec<u64>,
}

impl SparseHistogramRO {
/// Construct a new histogram from the provided parameters, bucket index,
/// and bucket count. See the documentation for [`crate::Config`] to
/// understand the meaning of the parameters `group_power` and
/// `max_value_power`.
/// Meanwhile, `index`` and `count` must be of the same length.
pub fn from_index_count(
grouping_power: u8,
max_value_power: u8,
index: Vec<usize>,
mut count: Vec<u64>,
) -> Result<Self, Error> {
let config = Config::new(grouping_power, max_value_power)?;

if index.len() != count.len() {
return Err(Error::LengthMismatch);
}

count_to_cumulative(count.as_mut_slice())?;

Ok(Self {
config,
index,
cumulative: count,
})
}

/// Return a collection of percentiles from this histogram.
///
/// Each percentile should be in the inclusive range `0.0..=100.0`. For
/// example, the 50th percentile (median) can be found using `50.0`.
///
/// The results will be sorted by the percentile.
pub fn percentiles(&self, percentiles: &[f64]) -> Result<Option<Vec<(f64, Bucket)>>, Error> {
// validate all the percentiles
if percentiles.is_empty() {
return Err(Error::InvalidPercentile);
}

for percentile in percentiles {
if !(0.0..=100.0).contains(percentile) {
return Err(Error::InvalidPercentile);
}
}

let total: u64 = self.cumulative.last().map_or(0, |x| *x);

// empty histogram, no percentiles available
if total == 0 {
return Ok(None);
}

// sort the requested percentiles so we can find them in a single pass
let mut percentiles = percentiles.to_vec();
percentiles.sort_by(|a, b| a.partial_cmp(b).unwrap());

let searches: Vec<u64> = percentiles
.iter()
.map(|p| ((total as f64) * *p / 100.0).ceil() as u64)
.collect();
let mut result: Vec<(f64, Bucket)> = Vec::with_capacity(percentiles.len());

let mut sliced = self.cumulative.as_slice();
let mut idx = 0;
for (percentile, value) in percentiles.iter().zip(searches.iter()) {
let p = sliced.binary_search(value).unwrap_or_else(|p| p);
idx += p;
result.push((
*percentile,
Bucket {
count: self.cumulative[idx]
- if idx == 0 {
0
} else {
self.cumulative[idx - 1]
},
range: self.config.index_to_range(self.index[idx]),
},
));
// shrink the slice for the next percentile
sliced = &sliced[idx..];
}

Ok(Some(result))
}

/// Return a single percentile from this histogram.
///
/// The percentile should be in the inclusive range `0.0..=100.0`. For
/// example, the 50th percentile (median) can be found using `50.0`.
pub fn percentile(&self, percentile: f64) -> Result<Option<Bucket>, Error> {
self.percentiles(&[percentile])
.map(|v| v.map(|x| x.first().unwrap().1.clone()))
}
}

impl SparseHistogram {
/// Construct a new histogram from the provided parameters. See the
/// documentation for [`crate::Config`] to understand their meaning.
Expand Down Expand Up @@ -314,6 +422,48 @@ impl From<&Histogram> for SparseHistogram {
}
}

impl From<&SparseHistogram> for SparseHistogramRO {
fn from(histogram: &SparseHistogram) -> Self {
let mut count = histogram.count.clone();
_ = count_to_cumulative(&mut count);

Self {
config: histogram.config,
index: histogram.index.clone(),
cumulative: count,
}
}
}

impl From<&SparseHistogramRO> for SparseHistogram {
fn from(histogram: &SparseHistogramRO) -> Self {
let mut cumulative = histogram.cumulative.clone();
cumulative_to_count(&mut cumulative);

Self {
config: histogram.config,
index: histogram.index.clone(),
count: cumulative,
}
}
}
fn count_to_cumulative(count: &mut [u64]) -> Result<(), Error> {
for offset in 1..count.len() {
count[offset] = count[offset].wrapping_add(count[offset - 1]);
if count[offset] < count[offset - 1] {
return Err(Error::Overflow);
}
}

Ok(())
}

fn cumulative_to_count(cumulative: &mut [u64]) {
for offset in (1..cumulative.len()).rev() {
cumulative[offset] -= cumulative[offset - 1];
}
}

#[cfg(test)]
mod tests {
use rand::Rng;
Expand Down Expand Up @@ -419,20 +569,26 @@ mod tests {
fn percentiles() {
let mut hstandard = Histogram::new(4, 10).unwrap();
let hempty = SparseHistogram::from(&hstandard);
let hempty_ro = SparseHistogramRO::from(&hempty);

for v in 1..1024 {
let _ = hstandard.increment(v);
}

let hsparse = SparseHistogram::from(&hstandard);
let hsparse_ro = SparseHistogramRO::from(&hsparse);
let percentiles = [1.0, 10.0, 25.0, 50.0, 75.0, 90.0, 99.0, 99.9];
for percentile in percentiles {
let bempty = hempty.percentile(percentile).unwrap();
let bempty_ro = hempty_ro.percentile(percentile).unwrap();
let bstandard = hstandard.percentile(percentile).unwrap();
let bsparse = hsparse.percentile(percentile).unwrap();
let bsparse_ro = hsparse_ro.percentile(percentile).unwrap();

assert_eq!(bempty, None);
assert_eq!(bempty_ro, None);
assert_eq!(bsparse, bstandard);
assert_eq!(bsparse_ro, bstandard);
}

assert_eq!(hempty.percentiles(&percentiles), Ok(None));
Expand Down

0 comments on commit 8ebc80f

Please sign in to comment.