Skip to content

Commit

Permalink
Merge pull request #110 from ineiti/template_storage
Browse files Browse the repository at this point in the history
Simplifying the storage and message handling in the template
  • Loading branch information
ineiti authored Aug 30, 2024
2 parents 7b28269 + 3ecea9a commit 499b0c8
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 92 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Following https://keepachangelog.com/en/1.1.0/ and using

### Changed
- Updated versions of most dependencies
- use tokio::sync::watch to pass configuration from `Translate` to `Broker`

### Added

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use flarch::data_storage::DataStorage;
use flarch::{data_storage::DataStorage, spawn_local};
use std::error::Error;
use tokio::sync::watch;

use crate::{
broker::{Broker, BrokerError, Subsystem, SubsystemHandler},
Expand All @@ -9,7 +10,7 @@ use crate::{
};

use super::{
core::{TemplateConfig, TemplateStorageSave},
core::{TemplateConfig, TemplateStorage, TemplateStorageSave},
messages::{MessageNode, TemplateIn, TemplateMessage, TemplateMessages, TemplateOut},
};

Expand All @@ -19,25 +20,37 @@ const MODULE_NAME: &str = "Template";
/// all messages are correctly translated from one to the other.
/// For this example, it uses the RandomConnections module to communicate
/// with other nodes.
///
/// The [TemplateBroker] holds the [Translate] and offers convenience methods
///
/// The [Template] holds the [Translate] and offers convenience methods
/// to interact with [Translate] and [TemplateMessage].
pub struct TemplateBroker {
pub struct Template {
/// Represents the underlying broker.
pub broker: Broker<TemplateMessage>,
our_id: NodeID,
ds: Box<dyn DataStorage>,
rx: watch::Receiver<TemplateStorage>,
}

impl TemplateBroker {
impl Template {
pub async fn start(
ds: Box<dyn DataStorage>,
mut ds: Box<dyn DataStorage + Send>,
our_id: NodeID,
rc: Broker<RandomMessage>,
cfg: TemplateConfig,
config: TemplateConfig,
) -> Result<Self, Box<dyn Error>> {
let broker = Translate::start(ds.clone(), our_id.clone(), rc, cfg).await?;
Ok(TemplateBroker { broker, ds, our_id })
let str = ds.get(MODULE_NAME).unwrap_or("".into());
let storage = TemplateStorageSave::from_str(&str).unwrap_or_default();
let (tx, rx) = watch::channel(storage.clone());
let messages = TemplateMessages::new(storage, config, our_id, tx)?;
let broker = Translate::start(rc, messages).await?;
let mut rx_spawn = rx.clone();
spawn_local(async move {
while rx_spawn.changed().await.is_ok() {
if let Ok(val) = rx_spawn.borrow().to_yaml() {
ds.set(MODULE_NAME, &val).expect("updating storage");
}
}
});
Ok(Template { broker, our_id, rx })
}

pub fn increase_self(&mut self, counter: u32) -> Result<(), BrokerError> {
Expand All @@ -46,34 +59,24 @@ impl TemplateBroker {
}

pub fn get_counter(&self) -> u32 {
TemplateStorageSave::from_str(&self.ds.get(MODULE_NAME).unwrap_or("".into()))
.unwrap_or_default()
.counter
self.rx.borrow().counter
}
}

/// Translates the messages to/from the RandomMessage and calls `TemplateMessages.processMessages`.
struct Translate {
/// This is always updated with the latest view of the TemplateMessages module.
messages: TemplateMessages,
ds: Box<dyn DataStorage + Send>,
}

impl Translate {
async fn start(
ds: Box<dyn DataStorage + Send>,
our_id: NodeID,
random: Broker<RandomMessage>,
config: TemplateConfig,
messages: TemplateMessages,
) -> Result<Broker<TemplateMessage>, Box<dyn Error>> {
let str = ds.get(MODULE_NAME).unwrap_or("".into());
let storage = TemplateStorageSave::from_str(&str).unwrap_or_default();
let mut template = Broker::new();

template
.add_subsystem(Subsystem::Handler(Box::new(Translate {
messages: TemplateMessages::new(storage, config, our_id)?,
ds,
})))
.add_subsystem(Subsystem::Handler(Box::new(Translate { messages })))
.await?;
template
.link_bi(
Expand Down Expand Up @@ -121,42 +124,24 @@ impl Translate {
None
}
}

fn handle_input(&mut self, msg_in: TemplateIn) -> Vec<TemplateOut> {
match self.messages.process_message(msg_in) {
Ok(ret) => return ret.into_iter().map(|m| m.into()).collect(),
Err(e) => log::warn!("While processing message: {e:?}"),
}
vec![]
}

fn handle_output(&mut self, msg_out: &TemplateOut) {
if let TemplateOut::StorageUpdate(ts) = msg_out {
if let Ok(yaml) = ts.to_yaml() {
if let Err(e) = self.ds.set(MODULE_NAME, &yaml) {
log::warn!("While setting TemplateTranslation: {e:?}");
}
}
}
}
}

#[cfg_attr(feature = "nosend", async_trait(?Send))]
#[cfg_attr(not(feature = "nosend"), async_trait)]
impl SubsystemHandler<TemplateMessage> for Translate {
async fn messages(&mut self, msgs: Vec<TemplateMessage>) -> Vec<TemplateMessage> {
let mut out = vec![];
for msg in msgs {
log::trace!("Got msg: {msg:?}");
if let TemplateMessage::Input(msg_in) = msg {
out.extend(self.handle_input(msg_in));
}
}
for msg in out.iter() {
log::trace!("Outputting: {msg:?}");
self.handle_output(msg);
}
out.into_iter().map(|o| o.into()).collect()
let msgs_in = msgs
.into_iter()
.filter_map(|msg| match msg {
TemplateMessage::Input(msg_in) => Some(msg_in),
TemplateMessage::Output(_) => None,
})
.collect();
self.messages
.process_messages(msgs_in)
.into_iter()
.map(|o| o.into())
.collect()
}
}

Expand All @@ -174,7 +159,7 @@ mod tests {
let id0 = NodeID::rnd();
let id1 = NodeID::rnd();
let mut rnd = Broker::new();
let mut tr = TemplateBroker::start(ds, id0, rnd.clone(), TemplateConfig::default()).await?;
let mut tr = Template::start(ds, id0, rnd.clone(), TemplateConfig::default()).await?;
let mut tap = rnd.get_tap().await?;
assert_eq!(0, tr.get_counter());

Expand Down
2 changes: 1 addition & 1 deletion flmodules/src/template/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct TemplateCore {
}

impl TemplateCore {
/// Initializes an EventsStorage with two categories.
/// Initializes a new TemplateCore.
pub fn new(storage: TemplateStorage, config: TemplateConfig) -> Self {
Self { storage, config }
}
Expand Down
77 changes: 43 additions & 34 deletions flmodules/src/template/messages.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::error::Error;

use crate::nodeids::{NodeID, NodeIDs};
use itertools::concat;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;

use super::core::*;

Expand All @@ -15,16 +15,15 @@ pub enum TemplateMessage {
}

/// The messages here represent all possible interactions with this module.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone)]
pub enum TemplateIn {
Node(NodeID, MessageNode),
UpdateNodeList(NodeIDs),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone)]
pub enum TemplateOut {
Node(NodeID, MessageNode),
StorageUpdate(TemplateStorage),
}

/// These are the messages which will be exchanged between the nodes for this
Expand All @@ -41,6 +40,7 @@ pub struct TemplateMessages {
pub core: TemplateCore,
nodes: NodeIDs,
our_id: NodeID,
tx: watch::Sender<TemplateStorage>,
}

impl TemplateMessages {
Expand All @@ -49,25 +49,28 @@ impl TemplateMessages {
storage: TemplateStorage,
cfg: TemplateConfig,
our_id: NodeID,
tx: watch::Sender<TemplateStorage>,
) -> Result<Self, Box<dyn Error>> {
Ok(Self {
core: TemplateCore::new(storage, cfg),
nodes: NodeIDs::empty(),
our_id,
tx,
})
}

/// Processes one generic message and returns either an error
/// or a Vec<MessageOut>.
pub fn process_message(
&mut self,
msg: TemplateIn,
) -> Result<Vec<TemplateOut>, serde_yaml::Error> {
log::trace!("got message {:?}", msg);
Ok(match msg {
TemplateIn::Node(src, node_msg) => self.process_node_message(src, node_msg),
TemplateIn::UpdateNodeList(ids) => self.node_list(ids),
})
pub fn process_messages(&mut self, msgs: Vec<TemplateIn>) -> Vec<TemplateOut> {
let mut out = vec![];
for msg in msgs {
log::trace!("Got msg: {msg:?}");
out.extend(match msg {
TemplateIn::Node(src, node_msg) => self.process_node_message(src, node_msg),
TemplateIn::UpdateNodeList(ids) => self.node_list(ids),
});
}
out
}

/// Processes a node to node message and returns zero or more
Expand All @@ -78,27 +81,28 @@ impl TemplateMessages {
// When increasing the counter, send 'self' counter to all other nodes.
// Also send a StorageUpdate message.
self.core.increase(c);
return concat([
self.nodes
.0
.iter()
.map(|id| {
TemplateOut::Node(
id.clone(),
MessageNode::Counter(self.core.storage.counter),
)
})
.collect(),
vec![TemplateOut::StorageUpdate(self.core.storage.clone()).into()],
]);
self.tx
.send(self.core.storage.clone())
.expect("while sending new storage");
return self
.nodes
.0
.iter()
.map(|id| {
TemplateOut::Node(
id.clone(),
MessageNode::Counter(self.core.storage.counter),
)
})
.collect();
}
MessageNode::Counter(c) => log::info!("Got counter from {}: {}", _src, c),
}
vec![]
}

/// Stores the new node list, excluding the ID of this node.
pub fn node_list(&mut self, mut ids: NodeIDs) -> Vec<TemplateOut> {
fn node_list(&mut self, mut ids: NodeIDs) -> Vec<TemplateOut> {
self.nodes = ids.remove_missing(&vec![self.our_id].into());
vec![]
}
Expand Down Expand Up @@ -127,13 +131,18 @@ mod tests {
let ids = NodeIDs::new(2);
let id0 = *ids.0.get(0).unwrap();
let id1 = *ids.0.get(1).unwrap();
let mut msg =
TemplateMessages::new(TemplateStorage::default(), TemplateConfig::default(), id0)?;
msg.process_message(TemplateIn::UpdateNodeList(ids))?;
let ret = msg.process_message(TemplateIn::Node(id1, MessageNode::Increase(2)))?;
assert_eq!(2, ret.len());
assert!(matches!(ret[0], TemplateOut::Node(_, MessageNode::Counter(2))));
assert!(matches!(ret[1], TemplateOut::StorageUpdate(_)));
let storage = TemplateStorage::default();
let (tx, rx) = watch::channel(storage.clone());
let mut msg = TemplateMessages::new(storage, TemplateConfig::default(), id0, tx)?;
msg.process_messages(vec![TemplateIn::UpdateNodeList(ids).into()]);
let ret =
msg.process_messages(vec![TemplateIn::Node(id1, MessageNode::Increase(2)).into()]);
assert_eq!(1, ret.len());
assert!(matches!(
ret[0],
TemplateOut::Node(_, MessageNode::Counter(2))
));
assert_eq!(2, rx.borrow().counter);
Ok(())
}
}
2 changes: 1 addition & 1 deletion flmodules/src/template/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ pub mod core;
// Messages for this module
pub mod messages;
// Integrating with other modules
pub mod translation;
pub mod broker;

0 comments on commit 499b0c8

Please sign in to comment.