Skip to content
This repository has been archived by the owner on Feb 27, 2024. It is now read-only.

Commit

Permalink
exchange before flatmap
Browse files Browse the repository at this point in the history
  • Loading branch information
LorenzSelv committed Aug 26, 2019
1 parent 71155f6 commit 92f471a
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/bin/wordcount_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use timely::dataflow::operators::broadcast::Broadcast;
use timely::dataflow::operators::exchange::Exchange;
use colored::Colorize;
use rescaling_examples::verify;
use timely::dataflow::operators::to_stream::ToStream;
use std::cell::RefCell;

fn calculate_hash<T: Hash>(t: &T) -> u64 {
let mut h = DefaultHasher::new();
Expand All @@ -51,9 +51,12 @@ fn main() {

let mut words_probe = ProbeHandle::new();

let rr = RefCell::new(0_u64);

let words_in =
lines_in
.to_stream(scope)
.exchange(move |_| { let mut rr = rr.borrow_mut(); *rr+=1; *rr }) // round-robin
.flat_map(|text: String|
text.split_whitespace()
.map(move |word| (word.to_owned(), 1))
Expand All @@ -66,7 +69,6 @@ fn main() {

let stateful_out =
words_in
.exchange(|(word, _)| calculate_hash(word)) // routing table of an arbitrary worker should route properly
.stateful_state_machine(|key: &String, val, agg: &mut u64| {
*agg += val;
(false, Some((key.clone(), *agg)))
Expand Down

0 comments on commit 92f471a

Please sign in to comment.