Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add min, max, and gaps methods to IntervalTree #174

Merged
merged 1 commit into from
Apr 22, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 116 additions & 2 deletions src/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ use std::{
convert::TryFrom,
fmt::Debug,
io::{self, Error, ErrorKind},
iter::DoubleEndedIterator,
ops::Range,
time::Duration,
};

Expand Down Expand Up @@ -698,13 +700,18 @@ impl Consumer {
None
};

let mut dedupe_window = IntervalTree::default();

// JetStream starts indexing at 1
dedupe_window.mark_processed(0);

Ok(Consumer {
nc,
stream,
cfg,
push_subscriber,
timeout: Duration::from_millis(5),
dedupe_window: Default::default(),
dedupe_window,
})
}

Expand Down Expand Up @@ -1041,7 +1048,7 @@ impl IntervalTree {
/// if this ID was not already marked as processed.
pub fn mark_processed(&mut self, id: u64) -> bool {
if self.inner.is_empty() {
self.inner.insert(1, 1);
self.inner.insert(id, id);
return true;
}

Expand Down Expand Up @@ -1110,6 +1117,90 @@ impl IntervalTree {
false
}
}

/// Returns the minimum ID marked as processed,
/// if any have been.
///
/// # Examples
///
/// ```
/// use nats::jetstream::IntervalTree;
///
/// let mut it = IntervalTree::default();
///
/// it.mark_processed(56);
/// it.mark_processed(259);
///
/// assert_eq!(it.min(), Some(56));
/// ```
pub fn min(&self) -> Option<u64> {
self.inner.iter().next().map(|(l, _h)| *l)
}

/// Returns the maximum ID marked as processed,
/// if any have been.
///
/// # Examples
///
/// ```
/// use nats::jetstream::IntervalTree;
///
/// let mut it = IntervalTree::default();
///
/// it.mark_processed(56);
/// it.mark_processed(259);
///
/// assert_eq!(it.max(), Some(259));
/// ```
pub fn max(&self) -> Option<u64> {
self.inner.iter().next_back().map(|(_l, h)| *h)
}

/// Returns a `DoubleEndedIterator` over
/// non-contiguous gaps that have not been
/// processed yet.
///
/// # Examples
///
/// ```
/// use std::ops::Range;
///
/// use nats::jetstream::IntervalTree;
///
/// let mut it = IntervalTree::default();
///
/// for id in 56..=122 {
/// it.mark_processed(id);
/// }
///
/// for id in 222..=259 {
/// it.mark_processed(id);
/// }
///
/// # assert_eq!(it.min(), Some(56));
/// # assert_eq!(it.max(), Some(259));
///
/// let gaps: Vec<Range<u64>> = it.gaps().collect();
///
/// assert_eq!(gaps, vec![Range { start: 123, end: 222 }]);
/// ```
pub fn gaps<'a>(
&'a self,
) -> impl 'a + DoubleEndedIterator<Item = Range<u64>> {
let mut iter = self.inner.iter();
let mut last_hi = iter.next().map(|(_l, h)| *h);
iter.map(move |(lo, hi)| {
let lh = last_hi.unwrap();
last_hi = Some(*hi);

assert!(lh + 1 < *lo);

Range {
start: lh + 1,
end: *lo,
}
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1169,5 +1260,28 @@ mod test {
assert!(rt.already_processed(5));
assert!(rt.already_processed(6));
assert!(!rt.already_processed(7));

let mut it = IntervalTree::default();

for id in 56..=122 {
it.mark_processed(id);
}

for id in 222..=259 {
it.mark_processed(id);
}

assert_eq!(it.min(), Some(56));
assert_eq!(it.max(), Some(259));

let gaps: Vec<Range<u64>> = it.gaps().collect();

assert_eq!(
gaps,
vec![Range {
start: 123,
end: 222
}]
);
}
}