diff --git a/flarch/Cargo.lock b/flarch/Cargo.lock index 2b307bdf..7f824003 100644 --- a/flarch/Cargo.lock +++ b/flarch/Cargo.lock @@ -1575,9 +1575,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.12" +version = "0.23.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8" dependencies = [ "once_cell", "ring", @@ -1618,9 +1618,9 @@ checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" [[package]] name = "rustls-webpki" -version = "0.102.7" +version = "0.102.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" dependencies = [ "ring", "rustls-pki-types", @@ -2171,9 +2171,9 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" dependencies = [ "tinyvec", ] diff --git a/flarch/src/lib.rs b/flarch/src/lib.rs index 226a111e..edab40c6 100644 --- a/flarch/src/lib.rs +++ b/flarch/src/lib.rs @@ -28,3 +28,4 @@ pub fn start_logging_filter_level(filters: Vec<&str>, level: log::LevelFilter) { } pub use flarch_macro::platform_async_trait; +pub use rand::random; \ No newline at end of file diff --git a/flarch/src/web_rtc/libc/web_rtc_setup.rs b/flarch/src/web_rtc/libc/web_rtc_setup.rs index 86b6ba98..e12ba5e6 100644 --- a/flarch/src/web_rtc/libc/web_rtc_setup.rs +++ b/flarch/src/web_rtc/libc/web_rtc_setup.rs @@ -3,8 +3,8 @@ use std::sync::{ Arc, }; -use async_trait::async_trait; use crate::broker::{Broker, Subsystem, SubsystemHandler}; +use async_trait::async_trait; use futures::lock::Mutex; use webrtc::{ api::{ @@ -27,10 +27,12 @@ use webrtc::{ }; use crate::web_rtc::{ - connection::{ConnectionConfig, HostLogin}, messages::{ + connection::{ConnectionConfig, HostLogin}, + messages::{ ConnType, ConnectionStateMap, DataChannelState, PeerMessage, SetupError, SignalingState, WebRTCInput, WebRTCMessage, WebRTCOutput, WebRTCSpawner, - }, node_connection::Direction + }, + node_connection::Direction, }; fn get_ice_server(host: HostLogin) -> RTCIceServer { @@ -184,7 +186,6 @@ impl WebRTCConnectionSetupLibc { /// Returns the offer string that needs to be sent to the `Follower` node. async fn make_offer(&mut self) -> Result { if self.direction.is_some() { - log::warn!("Resetting connection because of new offer"); self.reset().await?; } self.direction = Some(Direction::Outgoing); @@ -206,13 +207,13 @@ impl WebRTCConnectionSetupLibc { .set_local_description(offer.clone()) .await .map_err(to_error)?; + Ok(offer.sdp) } /// Takes the offer string async fn make_answer(&mut self, offer: String) -> Result { if self.direction.is_some() { - log::warn!("Resetting connection because of new answer"); self.reset().await?; } self.direction = Some(Direction::Incoming); @@ -375,8 +376,9 @@ impl WebRTCConnectionSetupLibc { log::warn!("Got message for deprecated on_open"); return Box::pin(async {}); } + + log::trace!("DataChannel is opened"); Box::pin(async move { - log::trace!("DataChannel opened"); broker_cl .emit_msg(WebRTCMessage::Output(WebRTCOutput::Connected)) .err() @@ -401,6 +403,11 @@ impl WebRTCConnectionSetupLibc { .map(|e| log::warn!("Text queued but not processed: {:?}", e)); }) })); + if let Some(dc) = rtc_data.lock().await.take() { + if let Err(e) = dc.close().await { + log::warn!("While closing datachannel: {e:?}"); + } + } rtc_data.lock().await.replace(data_channel); } @@ -420,6 +427,11 @@ impl WebRTCConnectionSetupLibc { self.get_state().await?, )))); } + WebRTCInput::Disconnect => { + if let Err(e) = self.reset().await { + log::warn!("While closing old connection: {e:?}"); + } + } WebRTCInput::Reset => { if self.direction.is_some() { self.reset().await?; @@ -430,13 +442,34 @@ impl WebRTCConnectionSetupLibc { } async fn reset(&mut self) -> Result<(), SetupError> { + if self.direction.is_none() { + return Ok(()); + } + self.direction = None; + + // Replacing all listeners with empty listeners if let Some(mut rd) = self.rtc_data.try_lock() { + if let Some(ref mut dc) = rd.as_ref() { + dc.on_message(Box::new(|_: DataChannelMessage| Box::pin(async {}))); + dc.on_open(Box::new(|| Box::pin(async {}))); + } *rd = None; } + self.connection + .on_data_channel(Box::new(|_: Arc| Box::pin(async {}))); + self.connection + .on_peer_connection_state_change(Box::new(|_: RTCPeerConnectionState| { + Box::pin(async {}) + })); + self.connection + .on_ice_candidate(Box::new(|_: Option| Box::pin(async {}))); + + if let Err(e) = self.connection.close().await { + log::warn!("While closing old connection: {e:?}"); + } self.connection = Self::make_connection(self.connection_cfg.clone()).await?; self.setup_connection().await?; - self.direction = None; Ok(()) } } @@ -447,11 +480,11 @@ impl SubsystemHandler for WebRTCConnectionSetupLibc { let mut out = vec![]; for msg in msgs { if let WebRTCMessage::Input(msg_in) = msg { - match self.msg_in(msg_in).await { + match self.msg_in(msg_in.clone()).await { Ok(Some(msg)) => out.push(msg), Ok(None) => {} Err(e) => { - log::warn!("{:p} Error processing message: {:?}", self, e); + log::trace!("{:p} Error processing message {msg_in:?}: {:?}", self, e); } } } diff --git a/flarch/src/web_rtc/messages.rs b/flarch/src/web_rtc/messages.rs index 39593dd6..46b3ccf5 100644 --- a/flarch/src/web_rtc/messages.rs +++ b/flarch/src/web_rtc/messages.rs @@ -76,6 +76,8 @@ pub enum WebRTCInput { UpdateState, /// Try to reconnect, or throw an error if incoming connection Reset, + /// Disconnect this node + Disconnect, } #[cfg(target_family="unix")] diff --git a/flarch/src/web_rtc/mod.rs b/flarch/src/web_rtc/mod.rs index ff252444..40b3c5d6 100644 --- a/flarch/src/web_rtc/mod.rs +++ b/flarch/src/web_rtc/mod.rs @@ -5,13 +5,13 @@ //! a connection, it doesn't describe how the nodes need to communicate to //! set up the connection. //! The messages for setting up the connection are described in the [`crate::signal::SignalServer`]. -//! +//! //! # Bidirectional connections -//! +//! //! Every WebRTC connection has one or two connections inside: //! - _outgoing_ is the connection initiated by this node //! - _incoming_ is the connection initiated by the other node -//! +//! //! This is necessary, as it is always possible that two nodes want to start //! connecting to each other concurrently. @@ -19,40 +19,45 @@ use std::collections::HashMap; use flarch_macro::platform_async_trait; -use crate::{broker::{Broker, BrokerError, Subsystem, SubsystemHandler, Translate}, nodeids::NodeID}; +use crate::{ + broker::{Broker, BrokerError, Subsystem, SubsystemHandler, Translate}, + nodeids::NodeID, +}; use self::{ messages::WebRTCSpawner, node_connection::{NCError, NCInput, NCMessage, NCOutput, NodeConnection}, }; -pub mod messages; pub mod connection; +pub mod messages; pub mod node_connection; pub mod websocket; -#[cfg(target_family="windows")] +#[cfg(target_family = "windows")] compile_error!("flarch is not available for windows"); -#[cfg(target_family="wasm")] +#[cfg(target_family = "wasm")] mod wasm; -#[cfg(target_family="wasm")] +#[cfg(target_family = "wasm")] pub use wasm::*; -#[cfg(target_family="unix")] +#[cfg(target_family = "unix")] mod libc; -#[cfg(target_family="unix")] +#[cfg(target_family = "unix")] pub use libc::*; #[derive(Debug, Clone, PartialEq)] /// All messages for the [`WebRTCConn`] broker. pub enum WebRTCConnMessage { /// Messages going to the WebRTC interface - InputNC((NodeID, NCInput)), + InputNC(NodeID, NCInput), /// Messages coming from the WebRTC interface - OutputNC((NodeID, NCOutput)), + OutputNC(NodeID, NCOutput), /// Connection request Connect(NodeID), + /// Disconnect this node + Disconnect(NodeID), } /// The actual implementation of the WebRTC connection setup. @@ -63,7 +68,7 @@ pub struct WebRTCConn { } impl WebRTCConn { - /// Creates a new [`Broker`] that will accept incoming connections and set up + /// Creates a new [`Broker`] that will accept incoming connections and set up /// new outgoing connections. pub async fn new(web_rtc: WebRTCSpawner) -> Result, BrokerError> { let mut br = Broker::new(); @@ -88,10 +93,20 @@ impl WebRTCConn { Ok(()) } + fn try_send(&mut self, dst: NodeID, msg: NCInput) { + if let Some(conn) = self.connections.get_mut(&dst) { + conn.emit_msg(NCMessage::Input(msg.clone())) + .err() + .map(|e| log::error!("When sending message {msg:?} to webrtc: {e:?}")); + } else { + log::warn!("Dropping message {:?} to unconnected node {}", msg, dst); + } + } + fn from_nc(id: NodeID) -> Translate { Box::new(move |msg| { if let NCMessage::Output(ncmsg) = msg { - return Some(WebRTCConnMessage::OutputNC((id, ncmsg))); + return Some(WebRTCConnMessage::OutputNC(id, ncmsg)); } None }) @@ -103,22 +118,24 @@ impl SubsystemHandler for WebRTCConn { async fn messages(&mut self, msgs: Vec) -> Vec { for msg in msgs { match msg { - WebRTCConnMessage::InputNC((dst, msg_in)) => { - if let Some(conn) = self.connections.get_mut(&dst) { - conn.emit_msg(NCMessage::Input(msg_in.clone())) - .err() - .map(|e| { - log::error!("When sending message {msg_in:?} to webrtc: {e:?}") - }); - } else { - log::warn!("Dropping message {:?} to unconnected node {}", msg_in, dst); - } + WebRTCConnMessage::InputNC(dst, msg_in) => { + self.try_send(dst, msg_in); + } + WebRTCConnMessage::Disconnect(dst) => { + self.try_send(dst, NCInput::Disconnect); } WebRTCConnMessage::Connect(dst) => { self.ensure_connection(&dst) .await .err() .map(|e| log::error!("When starting webrtc-connection {e:?}")); + self.try_send( + dst, + NCInput::Setup( + node_connection::Direction::Outgoing, + messages::PeerMessage::Init, + ), + ); } _ => {} }; diff --git a/flarch/src/web_rtc/node_connection.rs b/flarch/src/web_rtc/node_connection.rs index 37c9ccb0..6f9b538a 100644 --- a/flarch/src/web_rtc/node_connection.rs +++ b/flarch/src/web_rtc/node_connection.rs @@ -56,9 +56,9 @@ pub enum NCOutput { /// Received a text from any connection Text(String), /// Return a changed state from one of the connections - State((Direction, ConnectionStateMap)), + State(Direction, ConnectionStateMap), /// Setup message for the connection in the given direction - Setup((Direction, PeerMessage)), + Setup(Direction, PeerMessage), } #[derive(Debug, Clone, PartialEq)] @@ -72,7 +72,7 @@ pub enum NCInput { GetStates, /// Treat the [`PeerMessage`] to setup a new connection with the /// given direction - Setup((Direction, PeerMessage)), + Setup(Direction, PeerMessage), } #[derive(Debug, Clone, PartialEq)] @@ -191,34 +191,36 @@ impl NodeConnection { out } NCInput::Disconnect => vec![ - NCMessage::Incoming(WebRTCMessage::Input(WebRTCInput::Reset)), - NCMessage::Outgoing(WebRTCMessage::Input(WebRTCInput::Reset)), + NCMessage::Incoming(WebRTCMessage::Input(WebRTCInput::Disconnect)), + NCMessage::Outgoing(WebRTCMessage::Input(WebRTCInput::Disconnect)), ], NCInput::GetStates => { let mut out = vec![]; if let Some(state) = self.state_incoming { - out.push(NCMessage::Output(NCOutput::State(( + out.push(NCMessage::Output(NCOutput::State( Direction::Incoming, state.clone(), - )))); + ))); } if let Some(state) = self.state_outgoing { - out.push(NCMessage::Output(NCOutput::State(( + out.push(NCMessage::Output(NCOutput::State( Direction::Outgoing, state.clone(), - )))); + ))); } out } - NCInput::Setup((dir, pm)) => match dir { - Direction::Incoming => vec![NCMessage::Incoming(WebRTCMessage::Input( - WebRTCInput::Setup(pm), - ))], - Direction::Outgoing => vec![NCMessage::Outgoing(WebRTCMessage::Input( - WebRTCInput::Setup(pm), - ))], - }, + NCInput::Setup(dir, pm) => { + match dir { + Direction::Incoming => vec![NCMessage::Incoming(WebRTCMessage::Input( + WebRTCInput::Setup(pm), + ))], + Direction::Outgoing => vec![NCMessage::Outgoing(WebRTCMessage::Input( + WebRTCInput::Setup(pm), + ))], + } + } } } @@ -238,7 +240,7 @@ impl NodeConnection { out.extend(self.send_queue()); out } - WebRTCOutput::Setup(pm) => vec![NCMessage::Output(NCOutput::Setup((dir, pm)))], + WebRTCOutput::Setup(pm) => vec![NCMessage::Output(NCOutput::Setup(dir, pm))], WebRTCOutput::Text(msg_str) => { vec![NCMessage::Output(NCOutput::Text(msg_str))] } @@ -247,7 +249,7 @@ impl NodeConnection { Direction::Incoming => self.state_incoming = Some(state), Direction::Outgoing => self.state_outgoing = Some(state), } - vec![NCMessage::Output(NCOutput::State((dir, state)))] + vec![NCMessage::Output(NCOutput::State(dir, state))] } WebRTCOutput::Disconnected | WebRTCOutput::Error(_) => { let msg = match dir { diff --git a/flarch/src/web_rtc/wasm/web_rtc_setup.rs b/flarch/src/web_rtc/wasm/web_rtc_setup.rs index 483c9b9a..eb1e48e7 100644 --- a/flarch/src/web_rtc/wasm/web_rtc_setup.rs +++ b/flarch/src/web_rtc/wasm/web_rtc_setup.rs @@ -1,7 +1,6 @@ use async_trait::async_trait; use futures::lock::Mutex; use js_sys::Reflect; -use log::{error, warn}; use serde::{Deserialize, Serialize}; use std::sync::Arc; use wasm_bindgen::{prelude::*, JsCast}; @@ -13,14 +12,16 @@ use web_sys::{ RtcSessionDescriptionInit, RtcSignalingState, }; +use crate::broker::{Broker, Subsystem, SubsystemHandler}; use crate::web_rtc::{ - connection::{ConnectionConfig, HostLogin}, messages::{ + connection::{ConnectionConfig, HostLogin}, + messages::{ ConnType, ConnectionStateMap, DataChannelState, IceConnectionState, IceGatheringState, PeerMessage, SetupError, SignalingState, WebRTCInput, WebRTCMessage, WebRTCOutput, WebRTCSpawner, - }, node_connection::Direction + }, + node_connection::Direction, }; -use crate::broker::{Broker, Subsystem, SubsystemHandler}; pub struct WebRTCConnectionSetup { pub rp_conn: RtcPeerConnection, @@ -82,6 +83,11 @@ impl WebRTCConnectionSetup { } pub fn reset(&mut self) -> Result<(), SetupError> { + if self.direction.is_none() { + return Ok(()); + } + self.direction = None; + let empty_callback = Closure::wrap(Box::new(move |_: MessageEvent| { log::warn!("Got callback after reset"); }) as Box); @@ -99,15 +105,18 @@ impl WebRTCConnectionSetup { empty_callback.forget(); - self.rp_conn.close(); + self.close(); self.rp_conn = Self::create_rp_conn(self.config.clone())?; WebRTCConnectionSetup::ice_start(&self.rp_conn, self.broker.clone()); - self.direction = None; + Ok(()) + } + + fn close(&mut self) { + self.rp_conn.close(); if let Some(mut rd) = self.rtc_data.try_lock() { rd.as_ref().map(|r| r.close()); *rd = None; } - Ok(()) } pub fn ice_start(rp_conn: &RtcPeerConnection, broker: Broker) { @@ -156,7 +165,6 @@ impl WebRTCConnectionSetup { // Returns the offer string that needs to be sent to the `Follower` node. pub async fn make_offer(&mut self) -> Result { if self.direction.is_some() { - log::warn!("Resetting with offer in already opened connection"); self.reset()?; }; self.direction = Some(Direction::Outgoing); @@ -185,7 +193,6 @@ impl WebRTCConnectionSetup { // Takes the offer string pub async fn make_answer(&mut self, offer: String) -> Result { if self.direction.is_some() { - log::warn!("Resetting with offer in already opened connection"); self.reset()?; }; self.direction = Some(Direction::Incoming); @@ -197,17 +204,16 @@ impl WebRTCConnectionSetup { let srd_promise = self.rp_conn.set_remote_description(&offer_obj); JsFuture::from(srd_promise) .await - .map_err(|e| SetupError::SetupFail(e.as_string().unwrap()))?; + .map_err(|e| SetupError::SetupFail(format!("{e:?}")))?; let answer = match JsFuture::from(self.rp_conn.create_answer()).await { Ok(f) => f, Err(e) => { - error!("Error answer: {:?}", e); - return Err(SetupError::SetupFail(e.as_string().unwrap())); + return Err(SetupError::SetupFail(format!("{e:?}"))); } }; let answer_sdp = Reflect::get(&answer, &JsValue::from_str("sdp")) - .map_err(|e| SetupError::SetupFail(e.as_string().unwrap()))? + .map_err(|e| SetupError::SetupFail(format!("{e:?}")))? .as_string() .unwrap(); @@ -216,7 +222,7 @@ impl WebRTCConnectionSetup { let sld_promise = self.rp_conn.set_local_description(&answer_obj); JsFuture::from(sld_promise) .await - .map_err(|e| SetupError::SetupFail(e.as_string().unwrap()))?; + .map_err(|e| SetupError::SetupFail(format!("{e:?}")))?; Ok(answer_sdp) } @@ -255,7 +261,7 @@ impl WebRTCConnectionSetup { ) .await { - warn!("Couldn't add ice candidate: {:?}", err); + log::warn!("Couldn't add ice candidate: {:?}", err); } Ok(()) } @@ -297,6 +303,7 @@ impl WebRTCConnectionSetup { ) { let dc_clone = dc.clone(); let ondatachannel_open = Closure::wrap(Box::new(move |_ev: Event| { + log::trace!("DataChannel is opened"); let mut broker_clone = broker.clone(); let rtc_data = Arc::clone(&rtc_data); let dc_clone2 = dc_clone.clone(); @@ -324,20 +331,17 @@ impl WebRTCConnectionSetup { onmessage_callback.forget(); let broker_cl = broker.clone(); - let onerror_callback = Closure::wrap(Box::new(move |ev: MessageEvent| { + let onclose_callback = Closure::wrap(Box::new(move |_: MessageEvent| { let mut broker = broker_cl.clone(); wasm_bindgen_futures::spawn_local(async move { broker - .emit_msg(WebRTCMessage::Output(WebRTCOutput::Error(format!( - "{:?}", - ev - )))) + .emit_msg(WebRTCMessage::Output(WebRTCOutput::Disconnected)) .err() .map(|e| log::error!("While sending message: {:?}", e)); }); }) as Box); - dc_clone.set_onclose(Some(onerror_callback.as_ref().unchecked_ref())); - onerror_callback.forget(); + dc_clone.set_onclose(Some(onclose_callback.as_ref().unchecked_ref())); + onclose_callback.forget(); }) as Box); dc.set_onopen(Some(ondatachannel_open.as_ref().unchecked_ref())); ondatachannel_open.forget(); @@ -479,6 +483,7 @@ impl WebRTCConnection { self.setup.get_state().await?, )))); } + WebRTCInput::Disconnect => self.setup.reset()?, WebRTCInput::Reset => self.setup.reset()?, } Ok(None) @@ -491,11 +496,11 @@ impl SubsystemHandler for WebRTCConnection { let mut out = vec![]; for msg in msgs { if let WebRTCMessage::Input(msg_in) = msg { - match self.msg_in(msg_in).await { + match self.msg_in(msg_in.clone()).await { Ok(Some(msg)) => out.push(msg), Ok(None) => {} Err(e) => { - log::warn!("Error processing message: {:?}", e); + log::trace!("{:p} Error processing message {msg_in:?}: {:?}", self, e); } } } diff --git a/flmodules/src/gossip_events/broker.rs b/flmodules/src/gossip_events/broker.rs index 73eb7cee..63140a78 100644 --- a/flmodules/src/gossip_events/broker.rs +++ b/flmodules/src/gossip_events/broker.rs @@ -117,7 +117,7 @@ impl Translate { if let RandomMessage::Output(msg_out) = msg { match msg_out { RandomOut::ListUpdate(list) => Some(GossipIn::NodeList(list.into()).into()), - RandomOut::NodeMessageFromNetwork((id, msg)) => { + RandomOut::NodeMessageFromNetwork(id, msg) => { if msg.module == MODULE_NAME { serde_yaml::from_str::(&msg.msg) .ok() @@ -136,13 +136,13 @@ impl Translate { fn link_gossip_rnd(msg: GossipMessage) -> Option { if let GossipMessage::Output(GossipOut::Node(id, msg_node)) = msg { Some( - RandomIn::NodeMessageToNetwork(( + RandomIn::NodeMessageToNetwork( id, ModuleMessage { module: MODULE_NAME.into(), msg: serde_yaml::to_string(&msg_node).unwrap(), }, - )) + ) .into(), ) } else { @@ -222,13 +222,13 @@ mod tests { let msg = MessageNode::Events(vec![event.clone()]); broker_rnd .settle_msg( - RandomOut::NodeMessageFromNetwork(( + RandomOut::NodeMessageFromNetwork( id2, ModuleMessage { module: MODULE_NAME.into(), msg: serde_yaml::to_string(&msg).unwrap(), }, - )) + ) .into(), ) .await?; @@ -242,7 +242,7 @@ mod tests { fn assert_msg_reid(tap: &Receiver, id2: &NodeID) -> Result<(), Box> { for msg in tap.try_iter() { - if let RandomMessage::Input(RandomIn::NodeMessageToNetwork((id, msg_mod))) = msg { + if let RandomMessage::Input(RandomIn::NodeMessageToNetwork(id, msg_mod)) = msg { assert_eq!(id2, &id); assert_eq!(MODULE_NAME.to_string(), msg_mod.module); let msg_yaml = serde_yaml::from_str(&msg_mod.msg)?; diff --git a/flmodules/src/network/network.rs b/flmodules/src/network/network.rs index 09a99bc7..f9e926f2 100644 --- a/flmodules/src/network/network.rs +++ b/flmodules/src/network/network.rs @@ -218,7 +218,7 @@ impl NetworkBroker { vec![] }, vec![NetworkMessage::from_nc( - NCInput::Setup((pi.get_direction(&own_id), pi.message)), + NCInput::Setup(pi.get_direction(&own_id), pi.message), remote_node, )], ]) @@ -239,10 +239,6 @@ impl NetworkBroker { Ok(concat(vec![ if !self.connections.contains(&id) { - log::warn!( - "Got message to unconnected node and connecting first to {}", - id - ); self.connect(&id) } else { vec![] @@ -271,7 +267,7 @@ impl NetworkBroker { NCOutput::Connected(_) => vec![NetReply::Connected(id).into()], NCOutput::Disconnected(_) => vec![NetReply::Disconnected(id).into()], NCOutput::Text(msg) => vec![NetReply::RcvNodeMessage(id, msg).into()], - NCOutput::State((dir, state)) => { + NCOutput::State(dir, state) => { vec![NetReply::ConnectionState(NetworkConnectionState { id, dir, @@ -286,7 +282,7 @@ impl NetworkBroker { }) .into()] } - NCOutput::Setup((dir, pm)) => { + NCOutput::Setup(dir, pm) => { let mut id_init = self.node_config.info.get_id(); let mut id_follow = id; if dir == Direction::Incoming { @@ -342,8 +338,8 @@ impl NetworkBroker { fn to_web_rtc(msg: NetworkMessage) -> Option { if let NetworkMessage::WebRTC(msg_webrtc) = msg { match msg_webrtc { - WebRTCConnMessage::InputNC(_) | WebRTCConnMessage::Connect(_) => Some(msg_webrtc), - WebRTCConnMessage::OutputNC(_) => None, + WebRTCConnMessage::OutputNC(_, _) => None, + _ => Some(msg_webrtc), } } else { None @@ -351,7 +347,7 @@ impl NetworkBroker { } fn from_web_rtc(msg: WebRTCConnMessage) -> Option { - matches!(msg, WebRTCConnMessage::OutputNC(_)).then(|| NetworkMessage::WebRTC(msg)) + matches!(msg, WebRTCConnMessage::OutputNC(_, _)).then(|| NetworkMessage::WebRTC(msg)) } } @@ -369,7 +365,7 @@ impl SubsystemHandler for NetworkBroker { NetworkMessage::WebSocket(WSClientMessage::Output(ws)) => { out.extend(self.msg_ws(ws).await) } - NetworkMessage::WebRTC(WebRTCConnMessage::OutputNC((id, msg))) => { + NetworkMessage::WebRTC(WebRTCConnMessage::OutputNC(id, msg)) => { out.extend(self.msg_node(id, msg).await) } _ => {} @@ -409,7 +405,7 @@ impl fmt::Display for NetworkMessage { impl NetworkMessage { /// Convert a [`NCInput`] to Self pub fn from_nc(input: NCInput, dst: NodeID) -> Self { - Self::WebRTC(WebRTCConnMessage::InputNC((dst, input))) + Self::WebRTC(WebRTCConnMessage::InputNC(dst, input)) } } diff --git a/flmodules/src/network/signal.rs b/flmodules/src/network/signal.rs index eb597e00..2951d870 100644 --- a/flmodules/src/network/signal.rs +++ b/flmodules/src/network/signal.rs @@ -166,7 +166,7 @@ impl SignalServer { fn msg_in(&mut self, msg_in: SignalInput) -> Vec { match msg_in { - // SignalInput::WebSocket((dst, msg)) => { + // SignalInput::WebSocket(dst, msg) => { // if let Some(index) = self.connection_ids.get_by_left(&dst) { // return self.send_msg_node(*index, msg.clone()); // } diff --git a/flmodules/src/ping/broker.rs b/flmodules/src/ping/broker.rs index b2a7a4dd..08f9db8d 100644 --- a/flmodules/src/ping/broker.rs +++ b/flmodules/src/ping/broker.rs @@ -1,6 +1,9 @@ use std::sync::mpsc::{channel, Receiver, Sender}; -use flarch::{broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, platform_async_trait}; +use flarch::{ + broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, + platform_async_trait, +}; use crate::{ random_connections::messages::{ModuleMessage, RandomIn, RandomMessage, RandomOut}, @@ -85,12 +88,13 @@ impl Translate { fn link_rnd_ping(msg: RandomMessage) -> Option { if let RandomMessage::Output(msg_out) = msg { match msg_out { + RandomOut::DisconnectNode(id) => Some(PingIn::DisconnectNode(id).into()), RandomOut::ListUpdate(list) => Some(PingIn::NodeList(list.into()).into()), - RandomOut::NodeMessageFromNetwork((id, msg)) => { + RandomOut::NodeMessageFromNetwork(id, msg) => { if msg.module == MODULE_NAME { serde_yaml::from_str::(&msg.msg) .ok() - .map(|msg_node| PingIn::Message((id, msg_node)).into()) + .map(|msg_node| PingIn::Message(id, msg_node).into()) } else { None } @@ -105,18 +109,18 @@ impl Translate { fn link_ping_rnd(msg: PingMessage) -> Option { if let PingMessage::Output(msg_out) = msg { match msg_out { - PingOut::Message((id, msg_node)) => Some( - RandomIn::NodeMessageToNetwork(( + PingOut::Message(id, msg_node) => Some( + RandomIn::NodeMessageToNetwork( id, ModuleMessage { module: MODULE_NAME.into(), msg: serde_yaml::to_string(&msg_node).unwrap(), }, - )) + ) .into(), ), PingOut::Failed(id) => Some(RandomIn::NodeFailure(id).into()), - _ => None + _ => None, } } else { None @@ -148,8 +152,6 @@ impl SubsystemHandler for Translate { self.handle_output(msg); } - out.into_iter() - .map(|o| o.into()) - .collect() + out.into_iter().map(|o| o.into()).collect() } } diff --git a/flmodules/src/ping/messages.rs b/flmodules/src/ping/messages.rs index 7626cef2..b374c823 100644 --- a/flmodules/src/ping/messages.rs +++ b/flmodules/src/ping/messages.rs @@ -18,13 +18,14 @@ pub enum PingMessage { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum PingIn { Tick, - Message((NodeID, MessageNode)), + Message(NodeID, MessageNode), NodeList(NodeIDs), + DisconnectNode(NodeID), } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum PingOut { - Message((NodeID, MessageNode)), + Message(NodeID, MessageNode), Storage(PingStorage), Failed(NodeID), } @@ -51,8 +52,12 @@ impl Ping { pub fn process_msg(&mut self, msg: PingIn) -> Vec { match msg { PingIn::Tick => self.tick(), - PingIn::Message((id, msg_node)) => self.message(id, msg_node), + PingIn::Message(id, msg_node) => self.message(id, msg_node), PingIn::NodeList(ids) => self.new_nodes(ids), + PingIn::DisconnectNode(id) => { + self.storage.remove_node(&id); + vec![] + } } } @@ -67,7 +72,7 @@ impl Ping { pub fn message(&mut self, id: NodeID, msg: MessageNode) -> Vec { match msg { MessageNode::Ping => { - vec![PingOut::Message((id, MessageNode::Pong))] + vec![PingOut::Message(id, MessageNode::Pong)] } MessageNode::Pong => { self.storage.pong(id); @@ -86,7 +91,7 @@ impl Ping { fn create_messages(&mut self) -> Vec { let mut out = vec![]; for id in self.storage.ping.drain(..) { - out.push(PingOut::Message((id, MessageNode::Ping)).into()); + out.push(PingOut::Message(id, MessageNode::Ping).into()); } for id in self.storage.failed.drain(..) { out.push(PingOut::Failed(id).into()); diff --git a/flmodules/src/ping/storage.rs b/flmodules/src/ping/storage.rs index 8458bc8d..7c5132e4 100644 --- a/flmodules/src/ping/storage.rs +++ b/flmodules/src/ping/storage.rs @@ -34,17 +34,19 @@ impl PingStorage { if self.stats.contains_key(&id) { return; } - self.stats - .insert(id, PingStat{ + self.stats.insert( + id, + PingStat { lastping: 0, rx: 0, tx: 1, - }); + }, + ); self.ping.push(id); } pub fn pong(&mut self, id: NodeID) { - if let Some(mut stat) = self.stats.remove(&id){ + if let Some(mut stat) = self.stats.remove(&id) { stat.lastping = 0; stat.rx += 1; self.stats.insert(id, stat); @@ -59,6 +61,11 @@ impl PingStorage { self.tick_countdown(); } + pub fn remove_node(&mut self, id: &NodeID) { + self.stats.remove(id); + self.failed.push(*id); + } + fn tick_countdown(&mut self) { let mut failed = vec![]; for (id, stat) in self.stats.iter_mut() { @@ -71,8 +78,7 @@ impl PingStorage { } } for id in failed { - self.stats.remove(&id); - self.failed.push(id); + self.remove_node(&id); } } } diff --git a/flmodules/src/random_connections/broker.rs b/flmodules/src/random_connections/broker.rs index faab1dc8..c990a961 100644 --- a/flmodules/src/random_connections/broker.rs +++ b/flmodules/src/random_connections/broker.rs @@ -84,7 +84,7 @@ impl Translate { match msg_net { NetReply::RcvNodeMessage(id, msg_str) => { if let Ok(msg_rnd) = serde_yaml::from_str::(&msg_str) { - return Some(RandomIn::NodeMessageFromNetwork((id, msg_rnd)).into()); + return Some(RandomIn::NodeMessageFromNetwork(id, msg_rnd).into()); } } NetReply::RcvWSUpdateList(list) => { @@ -113,7 +113,7 @@ impl Translate { match msg_out { RandomOut::ConnectNode(id) => return Some(NetCall::Connect(id).into()), RandomOut::DisconnectNode(id) => return Some(NetCall::Disconnect(id).into()), - RandomOut::NodeMessageToNetwork((id, msg)) => { + RandomOut::NodeMessageToNetwork(id, msg) => { let msg_str = serde_yaml::to_string(&msg).unwrap(); return Some(NetCall::SendNodeMessage(id, msg_str).into()); } diff --git a/flmodules/src/random_connections/messages.rs b/flmodules/src/random_connections/messages.rs index bc7ca523..2a2cedc0 100644 --- a/flmodules/src/random_connections/messages.rs +++ b/flmodules/src/random_connections/messages.rs @@ -31,8 +31,8 @@ pub enum RandomIn { NodeFailure(NodeID), NodeConnected(NodeID), NodeDisconnected(NodeID), - NodeMessageFromNetwork((NodeID, NodeMessage)), - NodeMessageToNetwork((NodeID, ModuleMessage)), + NodeMessageFromNetwork(NodeID, NodeMessage), + NodeMessageToNetwork(NodeID, ModuleMessage), Tick, } @@ -42,8 +42,8 @@ pub enum RandomOut { DisconnectNode(NodeID), ListUpdate(NodeIDs), NodeInfoConnected(Vec), - NodeMessageToNetwork((NodeID, NodeMessage)), - NodeMessageFromNetwork((NodeID, ModuleMessage)), + NodeMessageToNetwork(NodeID, NodeMessage), + NodeMessageFromNetwork(NodeID, ModuleMessage), Storage(RandomStorage), } @@ -100,16 +100,16 @@ impl RandomConnections { self.update(), ]) } - RandomIn::NodeMessageFromNetwork((id, node_msg)) => self.network_msg(id, node_msg), - RandomIn::NodeMessageToNetwork((dst, msg)) => { + RandomIn::NodeMessageFromNetwork(id, node_msg) => self.network_msg(id, node_msg), + RandomIn::NodeMessageToNetwork(dst, msg) => { if self.storage.connected.contains(&dst) { - vec![RandomOut::NodeMessageToNetwork(( + vec![RandomOut::NodeMessageToNetwork( dst, NodeMessage::Module(msg), - ))] + )] } else { log::warn!( - "{self:p} Dropping message to unconnected node {dst} - trying to connect" + "{self:p} Dropping message to unconnected node {dst} - making sure we're disconnected" ); vec![ RandomOut::DisconnectNode(dst), @@ -124,7 +124,7 @@ impl RandomConnections { /// Processes one message from the network. pub fn network_msg(&mut self, id: U256, msg: NodeMessage) -> Vec { match msg { - NodeMessage::Module(msg_mod) => vec![RandomOut::NodeMessageFromNetwork((id, msg_mod))], + NodeMessage::Module(msg_mod) => vec![RandomOut::NodeMessageFromNetwork(id, msg_mod)], NodeMessage::DropConnection => { self.storage.disconnect((&vec![id]).into()); concat([vec![RandomOut::DisconnectNode(id)], self.new_connection()]) @@ -157,7 +157,7 @@ impl RandomConnections { .into_iter() .flat_map(|n| { vec![ - RandomOut::NodeMessageToNetwork((n, NodeMessage::DropConnection)), + RandomOut::NodeMessageToNetwork(n, NodeMessage::DropConnection), RandomOut::DisconnectNode(n), ] }) diff --git a/flmodules/src/template/broker.rs b/flmodules/src/template/broker.rs index ec7fab5e..7d6070e6 100644 --- a/flmodules/src/template/broker.rs +++ b/flmodules/src/template/broker.rs @@ -101,7 +101,7 @@ impl Translate { if let RandomMessage::Output(msg_out) = msg { match msg_out { RandomOut::ListUpdate(list) => Some(TemplateIn::UpdateNodeList(list.into()).into()), - RandomOut::NodeMessageFromNetwork((id, msg)) => { + RandomOut::NodeMessageFromNetwork(id, msg) => { if msg.module == MODULE_NAME { serde_yaml::from_str::(&msg.msg) .ok() @@ -120,13 +120,13 @@ impl Translate { fn link_template_rnd(msg: TemplateMessage) -> Option { if let TemplateMessage::Output(TemplateOut::Node(id, msg_node)) = msg { Some( - RandomIn::NodeMessageToNetwork(( + RandomIn::NodeMessageToNetwork( id, ModuleMessage { module: MODULE_NAME.into(), msg: serde_yaml::to_string(&msg_node).unwrap(), }, - )) + ) .into(), ) } else { diff --git a/flmodules/src/web_proxy/broker.rs b/flmodules/src/web_proxy/broker.rs index 54585738..d784a786 100644 --- a/flmodules/src/web_proxy/broker.rs +++ b/flmodules/src/web_proxy/broker.rs @@ -134,7 +134,7 @@ impl Translate { RandomOut::NodeInfoConnected(list) => { Some(WebProxyIn::NodeInfoConnected(list).into()) } - RandomOut::NodeMessageFromNetwork((id, msg)) => { + RandomOut::NodeMessageFromNetwork(id, msg) => { if msg.module == MODULE_NAME { serde_yaml::from_str::(&msg.msg) .ok() @@ -153,13 +153,13 @@ impl Translate { fn link_proxy_rnd(msg: WebProxyMessage) -> Option { if let WebProxyMessage::Output(WebProxyOut::Node(id, msg_node)) = msg { Some( - RandomIn::NodeMessageToNetwork(( + RandomIn::NodeMessageToNetwork( id, ModuleMessage { module: MODULE_NAME.into(), msg: serde_yaml::to_string(&msg_node).unwrap(), }, - )) + ) .into(), ) } else { @@ -232,25 +232,25 @@ mod tests { return Ok(()); } - if let Ok(RandomMessage::Input(RandomIn::NodeMessageToNetwork((dst, msg)))) = + if let Ok(RandomMessage::Input(RandomIn::NodeMessageToNetwork(dst, msg))) = cl_tap.try_recv() { log::debug!("Sending to WP: {msg:?}"); wp_rnd - .emit_msg(RandomMessage::Output(RandomOut::NodeMessageFromNetwork(( + .emit_msg(RandomMessage::Output(RandomOut::NodeMessageFromNetwork( dst, msg, - )))) + ))) .expect("sending to wp"); } - if let Ok(RandomMessage::Input(RandomIn::NodeMessageToNetwork((dst, msg)))) = + if let Ok(RandomMessage::Input(RandomIn::NodeMessageToNetwork(dst, msg))) = wp_tap.try_recv() { log::debug!("Sending to CL: {msg:?}"); cl_rnd - .emit_msg(RandomMessage::Output(RandomOut::NodeMessageFromNetwork(( + .emit_msg(RandomMessage::Output(RandomOut::NodeMessageFromNetwork( dst, msg, - )))) + ))) .expect("sending to wp"); } diff --git a/flnode/tests/helpers/mod.rs b/flnode/tests/helpers/mod.rs index a82ef2b3..829cc595 100644 --- a/flnode/tests/helpers/mod.rs +++ b/flnode/tests/helpers/mod.rs @@ -138,16 +138,16 @@ impl NetworkSimul { from_id, NetReply::RcvNodeMessage(id.clone(), msg_str).into(), )], - NetworkMessage::WebRTC(WebRTCConnMessage::InputNC(( + NetworkMessage::WebRTC(WebRTCConnMessage::InputNC( id_dst, NCInput::Text(msg_node), - ))) => { + )) => { vec![( id_dst.clone(), - NetworkMessage::WebRTC(WebRTCConnMessage::InputNC(( + NetworkMessage::WebRTC(WebRTCConnMessage::InputNC( *id, NCInput::Text(msg_node.clone()).into(), - ))) + )) .into(), )] }