Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handling of reconnection of the webrtc libc part #122

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions flarch/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions flarch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ pub fn start_logging_filter_level(filters: Vec<&str>, level: log::LevelFilter) {
}

pub use flarch_macro::platform_async_trait;
pub use rand::random;
51 changes: 42 additions & 9 deletions flarch/src/web_rtc/libc/web_rtc_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::sync::{
Arc,
};

use async_trait::async_trait;
use crate::broker::{Broker, Subsystem, SubsystemHandler};
use async_trait::async_trait;
use futures::lock::Mutex;
use webrtc::{
api::{
Expand All @@ -27,10 +27,12 @@ use webrtc::{
};

use crate::web_rtc::{
connection::{ConnectionConfig, HostLogin}, messages::{
connection::{ConnectionConfig, HostLogin},
messages::{
ConnType, ConnectionStateMap, DataChannelState, PeerMessage, SetupError, SignalingState,
WebRTCInput, WebRTCMessage, WebRTCOutput, WebRTCSpawner,
}, node_connection::Direction
},
node_connection::Direction,
};

fn get_ice_server(host: HostLogin) -> RTCIceServer {
Expand Down Expand Up @@ -184,7 +186,6 @@ impl WebRTCConnectionSetupLibc {
/// Returns the offer string that needs to be sent to the `Follower` node.
async fn make_offer(&mut self) -> Result<String, SetupError> {
if self.direction.is_some() {
log::warn!("Resetting connection because of new offer");
self.reset().await?;
}
self.direction = Some(Direction::Outgoing);
Expand All @@ -206,13 +207,13 @@ impl WebRTCConnectionSetupLibc {
.set_local_description(offer.clone())
.await
.map_err(to_error)?;

Ok(offer.sdp)
}

/// Takes the offer string
async fn make_answer(&mut self, offer: String) -> Result<String, SetupError> {
if self.direction.is_some() {
log::warn!("Resetting connection because of new answer");
self.reset().await?;
}
self.direction = Some(Direction::Incoming);
Expand Down Expand Up @@ -375,8 +376,9 @@ impl WebRTCConnectionSetupLibc {
log::warn!("Got message for deprecated on_open");
return Box::pin(async {});
}

log::trace!("DataChannel is opened");
Box::pin(async move {
log::trace!("DataChannel opened");
broker_cl
.emit_msg(WebRTCMessage::Output(WebRTCOutput::Connected))
.err()
Expand All @@ -401,6 +403,11 @@ impl WebRTCConnectionSetupLibc {
.map(|e| log::warn!("Text queued but not processed: {:?}", e));
})
}));
if let Some(dc) = rtc_data.lock().await.take() {
if let Err(e) = dc.close().await {
log::warn!("While closing datachannel: {e:?}");
}
}
rtc_data.lock().await.replace(data_channel);
}

Expand All @@ -420,6 +427,11 @@ impl WebRTCConnectionSetupLibc {
self.get_state().await?,
))));
}
WebRTCInput::Disconnect => {
if let Err(e) = self.reset().await {
log::warn!("While closing old connection: {e:?}");
}
}
WebRTCInput::Reset => {
if self.direction.is_some() {
self.reset().await?;
Expand All @@ -430,13 +442,34 @@ impl WebRTCConnectionSetupLibc {
}

async fn reset(&mut self) -> Result<(), SetupError> {
if self.direction.is_none() {
return Ok(());
}
self.direction = None;

// Replacing all listeners with empty listeners
if let Some(mut rd) = self.rtc_data.try_lock() {
if let Some(ref mut dc) = rd.as_ref() {
dc.on_message(Box::new(|_: DataChannelMessage| Box::pin(async {})));
dc.on_open(Box::new(|| Box::pin(async {})));
}
*rd = None;
}
self.connection
.on_data_channel(Box::new(|_: Arc<RTCDataChannel>| Box::pin(async {})));
self.connection
.on_peer_connection_state_change(Box::new(|_: RTCPeerConnectionState| {
Box::pin(async {})
}));
self.connection
.on_ice_candidate(Box::new(|_: Option<RTCIceCandidate>| Box::pin(async {})));

if let Err(e) = self.connection.close().await {
log::warn!("While closing old connection: {e:?}");
}

self.connection = Self::make_connection(self.connection_cfg.clone()).await?;
self.setup_connection().await?;
self.direction = None;
Ok(())
}
}
Expand All @@ -447,11 +480,11 @@ impl SubsystemHandler<WebRTCMessage> for WebRTCConnectionSetupLibc {
let mut out = vec![];
for msg in msgs {
if let WebRTCMessage::Input(msg_in) = msg {
match self.msg_in(msg_in).await {
match self.msg_in(msg_in.clone()).await {
Ok(Some(msg)) => out.push(msg),
Ok(None) => {}
Err(e) => {
log::warn!("{:p} Error processing message: {:?}", self, e);
log::trace!("{:p} Error processing message {msg_in:?}: {:?}", self, e);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions flarch/src/web_rtc/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub enum WebRTCInput {
UpdateState,
/// Try to reconnect, or throw an error if incoming connection
Reset,
/// Disconnect this node
Disconnect,
}

#[cfg(target_family="unix")]
Expand Down
65 changes: 41 additions & 24 deletions flarch/src/web_rtc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,59 @@
//! a connection, it doesn't describe how the nodes need to communicate to
//! set up the connection.
//! The messages for setting up the connection are described in the [`crate::signal::SignalServer`].
//!
//!
//! # Bidirectional connections
//!
//!
//! Every WebRTC connection has one or two connections inside:
//! - _outgoing_ is the connection initiated by this node
//! - _incoming_ is the connection initiated by the other node
//!
//!
//! This is necessary, as it is always possible that two nodes want to start
//! connecting to each other concurrently.

use std::collections::HashMap;

use flarch_macro::platform_async_trait;

use crate::{broker::{Broker, BrokerError, Subsystem, SubsystemHandler, Translate}, nodeids::NodeID};
use crate::{
broker::{Broker, BrokerError, Subsystem, SubsystemHandler, Translate},
nodeids::NodeID,
};

use self::{
messages::WebRTCSpawner,
node_connection::{NCError, NCInput, NCMessage, NCOutput, NodeConnection},
};

pub mod messages;
pub mod connection;
pub mod messages;
pub mod node_connection;
pub mod websocket;

#[cfg(target_family="windows")]
#[cfg(target_family = "windows")]
compile_error!("flarch is not available for windows");

#[cfg(target_family="wasm")]
#[cfg(target_family = "wasm")]
mod wasm;
#[cfg(target_family="wasm")]
#[cfg(target_family = "wasm")]
pub use wasm::*;

#[cfg(target_family="unix")]
#[cfg(target_family = "unix")]
mod libc;
#[cfg(target_family="unix")]
#[cfg(target_family = "unix")]
pub use libc::*;

#[derive(Debug, Clone, PartialEq)]
/// All messages for the [`WebRTCConn`] broker.
pub enum WebRTCConnMessage {
/// Messages going to the WebRTC interface
InputNC((NodeID, NCInput)),
InputNC(NodeID, NCInput),
/// Messages coming from the WebRTC interface
OutputNC((NodeID, NCOutput)),
OutputNC(NodeID, NCOutput),
/// Connection request
Connect(NodeID),
/// Disconnect this node
Disconnect(NodeID),
}

/// The actual implementation of the WebRTC connection setup.
Expand All @@ -63,7 +68,7 @@ pub struct WebRTCConn {
}

impl WebRTCConn {
/// Creates a new [`Broker<WebRTCConnMessage>`] that will accept incoming connections and set up
/// Creates a new [`Broker<WebRTCConnMessage>`] that will accept incoming connections and set up
/// new outgoing connections.
pub async fn new(web_rtc: WebRTCSpawner) -> Result<Broker<WebRTCConnMessage>, BrokerError> {
let mut br = Broker::new();
Expand All @@ -88,10 +93,20 @@ impl WebRTCConn {
Ok(())
}

fn try_send(&mut self, dst: NodeID, msg: NCInput) {
if let Some(conn) = self.connections.get_mut(&dst) {
conn.emit_msg(NCMessage::Input(msg.clone()))
.err()
.map(|e| log::error!("When sending message {msg:?} to webrtc: {e:?}"));
} else {
log::warn!("Dropping message {:?} to unconnected node {}", msg, dst);
}
}

fn from_nc(id: NodeID) -> Translate<NCMessage, WebRTCConnMessage> {
Box::new(move |msg| {
if let NCMessage::Output(ncmsg) = msg {
return Some(WebRTCConnMessage::OutputNC((id, ncmsg)));
return Some(WebRTCConnMessage::OutputNC(id, ncmsg));
}
None
})
Expand All @@ -103,22 +118,24 @@ impl SubsystemHandler<WebRTCConnMessage> for WebRTCConn {
async fn messages(&mut self, msgs: Vec<WebRTCConnMessage>) -> Vec<WebRTCConnMessage> {
for msg in msgs {
match msg {
WebRTCConnMessage::InputNC((dst, msg_in)) => {
if let Some(conn) = self.connections.get_mut(&dst) {
conn.emit_msg(NCMessage::Input(msg_in.clone()))
.err()
.map(|e| {
log::error!("When sending message {msg_in:?} to webrtc: {e:?}")
});
} else {
log::warn!("Dropping message {:?} to unconnected node {}", msg_in, dst);
}
WebRTCConnMessage::InputNC(dst, msg_in) => {
self.try_send(dst, msg_in);
}
WebRTCConnMessage::Disconnect(dst) => {
self.try_send(dst, NCInput::Disconnect);
}
WebRTCConnMessage::Connect(dst) => {
self.ensure_connection(&dst)
.await
.err()
.map(|e| log::error!("When starting webrtc-connection {e:?}"));
self.try_send(
dst,
NCInput::Setup(
node_connection::Direction::Outgoing,
messages::PeerMessage::Init,
),
);
}
_ => {}
};
Expand Down
Loading