From f7fa23a0648f01f802b8bac479fb83180f54a6c0 Mon Sep 17 00:00:00 2001 From: Lorenzo Selvatici Date: Tue, 20 Aug 2019 18:42:59 +0200 Subject: [PATCH] get control input from kakfa source --- kafka-tools/init-topic.sh | 8 ++++ kafka-tools/send-control.sh | 2 + src/kafka.rs | 21 +++++++--- text/sample.txt | 80 ++++++++++++++++++++++++++++++++++++- 4 files changed, 104 insertions(+), 7 deletions(-) create mode 100755 kafka-tools/init-topic.sh create mode 100755 kafka-tools/send-control.sh diff --git a/kafka-tools/init-topic.sh b/kafka-tools/init-topic.sh new file mode 100755 index 0000000..6f64b2d --- /dev/null +++ b/kafka-tools/init-topic.sh @@ -0,0 +1,8 @@ +topic="megaphone-control" +$KAFKA/bin/kafka-configs.sh --zookeeper localhost --alter --entity-type topics --entity-name $topic --add-config retention.ms=1000 +$KAFKA/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic $topic +$KAFKA/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $topic + +GREEN='\033[1;32m' +NC='\033[0m' # No Color +echo -e "${GREEN}Created topic \"$topic\"${NC}" diff --git a/kafka-tools/send-control.sh b/kafka-tools/send-control.sh new file mode 100755 index 0000000..aa132d0 --- /dev/null +++ b/kafka-tools/send-control.sh @@ -0,0 +1,2 @@ +topic="megaphone-control" +$KAFKA/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $topic diff --git a/src/kafka.rs b/src/kafka.rs index ef8d0d6..e03759b 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -13,7 +13,7 @@ pub fn control_stream>(scope: &mut G, input_probe: Pro source(scope, "ControlStream", |mut cap, info| { - let consumer = if cap.is_some() && widx == 0 { // Only worker 0 subscribes to the kafka topic + let consumer = if widx == 0 { // Only worker 0 subscribes to the kafka topic let topic = "megaphone-control"; let mut consumer_config = ClientConfig::new(); @@ -35,11 +35,11 @@ pub fn control_stream>(scope: &mut G, input_probe: Pro println!("[W{}@kafka-consumer] subscribed control commands topic", widx); Some(consumer) - } else { None }; + } else { cap = None; None }; let activator = scope.activator_for(&info.address[..]); - let mut seqno : u64 = 0; + let mut seqno: u64 = 0; move |output| { if let Some(consumer) = &consumer { @@ -52,6 +52,8 @@ pub fn control_stream>(scope: &mut G, input_probe: Pro let payload = message.payload().expect("null payload"); let text = std::str::from_utf8(payload).expect("parse error"); + println!("text is {:?}", text); + let instructions = text.split(",").map(|text| { let tokens = text.split(" ").map(|x| x.to_lowercase().trim().to_string()).collect::>(); @@ -66,7 +68,6 @@ pub fn control_stream>(scope: &mut G, input_probe: Pro let map = tokens[1..].into_iter().map(|x| x.parse::().unwrap()).collect::>(); Some(ControlInst::Map(map)) } - _ => { println!("{}", "unrecognized command".bold().red()); None @@ -86,9 +87,17 @@ pub fn control_stream>(scope: &mut G, input_probe: Pro // Downgrade the capability until we have caught up with the input stream. // What we would like to do: `cap.downgrade(input_probe.time())`. - while !input_probe.less_equal(cap.time()) { - cap.downgrade(&(*cap.time() + 1)) + while !input_probe.done() && !input_probe.less_equal(cap.time()) { + let new_time = *cap.time() + 1; + // println!("downgrading cap to {:?}", new_time); + cap.downgrade(&new_time) } + + } + + if input_probe.done() { + println!("input stream is done, closing control stream"); + cap = None; } } } diff --git a/text/sample.txt b/text/sample.txt index 3c3d750..dcec796 100644 --- a/text/sample.txt +++ b/text/sample.txt @@ -36,4 +36,82 @@ Now I think you going to see a guy who will go that inch with you. Your gonna see a guy who will sacrifice himself for this team, because he knows when it comes down to it your gonna do the same for him. That’s a team, gentlemen, and either, we heal, now, as a team, or we will die as individuals. That’s football guys, that’s all it is. -Now, what are you gonna do? \ No newline at end of file +Now, what are you gonna do? +I don’t know what to say, really. +Three minutes to the biggest battle of our professional lives. +All comes down to today, and either, we heal as a team, or we’re gonna crumble. +Inch by inch, play by play. +Until we’re finished. +We’re in hell right now, gentlemen. +Believe me. +And, we can stay here, get the shit kicked out of us, or we can fight our way back into the light. +We can climb outta hell… one inch at a time. + +Now I can’t do it for you, I’m too old. +I look around, I see these young faces and I think, I mean, I’ve made every wrong choice a middle-aged man can make. +I, uh, I’ve pissed away all my money, believe it or not. +I chased off anyone who’s ever loved me. +And lately, I can’t even stand the face I see in the mirror. + +You know, when you get old, in life, things get taken from you. +I mean, that’s… that’s… that’s a part of life. +But, you only learn that when you start losin’ stuff. +You find out life’s this game of inches, so is football. +Because in either game – life or football – the margin for error is so small. +I mean, one half a step too late or too early and you don’t quite make it. +One half second too slow, too fast and you don’t quite catch it. +The inches we need are everywhere around us. +They’re in every break of the game, every minute, every second. + +On this team we fight for that inch. +On this team we tear ourselves and everyone else around us to pieces for that inch. +We claw with our fingernails for that inch. +Because we know when we add up all those inches that’s gonna make the f****** difference between WINNING and LOSING, between LIVING and DYING! + +I’ll tell you this, in any fight it’s the guy whose willing to die whose gonna win that inch. +And I know, if I’m gonna have any life anymore it’s because I’m still willing to fight and die for that inch, because that’s what living is, the six inches in front of your face. +Now I can’t make you do it. You’ve got to look at the guy next to you, look into his eyes. +Now I think you going to see a guy who will go that inch with you. +Your gonna see a guy who will sacrifice himself for this team, because he knows when it comes down to it your gonna do the same for him. +That’s a team, gentlemen, and either, we heal, now, as a team, or we will die as individuals. +That’s football guys, that’s all it is. +Now, what are you gonna do? +I don’t know what to say, really. +Three minutes to the biggest battle of our professional lives. +All comes down to today, and either, we heal as a team, or we’re gonna crumble. +Inch by inch, play by play. +Until we’re finished. +We’re in hell right now, gentlemen. +Believe me. +And, we can stay here, get the shit kicked out of us, or we can fight our way back into the light. +We can climb outta hell… one inch at a time. + +Now I can’t do it for you, I’m too old. +I look around, I see these young faces and I think, I mean, I’ve made every wrong choice a middle-aged man can make. +I, uh, I’ve pissed away all my money, believe it or not. +I chased off anyone who’s ever loved me. +And lately, I can’t even stand the face I see in the mirror. + +You know, when you get old, in life, things get taken from you. +I mean, that’s… that’s… that’s a part of life. +But, you only learn that when you start losin’ stuff. +You find out life’s this game of inches, so is football. +Because in either game – life or football – the margin for error is so small. +I mean, one half a step too late or too early and you don’t quite make it. +One half second too slow, too fast and you don’t quite catch it. +The inches we need are everywhere around us. +They’re in every break of the game, every minute, every second. + +On this team we fight for that inch. +On this team we tear ourselves and everyone else around us to pieces for that inch. +We claw with our fingernails for that inch. +Because we know when we add up all those inches that’s gonna make the f****** difference between WINNING and LOSING, between LIVING and DYING! + +I’ll tell you this, in any fight it’s the guy whose willing to die whose gonna win that inch. +And I know, if I’m gonna have any life anymore it’s because I’m still willing to fight and die for that inch, because that’s what living is, the six inches in front of your face. +Now I can’t make you do it. You’ve got to look at the guy next to you, look into his eyes. +Now I think you going to see a guy who will go that inch with you. +Your gonna see a guy who will sacrifice himself for this team, because he knows when it comes down to it your gonna do the same for him. +That’s a team, gentlemen, and either, we heal, now, as a team, or we will die as individuals. +That’s football guys, that’s all it is. +Now, what are you gonna do?