Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More debug implementations #387

Merged
merged 1 commit into from
Jun 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::cell::RefCell;
use std::any::Any;
use std::collections::HashMap;
use std::time::{Instant, Duration};
use std::fmt::{self, Debug};

pub struct Registry<Id> {
/// A worker-specific identifier.
Expand Down Expand Up @@ -31,7 +32,7 @@ impl<Id: Clone+'static> Registry<Id> {
name: &str,
action: F) -> Option<Box<dyn Any>>
{
let logger = Logger::<T, Id>::new(self.time.clone(), Duration::default(), self.id.clone(), action);
let logger = Logger::<T, Id>::new(self.time, Duration::default(), self.id.clone(), action);
self.insert_logger(name, logger)
}

Expand Down Expand Up @@ -99,7 +100,7 @@ impl<T, E: Clone> Clone for Logger<T, E> {
Logger {
id: self.id.clone(),
time: self.time,
offset: self.offset.clone(),
offset: self.offset,
action: self.action.clone(),
buffer: self.buffer.clone(),
}
Expand Down Expand Up @@ -153,7 +154,7 @@ impl<T, E: Clone> Logger<T, E> {
let mut buffer = self.buffer.borrow_mut();
let elapsed = self.time.elapsed() + self.offset;
for event in events {
buffer.push((elapsed.clone(), self.id.clone(), event.into()));
buffer.push((elapsed, self.id.clone(), event.into()));
if buffer.len() == buffer.capacity() {
// Would call `self.flush()`, but for `RefCell` panic.
let mut action = self.action.borrow_mut();
Expand Down Expand Up @@ -186,6 +187,22 @@ impl<T, E> Drop for Logger<T, E> {
}
}

impl<T, E> Debug for Logger<T, E>
where
E: Debug,
T: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Logger")
.field("id", &self.id)
.field("time", &self.time)
.field("offset", &self.offset)
.field("action", &Rc::as_ptr(&self.action))
.field("buffer", &self.buffer)
.finish()
}
}

/// Types that can be flushed.
trait Flush {
/// Flushes buffered data.
Expand Down
71 changes: 46 additions & 25 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! The only requirement of a pact is that it not alter the number of `D` records at each time `T`.
//! The progress tracking logic assumes that this number is independent of the pact used.

use std::marker::PhantomData;
use std::{fmt::{self, Debug}, marker::PhantomData};

use crate::communication::{Push, Pull, Data};
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
Expand All @@ -16,7 +16,7 @@ use crate::worker::AsWorker;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use super::{Bundle, Message};

use crate::logging::TimelyLogger as Logger;
use crate::logging::{TimelyLogger as Logger, MessagesEvent};

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T: 'static, D: 'static> {
Expand All @@ -29,7 +29,9 @@ pub trait ParallelizationContract<T: 'static, D: 'static> {
}

/// A direct connection
#[derive(Debug)]
pub struct Pipeline;

impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<Bundle<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<Bundle<T, D>>>;
Expand All @@ -43,8 +45,9 @@ impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
}

/// An exchange between multiple observers by data
pub struct Exchange<D, F: FnMut(&D)->u64+'static> { hash_func: F, phantom: PhantomData<D>, }
impl<D, F: FnMut(&D)->u64> Exchange<D, F> {
pub struct Exchange<D, F> { hash_func: F, phantom: PhantomData<D> }

impl<D, F: FnMut(&D)->u64+'static> Exchange<D, F> {
/// Allocates a new `Exchange` pact from a distribution function.
pub fn new(func: F) -> Exchange<D, F> {
Exchange {
Expand All @@ -67,16 +70,24 @@ impl<T: Eq+Data+Clone, D: Data+Clone, F: FnMut(&D)->u64+'static> Parallelization
}
}

impl<D, F> Debug for Exchange<D, F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Exchange").finish()
}
}

/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPusher<T, D, P: Push<Bundle<T, D>>> {
pusher: P,
channel: usize,
counter: usize,
source: usize,
target: usize,
phantom: ::std::marker::PhantomData<(T, D)>,
phantom: PhantomData<(T, D)>,
logging: Option<Logger>,
}

impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
/// Allocates a new pusher.
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
Expand All @@ -86,7 +97,7 @@ impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
counter: 0,
source,
target,
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
logging,
}
}
Expand All @@ -97,42 +108,48 @@ impl<T, D, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T, D, P> {
fn push(&mut self, pair: &mut Option<Bundle<T, D>>) {
if let Some(bundle) = pair {
self.counter += 1;

// Stamp the sequence number and source.
// FIXME: Awkward moment/logic.
if let Some(message) = bundle.if_mut() {
message.seq = self.counter-1;
message.seq = self.counter - 1;
message.from = self.source;
}

self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent {
is_send: true,
channel: self.channel,
source: self.source,
target: self.target,
seq_no: self.counter-1,
length: bundle.data.len(),
}));
if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
is_send: true,
channel: self.channel,
source: self.source,
target: self.target,
seq_no: self.counter - 1,
length: bundle.data.len(),
})
}
}

