Skip to content

Commit

Permalink
Instrumented infra
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Oct 24, 2023
1 parent 41fe707 commit f3a0ece
Show file tree
Hide file tree
Showing 15 changed files with 594 additions and 199 deletions.
2 changes: 2 additions & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ disallowed-methods = [
{ path = "futures::future::join_all", reason = "We don't have a replacement for this method yet. Consider extending `SeqJoin` trait." },
{ path = "futures::future::try_join_all", reason = "Use Context.try_join instead." },
]

allow-private-module-inception = true
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ toml = { version = "0.8", optional = true }
tower = { version = "0.4.13", optional = true }
tower-http = { version = "0.4.0", optional = true, features = ["trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "ansi"] }
typenum = "1.16"
# hpke is pinned to it
x25519-dalek = "2.0.0-rc.3"
delegate = "0.10.0"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5.0"
Expand Down
34 changes: 0 additions & 34 deletions src/helpers/buffers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,3 @@ mod unordered_receiver;
pub use ordering_mpsc::{ordering_mpsc, OrderingMpscReceiver, OrderingMpscSender};
pub use ordering_sender::{OrderedStream, OrderingSender};
pub use unordered_receiver::UnorderedReceiver;

#[cfg(debug_assertions)]
#[allow(unused)] // todo(alex): make test world print the state again
mod waiting {
use std::collections::HashMap;

use crate::helpers::ChannelId;

pub(in crate::helpers) struct WaitingTasks<'a> {
tasks: HashMap<&'a ChannelId, Vec<u32>>,
}

impl<'a> WaitingTasks<'a> {
pub fn new(tasks: HashMap<&'a ChannelId, Vec<u32>>) -> Self {
Self { tasks }
}

pub fn is_empty(&self) -> bool {
self.tasks.is_empty()
}
}

impl std::fmt::Debug for WaitingTasks<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
for (channel, records) in &self.tasks {
write!(f, "\n {channel:?}: {records:?}")?;
}
write!(f, "\n]")?;

Ok(())
}
}
}
18 changes: 18 additions & 0 deletions src/helpers/buffers/ordering_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ impl WaitingShard {
self.wakers.pop_front().unwrap().w.wake();
}
}

pub fn waiting(&self) -> impl Iterator<Item = usize> + '_ {
self.wakers.iter().map(|waker| waker.i)
}
}

/// A collection of wakers that are indexed by the send index (`i`).
Expand Down Expand Up @@ -224,6 +228,12 @@ impl Waiting {
fn wake(&self, i: usize) {
self.shard(i).wake(i);
}

fn waiting(&self, indices: &mut Vec<usize>) {
self.shards
.iter()
.for_each(|shard| indices.extend(shard.lock().unwrap().waiting()));
}
}

/// An `OrderingSender` accepts messages for sending in any order, but
Expand Down Expand Up @@ -375,6 +385,14 @@ impl OrderingSender {
) -> OrderedStream<crate::sync::Arc<Self>> {
OrderedStream { sender: self }
}

pub fn waiting(&self) -> Vec<usize> {
let mut buf = Vec::new();
self.waiting.waiting(&mut buf);
buf.sort_unstable();

buf
}
}

/// A future for writing item `i` into an `OrderingSender`.
Expand Down
50 changes: 47 additions & 3 deletions src/helpers/buffers/unordered_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ where
/// Note: in protocols we try to send before receiving, so we can rely on
/// that easing load on this mechanism. There might also need to be some
/// end-to-end back pressure for tasks that do not involve sending at all.
overflow_wakers: Vec<Waker>,
overflow_wakers: Vec<(Waker, usize)>,
/// If this receiver is closed and no longer capable of receiving data.
closed: bool,
_marker: PhantomData<C>,
Expand Down Expand Up @@ -184,7 +184,7 @@ where
);
// We don't save a waker at `self.next`, so `>` and not `>=`.
if i > self.next + self.wakers.len() {
self.overflow_wakers.push(waker);
self.overflow_wakers.push((waker, i));
} else {
let index = i % self.wakers.len();
if let Some(old) = self.wakers[index].as_ref() {
Expand All @@ -207,7 +207,8 @@ where
}
if self.next % (self.wakers.len() / 2) == 0 {
// Wake all the overflowed wakers. See comments on `overflow_wakers`.
for w in take(&mut self.overflow_wakers) {
// todo: we may want to wake specific wakers now
for (w, _) in take(&mut self.overflow_wakers) {
w.wake();
}
}
Expand Down Expand Up @@ -288,6 +289,42 @@ where
self.spare = Spare::default();
}
}

fn waiting(&self) -> impl Iterator<Item = usize> + '_ {
// we are under-reporting overflow wakers as there is no information about their
// let mut r = Vec::new();
let start = self.next % self.wakers.len();
self.wakers
.iter()
.enumerate()
.filter_map(|(i, waker)| waker.as_ref().map(|_| i))
.map(move |i| {
if i < start {
self.next + (self.wakers.len() - start + i)
} else {
self.next + (i - start)
}
})
.chain(self.overflow_wakers.iter().map(|v| v.1))

// for (i, maybe_waker) in self.wakers.iter().enumerate() {
// if let Some(waker) = maybe_waker {
// if i < start {
// r.push(self.next + (self.wakers.len() - start + i))
// } else {
// r.push(self.next + (i - start))
// }
// }
// }
//
// for (_, i) in &self.overflow_wakers {
// r.push(*i);
// }
//
// r.sort_unstable();
//
// r.into_iter()
}
}

/// Take an ordered stream of bytes and make messages from that stream
Expand Down Expand Up @@ -357,6 +394,13 @@ where
// Closed streams cannot move back to open.
self.inner.lock().unwrap().is_closed()
}

pub fn waiting(&self) -> Vec<usize> {
let mut r = self.inner.lock().unwrap().waiting().collect::<Vec<_>>();
r.sort_unstable();

r
}
}

impl<S, C> Clone for UnorderedReceiver<S, C>
Expand Down
Loading

0 comments on commit f3a0ece

Please sign in to comment.