Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Jun 14, 2024
1 parent f369204 commit 141ebee
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 74 deletions.
4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn initialize_networking(
my_index: usize,
threads: usize,
noisy: bool,
log_sender: Arc<Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>>,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
Expand All @@ -58,7 +58,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
mut sockets: Vec<Option<S>>,
my_index: usize,
threads: usize,
log_sender: Arc<Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>>,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
Expand Down
2 changes: 1 addition & 1 deletion communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub enum Config {
/// Verbosely report connection process
report: bool,
/// Closure to create a new logger for a communication thread
log_fn: Arc<Box<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent, CommunicationSetup>> + Send + Sync>>,
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent, CommunicationSetup>> + Send + Sync>,
}
}

Expand Down
75 changes: 4 additions & 71 deletions timely/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,80 +219,13 @@ where
/// // the extracted data should have data (0..10) thrice at timestamp 0.
/// assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());
/// ```
///
/// ### Communication logging
///
/// For multi-process (cluster) configurations, this functions installs a custom log
/// function, replacing any previous function. The function connects to the host in
/// the environment symbol `TIMELY_COMM_LOG_ADDR`.
///
/// ### Timely logging
///
/// If the environment symbol `TIMELY_WORKER_LOG_ADDR` is set, each worker will try to
/// connect to this address to send its worker logs.
pub fn execute<T, F>(
mut config: Config,
func: F
) -> Result<WorkerGuards<T>,String>
pub fn execute<T, F>(config: Config, func: F) -> Result<WorkerGuards<T>,String>
where
T:Send+'static,
F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static {

if let CommunicationConfig::Cluster { ref mut log_fn, .. } = config.communication {

*log_fn = std::sync::Arc::new(Box::new(|events_setup| {

let mut result = None;
if let Ok(addr) = ::std::env::var("TIMELY_COMM_LOG_ADDR") {

use ::std::net::TcpStream;
use crate::logging::BatchLogger;
use crate::dataflow::operators::capture::EventWriter;

eprintln!("enabled COMM logging to {}", addr);

if let Ok(stream) = TcpStream::connect(&addr) {
let writer = EventWriter::new(stream);
let mut logger = BatchLogger::new(writer);
result = Some(crate::logging_core::Logger::new(
::std::time::Instant::now(),
::std::time::Duration::default(),
events_setup,
move |time, data| logger.publish_batch(time, data)
));
}
else {
panic!("Could not connect to communication log address: {:?}", addr);
}
}
result
}));
}

F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static,
{
let (allocators, other) = config.communication.try_build()?;

execute_from(allocators, other, config.worker, move |worker| {
// If an environment variable is set, use it as the default timely logging.
if let Ok(addr) = ::std::env::var("TIMELY_WORKER_LOG_ADDR") {

use ::std::net::TcpStream;
use crate::logging::{BatchLogger, TimelyEvent};
use crate::dataflow::operators::capture::EventWriter;

if let Ok(stream) = TcpStream::connect(&addr) {
let writer = EventWriter::new(stream);
let mut logger = BatchLogger::new(writer);
worker.log_register()
.insert::<TimelyEvent,_>("timely", move |time, data|
logger.publish_batch(time, data)
);
}
else {
panic!("Could not connect logging stream to: {:?}", addr);
}
}
func(worker)
})
execute_from(allocators, other, config.worker, func)
}

/// Executes a timely dataflow from supplied arguments and per-communicator logic.
Expand Down

0 comments on commit 141ebee

Please sign in to comment.