diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 93e6a8867..23228f4e8 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -5,7 +5,7 @@ use timely::dataflow::operators::{Input, Exchange, Probe}; // use timely::dataflow::operators::capture::EventWriter; // use timely::dataflow::ScopeParent; -use timely::logging::TimelyEvent; +use timely::logging::{TimelyEvent, TimelyProgressEvent}; fn main() { // initializes and runs a timely dataflow. @@ -21,6 +21,26 @@ fn main() { data.iter().for_each(|x| println!("LOG1: {:?}", x)) ); + // Register timely progress logging. + // Less generally useful: intended for debugging advanced custom operators or timely + // internals. + worker.log_register().insert::("timely/progress", |_time, data| + data.iter().for_each(|x| { + println!("PROGRESS: {:?}", x); + let (_, _, ev) = x; + print!("PROGRESS: TYPED MESSAGES: "); + for (n, p, t, d) in ev.messages.iter() { + print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + } + println!(); + print!("PROGRESS: TYPED INTERNAL: "); + for (n, p, t, d) in ev.internal.iter() { + print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + } + println!(); + }) + ); + // create a new input, exchange data, and inspect its output worker.dataflow(|scope| { scope diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 824490376..98f255e07 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -12,6 +12,7 @@ use crate::progress::{Source, Target}; use crate::progress::timestamp::Refines; use crate::order::Product; use crate::logging::TimelyLogger as Logger; +use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::worker::{AsWorker, Config}; use super::{ScopeParent, Scope}; @@ -32,6 +33,8 @@ where pub parent: G, /// The log writer for this scope. pub logging: Option, + /// The progress log writer for this scope. + pub progress_logging: Option, } impl<'a, G, T> Child<'a, G, T> @@ -115,12 +118,13 @@ where let index = self.subgraph.borrow_mut().allocate_child_id(); let path = self.subgraph.borrow().path.clone(); - let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), name)); + let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), self.progress_logging.clone(), name)); let result = { let mut builder = Child { subgraph: &subscope, parent: self.clone(), logging: self.logging.clone(), + progress_logging: self.progress_logging.clone(), }; func(&mut builder) }; @@ -143,7 +147,8 @@ where Child { subgraph: self.subgraph, parent: self.parent.clone(), - logging: self.logging.clone() + logging: self.logging.clone(), + progress_logging: self.progress_logging.clone(), } } } diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ac0316304..0be7f98d3 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -6,6 +6,8 @@ pub type WorkerIdentifier = usize; pub type Logger = crate::logging_core::Logger; /// Logger for timely dataflow system events. pub type TimelyLogger = Logger; +/// Logger for timely dataflow progress events (the "timely/progress" log stream). +pub type TimelyProgressLogger = Logger; use std::time::Duration; use crate::dataflow::operators::capture::{Event, EventPusher}; @@ -70,9 +72,63 @@ pub struct ChannelsEvent { pub target: (usize, usize), } -#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] +/// Encapsulates Any and Debug for dynamically typed timestamps in logs +pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any { + /// Upcasts this `ProgressEventTimestamp` to `Any`. + /// + /// NOTE: This is required until https://github.com/rust-lang/rfcs/issues/2765 is fixed + /// + /// # Example + /// ```rust + /// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)]; + /// let ts: &timely::logging::ProgressEventTimestampVec = &ts; + /// for (n, p, t, d) in ts.iter() { + /// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d)); + /// } + /// println!(); + /// ``` + fn as_any(&self) -> &dyn std::any::Any; + + /// Returns the name of the concrete type of this object. + /// + /// # Note + /// + /// This is intended for diagnostic use. The exact contents and format of the + /// string returned are not specified, other than being a best-effort + /// description of the type. For example, amongst the strings + /// that `type_name::>()` might return are `"Option"` and + /// `"std::option::Option"`. + fn type_name(&self) -> &'static str; +} +impl ProgressEventTimestamp for T { + fn as_any(&self) -> &dyn std::any::Any { self } + + fn type_name(&self) -> &'static str { std::any::type_name::() } +} + +/// A vector of progress updates in logs +/// +/// This exists to support upcasting of the concrecte progress update vectors to +/// `dyn ProgressEventTimestamp`. Doing so at the vector granularity allows us to +/// use a single allocation for the entire vector (as opposed to a `Box` allocation +/// for each dynamically typed element). +pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any { + /// Iterate over the contents of the vector + fn iter<'a>(&'a self) -> Box+'a>; +} + +impl ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> { + fn iter<'a>(&'a self) -> Box+'a> { + Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| { + let t: &dyn ProgressEventTimestamp = t; + (n, p, t, d) + })) + } +} + +#[derive(Debug)] /// Send or receive of progress information. -pub struct ProgressEvent { +pub struct TimelyProgressEvent { /// `true` if the event is a send, and `false` if it is a receive. pub is_send: bool, /// Source worker index. @@ -84,9 +140,9 @@ pub struct ProgressEvent { /// Sequence of nested scope identifiers indicating the path from the root to this instance. pub addr: Vec, /// List of message updates, containing Target descriptor, timestamp as string, and delta. - pub messages: Vec<(usize, usize, String, i64)>, + pub messages: Box, /// List of capability updates, containing Source descriptor, timestamp as string, and delta. - pub internal: Vec<(usize, usize, String, i64)>, + pub internal: Box, } #[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] @@ -225,8 +281,6 @@ pub enum TimelyEvent { Operates(OperatesEvent), /// Channel creation. Channels(ChannelsEvent), - /// Progress message send or receive. - Progress(ProgressEvent), /// Progress propagation (reasoning). PushProgress(PushProgressEvent), /// Message send or receive. @@ -259,10 +313,6 @@ impl From for TimelyEvent { fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) } } -impl From for TimelyEvent { - fn from(v: ProgressEvent) -> TimelyEvent { TimelyEvent::Progress(v) } -} - impl From for TimelyEvent { fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) } } diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 81c5e71a9..5eead8b92 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -1,9 +1,10 @@ //! Broadcasts progress information among workers. use crate::progress::{ChangeBatch, Timestamp}; -use crate::progress::Location; +use crate::progress::{Location, Port}; use crate::communication::{Message, Push, Pull}; use crate::logging::TimelyLogger as Logger; +use crate::logging::TimelyProgressLogger as ProgressLogger; /// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)` pub type ProgressVec = Vec<((Location, T), i64)>; @@ -25,12 +26,12 @@ pub struct Progcaster { /// Communication channel identifier channel_identifier: usize, - logging: Option, + progress_logging: Option, } impl Progcaster { /// Creates a new `Progcaster` using a channel from the supplied worker. - pub fn new(worker: &mut A, path: &Vec, mut logging: Option) -> Progcaster { + pub fn new(worker: &mut A, path: &Vec, mut logging: Option, progress_logging: Option) -> Progcaster { let channel_identifier = worker.new_identifier(); let (pushers, puller) = worker.allocate(channel_identifier, &path[..]); @@ -48,7 +49,7 @@ impl Progcaster { counter: 0, addr, channel_identifier, - logging, + progress_logging, } } @@ -58,16 +59,35 @@ impl Progcaster { changes.compact(); if !changes.is_empty() { - self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent { - is_send: true, - source: self.source, - channel: self.channel_identifier, - seq_no: self.counter, - addr: self.addr.clone(), - // TODO: fill with additional data - messages: Vec::new(), - internal: Vec::new(), - })); + self.progress_logging.as_ref().map(|l| { + + // Pre-allocate enough space; we transfer ownership, so there is not + // an apportunity to re-use allocations (w/o changing the logging + // interface to accept references). + let mut messages = Box::new(Vec::with_capacity(changes.len())); + let mut internal = Box::new(Vec::with_capacity(changes.len())); + + for ((location, time), diff) in changes.iter() { + match location.port { + Port::Target(port) => { + messages.push((location.node, port, time.clone(), *diff)) + }, + Port::Source(port) => { + internal.push((location.node, port, time.clone(), *diff)) + } + } + } + + l.log(crate::logging::TimelyProgressEvent { + is_send: true, + source: self.source, + channel: self.channel_identifier, + seq_no: self.counter, + addr: self.addr.clone(), + messages, + internal, + }); + }); for pusher in self.pushers.iter_mut() { @@ -108,16 +128,36 @@ impl Progcaster { let addr = &mut self.addr; let channel = self.channel_identifier; - self.logging.as_ref().map(|l| l.log(crate::logging::ProgressEvent { - is_send: false, - source: source, - seq_no: counter, - channel, - addr: addr.clone(), - // TODO: fill with additional data - messages: Vec::new(), - internal: Vec::new(), - })); + + // See comments above about the relatively high cost of this logging, and our + // options for improving it if performance limits users who want other logging. + self.progress_logging.as_ref().map(|l| { + + let mut messages = Box::new(Vec::with_capacity(changes.len())); + let mut internal = Box::new(Vec::with_capacity(changes.len())); + + for ((location, time), diff) in recv_changes.iter() { + + match location.port { + Port::Target(port) => { + messages.push((location.node, port, time.clone(), *diff)) + }, + Port::Source(port) => { + internal.push((location.node, port, time.clone(), *diff)) + } + } + } + + l.log(crate::logging::TimelyProgressEvent { + is_send: false, + source: source, + seq_no: counter, + channel, + addr: addr.clone(), + messages: messages, + internal: internal, + }); + }); // We clone rather than drain to avoid deserialization. for &(ref update, delta) in recv_changes.iter() { diff --git a/timely/src/progress/change_batch.rs b/timely/src/progress/change_batch.rs index 348a098c0..09f4cb5ec 100644 --- a/timely/src/progress/change_batch.rs +++ b/timely/src/progress/change_batch.rs @@ -201,6 +201,27 @@ impl ChangeBatch { } } + /// Number of compacted updates. + /// + /// This method requires mutable access to `self` because it may need to compact the + /// representation to determine the number of actual updates. + /// + /// # Examples + /// + ///``` + /// use timely::progress::ChangeBatch; + /// + /// let mut batch = ChangeBatch::::new_from(17, 1); + /// batch.update(17, -1); + /// batch.update(14, -1); + /// assert_eq!(batch.len(), 1); + ///``` + #[inline] + pub fn len(&mut self) -> usize { + self.compact(); + self.updates.len() + } + /// Drains `self` into `other`. /// /// This method has similar a effect to calling `other.extend(self.drain())`, but has the diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index bf571d2da..2babf5d27 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -11,6 +11,7 @@ use std::collections::BinaryHeap; use std::cmp::Reverse; use crate::logging::TimelyLogger as Logger; +use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::scheduling::Schedule; use crate::scheduling::activate::Activations; @@ -63,6 +64,9 @@ where /// Logging handle logging: Option, + + /// Progress logging handle + progress_logging: Option, } impl SubgraphBuilder @@ -95,6 +99,7 @@ where index: usize, mut path: Vec, logging: Option, + progress_logging: Option, name: &str, ) -> SubgraphBuilder @@ -114,6 +119,7 @@ where input_messages: Vec::new(), output_capabilities: Vec::new(), logging, + progress_logging, } } @@ -169,7 +175,7 @@ where let (tracker, scope_summary) = builder.build(); - let progcaster = Progcaster::new(worker, &self.path, self.logging.clone()); + let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone()); let mut incomplete = vec![true; self.children.len()]; incomplete[0] = false; diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 93e80c596..62196b970 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -570,7 +570,8 @@ impl Worker { let dataflow_index = self.allocate_dataflow_index(); let identifier = self.new_identifier(); - let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), name); + let progress_logging = self.logging.borrow_mut().get("timely/progress"); + let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), progress_logging.clone(), name); let subscope = RefCell::new(subscope); let result = { @@ -578,6 +579,7 @@ impl Worker { subgraph: &subscope, parent: self.clone(), logging: logging.clone(), + progress_logging: progress_logging.clone(), }; func(&mut resources, &mut builder) };