From 90c35412ffc95fb34b7b8b4317d5f73ee60cafc4 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 8 Jan 2025 14:57:43 +0100 Subject: [PATCH] Define loggers in terms of container builders The logging infrastructure had some old assumptions built in, such as the container type and the size of buffers. With this change, we defer to the container builder pattern to re-use the existing infrastructure. This also allows us to switch the container type to something else if we'd like to do so. Signed-off-by: Moritz Hoffmann --- communication/Cargo.toml | 1 + .../src/allocator/zero_copy/initialize.rs | 9 +- communication/src/allocator/zero_copy/tcp.rs | 42 +++--- communication/src/initialize.rs | 6 +- communication/src/lib.rs | 8 ++ logging/Cargo.toml | 3 + logging/src/lib.rs | 126 ++++++------------ timely/examples/logging-send.rs | 17 +-- timely/src/dataflow/channels/pact.rs | 10 +- .../src/dataflow/operators/core/enterleave.rs | 6 +- .../src/dataflow/operators/core/partition.rs | 4 +- .../src/dataflow/operators/generic/handles.rs | 12 +- .../dataflow/operators/generic/notificator.rs | 10 +- timely/src/dataflow/scopes/child.rs | 1 + timely/src/dataflow/stream.rs | 18 +-- timely/src/logging.rs | 26 ++-- timely/src/progress/broadcast.rs | 30 +++-- timely/src/progress/reachability.rs | 48 +++---- timely/src/progress/subgraph.rs | 22 +-- timely/src/worker.rs | 36 ++--- 20 files changed, 221 insertions(+), 214 deletions(-) diff --git a/communication/Cargo.toml b/communication/Cargo.toml index 64cc72c20..e026f9cff 100644 --- a/communication/Cargo.toml +++ b/communication/Cargo.toml @@ -23,4 +23,5 @@ byteorder = "1.5" serde = { version = "1.0", features = ["derive"] } timely_bytes = { path = "../bytes", version = "0.12" } timely_logging = { path = "../logging", version = "0.13" } +timely_container = { path = "../container", version = "0.13.0" } crossbeam-channel = "0.5" diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index 6ca27aba6..962cc24e2 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -1,7 +1,6 @@ //! Network initialization. use std::sync::Arc; -// use crate::allocator::Process; use crate::allocator::process::ProcessBuilder; use crate::networking::create_sockets; use super::tcp::{send_loop, recv_loop}; @@ -30,8 +29,8 @@ impl Drop for CommsGuard { } } -use crate::logging::{CommunicationSetup, CommunicationEvent}; -use timely_logging::Logger; +use crate::logging::CommunicationSetup; +use crate::CommLogger; /// Initializes network connections pub fn initialize_networking( @@ -39,7 +38,7 @@ pub fn initialize_networking( my_index: usize, threads: usize, noisy: bool, - log_sender: ArcOption>+Send+Sync>, + log_sender: ArcOption+Send+Sync>, ) -> ::std::io::Result<(Vec>, CommsGuard)> { @@ -58,7 +57,7 @@ pub fn initialize_networking_from_sockets( mut sockets: Vec>, my_index: usize, threads: usize, - log_sender: ArcOption>+Send+Sync>, + log_sender: ArcOption+Send+Sync>, ) -> ::std::io::Result<(Vec>, CommsGuard)> { diff --git a/communication/src/allocator/zero_copy/tcp.rs b/communication/src/allocator/zero_copy/tcp.rs index eb776481c..e6fcd2e7e 100644 --- a/communication/src/allocator/zero_copy/tcp.rs +++ b/communication/src/allocator/zero_copy/tcp.rs @@ -3,16 +3,14 @@ use std::io::{self, Write}; use crossbeam_channel::{Sender, Receiver}; +use crate::CommLogger; +use crate::logging::{CommunicationEvent, MessageEvent, StateEvent}; use crate::networking::MessageHeader; use super::bytes_slab::BytesSlab; use super::bytes_exchange::MergeQueue; use super::stream::Stream; -use timely_logging::Logger; - -use crate::logging::{CommunicationEvent, MessageEvent, StateEvent}; - fn tcp_panic(context: &'static str, cause: io::Error) -> ! { // NOTE: some downstream crates sniff out "timely communication error:" from // the panic message. Avoid removing or rewording this message if possible. @@ -35,12 +33,14 @@ pub fn recv_loop( worker_offset: usize, process: usize, remote: usize, - mut logger: Option>) + logger: Option) where S: Stream, { // Log the receive thread's start. - logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true })); + if let Some(logger) = logger.as_ref() { + logger.log(CommunicationEvent::from(StateEvent { send: false, process, remote, start: true })); + } let mut targets: Vec = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect(); @@ -88,9 +88,9 @@ where let bytes = buffer.extract(peeled_bytes); // Record message receipt. - logger.as_mut().map(|logger| { - logger.log(MessageEvent { is_send: false, header, }); - }); + if let Some(logger) = logger.as_ref() { + logger.log(CommunicationEvent::from(MessageEvent { is_send: false, header })); + } if header.length > 0 { stageds[header.target - worker_offset].push(bytes); @@ -117,7 +117,9 @@ where } // Log the receive thread's end. - logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, })); + if let Some(logger) = logger.as_ref() { + logger.log(CommunicationEvent::from(StateEvent { send: false, process, remote, start: false, })); + } } /// Repeatedly sends messages into a TcpStream. @@ -134,11 +136,13 @@ pub fn send_loop( sources: Vec>, process: usize, remote: usize, - mut logger: Option>) + logger: Option) { // Log the send thread's start. - logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, })); + if let Some(logger) = logger.as_ref() { + logger.log(CommunicationEvent::from(StateEvent { send: true, process, remote, start: true, })); + } let mut sources: Vec = sources.into_iter().map(|x| { let buzzer = crate::buzzer::Buzzer::default(); @@ -176,13 +180,13 @@ pub fn send_loop( for mut bytes in stash.drain(..) { // Record message sends. - logger.as_mut().map(|logger| { + if let Some(logger) = logger.as_ref() { let mut offset = 0; while let Some(header) = MessageHeader::try_read(&mut bytes[offset..]) { - logger.log(MessageEvent { is_send: true, header, }); + logger.log(CommunicationEvent::from(MessageEvent { is_send: true, header, })); offset += header.required_bytes(); } - }); + }; writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e)); } @@ -202,8 +206,12 @@ pub fn send_loop( header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e)); writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e)); writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e)); - logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header })); + if let Some(logger) = logger.as_ref() { + logger.log(CommunicationEvent::from(MessageEvent { is_send: true, header })); + } // Log the send thread's end. - logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, })); + if let Some(logger) = logger.as_ref() { + logger.log(CommunicationEvent::from(StateEvent { send: true, process, remote, start: false, })); + } } diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index af78ba4fe..3b1892928 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -14,8 +14,8 @@ use crate::allocator::{AllocateBuilder, Process, Generic, GenericBuilder}; use crate::allocator::zero_copy::allocator_process::ProcessBuilder; use crate::allocator::zero_copy::initialize::initialize_networking; -use crate::logging::{CommunicationSetup, CommunicationEvent}; -use timely_logging::Logger; +use crate::logging::CommunicationSetup; +use crate::CommLogger; use std::fmt::{Debug, Formatter}; @@ -39,7 +39,7 @@ pub enum Config { /// Verbosely report connection process report: bool, /// Closure to create a new logger for a communication thread - log_fn: Arc Option> + Send + Sync>, + log_fn: Arc Option + Send + Sync>, } } diff --git a/communication/src/lib.rs b/communication/src/lib.rs index 3fe7e0542..b255968d9 100644 --- a/communication/src/lib.rs +++ b/communication/src/lib.rs @@ -97,6 +97,8 @@ #![forbid(missing_docs)] +use std::time::Duration; + pub mod allocator; pub mod networking; pub mod initialize; @@ -191,3 +193,9 @@ fn promise_futures(sends: usize, recvs: usize) -> (Vec>>, Vec>>; diff --git a/logging/Cargo.toml b/logging/Cargo.toml index a981412e9..94eda8b90 100644 --- a/logging/Cargo.toml +++ b/logging/Cargo.toml @@ -10,3 +10,6 @@ homepage = "https://github.com/TimelyDataflow/timely-dataflow" repository = "https://github.com/TimelyDataflow/timely-dataflow.git" keywords = ["timely", "dataflow", "logging"] license = "MIT" + +[dependencies] +timely_container = { version = "0.13.0", path = "../container" } diff --git a/logging/src/lib.rs b/logging/src/lib.rs index c21ec513d..704793c8f 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -6,6 +6,8 @@ use std::collections::HashMap; use std::time::{Instant, Duration}; use std::fmt::{self, Debug}; +use timely_container::{ContainerBuilder, PushInto}; + pub struct Registry { /// A map from names to typed loggers. map: HashMap, Box)>, @@ -25,21 +27,17 @@ impl Registry { /// seen (likely greater or equal to the timestamp of the last event). The end of a /// logging stream is indicated only by dropping the associated action, which can be /// accomplished with `remove` (or a call to insert, though this is not recommended). - pub fn insert)+'static>( + pub fn insert( &mut self, name: &str, action: F) -> Option> { - let logger = Logger::::new(self.time, Duration::default(), action); + let logger = Logger::::new(self.time, Duration::default(), action); self.insert_logger(name, logger) } /// Binds a log name to a logger. - pub fn insert_logger( - &mut self, - name: &str, - logger: Logger) -> Option> - { + pub fn insert_logger(&mut self, name: &str, logger: Logger) -> Option> { self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0) } @@ -54,10 +52,11 @@ impl Registry { } /// Retrieves a shared logger, if one has been inserted. - pub fn get(&self, name: &str) -> Option> { + #[inline] + pub fn get(&self, name: &str) -> Option> { self.map .get(name) - .and_then(|entry| entry.0.downcast_ref::>()) + .and_then(|entry| entry.0.downcast_ref::>()) .map(|x| (*x).clone()) } @@ -76,8 +75,8 @@ impl Registry { } impl Flush for Registry { - fn flush(&mut self) { - for value in self.map.values_mut() { + fn flush(&self) { + for value in self.map.values() { value.1.flush(); } } @@ -85,11 +84,11 @@ impl Flush for Registry { /// A buffering logger. #[derive(Debug)] -pub struct Logger { - inner: Rc)>>>, +pub struct Logger { + inner: Rc>>, } -impl Clone for Logger { +impl Clone for Logger { fn clone(&self) -> Self { Self { inner: self.inner.clone() @@ -97,28 +96,28 @@ impl Clone for Logger { } } -struct LoggerInner)> { +struct LoggerInner { /// common instant used for all loggers. time: Instant, /// offset to allow re-calibration. offset: Duration, - /// shared buffer of accumulated log events - buffer: Vec<(Duration, T)>, + /// container builder to produce buffers of accumulated log events + builder: CB, /// action to take on full log buffers. action: A, } -impl Logger { +impl Logger { /// Allocates a new shareable logger bound to a write destination. pub fn new(time: Instant, offset: Duration, action: F) -> Self where - F: FnMut(&Duration, &mut Vec<(Duration, T)>)+'static + F: FnMut(&Duration, &mut CB::Container)+'static { let inner = LoggerInner { time, offset, action, - buffer: Vec::with_capacity(LoggerInner::::buffer_capacity()), + builder: CB::default(), }; let inner = Rc::new(RefCell::new(inner)); Logger { inner } @@ -133,7 +132,7 @@ impl Logger { /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure /// that the `action` only sees one stream of events with increasing timestamps. This may /// have a cost that we don't entirely understand. - pub fn log>(&self, event: S) { + pub fn log(&self, event: S) where CB: PushInto<(Duration, S)> { self.log_many(Some(event)); } @@ -151,78 +150,57 @@ impl Logger { /// that the `action` only sees one stream of events with increasing timestamps. This may /// have a cost that we don't entirely understand. pub fn log_many(&self, events: I) - where I: IntoIterator, I::Item: Into + where I: IntoIterator, CB: PushInto<(Duration, I::Item)> { self.inner.borrow_mut().log_many(events) } /// Flushes logged messages and communicates the new minimal timestamp. - pub fn flush(&mut self) { + pub fn flush(&self) { ::flush(self); } } -impl)> LoggerInner { - - /// The upper limit for buffers to allocate, size in bytes. [Self::buffer_capacity] converts - /// this to size in elements. - const BUFFER_SIZE_BYTES: usize = 1 << 13; - - /// The maximum buffer capacity in elements. Returns a number between [Self::BUFFER_SIZE_BYTES] - /// and 1, inclusively. - // TODO: This fn is not const because it cannot depend on non-Sized generic parameters - fn buffer_capacity() -> usize { - let size = ::std::mem::size_of::<(Duration, T)>(); - if size == 0 { - Self::BUFFER_SIZE_BYTES - } else if size <= Self::BUFFER_SIZE_BYTES { - Self::BUFFER_SIZE_BYTES / size - } else { - 1 - } - } - +impl LoggerInner { pub fn log_many(&mut self, events: I) - where I: IntoIterator, I::Item: Into + where I: IntoIterator, CB: PushInto<(Duration, I::Item)>, { let elapsed = self.time.elapsed() + self.offset; for event in events { - self.buffer.push((elapsed, event.into())); - if self.buffer.len() == self.buffer.capacity() { - // Would call `self.flush()`, but for `RefCell` panic. - (self.action)(&elapsed, &mut self.buffer); - // The buffer clear could plausibly be removed, changing the semantics but allowing users - // to do in-place updates without forcing them to take ownership. - self.buffer.clear(); - let buffer_capacity = self.buffer.capacity(); - if buffer_capacity < Self::buffer_capacity() { - self.buffer.reserve((buffer_capacity+1).next_power_of_two()); - } + self.builder.push_into((elapsed, event.into())); + while let Some(container) = self.builder.extract() { + (self.action)(&elapsed, container); } } } + + pub fn flush(&mut self) { + // TODO: This previously called action with an empty buffer if there was no data. + let elapsed = self.time.elapsed() + self.offset; + while let Some(buffer) = self.builder.finish() { + (self.action)(&elapsed, buffer); + } + } } /// Flush on the *last* drop of a logger. -impl)> Drop for LoggerInner { +impl Drop for LoggerInner { fn drop(&mut self) { // Avoid sending out empty buffers just because of drops. - if !self.buffer.is_empty() { - self.flush(); - } + self.flush(); } } -impl)> Debug for LoggerInner +impl Debug for LoggerInner where - T: Debug, + CB: ContainerBuilder + Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("LoggerInner") .field("time", &self.time) .field("offset", &self.offset) .field("action", &"FnMut") - .field("buffer", &self.buffer) + .field("builder", &self.builder) .finish() } } @@ -230,29 +208,11 @@ where /// Types that can be flushed. trait Flush { /// Flushes buffered data. - fn flush(&mut self); + fn flush(&self); } -impl Flush for Logger { - fn flush(&mut self) { +impl Flush for Logger { + fn flush(&self) { self.inner.borrow_mut().flush() } } - -impl)> Flush for LoggerInner { - fn flush(&mut self) { - let elapsed = self.time.elapsed() + self.offset; - if !self.buffer.is_empty() { - (self.action)(&elapsed, &mut self.buffer); - self.buffer.clear(); - // NB: This does not re-allocate any specific size if the buffer has been - // taken. The intent is that the geometric growth in `log_many` should be - // enough to ensure that we do not send too many small buffers, nor do we - // allocate too large buffers when they are not needed. - } - else { - // Avoid swapping resources for empty buffers. - (self.action)(&elapsed, &mut Vec::new()); - } - } -} diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 537e93071..83be22f30 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -1,9 +1,9 @@ +use std::time::Duration; use timely::dataflow::{InputHandle, ProbeHandle}; use timely::dataflow::operators::{Input, Exchange, Probe}; -// use timely::dataflow::operators::capture::EventWriter; -// use timely::dataflow::ScopeParent; -use timely::logging::{TimelyEvent, TimelyProgressEvent}; +use timely::logging::{TimelyEventBuilder, TimelyProgressEventBuilder}; +use timely_container::CapacityContainerBuilder; fn main() { // initializes and runs a timely dataflow. @@ -15,14 +15,14 @@ fn main() { let mut probe = ProbeHandle::new(); // Register timely worker logging. - worker.log_register().insert::("timely", |_time, data| + worker.log_register().insert::("timely", |_time, data| data.iter().for_each(|x| println!("LOG1: {:?}", x)) ); // Register timely progress logging. // Less generally useful: intended for debugging advanced custom operators or timely // internals. - worker.log_register().insert::("timely/progress", |_time, data| + worker.log_register().insert::("timely/progress", |_time, data| data.iter().for_each(|x| { println!("PROGRESS: {:?}", x); let (_, ev) = x; @@ -48,7 +48,7 @@ fn main() { }); // Register timely worker logging. - worker.log_register().insert::("timely", |_time, data| + worker.log_register().insert::("timely", |_time, data| data.iter().for_each(|x| println!("LOG2: {:?}", x)) ); @@ -61,13 +61,14 @@ fn main() { }); // Register user-level logging. - worker.log_register().insert::<(),_>("input", |_time, data| + type MyBuilder = CapacityContainerBuilder>; + worker.log_register().insert::("input", |_time, data| for element in data.iter() { println!("Round tick at: {:?}", element.0); } ); - let input_logger = worker.log_register().get::<()>("input").expect("Input logger absent"); + let input_logger = worker.log_register().get::("input").expect("Input logger absent"); let timer = std::time::Instant::now(); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 55404015c..dee83aae7 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -15,7 +15,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::communication::{Push, Pull}; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; use crate::dataflow::channels::Message; -use crate::logging::{TimelyLogger as Logger, MessagesEvent}; +use crate::logging::{TimelyLogger as Logger, MessagesEvent, TimelyEvent}; use crate::progress::Timestamp; use crate::worker::AsWorker; use crate::Data; @@ -141,14 +141,14 @@ impl>> Push> for LogPusher< bundle.from = self.source; if let Some(logger) = self.logging.as_ref() { - logger.log(MessagesEvent { + logger.log(TimelyEvent::from(MessagesEvent { is_send: true, channel: self.channel, source: self.source, target: self.target, seq_no: self.counter - 1, length: bundle.data.len(), - }) + })); } } @@ -188,14 +188,14 @@ impl>> Pull> for LogPuller< let target = self.index; if let Some(logger) = self.logging.as_ref() { - logger.log(MessagesEvent { + logger.log(TimelyEvent::from(MessagesEvent { is_send: false, channel, source: bundle.from, target, seq_no: bundle.seq, length: bundle.data.len(), - }); + })); } } diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 16464a7f6..46a4193c2 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -21,7 +21,7 @@ use std::marker::PhantomData; -use crate::logging::{TimelyLogger, MessagesEvent}; +use crate::logging::{TimelyLogger, MessagesEvent, TimelyEvent}; use crate::progress::Timestamp; use crate::progress::timestamp::Refines; use crate::progress::{Source, Target}; @@ -225,8 +225,8 @@ where ..send_event }; - self.logger.log(send_event); - self.logger.log(recv_event); + self.logger.log(TimelyEvent::from(send_event)); + self.logger.log(TimelyEvent::from(recv_event)); self.counter += 1; } diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index c55c55d01..b8c7b7415 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -15,14 +15,14 @@ pub trait Partition { /// ``` /// use timely::dataflow::operators::ToStream; /// use timely::dataflow::operators::core::{Partition, Inspect}; + /// use timely_container::CapacityContainerBuilder; /// /// timely::example(|scope| { /// let streams = (0..10).to_stream(scope) - /// .partition(3, |x| (x % 3, x)); + /// .partition::>, _, _>(3, |x| (x % 3, x)); /// /// for (idx, stream) in streams.into_iter().enumerate() { /// stream - /// .container::>() /// .inspect(move |x| println!("seen {idx}: {x:?}")); /// } /// }); diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 47811a659..435457bb7 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -17,7 +17,7 @@ use crate::dataflow::channels::Message; use crate::communication::{Push, Pull}; use crate::{Container, Data}; use crate::container::{ContainerBuilder, CapacityContainerBuilder}; -use crate::logging::TimelyLogger as Logger; +use crate::logging::{TimelyEvent, TimelyLogger as Logger}; use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; @@ -82,11 +82,15 @@ impl>> InputHandleCore, &mut C)>(&mut self, mut logic: F) { - let mut logging = self.logging.take(); + let logging = self.logging.take(); while let Some((cap, data)) = self.next() { - logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true })); + if let Some(l) = logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::GuardedMessageEvent { is_start: true })); + }; logic(cap, data); - logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false })); + if let Some(l) = logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::GuardedMessageEvent { is_start: false })); + }; } self.logging = logging; } diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 905fdef4f..387aef408 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -1,7 +1,7 @@ use crate::progress::frontier::{AntichainRef, MutableAntichain}; use crate::progress::Timestamp; use crate::dataflow::operators::Capability; -use crate::logging::TimelyLogger as Logger; +use crate::logging::{TimelyEvent, TimelyLogger as Logger}; /// Tracks requests for notification and delivers available notifications. /// @@ -82,9 +82,13 @@ impl<'a, T: Timestamp> Notificator<'a, T> { #[inline] pub fn for_each, u64, &mut Notificator)>(&mut self, mut logic: F) { while let Some((cap, count)) = self.next() { - self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: true })); + if let Some(l) = self.logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::GuardedMessageEvent { is_start: true })); + }; logic(cap, count, self); - self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: false })); + if let Some(l) = self.logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::GuardedMessageEvent { is_start: false })); + }; } } } diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index f7a8a46e6..cfce28c3e 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -70,6 +70,7 @@ where fn peek_identifier(&self) -> usize { self.parent.peek_identifier() } + #[inline(always)] fn log_register(&self) -> ::std::cell::RefMut { self.parent.log_register() } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index d071cac45..ac96e70a6 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -12,8 +12,8 @@ use crate::dataflow::channels::pushers::tee::TeeHelper; use crate::dataflow::channels::Message; use std::fmt::{self, Debug}; use crate::Container; +use crate::logging::TimelyEvent; -// use dataflow::scopes::root::loggers::CHANNELS_Q; /// Abstraction of a stream of `C: Container` records timestamped with `S::Timestamp`. /// @@ -54,13 +54,15 @@ impl StreamCore { /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { - let mut logging = self.scope().logging(); - logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { - id: identifier, - scope_addr: self.scope.addr().to_vec(), - source: (self.name.node, self.name.port), - target: (target.node, target.port), - })); + let logging = self.scope().logging(); + if let Some(l) = logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::ChannelsEvent { + id: identifier, + scope_addr: self.scope.addr().to_vec(), + source: (self.name.node, self.name.port), + target: (target.node, target.port), + })) + }; self.scope.add_edge(self.name, target); self.ports.add_pusher(pusher); diff --git a/timely/src/logging.rs b/timely/src/logging.rs index 1b17c1c33..728f738c6 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -2,16 +2,24 @@ /// Type alias for logging timely events. pub type WorkerIdentifier = usize; -/// Logger type for worker-local logging. -pub type Logger = crate::logging_core::Logger; + +/// Container builder for timely dataflow system events. +pub type TimelyEventBuilder = CapacityContainerBuilder>; + /// Logger for timely dataflow system events. -pub type TimelyLogger = Logger; +pub type TimelyLogger = Logger; + +/// Container builder for timely dataflow progress events. +pub type TimelyProgressEventBuilder = CapacityContainerBuilder>; + /// Logger for timely dataflow progress events (the "timely/progress" log stream). -pub type TimelyProgressLogger = Logger; +pub type TimelyProgressLogger = Logger; use std::time::Duration; use columnar::Columnar; use serde::{Deserialize, Serialize}; +use timely_container::CapacityContainerBuilder; +use crate::logging_core::Logger; use crate::dataflow::operators::capture::{Event, EventPusher}; /// Logs events as a timely stream, with progress statements. @@ -83,7 +91,7 @@ pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any { /// # Example /// ```rust /// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)]; - /// let ts: &dyn timely::logging::ProgressEventTimestampVec = &ts; + /// let ts: &dyn timely::logging::ProgressEventTimestampVec = &ts.into_boxed_slice(); /// for (n, p, t, d) in ts.iter() { /// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d)); /// } @@ -119,7 +127,7 @@ pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any { fn iter<'a>(&'a self) -> Box+'a>; } -impl ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> { +impl ProgressEventTimestampVec for Box<[(usize, usize, T, i64)]> { fn iter<'a>(&'a self) -> Box+'a> { Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| { let t: &dyn ProgressEventTimestamp = t; @@ -128,7 +136,7 @@ impl ProgressEventTimestampVec for Vec<(usize, usize, } } -#[derive(Debug)] +#[derive(Debug, Clone)] /// Send or receive of progress information. pub struct TimelyProgressEvent { /// `true` if the event is a send, and `false` if it is a receive. @@ -142,9 +150,9 @@ pub struct TimelyProgressEvent { /// Sequence of nested scope identifiers indicating the path from the root to this instance. pub addr: Vec, /// List of message updates, containing Target descriptor, timestamp as string, and delta. - pub messages: Box, + pub messages: std::rc::Rc, /// List of capability updates, containing Source descriptor, timestamp as string, and delta. - pub internal: Box, + pub internal: std::rc::Rc, } #[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 1092929c0..0018c82ae 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -4,7 +4,7 @@ use std::rc::Rc; use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::{Location, Port}; use crate::communication::{Push, Pull}; -use crate::logging::TimelyLogger as Logger; +use crate::logging::{ProgressEventTimestampVec, TimelyEvent, TimelyLogger as Logger}; use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::Bincode; @@ -33,14 +33,16 @@ pub struct Progcaster { impl Progcaster { /// Creates a new `Progcaster` using a channel from the supplied worker. - pub fn new(worker: &mut A, addr: Rc<[usize]>, mut logging: Option, progress_logging: Option) -> Progcaster { + pub fn new(worker: &mut A, addr: Rc<[usize]>, logging: Option, progress_logging: Option) -> Progcaster { let channel_identifier = worker.new_identifier(); let (pushers, puller) = worker.allocate(channel_identifier, addr.clone()); - logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent { - identifier: channel_identifier, - kind: crate::logging::CommChannelKind::Progress, - })); + if let Some(logger) = logging.as_ref() { + logger.log(TimelyEvent::from(crate::logging::CommChannelsEvent { + identifier: channel_identifier, + kind: crate::logging::CommChannelKind::Progress, + })); + } let worker_index = worker.index(); Progcaster { to_push: None, @@ -65,8 +67,8 @@ impl Progcaster { // Pre-allocate enough space; we transfer ownership, so there is not // an opportunity to re-use allocations (w/o changing the logging // interface to accept references). - let mut messages = Box::new(Vec::with_capacity(changes.len())); - let mut internal = Box::new(Vec::with_capacity(changes.len())); + let mut messages = Vec::with_capacity(changes.len()); + let mut internal = Vec::with_capacity(changes.len()); for ((location, time), diff) in changes.iter() { match location.port { @@ -85,8 +87,8 @@ impl Progcaster { channel: self.channel_identifier, seq_no: self.counter, addr: self.addr.to_vec(), - messages, - internal, + messages: Rc::new(messages.into_boxed_slice()) as Rc, + internal: Rc::new(internal.into_boxed_slice()) as Rc, }); }); @@ -134,8 +136,8 @@ impl Progcaster { // options for improving it if performance limits users who want other logging. self.progress_logging.as_ref().map(|l| { - let mut messages = Box::new(Vec::with_capacity(changes.len())); - let mut internal = Box::new(Vec::with_capacity(changes.len())); + let mut messages = Vec::with_capacity(changes.len()); + let mut internal = Vec::with_capacity(changes.len()); for ((location, time), diff) in recv_changes.iter() { @@ -155,8 +157,8 @@ impl Progcaster { seq_no: counter, channel, addr: addr.to_vec(), - messages, - internal, + messages: Rc::new(messages.into_boxed_slice()) as Rc, + internal: Rc::new(internal.into_boxed_slice()) as Rc, }); }); diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 1f0c45adf..c163c77f1 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -74,7 +74,6 @@ use std::collections::{BinaryHeap, HashMap, VecDeque}; use std::cmp::Reverse; - use crate::progress::Timestamp; use crate::progress::{Source, Target}; use crate::progress::ChangeBatch; @@ -577,7 +576,7 @@ impl Tracker { .collect::>(); if !target_changes.is_empty() { - logger.log_target_updates(Box::new(target_changes)); + logger.log_target_updates(target_changes.into_boxed_slice()); } let source_changes = @@ -587,7 +586,7 @@ impl Tracker { .collect::>(); if !source_changes.is_empty() { - logger.log_source_updates(Box::new(source_changes)); + logger.log_source_updates(source_changes.into_boxed_slice()); } } @@ -832,41 +831,40 @@ fn summarize_outputs( /// Logging types for reachability tracking events. pub mod logging { use std::rc::Rc; - use crate::logging::{Logger, ProgressEventTimestampVec}; + use std::time::Duration; + use timely_container::CapacityContainerBuilder; + use crate::logging_core::Logger; + use crate::logging::ProgressEventTimestampVec; + + /// A container builder for tracker events. + pub type TrackerEventBuilder = CapacityContainerBuilder>; /// A logger with additional identifying information about the tracker. pub struct TrackerLogger { path: Rc<[usize]>, - logger: Logger, + logger: Logger, } impl TrackerLogger { /// Create a new tracker logger from its fields. - pub fn new(path: Rc<[usize]>, logger: Logger) -> Self { + pub fn new(path: Rc<[usize]>, logger: Logger) -> Self { Self { path, logger } } /// Log source update events with additional identifying information. - pub fn log_source_updates(&mut self, updates: Box) { - self.logger.log({ - SourceUpdate { - tracker_id: self.path.to_vec(), - updates, - } - }) + pub fn log_source_updates(&mut self, updates: T) { + let updates = Rc::new(updates) as Rc; + self.logger.log(TrackerEvent::from(SourceUpdate { tracker_id: self.path.to_vec(), updates})) } /// Log target update events with additional identifying information. - pub fn log_target_updates(&mut self, updates: Box) { - self.logger.log({ - TargetUpdate { - tracker_id: self.path.to_vec(), - updates, - } - }) + pub fn log_target_updates(&mut self, updates: T) { + let updates = Rc::new(updates) as Rc; + self.logger.log(TrackerEvent::from(TargetUpdate { tracker_id: self.path.to_vec(), updates }) ) } } /// Events that the tracker may record. + #[derive(Debug, Clone)] pub enum TrackerEvent { /// Updates made at a source of data. SourceUpdate(SourceUpdate), @@ -875,19 +873,21 @@ pub mod logging { } /// An update made at a source of data. + #[derive(Debug, Clone)] pub struct SourceUpdate { /// An identifier for the tracker. pub tracker_id: Vec, /// Updates themselves, as `(node, port, time, diff)`. - pub updates: Box, + pub updates: Rc, } /// An update made at a target of data. + #[derive(Debug, Clone)] pub struct TargetUpdate { /// An identifier for the tracker. pub tracker_id: Vec, /// Updates themselves, as `(node, port, time, diff)`. - pub updates: Box, + pub updates: Rc, } impl From for TrackerEvent { @@ -924,7 +924,7 @@ impl Drop for Tracker { }) .collect::>(); if !target_changes.is_empty() { - logger.log_target_updates(Box::new(target_changes)); + logger.log_target_updates(target_changes.into_boxed_slice()); } let source_changes = per_operator.sources @@ -937,7 +937,7 @@ impl Drop for Tracker { }) .collect::>(); if !source_changes.is_empty() { - logger.log_source_updates(Box::new(source_changes)); + logger.log_source_updates(source_changes.into_boxed_slice()); } } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 1fdacac1e..e0eae68ab 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -10,7 +10,7 @@ use std::cell::RefCell; use std::collections::BinaryHeap; use std::cmp::Reverse; -use crate::logging::TimelyLogger as Logger; +use crate::logging::{TimelyEvent, TimelyLogger as Logger}; use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::scheduling::Schedule; @@ -135,11 +135,11 @@ where child_path.extend_from_slice(&self.path[..]); child_path.push(index); - l.log(crate::logging::OperatesEvent { + l.log(TimelyEvent::from(crate::logging::OperatesEvent { id: identifier, addr: child_path, name: child.name().to_owned(), - }); + })); } self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone())) } @@ -179,7 +179,7 @@ where let path = self.path.clone(); let reachability_logging = worker.log_register() - .get::("timely/reachability") + .get::("timely/reachability") .map(|logger| reachability::logging::TrackerLogger::new(path, logger)); let (tracker, scope_summary) = builder.build(reachability_logging); @@ -685,23 +685,23 @@ impl PerOperatorState { if let Some(ref mut operator) = self.operator { // Perhaps log information about the start of the schedule call. - if let Some(l) = self.logging.as_mut() { + if let Some(l) = self.logging.as_ref() { // FIXME: There is no contract that the operator must consume frontier changes. // This report could be spurious. // TODO: Perhaps fold this in to `ScheduleEvent::start()` as a "reason"? let frontiers = &mut self.shared_progress.borrow_mut().frontiers[..]; if frontiers.iter_mut().any(|buffer| !buffer.is_empty()) { - l.log(crate::logging::PushProgressEvent { op_id: self.id }) + l.log(TimelyEvent::from(crate::logging::PushProgressEvent { op_id: self.id })); } - l.log(crate::logging::ScheduleEvent::start(self.id)); + l.log(TimelyEvent::from(crate::logging::ScheduleEvent::start(self.id))); } let incomplete = operator.schedule(); // Perhaps log information about the stop of the schedule call. - if let Some(l) = self.logging.as_mut() { - l.log(crate::logging::ScheduleEvent::stop(self.id)); + if let Some(l) = self.logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::ScheduleEvent::stop(self.id))); } incomplete @@ -723,8 +723,8 @@ impl PerOperatorState { fn shut_down(&mut self) { if self.operator.is_some() { - if let Some(l) = self.logging.as_mut() { - l.log(crate::logging::ShutdownEvent{ id: self.id }); + if let Some(l) = self.logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::ShutdownEvent{ id: self.id })); } self.operator = None; } diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 51fde209a..927aa2299 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -16,7 +16,7 @@ use crate::progress::timestamp::{Refines}; use crate::progress::SubgraphBuilder; use crate::progress::operate::Operate; use crate::dataflow::scopes::Child; -use crate::logging::TimelyLogger; +use crate::logging::{TimelyEvent, TimelyLogger}; /// Different ways in which timely's progress tracking can work. /// @@ -200,6 +200,7 @@ pub trait AsWorker : Scheduler { /// Provides access to named logging streams. fn log_register(&self) -> ::std::cell::RefMut; /// Provides access to the timely logging stream. + #[inline(always)] fn logging(&self) -> Option { self.log_register().get("timely") } } @@ -245,6 +246,7 @@ impl AsWorker for Worker { fn new_identifier(&mut self) -> usize { self.new_identifier() } fn peek_identifier(&self) -> usize { self.peek_identifier() } + #[inline(always)] fn log_register(&self) -> RefMut { self.log_register() } @@ -365,10 +367,11 @@ impl Worker { }; if delay != Some(Duration::new(0,0)) { + let logger = self.logging(); // Log parking and flush log. - if let Some(l) = self.logging().as_mut() { - l.log(crate::logging::ParkEvent::park(delay)); + if let Some(l) = logger.as_ref() { + l.log(TimelyEvent::from(crate::logging::ParkEvent::park(delay))); l.flush(); } @@ -377,7 +380,9 @@ impl Worker { .await_events(delay); // Log return from unpark. - self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark())); + if let Some(logger) = logger { + logger.log(TimelyEvent::from(crate::logging::ParkEvent::unpark())); + } } else { // Schedule active dataflows. @@ -536,11 +541,12 @@ impl Worker { /// timely::execute_from_args(::std::env::args(), |worker| { /// /// worker.log_register() - /// .insert::("timely", |time, data| + /// .insert::("timely", |time, data| /// println!("{:?}\t{:?}", time, data) /// ); /// }); /// ``` + #[inline(always)] pub fn log_register(&self) -> ::std::cell::RefMut { self.logging.borrow_mut() } @@ -618,7 +624,7 @@ impl Worker { /// ); /// }); /// ``` - pub fn dataflow_core(&mut self, name: &str, mut logging: Option, mut resources: V, func: F) -> R + pub fn dataflow_core(&mut self, name: &str, logging: Option, mut resources: V, func: F) -> R where T: Refines<()>, F: FnOnce(&mut V, &mut Child)->R, @@ -644,12 +650,12 @@ impl Worker { let mut operator = subscope.into_inner().build(self); - if let Some(l) = logging.as_mut() { - l.log(crate::logging::OperatesEvent { + if let Some(l) = logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::OperatesEvent { id: identifier, addr: operator.path().to_vec(), name: operator.name().to_string(), - }); + })); l.flush(); } @@ -748,8 +754,8 @@ impl Wrapper { fn step(&mut self) -> bool { // Perhaps log information about the start of the schedule call. - if let Some(l) = self.logging.as_mut() { - l.log(crate::logging::ScheduleEvent::start(self.identifier)); + if let Some(l) = self.logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::ScheduleEvent::start(self.identifier))); } let incomplete = self.operate.as_mut().map(|op| op.schedule()).unwrap_or(false); @@ -759,8 +765,8 @@ impl Wrapper { } // Perhaps log information about the stop of the schedule call. - if let Some(l) = self.logging.as_mut() { - l.log(crate::logging::ScheduleEvent::stop(self.identifier)); + if let Some(l) = self.logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::ScheduleEvent::stop(self.identifier))); } incomplete @@ -769,8 +775,8 @@ impl Wrapper { impl Drop for Wrapper { fn drop(&mut self) { - if let Some(l) = self.logging.as_mut() { - l.log(crate::logging::ShutdownEvent { id: self.identifier }); + if let Some(l) = self.logging.as_ref() { + l.log(TimelyEvent::from(crate::logging::ShutdownEvent { id: self.identifier })); } // ensure drop order self.operate = None;