Skip to content
This repository has been archived by the owner on Feb 27, 2024. It is now read-only.

Commit

Permalink
try with flatmap
Browse files Browse the repository at this point in the history
  • Loading branch information
LorenzSelv committed Aug 30, 2019
1 parent 7e37bec commit e2e6eb4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 19 deletions.
49 changes: 32 additions & 17 deletions src/bin/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ use dynamic_scaling_mechanism::{Control, ControlInst, BinId, BIN_SHIFT};
use dynamic_scaling_mechanism::state_machine::BinnedStateMachine;

use timely::dataflow::operators::input::Handle;
use rescaling_examples::{verify, WordGenerator, LoadBalancer};
use rescaling_examples::{verify, LoadBalancer, LinesGenerator};
use timely::dataflow::operators::inspect::Inspect;
use std::process::Command;
use std::fs::File;
use colored::Colorize;
use std::collections::VecDeque;
use std::io::Write;
use timely::dataflow::operators::map::Map;
use timely::dataflow::operators::exchange::Exchange;

const WORKER_BOOTSTRAP_MARGIN: u64 = 500_000_000; // wait 500 millis after spawning before sending move commands

Expand All @@ -44,10 +46,12 @@ fn calculate_hash<T: Hash>(t: &T) -> u64 {
}

fn main() {
let rate: u64 = 10_000;
let duration_ns: u64 = 20*1_000_000_000;
let rate: u64 = 10_00;
let duration_ns: u64 = 40*1_000_000_000;
let validate = false;
let key_space = 1000;
let words_per_line = 100;
let word_length = 10;

let timelines: Vec<_> = timely::execute_from_args(std::env::args(), move |worker| {

Expand Down Expand Up @@ -77,10 +81,10 @@ fn main() {
control.inspect(move |c| println!("[W{}] {}", index, format!("control message is {:?}", c).bold().yellow()));

// Construct the data generator
let input = input
let lines = input
.to_stream(scope)
.unary_frontier(Pipeline, "Data generator", |mut cap, _info| {
let mut word_generator = WordGenerator::new_uniform(index, key_space);
let mut lines_generator = LinesGenerator::new(key_space, words_per_line, word_length);
let mut last_production_time = 0;

move |input, output| {
Expand All @@ -99,13 +103,13 @@ fn main() {
// If there are actual elements to be produced, open a session and produce them
if let Some(_) = it.next() {
let mut session = output.session(cap);
session.give((word_generator.word_rand(), 1));
let mut word_count = 1;
session.give(lines_generator.next());
let mut word_count = words_per_line;
for _t in it {
session.give((word_generator.word_rand(), 1));
word_count += 1;
session.give(lines_generator.next());
word_count += words_per_line;
}
element_hdr2.borrow_mut().add_value(word_count);
element_hdr2.borrow_mut().add_value(word_count as u64);
last_production_time = current_time;
}
}
Expand All @@ -115,21 +119,32 @@ fn main() {
}
});

let rr = RefCell::new(0);

let words =
lines
.exchange(move |_| { let mut rr = rr.borrow_mut(); *rr+=1; *rr }) // round-robin
.flat_map(|text: String|
text.split_whitespace()
.map(move |word| (word.to_owned(), 1))
.collect::<Vec<_>>()
);

let sst_output =
input
.stateful_state_machine(|key: &_, val, agg: &mut u64| {
words
.stateful_state_machine(|key: &String, val, agg: &mut u64| {
*agg += val;
(false, Some((*key, *agg)))
(false, Some((key.clone(), *agg)))
}, |key| calculate_hash(key), &control)
//.inspect(move |x| println!("{:?}", x))
.probe_with(&mut probe);

if validate {
use timely::dataflow::operators::aggregation::StateMachine;
let correct = input
.state_machine(|_key: &_, val, agg: &mut u64| {
let correct = words
.state_machine(|key: &String, val, agg: &mut u64| {
*agg += val;
(false, Some((*_key, *agg)))
(false, Some((key.clone(), *agg)))
}, |_key| 0); // plain exchange won't compute correct counts when after rescaling (no routing table)
verify(&sst_output, &correct).probe_with(&mut probe);
}
Expand Down Expand Up @@ -174,7 +189,7 @@ fn main() {
let elapsed_ns = timer.elapsed().to_nanos();

if let Some(spawn_at_time) = spawn_at_times.front() {
if elapsed_ns >= *spawn_at_time {
if elapsed_ns >= *spawn_at_time && spawn_info.is_none() {
spawn_at_times.pop_front();

let old_peers = worker.peers();
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ pub struct LinesGenerator {
}

impl LinesGenerator {
pub fn new(distinct_words: usize, words_per_line: usize) -> Self {
pub fn new(distinct_words: usize, words_per_line: usize, word_length: usize) -> Self {
let mut rng = rand::thread_rng();
let distinct_words = (0..distinct_words).map(|_| {
(0..10).map(|_| rng.sample(rand::distributions::Alphanumeric)).collect::<String>()
(0..word_length).map(|_| rng.sample(rand::distributions::Alphanumeric)).collect::<String>()
}).collect();

LinesGenerator {
Expand Down

0 comments on commit e2e6eb4

Please sign in to comment.