Skip to content
This repository has been archived by the owner on Feb 27, 2024. It is now read-only.

Commit

Permalink
get control input from kakfa source
Browse files Browse the repository at this point in the history
  • Loading branch information
LorenzSelv committed Aug 20, 2019
1 parent d271acd commit f7fa23a
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 7 deletions.
8 changes: 8 additions & 0 deletions kafka-tools/init-topic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
topic="megaphone-control"
$KAFKA/bin/kafka-configs.sh --zookeeper localhost --alter --entity-type topics --entity-name $topic --add-config retention.ms=1000
$KAFKA/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic $topic
$KAFKA/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $topic

GREEN='\033[1;32m'
NC='\033[0m' # No Color
echo -e "${GREEN}Created topic \"$topic\"${NC}"
2 changes: 2 additions & 0 deletions kafka-tools/send-control.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
topic="megaphone-control"
$KAFKA/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $topic
21 changes: 15 additions & 6 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub fn control_stream<G: Scope<Timestamp=usize>>(scope: &mut G, input_probe: Pro

source(scope, "ControlStream", |mut cap, info| {

let consumer = if cap.is_some() && widx == 0 { // Only worker 0 subscribes to the kafka topic
let consumer = if widx == 0 { // Only worker 0 subscribes to the kafka topic
let topic = "megaphone-control";

let mut consumer_config = ClientConfig::new();
Expand All @@ -35,11 +35,11 @@ pub fn control_stream<G: Scope<Timestamp=usize>>(scope: &mut G, input_probe: Pro
println!("[W{}@kafka-consumer] subscribed control commands topic", widx);

Some(consumer)
} else { None };
} else { cap = None; None };

let activator = scope.activator_for(&info.address[..]);

let mut seqno : u64 = 0;
let mut seqno: u64 = 0;

move |output| {
if let Some(consumer) = &consumer {
Expand All @@ -52,6 +52,8 @@ pub fn control_stream<G: Scope<Timestamp=usize>>(scope: &mut G, input_probe: Pro
let payload = message.payload().expect("null payload");
let text = std::str::from_utf8(payload).expect("parse error");

println!("text is {:?}", text);

let instructions = text.split(",").map(|text| {
let tokens = text.split(" ").map(|x| x.to_lowercase().trim().to_string()).collect::<Vec<_>>();

Expand All @@ -66,7 +68,6 @@ pub fn control_stream<G: Scope<Timestamp=usize>>(scope: &mut G, input_probe: Pro
let map = tokens[1..].into_iter().map(|x| x.parse::<usize>().unwrap()).collect::<Vec<usize>>();
Some(ControlInst::Map(map))
}

_ => {
println!("{}", "unrecognized command".bold().red());
None
Expand All @@ -86,9 +87,17 @@ pub fn control_stream<G: Scope<Timestamp=usize>>(scope: &mut G, input_probe: Pro

// Downgrade the capability until we have caught up with the input stream.
// What we would like to do: `cap.downgrade(input_probe.time())`.
while !input_probe.less_equal(cap.time()) {
cap.downgrade(&(*cap.time() + 1))
while !input_probe.done() && !input_probe.less_equal(cap.time()) {
let new_time = *cap.time() + 1;
// println!("downgrading cap to {:?}", new_time);
cap.downgrade(&new_time)
}

}

if input_probe.done() {
println!("input stream is done, closing control stream");
cap = None;
}
}
}
Expand Down
80 changes: 79 additions & 1 deletion text/sample.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,82 @@ Now I think you going to see a guy who will go that inch with you.
Your gonna see a guy who will sacrifice himself for this team, because he knows when it comes down to it your gonna do the same for him.
That’s a team, gentlemen, and either, we heal, now, as a team, or we will die as individuals.
That’s football guys, that’s all it is.
Now, what are you gonna do?
Now, what are you gonna do?
I don’t know what to say, really.
Three minutes to the biggest battle of our professional lives.
All comes down to today, and either, we heal as a team, or we’re gonna crumble.
Inch by inch, play by play.
Until we’re finished.
We’re in hell right now, gentlemen.
Believe me.
And, we can stay here, get the shit kicked out of us, or we can fight our way back into the light.
We can climb outta hell… one inch at a time.

Now I can’t do it for you, I’m too old.
I look around, I see these young faces and I think, I mean, I’ve made every wrong choice a middle-aged man can make.
I, uh, I’ve pissed away all my money, believe it or not.
I chased off anyone who’s ever loved me.
And lately, I can’t even stand the face I see in the mirror.

You know, when you get old, in life, things get taken from you.
I mean, that’s… that’s… that’s a part of life.
But, you only learn that when you start losin’ stuff.
You find out life’s this game of inches, so is football.
Because in either game – life or football – the margin for error is so small.
I mean, one half a step too late or too early and you don’t quite make it.
One half second too slow, too fast and you don’t quite catch it.
The inches we need are everywhere around us.
They’re in every break of the game, every minute, every second.

On this team we fight for that inch.
On this team we tear ourselves and everyone else around us to pieces for that inch.
We claw with our fingernails for that inch.
Because we know when we add up all those inches that’s gonna make the f****** difference between WINNING and LOSING, between LIVING and DYING!

I’ll tell you this, in any fight it’s the guy whose willing to die whose gonna win that inch.
And I know, if I’m gonna have any life anymore it’s because I’m still willing to fight and die for that inch, because that’s what living is, the six inches in front of your face.
Now I can’t make you do it. You’ve got to look at the guy next to you, look into his eyes.
Now I think you going to see a guy who will go that inch with you.
Your gonna see a guy who will sacrifice himself for this team, because he knows when it comes down to it your gonna do the same for him.
That’s a team, gentlemen, and either, we heal, now, as a team, or we will die as individuals.
That’s football guys, that’s all it is.
Now, what are you gonna do?
I don’t know what to say, really.
Three minutes to the biggest battle of our professional lives.
All comes down to today, and either, we heal as a team, or we’re gonna crumble.
Inch by inch, play by play.
Until we’re finished.
We’re in hell right now, gentlemen.
Believe me.
And, we can stay here, get the shit kicked out of us, or we can fight our way back into the light.
We can climb outta hell… one inch at a time.

Now I can’t do it for you, I’m too old.
I look around, I see these young faces and I think, I mean, I’ve made every wrong choice a middle-aged man can make.
I, uh, I’ve pissed away all my money, believe it or not.
I chased off anyone who’s ever loved me.
And lately, I can’t even stand the face I see in the mirror.

You know, when you get old, in life, things get taken from you.
I mean, that’s… that’s… that’s a part of life.
But, you only learn that when you start losin’ stuff.
You find out life’s this game of inches, so is football.
Because in either game – life or football – the margin for error is so small.
I mean, one half a step too late or too early and you don’t quite make it.
One half second too slow, too fast and you don’t quite catch it.
The inches we need are everywhere around us.
They’re in every break of the game, every minute, every second.

On this team we fight for that inch.
On this team we tear ourselves and everyone else around us to pieces for that inch.
We claw with our fingernails for that inch.
Because we know when we add up all those inches that’s gonna make the f****** difference between WINNING and LOSING, between LIVING and DYING!

I’ll tell you this, in any fight it’s the guy whose willing to die whose gonna win that inch.
And I know, if I’m gonna have any life anymore it’s because I’m still willing to fight and die for that inch, because that’s what living is, the six inches in front of your face.
Now I can’t make you do it. You’ve got to look at the guy next to you, look into his eyes.
Now I think you going to see a guy who will go that inch with you.
Your gonna see a guy who will sacrifice himself for this team, because he knows when it comes down to it your gonna do the same for him.
That’s a team, gentlemen, and either, we heal, now, as a team, or we will die as individuals.
That’s football guys, that’s all it is.
Now, what are you gonna do?

0 comments on commit f7fa23a

Please sign in to comment.