From 8a4086d7939dcf391ab566261be337a5867dbab5 Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Sun, 2 May 2021 16:20:26 -0500 Subject: [PATCH] Added debug implementations for a bunch of types --- logging/src/lib.rs | 23 +++++- timely/src/dataflow/channels/pact.rs | 71 ++++++++++++------- .../src/dataflow/channels/pushers/buffer.rs | 1 + .../src/dataflow/channels/pushers/counter.rs | 6 +- timely/src/dataflow/channels/pushers/tee.rs | 45 ++++++++++-- .../src/dataflow/operators/capture/event.rs | 6 ++ timely/src/dataflow/operators/feedback.rs | 1 + .../dataflow/operators/generic/builder_raw.rs | 2 + .../dataflow/operators/generic/builder_rc.rs | 1 + .../src/dataflow/operators/generic/handles.rs | 1 + .../dataflow/operators/generic/notificator.rs | 4 +- timely/src/dataflow/operators/input.rs | 3 +- timely/src/dataflow/operators/probe.rs | 10 +++ .../src/dataflow/operators/unordered_input.rs | 1 + timely/src/dataflow/stream.rs | 4 +- timely/src/scheduling/activate.rs | 4 ++ 16 files changed, 144 insertions(+), 39 deletions(-) diff --git a/logging/src/lib.rs b/logging/src/lib.rs index 02394ac6c0..3dab7d3731 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -4,6 +4,7 @@ use std::cell::RefCell; use std::any::Any; use std::collections::HashMap; use std::time::{Instant, Duration}; +use std::fmt::{self, Debug}; pub struct Registry { /// A worker-specific identifier. @@ -31,7 +32,7 @@ impl Registry { name: &str, action: F) -> Option> { - let logger = Logger::::new(self.time.clone(), Duration::default(), self.id.clone(), action); + let logger = Logger::::new(self.time, Duration::default(), self.id.clone(), action); self.insert_logger(name, logger) } @@ -99,7 +100,7 @@ impl Clone for Logger { Logger { id: self.id.clone(), time: self.time, - offset: self.offset.clone(), + offset: self.offset, action: self.action.clone(), buffer: self.buffer.clone(), } @@ -153,7 +154,7 @@ impl Logger { let mut buffer = self.buffer.borrow_mut(); let elapsed = self.time.elapsed() + self.offset; for event in events { - buffer.push((elapsed.clone(), self.id.clone(), event.into())); + buffer.push((elapsed, self.id.clone(), event.into())); if buffer.len() == buffer.capacity() { // Would call `self.flush()`, but for `RefCell` panic. let mut action = self.action.borrow_mut(); @@ -186,6 +187,22 @@ impl Drop for Logger { } } +impl Debug for Logger +where + E: Debug, + T: Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Logger") + .field("id", &self.id) + .field("time", &self.time) + .field("offset", &self.offset) + .field("action", &Rc::as_ptr(&self.action)) + .field("buffer", &self.buffer) + .finish() + } +} + /// Types that can be flushed. trait Flush { /// Flushes buffered data. diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 2b123683e0..43b5e42792 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -7,7 +7,7 @@ //! The only requirement of a pact is that it not alter the number of `D` records at each time `T`. //! The progress tracking logic assumes that this number is independent of the pact used. -use std::marker::PhantomData; +use std::{fmt::{self, Debug}, marker::PhantomData}; use crate::communication::{Push, Pull, Data}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; @@ -16,7 +16,7 @@ use crate::worker::AsWorker; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; use super::{Bundle, Message}; -use crate::logging::TimelyLogger as Logger; +use crate::logging::{TimelyLogger as Logger, MessagesEvent}; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { @@ -29,7 +29,9 @@ pub trait ParallelizationContract { } /// A direct connection +#[derive(Debug)] pub struct Pipeline; + impl ParallelizationContract for Pipeline { type Pusher = LogPusher>>; type Puller = LogPuller>>; @@ -43,8 +45,9 @@ impl ParallelizationContract for Pipeline { } /// An exchange between multiple observers by data -pub struct Exchangeu64+'static> { hash_func: F, phantom: PhantomData, } -implu64> Exchange { +pub struct Exchange { hash_func: F, phantom: PhantomData } + +implu64+'static> Exchange { /// Allocates a new `Exchange` pact from a distribution function. pub fn new(func: F) -> Exchange { Exchange { @@ -67,16 +70,24 @@ implu64+'static> Parallelization } } +impl Debug for Exchange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Exchange").finish() + } +} + /// Wraps a `Message` pusher to provide a `Push<(T, Content)>`. +#[derive(Debug)] pub struct LogPusher>> { pusher: P, channel: usize, counter: usize, source: usize, target: usize, - phantom: ::std::marker::PhantomData<(T, D)>, + phantom: PhantomData<(T, D)>, logging: Option, } + impl>> LogPusher { /// Allocates a new pusher. pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option) -> Self { @@ -86,7 +97,7 @@ impl>> LogPusher { counter: 0, source, target, - phantom: ::std::marker::PhantomData, + phantom: PhantomData, logging, } } @@ -97,34 +108,40 @@ impl>> Push> for LogPusher { fn push(&mut self, pair: &mut Option>) { if let Some(bundle) = pair { self.counter += 1; + // Stamp the sequence number and source. // FIXME: Awkward moment/logic. if let Some(message) = bundle.if_mut() { - message.seq = self.counter-1; + message.seq = self.counter - 1; message.from = self.source; } - self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent { - is_send: true, - channel: self.channel, - source: self.source, - target: self.target, - seq_no: self.counter-1, - length: bundle.data.len(), - })); + if let Some(logger) = self.logging.as_ref() { + logger.log(MessagesEvent { + is_send: true, + channel: self.channel, + source: self.source, + target: self.target, + seq_no: self.counter - 1, + length: bundle.data.len(), + }) + } } + self.pusher.push(pair); } } /// Wraps a `Message` puller to provide a `Pull<(T, Content)>`. +#[derive(Debug)] pub struct LogPuller>> { puller: P, channel: usize, index: usize, - phantom: ::std::marker::PhantomData<(T, D)>, + phantom: PhantomData<(T, D)>, logging: Option, } + impl>> LogPuller { /// Allocates a new `Puller`. pub fn new(puller: P, index: usize, channel: usize, logging: Option) -> Self { @@ -132,7 +149,7 @@ impl>> LogPuller { puller, channel, index, - phantom: ::std::marker::PhantomData, + phantom: PhantomData, logging, } } @@ -145,15 +162,19 @@ impl>> Pull> for LogPuller { if let Some(bundle) = result { let channel = self.channel; let target = self.index; - self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent { - is_send: false, - channel, - source: bundle.from, - target, - seq_no: bundle.seq, - length: bundle.data.len(), - })); + + if let Some(logger) = self.logging.as_ref() { + logger.log(MessagesEvent { + is_send: false, + channel, + source: bundle.from, + target, + seq_no: bundle.seq, + length: bundle.data.len(), + }); + } } + result } } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 7bb4c5553a..288d5ff884 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -10,6 +10,7 @@ use crate::communication::Push; /// /// The `Buffer` type should be used by calling `session` with a time, which checks whether /// data must be flushed and creates a `Session` object which allows sending at the given time. +#[derive(Debug)] pub struct Buffer>> { time: Option, // the currently open time, if it is open buffer: Vec, // a buffer for records, to send at self.time diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index 32f97b1cdd..c8d6dff72f 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -1,5 +1,6 @@ //! A wrapper which counts the number of records pushed past and updates a shared count map. +use std::marker::PhantomData; use std::rc::Rc; use std::cell::RefCell; @@ -8,10 +9,11 @@ use crate::dataflow::channels::Bundle; use crate::communication::Push; /// A wrapper which updates shared `produced` based on the number of records pushed. +#[derive(Debug)] pub struct Counter>> { pushee: P, produced: Rc>>, - phantom: ::std::marker::PhantomData, + phantom: PhantomData, } impl Push> for Counter where T : Ord+Clone+'static, P: Push> { @@ -34,7 +36,7 @@ impl>> Counter where T : Ord+Clone+'static { Counter { pushee, produced: Rc::new(RefCell::new(ChangeBatch::new())), - phantom: ::std::marker::PhantomData, + phantom: PhantomData, } } /// A references to shared changes in counts, for cloning or draining. diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index ae0b9be8dd..f5e3915eaf 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -1,17 +1,20 @@ //! A `Push` implementor with a list of `Box` to forward pushes to. -use std::rc::Rc; use std::cell::RefCell; +use std::fmt::{self, Debug}; +use std::rc::Rc; -use crate::Data; use crate::dataflow::channels::{Bundle, Message}; +use crate::Data; use crate::communication::Push; +type PushList = Rc>>>>>; + /// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. pub struct Tee { buffer: Vec, - shared: Rc>>>>>, + shared: PushList, } impl Push> for Tee { @@ -58,9 +61,27 @@ impl Clone for Tee { } } +impl Debug for Tee +where + D: Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut debug = f.debug_struct("Tee"); + debug.field("buffer", &self.buffer); + + if let Ok(shared) = self.shared.try_borrow() { + debug.field("shared", &format!("{} pushers", shared.len())); + } else { + debug.field("shared", &"..."); + } + + debug.finish() + } +} + /// A shared list of `Box` used to add `Push` implementors. pub struct TeeHelper { - shared: Rc>>>>> + shared: PushList, } impl TeeHelper { @@ -73,7 +94,21 @@ impl TeeHelper { impl Clone for TeeHelper { fn clone(&self) -> Self { TeeHelper { - shared: self.shared.clone() + shared: self.shared.clone(), } } } + +impl Debug for TeeHelper { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut debug = f.debug_struct("TeeHelper"); + + if let Ok(shared) = self.shared.try_borrow() { + debug.field("shared", &format!("{} pushers", shared.len())); + } else { + debug.field("shared", &"..."); + } + + debug.finish() + } +} diff --git a/timely/src/dataflow/operators/capture/event.rs b/timely/src/dataflow/operators/capture/event.rs index d160a70406..31e3d6b0d8 100644 --- a/timely/src/dataflow/operators/capture/event.rs +++ b/timely/src/dataflow/operators/capture/event.rs @@ -101,6 +101,12 @@ pub mod link { } } + impl Default for EventLink { + fn default() -> Self { + Self::new() + } + } + #[test] fn avoid_stack_overflow_in_drop() { let mut event1 = Rc::new(EventLink::<(),()>::new()); diff --git a/timely/src/dataflow/operators/feedback.rs b/timely/src/dataflow/operators/feedback.rs index 61be7d3edb..8517643034 100644 --- a/timely/src/dataflow/operators/feedback.rs +++ b/timely/src/dataflow/operators/feedback.rs @@ -131,6 +131,7 @@ impl ConnectLoop for Stream { } /// A handle used to bind the source of a loop variable. +#[derive(Debug)] pub struct Handle { builder: OperatorBuilder, summary: ::Summary, diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index ace34cb00c..a4c151ea8b 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -21,6 +21,7 @@ use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::operators::generic::operator_info::OperatorInfo; /// Contains type-free information about the operator properties. +#[derive(Debug)] pub struct OperatorShape { name: String, // A meaningful name for the operator. notify: bool, // Does the operator require progress notifications. @@ -53,6 +54,7 @@ impl OperatorShape { } /// Builds operators with generic shape. +#[derive(Debug)] pub struct OperatorBuilder { scope: G, index: usize, diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 98677b5f96..21eee25ddf 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -26,6 +26,7 @@ use crate::logging::TimelyLogger as Logger; use super::builder_raw::OperatorBuilder as OperatorBuilderRaw; /// Builds operators with generic shape. +#[derive(Debug)] pub struct OperatorBuilder { builder: OperatorBuilderRaw, frontier: Vec>, diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index d8a0303d40..b9d69f634d 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -161,6 +161,7 @@ pub fn new_input_handle>>(pull_counter: Pu /// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other /// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the /// pusher is flushed (via the `cease` method) once it is no longer used. +#[derive(Debug)] pub struct OutputWrapper>> { push_buffer: Buffer>, internal_buffer: Rc>>, diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index ae3ad741cb..600864454a 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -14,6 +14,7 @@ use crate::logging::TimelyLogger as Logger; /// Notification requests persist across uses of `Notificator`, and it may help to think of `Notificator` /// as a notification *session*. However, idiomatically it seems you mostly want to restrict your usage /// to such sessions, which is why this is the main notificator type. +#[derive(Debug)] pub struct Notificator<'a, T: Timestamp> { frontiers: &'a [&'a MutableAntichain], inner: &'a mut FrontierNotificator, @@ -230,6 +231,7 @@ fn notificator_delivers_notifications_in_topo_order() { /// in2.close(); /// }).unwrap(); /// ``` +#[derive(Debug)] pub struct FrontierNotificator { pending: Vec<(Capability, u64)>, available: ::std::collections::BinaryHeap>, @@ -403,7 +405,7 @@ impl FrontierNotificator { } } -#[derive(PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq)] struct OrderReversed { element: Capability, } diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index e66d9a63e1..11d02896d0 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -130,6 +130,7 @@ impl Input for G where ::Timestamp: TotalOrder { } } +#[derive(Debug)] struct Operator { name: String, address: Vec, @@ -168,7 +169,7 @@ impl Operate for Operator { /// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. - +#[derive(Debug)] pub struct Handle { activate: Vec, progress: Vec>>>, diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index e3cfcabf3d..124be5972c 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -137,6 +137,7 @@ impl Probe for Stream { } /// Reports information about progress at the probe. +#[derive(Debug)] pub struct Handle { frontier: Rc>> } @@ -178,6 +179,15 @@ impl Clone for Handle { } } +impl Default for Handle +where + T: Timestamp, +{ + fn default() -> Self { + Self::new() + } +} + #[cfg(test)] mod tests { diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index 89b3d1faf9..6dc163bbfd 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -145,6 +145,7 @@ impl Operate for UnorderedOperator { } /// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. +#[derive(Debug)] pub struct UnorderedHandle { buffer: PushBuffer>>, } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index fec4187692..8a3bccfbd1 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -10,7 +10,7 @@ use crate::communication::Push; use crate::dataflow::Scope; use crate::dataflow::channels::pushers::tee::TeeHelper; use crate::dataflow::channels::Bundle; -use std::fmt; +use std::fmt::{self, Debug}; // use dataflow::scopes::root::loggers::CHANNELS_Q; @@ -56,7 +56,7 @@ impl Stream { pub fn scope(&self) -> S { self.scope.clone() } } -impl fmt::Debug for Stream +impl Debug for Stream where S: Scope, { diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index 0b49f1fda8..b4f7ff412d 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -187,6 +187,7 @@ impl Activations { } /// A thread-safe handle to an `Activations`. +#[derive(Debug)] pub struct SyncActivations { tx: Sender>, thread: Thread, @@ -217,6 +218,7 @@ impl SyncActivations { } /// A capability to activate a specific path. +#[derive(Debug)] pub struct Activator { path: Vec, queue: Rc>, @@ -251,6 +253,7 @@ impl Activator { } /// A thread-safe version of `Activator`. +#[derive(Debug)] pub struct SyncActivator { path: Vec, queue: SyncActivations, @@ -295,6 +298,7 @@ impl std::error::Error for SyncActivationError { } /// A wrapper that unparks on drop. +#[derive(Debug)] pub struct ActivateOnDrop { wrapped: T, address: Rc>,