self.pusher.push(pair);
}
}

/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPuller<T, D, P: Pull<Bundle<T, D>>> {
puller: P,
channel: usize,
index: usize,
phantom: ::std::marker::PhantomData<(T, D)>,
phantom: PhantomData<(T, D)>,
logging: Option<Logger>,
}

impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
/// Allocates a new `Puller`.
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
puller,
channel,
index,
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
logging,
}
}
Expand All @@ -145,15 +162,19 @@ impl<T, D, P: Pull<Bundle<T, D>>> Pull<Bundle<T, D>> for LogPuller<T, D, P> {
if let Some(bundle) = result {
let channel = self.channel;
let target = self.index;
self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent {
is_send: false,
channel,
source: bundle.from,
target,
seq_no: bundle.seq,
length: bundle.data.len(),
}));

if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
is_send: false,
channel,
source: bundle.from,
target,
seq_no: bundle.seq,
length: bundle.data.len(),
});
}
}

result
}
}
1 change: 1 addition & 0 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::communication::Push;
///
/// The `Buffer` type should be used by calling `session` with a time, which checks whether
/// data must be flushed and creates a `Session` object which allows sending at the given time.
#[derive(Debug)]
pub struct Buffer<T, D, P: Push<Bundle<T, D>>> {
time: Option<T>, // the currently open time, if it is open
buffer: Vec<D>, // a buffer for records, to send at self.time
Expand Down
6 changes: 4 additions & 2 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! A wrapper which counts the number of records pushed past and updates a shared count map.

use std::marker::PhantomData;
use std::rc::Rc;
use std::cell::RefCell;

Expand All @@ -8,10 +9,11 @@ use crate::dataflow::channels::Bundle;
use crate::communication::Push;

/// A wrapper which updates shared `produced` based on the number of records pushed.
#[derive(Debug)]
pub struct Counter<T: Ord, D, P: Push<Bundle<T, D>>> {
pushee: P,
produced: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<D>,
phantom: PhantomData<D>,
}

impl<T, D, P> Push<Bundle<T, D>> for Counter<T, D, P> where T : Ord+Clone+'static, P: Push<Bundle<T, D>> {
Expand All @@ -34,7 +36,7 @@ impl<T, D, P: Push<Bundle<T, D>>> Counter<T, D, P> where T : Ord+Clone+'static {
Counter {
pushee,
produced: Rc::new(RefCell::new(ChangeBatch::new())),
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
}
}
/// A references to shared changes in counts, for cloning or draining.
Expand Down
45 changes: 40 additions & 5 deletions timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
//! A `Push` implementor with a list of `Box<Push>` to forward pushes to.

