Skip to content

Commit

Permalink
Merge pull request #122 from ineiti/debug_reconnections
Browse files Browse the repository at this point in the history
Better handling of reconnection of the webrtc libc part
  • Loading branch information
ineiti authored Sep 18, 2024
2 parents 25c0802 + 631395d commit b14b73b
Show file tree
Hide file tree
Showing 18 changed files with 220 additions and 151 deletions.
12 changes: 6 additions & 6 deletions flarch/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions flarch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ pub fn start_logging_filter_level(filters: Vec<&str>, level: log::LevelFilter) {
}

pub use flarch_macro::platform_async_trait;
pub use rand::random;
51 changes: 42 additions & 9 deletions flarch/src/web_rtc/libc/web_rtc_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::sync::{
Arc,
};

use async_trait::async_trait;
use crate::broker::{Broker, Subsystem, SubsystemHandler};
use async_trait::async_trait;
use futures::lock::Mutex;
use webrtc::{
api::{
Expand All @@ -27,10 +27,12 @@ use webrtc::{
};

use crate::web_rtc::{
connection::{ConnectionConfig, HostLogin}, messages::{
connection::{ConnectionConfig, HostLogin},
messages::{
ConnType, ConnectionStateMap, DataChannelState, PeerMessage, SetupError, SignalingState,
WebRTCInput, WebRTCMessage, WebRTCOutput, WebRTCSpawner,
}, node_connection::Direction
},
node_connection::Direction,
};

fn get_ice_server(host: HostLogin) -> RTCIceServer {
Expand Down Expand Up @@ -184,7 +186,6 @@ impl WebRTCConnectionSetupLibc {
/// Returns the offer string that needs to be sent to the `Follower` node.
async fn make_offer(&mut self) -> Result<String, SetupError> {
if self.direction.is_some() {
log::warn!("Resetting connection because of new offer");
self.reset().await?;
}
self.direction = Some(Direction::Outgoing);
Expand All @@ -206,13 +207,13 @@ impl WebRTCConnectionSetupLibc {
.set_local_description(offer.clone())
.await
.map_err(to_error)?;

Ok(offer.sdp)
}

/// Takes the offer string
async fn make_answer(&mut self, offer: String) -> Result<String, SetupError> {
if self.direction.is_some() {
log::warn!("Resetting connection because of new answer");
self.reset().await?;
}
self.direction = Some(Direction::Incoming);
Expand Down Expand Up @@ -375,8 +376,9 @@ impl WebRTCConnectionSetupLibc {
log::warn!("Got message for deprecated on_open");
return Box::pin(async {});
}

log::trace!("DataChannel is opened");
Box::pin(async move {
log::trace!("DataChannel opened");
broker_cl
.emit_msg(WebRTCMessage::Output(WebRTCOutput::Connected))
.err()
Expand All @@ -401,6 +403,11 @@ impl WebRTCConnectionSetupLibc {
.map(|e| log::warn!("Text queued but not processed: {:?}", e));
})
}));
if let Some(dc) = rtc_data.lock().await.take() {
if let Err(e) = dc.close().await {
log::warn!("While closing datachannel: {e:?}");
}
}
rtc_data.lock().await.replace(data_channel);
}

Expand All @@ -420,6 +427,11 @@ impl WebRTCConnectionSetupLibc {
self.get_state().await?,
))));
}
WebRTCInput::Disconnect => {
if let Err(e) = self.reset().await {
log::warn!("While closing old connection: {e:?}");
}
}
WebRTCInput::Reset => {
if self.direction.is_some() {
self.reset().await?;
Expand All @@ -430,13 +442,34 @@ impl WebRTCConnectionSetupLibc {
}

async fn reset(&mut self) -> Result<(), SetupError> {
if self.direction.is_none() {
return Ok(());
}
self.direction = None;

// Replacing all listeners with empty listeners
if let Some(mut rd) = self.rtc_data.try_lock() {
if let Some(ref mut dc) = rd.as_ref() {
dc.on_message(Box::new(|_: DataChannelMessage| Box::pin(async {})));
dc.on_open(Box::new(|| Box::pin(async {})));
}
*rd = None;
}
self.connection
.on_data_channel(Box::new(|_: Arc<RTCDataChannel>| Box::pin(async {})));
self.connection
.on_peer_connection_state_change(Box::new(|_: RTCPeerConnectionState| {
Box::pin(async {})
}));
self.connection
.on_ice_candidate(Box::new(|_: Option<RTCIceCandidate>| Box::pin(async {})));

if let Err(e) = self.connection.close().await {
log::warn!("While closing old connection: {e:?}");
}

self.connection = Self::make_connection(self.connection_cfg.clone()).await?;
self.setup_connection().await?;
self.direction = None;
Ok(())
}
}
Expand All @@ -447,11 +480,11 @@ impl SubsystemHandler<WebRTCMessage> for WebRTCConnectionSetupLibc {
let mut out = vec![];
for msg in msgs {
if let WebRTCMessage::Input(msg_in) = msg {
match self.msg_in(msg_in).await {
match self.msg_in(msg_in.clone()).await {
Ok(Some(msg)) => out.push(msg),
Ok(None) => {}
Err(e) => {
log::warn!("{:p} Error processing message: {:?}", self, e);
log::trace!("{:p} Error processing message {msg_in:?}: {:?}", self, e);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions flarch/src/web_rtc/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub enum WebRTCInput {
UpdateState,
/// Try to reconnect, or throw an error if incoming connection
Reset,
/// Disconnect this node
Disconnect,
}

#[cfg(target_family="unix")]
Expand Down
65 changes: 41 additions & 24 deletions flarch/src/web_rtc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,59 @@
//! a connection, it doesn't describe how the nodes need to communicate to
//! set up the connection.
//! The messages for setting up the connection are described in the [`crate::signal::SignalServer`].
//!
//!
//! # Bidirectional connections
//!
//!
//! Every WebRTC connection has one or two connections inside:
//! - _outgoing_ is the connection initiated by this node
//! - _incoming_ is the connection initiated by the other node
//!
//!
//! This is necessary, as it is always possible that two nodes want to start
//! connecting to each other concurrently.
use std::collections::HashMap;

use flarch_macro::platform_async_trait;

use crate::{broker::{Broker, BrokerError, Subsystem, SubsystemHandler, Translate}, nodeids::NodeID};
use crate::{
broker::{Broker, BrokerError, Subsystem, SubsystemHandler, Translate},
nodeids::NodeID,
};

use self::{
messages::WebRTCSpawner,
node_connection::{NCError, NCInput, NCMessage, NCOutput, NodeConnection},
};

pub mod messages;
pub mod connection;
pub mod messages;
pub mod node_connection;
pub mod websocket;

#[cfg(target_family="windows")]
#[cfg(target_family = "windows")]
compile_error!("flarch is not available for windows");

#[cfg(target_family="wasm")]
#[cfg(target_family = "wasm")]
mod wasm;
#[cfg(target_family="wasm")]
#[cfg(target_family = "wasm")]
pub use wasm::*;

#[cfg(target_family="unix")]
#[cfg(target_family = "unix")]
mod libc;
#[cfg(target_family="unix")]
#[cfg(target_family = "unix")]
pub use libc::*;

#[derive(Debug, Clone, PartialEq)]
/// All messages for the [`WebRTCConn`] broker.
pub enum WebRTCConnMessage {
/// Messages going to the WebRTC interface
InputNC((NodeID, NCInput)),
InputNC(NodeID, NCInput),
/// Messages coming from the WebRTC interface
OutputNC((NodeID, NCOutput)),
OutputNC(NodeID, NCOutput),
/// Connection request
Connect(NodeID),
/// Disconnect this node
Disconnect(NodeID),
}

/// The actual implementation of the WebRTC connection setup.
Expand All @@ -63,7 +68,7 @@ pub struct WebRTCConn {
}

impl WebRTCConn {
/// Creates a new [`Broker<WebRTCConnMessage>`] that will accept incoming connections and set up
/// Creates a new [`Broker<WebRTCConnMessage>`] that will accept incoming connections and set up
/// new outgoing connections.
pub async fn new(web_rtc: WebRTCSpawner) -> Result<Broker<WebRTCConnMessage>, BrokerError> {
let mut br = Broker::new();
Expand All @@ -88,10 +93,20 @@ impl WebRTCConn {
Ok(())
}

fn try_send(&mut self, dst: NodeID, msg: NCInput) {
if let Some(conn) = self.connections.get_mut(&dst) {
conn.emit_msg(NCMessage::Input(msg.clone()))
.err()
.map(|e| log::error!("When sending message {msg:?} to webrtc: {e:?}"));
} else {
log::warn!("Dropping message {:?} to unconnected node {}", msg, dst);
}
}

fn from_nc(id: NodeID) -> Translate<NCMessage, WebRTCConnMessage> {
Box::new(move |msg| {
if let NCMessage::Output(ncmsg) = msg {
return Some(WebRTCConnMessage::OutputNC((id, ncmsg)));
return Some(WebRTCConnMessage::OutputNC(id, ncmsg));
}
None
})
Expand All @@ -103,22 +118,24 @@ impl SubsystemHandler<WebRTCConnMessage> for WebRTCConn {
async fn messages(&mut self, msgs: Vec<WebRTCConnMessage>) -> Vec<WebRTCConnMessage> {
for msg in msgs {
match msg {
WebRTCConnMessage::InputNC((dst, msg_in)) => {
if let Some(conn) = self.connections.get_mut(&dst) {
conn.emit_msg(NCMessage::Input(msg_in.clone()))
.err()
.map(|e| {
log::error!("When sending message {msg_in:?} to webrtc: {e:?}")
});
} else {
log::warn!("Dropping message {:?} to unconnected node {}", msg_in, dst);
}
WebRTCConnMessage::InputNC(dst, msg_in) => {
self.try_send(dst, msg_in);
}
WebRTCConnMessage::Disconnect(dst) => {
self.try_send(dst, NCInput::Disconnect);
}
WebRTCConnMessage::Connect(dst) => {
self.ensure_connection(&dst)
.await
.err()
.map(|e| log::error!("When starting webrtc-connection {e:?}"));
self.try_send(
dst,
NCInput::Setup(
node_connection::Direction::Outgoing,
messages::PeerMessage::Init,
),
);
}
_ => {}
};
Expand Down
Loading

0 comments on commit b14b73b

Please sign in to comment.