From 404354f1f93c3d7550a1a959d02edbfc89dfc645 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Fri, 23 Aug 2024 14:49:51 +0200 Subject: [PATCH] Simplifying the storage and message handling in the template --- CHANGELOG.md | 1 + .../template/{translation.rs => broker.rs} | 90 +++++++------------ flmodules/src/template/core.rs | 2 +- flmodules/src/template/messages.rs | 71 ++++++++------- flmodules/src/template/mod.rs | 2 +- 5 files changed, 77 insertions(+), 89 deletions(-) rename flmodules/src/template/{translation.rs => broker.rs} (67%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dfb94e1..6654aa5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Following https://keepachangelog.com/en/1.1.0/ and using ### Changed - Updated versions of most dependencies +- use tokio::sync::watch to pass configuration from `Translate` to `Broker` ### Added diff --git a/flmodules/src/template/translation.rs b/flmodules/src/template/broker.rs similarity index 67% rename from flmodules/src/template/translation.rs rename to flmodules/src/template/broker.rs index c86ce11c..04dde749 100644 --- a/flmodules/src/template/translation.rs +++ b/flmodules/src/template/broker.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; -use flarch::data_storage::DataStorage; +use flarch::{data_storage::DataStorage, spawn_local}; use std::error::Error; +use tokio::sync::watch; use crate::{ broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, @@ -9,7 +10,7 @@ use crate::{ }; use super::{ - core::{TemplateConfig, TemplateStorageSave}, + core::{TemplateConfig, TemplateStorage, TemplateStorageSave}, messages::{MessageNode, TemplateIn, TemplateMessage, TemplateMessages, TemplateOut}, }; @@ -19,25 +20,37 @@ const MODULE_NAME: &str = "Template"; /// all messages are correctly translated from one to the other. /// For this example, it uses the RandomConnections module to communicate /// with other nodes. -/// -/// The [TemplateBroker] holds the [Translate] and offers convenience methods +/// +/// The [Template] holds the [Translate] and offers convenience methods /// to interact with [Translate] and [TemplateMessage]. -pub struct TemplateBroker { +pub struct Template { /// Represents the underlying broker. pub broker: Broker, our_id: NodeID, - ds: Box, + rx: watch::Receiver, } -impl TemplateBroker { +impl Template { pub async fn start( - ds: Box, + mut ds: Box, our_id: NodeID, rc: Broker, - cfg: TemplateConfig, + config: TemplateConfig, ) -> Result> { - let broker = Translate::start(ds.clone(), our_id.clone(), rc, cfg).await?; - Ok(TemplateBroker { broker, ds, our_id }) + let str = ds.get(MODULE_NAME).unwrap_or("".into()); + let storage = TemplateStorageSave::from_str(&str).unwrap_or_default(); + let (tx, rx) = watch::channel(storage.clone()); + let messages = TemplateMessages::new(storage, config, our_id, tx)?; + let broker = Translate::start(rc, messages).await?; + let mut rx_spawn = rx.clone(); + spawn_local(async move { + while rx_spawn.changed().await.is_ok() { + if let Ok(val) = rx_spawn.borrow().to_yaml() { + ds.set(MODULE_NAME, &val).expect("updating storage"); + } + } + }); + Ok(Template { broker, our_id, rx }) } pub fn increase_self(&mut self, counter: u32) -> Result<(), BrokerError> { @@ -46,34 +59,24 @@ impl TemplateBroker { } pub fn get_counter(&self) -> u32 { - TemplateStorageSave::from_str(&self.ds.get(MODULE_NAME).unwrap_or("".into())) - .unwrap_or_default() - .counter + self.rx.borrow().counter } } +/// Translates the messages to/from the RandomMessage and calls `TemplateMessages.processMessages`. struct Translate { - /// This is always updated with the latest view of the TemplateMessages module. messages: TemplateMessages, - ds: Box, } impl Translate { async fn start( - ds: Box, - our_id: NodeID, random: Broker, - config: TemplateConfig, + messages: TemplateMessages, ) -> Result, Box> { - let str = ds.get(MODULE_NAME).unwrap_or("".into()); - let storage = TemplateStorageSave::from_str(&str).unwrap_or_default(); let mut template = Broker::new(); template - .add_subsystem(Subsystem::Handler(Box::new(Translate { - messages: TemplateMessages::new(storage, config, our_id)?, - ds, - }))) + .add_subsystem(Subsystem::Handler(Box::new(Translate { messages }))) .await?; template .link_bi( @@ -121,42 +124,17 @@ impl Translate { None } } - - fn handle_input(&mut self, msg_in: TemplateIn) -> Vec { - match self.messages.process_message(msg_in) { - Ok(ret) => return ret.into_iter().map(|m| m.into()).collect(), - Err(e) => log::warn!("While processing message: {e:?}"), - } - vec![] - } - - fn handle_output(&mut self, msg_out: &TemplateOut) { - if let TemplateOut::StorageUpdate(ts) = msg_out { - if let Ok(yaml) = ts.to_yaml() { - if let Err(e) = self.ds.set(MODULE_NAME, &yaml) { - log::warn!("While setting TemplateTranslation: {e:?}"); - } - } - } - } } #[cfg_attr(feature = "nosend", async_trait(?Send))] #[cfg_attr(not(feature = "nosend"), async_trait)] impl SubsystemHandler for Translate { async fn messages(&mut self, msgs: Vec) -> Vec { - let mut out = vec![]; - for msg in msgs { - log::trace!("Got msg: {msg:?}"); - if let TemplateMessage::Input(msg_in) = msg { - out.extend(self.handle_input(msg_in)); - } - } - for msg in out.iter() { - log::trace!("Outputting: {msg:?}"); - self.handle_output(msg); - } - out.into_iter().map(|o| o.into()).collect() + let msgs_in = msgs.into_iter().filter_map(|msg| match msg { + TemplateMessage::Input(msg_in) => Some(msg_in), + TemplateMessage::Output(_) => None, + }).collect(); + self.messages.process_messages(msgs_in).into_iter().map(|o| o.into()).collect() } } @@ -174,7 +152,7 @@ mod tests { let id0 = NodeID::rnd(); let id1 = NodeID::rnd(); let mut rnd = Broker::new(); - let mut tr = TemplateBroker::start(ds, id0, rnd.clone(), TemplateConfig::default()).await?; + let mut tr = Template::start(ds, id0, rnd.clone(), TemplateConfig::default()).await?; let mut tap = rnd.get_tap().await?; assert_eq!(0, tr.get_counter()); diff --git a/flmodules/src/template/core.rs b/flmodules/src/template/core.rs index 11ad191b..4ad305b3 100644 --- a/flmodules/src/template/core.rs +++ b/flmodules/src/template/core.rs @@ -22,7 +22,7 @@ pub struct TemplateCore { } impl TemplateCore { - /// Initializes an EventsStorage with two categories. + /// Initializes a new TemplateCore. pub fn new(storage: TemplateStorage, config: TemplateConfig) -> Self { Self { storage, config } } diff --git a/flmodules/src/template/messages.rs b/flmodules/src/template/messages.rs index 1dbc3d43..96a6e04f 100644 --- a/flmodules/src/template/messages.rs +++ b/flmodules/src/template/messages.rs @@ -1,8 +1,8 @@ use std::error::Error; use crate::nodeids::{NodeID, NodeIDs}; -use itertools::concat; use serde::{Deserialize, Serialize}; +use tokio::sync::watch; use super::core::*; @@ -24,7 +24,6 @@ pub enum TemplateIn { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum TemplateOut { Node(NodeID, MessageNode), - StorageUpdate(TemplateStorage), } /// These are the messages which will be exchanged between the nodes for this @@ -41,6 +40,7 @@ pub struct TemplateMessages { pub core: TemplateCore, nodes: NodeIDs, our_id: NodeID, + tx: watch::Sender, } impl TemplateMessages { @@ -49,25 +49,28 @@ impl TemplateMessages { storage: TemplateStorage, cfg: TemplateConfig, our_id: NodeID, + tx: watch::Sender, ) -> Result> { Ok(Self { core: TemplateCore::new(storage, cfg), nodes: NodeIDs::empty(), our_id, + tx, }) } /// Processes one generic message and returns either an error /// or a Vec. - pub fn process_message( - &mut self, - msg: TemplateIn, - ) -> Result, serde_yaml::Error> { - log::trace!("got message {:?}", msg); - Ok(match msg { - TemplateIn::Node(src, node_msg) => self.process_node_message(src, node_msg), - TemplateIn::UpdateNodeList(ids) => self.node_list(ids), - }) + pub fn process_messages(&mut self, msgs: Vec) -> Vec { + let mut out = vec![]; + for msg in msgs { + log::trace!("Got msg: {msg:?}"); + out.extend(match msg { + TemplateIn::Node(src, node_msg) => self.process_node_message(src, node_msg), + TemplateIn::UpdateNodeList(ids) => self.node_list(ids), + }); + } + out } /// Processes a node to node message and returns zero or more @@ -78,19 +81,20 @@ impl TemplateMessages { // When increasing the counter, send 'self' counter to all other nodes. // Also send a StorageUpdate message. self.core.increase(c); - return concat([ - self.nodes - .0 - .iter() - .map(|id| { - TemplateOut::Node( - id.clone(), - MessageNode::Counter(self.core.storage.counter), - ) - }) - .collect(), - vec![TemplateOut::StorageUpdate(self.core.storage.clone()).into()], - ]); + self.tx + .send(self.core.storage.clone()) + .expect("while sending new storage"); + return self + .nodes + .0 + .iter() + .map(|id| { + TemplateOut::Node( + id.clone(), + MessageNode::Counter(self.core.storage.counter), + ) + }) + .collect(); } MessageNode::Counter(c) => log::info!("Got counter from {}: {}", _src, c), } @@ -127,13 +131,18 @@ mod tests { let ids = NodeIDs::new(2); let id0 = *ids.0.get(0).unwrap(); let id1 = *ids.0.get(1).unwrap(); - let mut msg = - TemplateMessages::new(TemplateStorage::default(), TemplateConfig::default(), id0)?; - msg.process_message(TemplateIn::UpdateNodeList(ids))?; - let ret = msg.process_message(TemplateIn::Node(id1, MessageNode::Increase(2)))?; - assert_eq!(2, ret.len()); - assert!(matches!(ret[0], TemplateOut::Node(_, MessageNode::Counter(2)))); - assert!(matches!(ret[1], TemplateOut::StorageUpdate(_))); + let storage = TemplateStorage::default(); + let (tx, rx) = watch::channel(storage.clone()); + let mut msg = TemplateMessages::new(storage, TemplateConfig::default(), id0, tx)?; + msg.process_messages(vec![TemplateIn::UpdateNodeList(ids).into()]); + let ret = + msg.process_messages(vec![TemplateIn::Node(id1, MessageNode::Increase(2)).into()]); + assert_eq!(1, ret.len()); + assert!(matches!( + ret[0], + TemplateOut::Node(_, MessageNode::Counter(2)) + )); + assert_eq!(2, rx.borrow().counter); Ok(()) } } diff --git a/flmodules/src/template/mod.rs b/flmodules/src/template/mod.rs index a4d24237..80d1dca3 100644 --- a/flmodules/src/template/mod.rs +++ b/flmodules/src/template/mod.rs @@ -3,4 +3,4 @@ pub mod core; // Messages for this module pub mod messages; // Integrating with other modules -pub mod translation; \ No newline at end of file +pub mod broker; \ No newline at end of file