Skip to content

Commit

Permalink
Define loggers in terms of container builders
Browse files Browse the repository at this point in the history
The logging infrastructure had some old assumptions built in, such as the
container type and the size of buffers. With this change, we defer to the
container builder pattern to re-use the existing infrastructure. This also
allows us to switch the container type to something else if we'd like to do
so.

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Jan 8, 2025
1 parent 486c988 commit 90c3541
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 214 deletions.
1 change: 1 addition & 0 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ byteorder = "1.5"
serde = { version = "1.0", features = ["derive"] }
timely_bytes = { path = "../bytes", version = "0.12" }
timely_logging = { path = "../logging", version = "0.13" }
timely_container = { path = "../container", version = "0.13.0" }
crossbeam-channel = "0.5"
9 changes: 4 additions & 5 deletions communication/src/allocator/zero_copy/initialize.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Network initialization.
use std::sync::Arc;
// use crate::allocator::Process;
use crate::allocator::process::ProcessBuilder;
use crate::networking::create_sockets;
use super::tcp::{send_loop, recv_loop};
Expand Down Expand Up @@ -30,16 +29,16 @@ impl Drop for CommsGuard {
}
}

use crate::logging::{CommunicationSetup, CommunicationEvent};
use timely_logging::Logger;
use crate::logging::CommunicationSetup;
use crate::CommLogger;

/// Initializes network connections
pub fn initialize_networking(
addresses: Vec<String>,
my_index: usize,
threads: usize,
noisy: bool,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<CommLogger>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
Expand All @@ -58,7 +57,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
mut sockets: Vec<Option<S>>,
my_index: usize,
threads: usize,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<CommLogger>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
Expand Down
42 changes: 25 additions & 17 deletions communication/src/allocator/zero_copy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
use std::io::{self, Write};
use crossbeam_channel::{Sender, Receiver};

use crate::CommLogger;
use crate::logging::{CommunicationEvent, MessageEvent, StateEvent};
use crate::networking::MessageHeader;

use super::bytes_slab::BytesSlab;
use super::bytes_exchange::MergeQueue;
use super::stream::Stream;

use timely_logging::Logger;

use crate::logging::{CommunicationEvent, MessageEvent, StateEvent};

fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
// NOTE: some downstream crates sniff out "timely communication error:" from
// the panic message. Avoid removing or rewording this message if possible.
Expand All @@ -35,12 +33,14 @@ pub fn recv_loop<S>(
worker_offset: usize,
process: usize,
remote: usize,
mut logger: Option<Logger<CommunicationEvent>>)
logger: Option<CommLogger>)
where
S: Stream,
{
// Log the receive thread's start.
logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true }));
if let Some(logger) = logger.as_ref() {
logger.log(CommunicationEvent::from(StateEvent { send: false, process, remote, start: true }));
}

let mut targets: Vec<MergeQueue> = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect();

Expand Down Expand Up @@ -88,9 +88,9 @@ where
let bytes = buffer.extract(peeled_bytes);

// Record message receipt.
logger.as_mut().map(|logger| {
logger.log(MessageEvent { is_send: false, header, });
});
if let Some(logger) = logger.as_ref() {
logger.log(CommunicationEvent::from(MessageEvent { is_send: false, header }));
}

if header.length > 0 {
stageds[header.target - worker_offset].push(bytes);
Expand All @@ -117,7 +117,9 @@ where
}

// Log the receive thread's end.
logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, }));
if let Some(logger) = logger.as_ref() {
logger.log(CommunicationEvent::from(StateEvent { send: false, process, remote, start: false, }));
}
}

/// Repeatedly sends messages into a TcpStream.
Expand All @@ -134,11 +136,13 @@ pub fn send_loop<S: Stream>(
sources: Vec<Sender<MergeQueue>>,
process: usize,
remote: usize,
mut logger: Option<Logger<CommunicationEvent>>)
logger: Option<CommLogger>)
{

// Log the send thread's start.
logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));
if let Some(logger) = logger.as_ref() {
logger.log(CommunicationEvent::from(StateEvent { send: true, process, remote, start: true, }));
}

let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
let buzzer = crate::buzzer::Buzzer::default();
Expand Down Expand Up @@ -176,13 +180,13 @@ pub fn send_loop<S: Stream>(
for mut bytes in stash.drain(..) {

// Record message sends.
logger.as_mut().map(|logger| {
if let Some(logger) = logger.as_ref() {
let mut offset = 0;
while let Some(header) = MessageHeader::try_read(&mut bytes[offset..]) {
logger.log(MessageEvent { is_send: true, header, });
logger.log(CommunicationEvent::from(MessageEvent { is_send: true, header, }));
offset += header.required_bytes();
}
});
};

writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e));
}
Expand All @@ -202,8 +206,12 @@ pub fn send_loop<S: Stream>(
header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e));
writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e));
logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header }));
if let Some(logger) = logger.as_ref() {
logger.log(CommunicationEvent::from(MessageEvent { is_send: true, header }));
}

// Log the send thread's end.
logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, }));
if let Some(logger) = logger.as_ref() {
logger.log(CommunicationEvent::from(StateEvent { send: true, process, remote, start: false, }));
}
}
6 changes: 3 additions & 3 deletions communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::allocator::{AllocateBuilder, Process, Generic, GenericBuilder};
use crate::allocator::zero_copy::allocator_process::ProcessBuilder;
use crate::allocator::zero_copy::initialize::initialize_networking;

use crate::logging::{CommunicationSetup, CommunicationEvent};
use timely_logging::Logger;
use crate::logging::CommunicationSetup;
use crate::CommLogger;
use std::fmt::{Debug, Formatter};


Expand All @@ -39,7 +39,7 @@ pub enum Config {
/// Verbosely report connection process
report: bool,
/// Closure to create a new logger for a communication thread
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent>> + Send + Sync>,
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<CommLogger> + Send + Sync>,
}
}

Expand Down
8 changes: 8 additions & 0 deletions communication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@
#![forbid(missing_docs)]

use std::time::Duration;

pub mod allocator;
pub mod networking;
pub mod initialize;
Expand Down Expand Up @@ -191,3 +193,9 @@ fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<V

(senders, recvers)
}

use timely_container::CapacityContainerBuilder;
use timely_logging::Logger;
use crate::logging::CommunicationEvent;

pub(crate) type CommLogger = Logger<CapacityContainerBuilder<Vec<(Duration, CommunicationEvent)>>>;
3 changes: 3 additions & 0 deletions logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ homepage = "https://github.com/TimelyDataflow/timely-dataflow"
repository = "https://github.com/TimelyDataflow/timely-dataflow.git"
keywords = ["timely", "dataflow", "logging"]
license = "MIT"

[dependencies]
timely_container = { version = "0.13.0", path = "../container" }
Loading

0 comments on commit 90c3541

Please sign in to comment.