Skip to content
This repository has been archived by the owner on Feb 27, 2024. It is now read-only.

Commit

Permalink
emit metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
LorenzSelv committed Aug 29, 2019
1 parent d31d823 commit 65f3db1
Showing 1 changed file with 50 additions and 45 deletions.
95 changes: 50 additions & 45 deletions src/bin/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use std::process::Command;
use std::fs::File;
use colored::Colorize;
use std::collections::VecDeque;
use std::io::{Stdout, Write};
use rdkafka::message::ToBytes;

const WORKER_BOOTSTRAP_MARGIN: u64 = 500_000_000; // wait 500 millis after spawning before sending move commands

Expand All @@ -50,8 +52,8 @@ fn calculate_hash<T: Hash>(t: &T) -> u64 {

fn main() {

let rate: u64 = 1_000_000;
let duration_ns: u64 = 20*1_000_000_000;
let rate: u64 = 1_0;
let duration_ns: u64 = 10*1_000_000_000;
let validate = false;
let key_space = 1000;

Expand All @@ -77,8 +79,6 @@ fn main() {
let element_hdr = Rc::new(RefCell::new(::hdrhist::HDRHist::new()));
let element_hdr2 = Rc::clone(&element_hdr);

let count = 1;

// Construct the dataflow
worker.dataflow(|scope: &mut ::timely::dataflow::scopes::Child<_, usize>| {
let control = control_input.to_stream(scope).broadcast();
Expand All @@ -91,18 +91,20 @@ fn main() {
let mut word_generator = WordGenerator::new_uniform(index, key_space);
let mut last_production_time = 0;

let mut first_time = true;

move |input, output| {
// Input closed, we're done
if input.frontier().is_empty() {
cap.take();
} else if let Some(cap) = cap.as_mut() {
let current_time = input.frontier().frontier()[0];
let probe_time = probe2.with_frontier(|f| if f.is_empty() { 1 } else { f[0] });
let probe_time = probe2.with_frontier(|f| if f.is_empty() { 0 } else { f[0] });
let delta_probe = current_time - probe_time;
let delta_production = current_time - last_production_time;
// if delta to probe is smaller than half of delta to production, consider to produce more data
if delta_probe <= delta_production * 2 {
if let Some(mut it) = input_times_gen.iter_until((current_time - count) as u64) {
if let Some(mut it) = input_times_gen.iter_until((current_time) as u64) {
// `it` is some => we are still running!
// If there are actual elements to be produced, open a session and produce them
if let Some(_) = it.next() {
Expand All @@ -129,6 +131,7 @@ fn main() {
*agg += val;
(false, Some((*key, *agg)))
}, |key| calculate_hash(key), &control)
.inspect(move |x| println!("{:?}", x))
.probe_with(&mut probe);

if validate {
Expand All @@ -142,35 +145,20 @@ fn main() {
}
});

if worker.bootstrap() { return None; }

let mut spawn_at_times: VecDeque<u64> = VecDeque::new();
spawn_at_times.push_back(duration_ns/3);
spawn_at_times.push_back(2*duration_ns/3);

let mut spawn_metrics = Vec::new();

let mut output_metric_collector =
::streaming_harness::output::default::hdrhist_timeline_collector(
input_times(),
0, 2_000_000_000, duration_ns - 2_000_000_000, duration_ns,
250_000_000);

let mut control_sequence = 0;
let mut control_input = Some(control_input);
if index != 0 {
control_input.take().unwrap().close();
}

// Wait for initialization in `count` rounds
for i in 1..=count {
input.advance_to(i);
if let Some(control_input) = control_input.as_mut() {
control_input.advance_to(count);
}
while probe.less_than(&i) { worker.step(); }
}

input.advance_to(count);
if let Some(control_input) = control_input.as_mut() {
control_input.advance_to(count);
}
while probe.less_than(&count) { worker.step(); }

let n = std::env::var("N").expect("missing N env var -- number of processes").parse::<usize>().unwrap();
let w = std::env::var("W").expect("missing W env var -- number of workers").parse::<usize>().unwrap();
let mut p = n;
Expand All @@ -181,12 +169,15 @@ fn main() {
let mut load_balancer = LoadBalancer::new((0..peers).collect(), 1 << BIN_SHIFT);

let mut input = Some(input);
let mut control_input = Some(control_input);
let mut control_sequence = 0;

let timer = ::std::time::Instant::now();

loop {
if index != 0 {
input.take().unwrap();
input.take().unwrap().close();
control_input.take().unwrap().close();
break;
}

Expand All @@ -198,13 +189,15 @@ fn main() {

let old_peers = worker.peers();

let file = File::create("foo.txt").unwrap();
let stdout = File::create(format!("/tmp/process-{}-stdout", p)).unwrap();
let stderr = File::create(format!("/tmp/process-{}-stderr", p)).unwrap();

Command::new("cargo")
.stdout(file)
.stdout(stdout)
.stderr(stderr)
.arg("run")
.arg("--bin")
.arg("wordcount_bench")
.arg("benchmark")
.arg("--")
.arg("-n")
.arg(n.to_string())
Expand Down Expand Up @@ -255,14 +248,17 @@ fn main() {

control_sequence += 1;
bin_moved = true;

println!("bootstrap worker:\tbootstrap={}\tmoves={}", bootstrap_time, elapsed_ns);
spawn_metrics.push((bootstrap_time, elapsed_ns));
}
}
if bin_moved { spawn_info = None; }

output_metric_collector.acknowledge_while(
elapsed_ns,
|t| {
!probe.less_than(&(t as usize + count))
!probe.less_than(&(t as usize)) // TODO(lorenzo) +1 ?
});

if input.is_none() {
Expand All @@ -271,10 +267,10 @@ fn main() {

if elapsed_ns < duration_ns {
let input = input.as_mut().unwrap();
input.advance_to(elapsed_ns as usize + count);
input.advance_to(elapsed_ns as usize);
if let Some(control_input) = control_input.as_mut() {
if *control_input.time() < elapsed_ns as usize + count {
control_input.advance_to(elapsed_ns as usize + count);
if *control_input.time() < elapsed_ns as usize {
control_input.advance_to(elapsed_ns as usize);
}
}
} else {
Expand All @@ -293,19 +289,28 @@ fn main() {
for (value, prob, count) in element_hdr.ccdf() {
println!("count_ccdf\t{}\t{}\t{}", value, prob, count);
}
output_metric_collector.into_inner()

if index == 0 { Some((output_metric_collector.into_inner(), spawn_metrics)) } else { None }

}).expect("unsuccessful execution").join().into_iter().map(|x| x.unwrap()).collect();

let ::streaming_harness::timeline::Timeline { timeline, latency_metrics, .. } = ::streaming_harness::output::combine_all(timelines);
let result: Vec<(streaming_harness::timeline::Timeline<_,_,_,_>, Vec<_>)> = timelines.into_iter().filter_map(|mut x| x.take()).collect();

if !result.is_empty() {
let (timelines, spawn_metrics): (Vec<streaming_harness::timeline::Timeline<_,_,_,_>>, Vec<_>) = result.into_iter().unzip();
let ::streaming_harness::timeline::Timeline { timeline, latency_metrics, .. } = ::streaming_harness::output::combine_all(timelines);

let latency_metrics = latency_metrics.into_inner();
// println!("DEBUG_summary\t{}", latency_metrics.summary_string().replace("\n", "\nDEBUG_summary\t"));
// println!("{}",
// timeline.clone().into_iter().map(|::streaming_harness::timeline::TimelineElement { time, metrics, samples }|
// format!("DEBUG_timeline\t-- {} ({} samples) --\nDEBUG_timeline\t{}", time, samples, metrics.summary_string().replace("\n", "\nDEBUG_timeline\t"))).collect::<Vec<_>>().join("\n"));
let spawn_metrics = spawn_metrics.first().unwrap(); // only worker 0 measures spawn new processes

for (value, prob, count) in latency_metrics.ccdf() {
println!("latency_ccdf\t{}\t{}\t{}", value, prob, count);
println!("{}", ::streaming_harness::format::format_summary_timeline("summary_timeline".to_string(), timeline.clone()));
for (bootstrap, mv) in spawn_metrics.iter() {
println!("spawn_metric\t{}\t{}", bootstrap, mv);
}

let mut metrics = File::create("metrics").unwrap();
metrics.write(::streaming_harness::format::format_summary_timeline("summary_timeline".to_string(), timeline.clone()).as_bytes()).unwrap();
for (bootstrap, mv) in spawn_metrics.iter() {
metrics.write(format!("spawn_metric\t{}\t{}\n", bootstrap, mv).as_bytes()).unwrap();
}
}
println!("{}", ::streaming_harness::format::format_summary_timeline("summary_timeline".to_string(), timeline.clone()));
}

0 comments on commit 65f3db1

Please sign in to comment.