use std::rc::Rc;
use std::cell::RefCell;
use std::fmt::{self, Debug};
use std::rc::Rc;

use crate::Data;
use crate::dataflow::channels::{Bundle, Message};
use crate::Data;

use crate::communication::Push;

type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>;

/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
pub struct Tee<T: 'static, D: 'static> {
buffer: Vec<D>,
shared: Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>,
shared: PushList<T, D>,
}

impl<T: Data, D: Data> Push<Bundle<T, D>> for Tee<T, D> {
Expand Down Expand Up @@ -58,9 +61,27 @@ impl<T, D> Clone for Tee<T, D> {
}
}

impl<T, D> Debug for Tee<T, D>
where
D: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("Tee");
debug.field("buffer", &self.buffer);

if let Ok(shared) = self.shared.try_borrow() {
debug.field("shared", &format!("{} pushers", shared.len()));
} else {
debug.field("shared", &"...");
}

debug.finish()
}
}

/// A shared list of `Box<Push>` used to add `Push` implementors.
pub struct TeeHelper<T, D> {
shared: Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>
shared: PushList<T, D>,
}

impl<T, D> TeeHelper<T, D> {
Expand All @@ -73,7 +94,21 @@ impl<T, D> TeeHelper<T, D> {
impl<T, D> Clone for TeeHelper<T, D> {
fn clone(&self) -> Self {
TeeHelper {
shared: self.shared.clone()
shared: self.shared.clone(),
}
}
}

impl<T, D> Debug for TeeHelper<T, D> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("TeeHelper");

if let Ok(shared) = self.shared.try_borrow() {
debug.field("shared", &format!("{} pushers", shared.len()));
} else {
debug.field("shared", &"...");
}

debug.finish()
}
}
6 changes: 6 additions & 0 deletions timely/src/dataflow/operators/capture/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ pub mod link {
}
}

impl<T, D> Default for EventLink<T, D> {
fn default() -> Self {
Self::new()
}
}

#[test]
fn avoid_stack_overflow_in_drop() {
let mut event1 = Rc::new(EventLink::<(),()>::new());
Expand Down
1 change: 1 addition & 0 deletions timely/src/dataflow/operators/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl<G: Scope, D: Data> ConnectLoop<G, D> for Stream<G, D> {
}

/// A handle used to bind the source of a loop variable.
#[derive(Debug)]
pub struct Handle<G: Scope, D: Data> {
builder: OperatorBuilder<G>,
summary: <G::Timestamp as Timestamp>::Summary,
Expand Down
2 changes: 2 additions & 0 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::dataflow::channels::pact::ParallelizationContract;
use crate::dataflow::operators::generic::operator_info::OperatorInfo;

/// Contains type-free information about the operator properties.
#[derive(Debug)]
pub struct OperatorShape {
name: String, // A meaningful name for the operator.
notify: bool, // Does the operator require progress notifications.
Expand Down Expand Up @@ -53,6 +54,7 @@ impl OperatorShape {
}

/// Builds operators with generic shape.
#[derive(Debug)]
pub struct OperatorBuilder<G: Scope> {
scope: G,
index: usize,
Expand Down
1 change: 1 addition & 0 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::logging::TimelyLogger as Logger;
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;

/// Builds operators with generic shape.
#[derive(Debug)]
pub struct OperatorBuilder<G: Scope> {
builder: OperatorBuilderRaw<G>,
frontier: Vec<MutableAntichain<G::Timestamp>>,
Expand Down
1 change: 1 addition & 0 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ pub fn new_input_handle<T: Timestamp, D, P: Pull<Bundle<T, D>>>(pull_counter: Pu
/// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other
/// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the
/// pusher is flushed (via the `cease` method) once it is no longer used.
#[derive(Debug)]
pub struct OutputWrapper<T: Timestamp, D, P: Push<Bundle<T, D>>> {
push_buffer: Buffer<T, D, PushCounter<T, D, P>>,
internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
Expand Down
Loading