Skip to content

Commit

Permalink
Merge pull request #120 from ineiti/websocket_reconnection
Browse files Browse the repository at this point in the history
Websocket reconnection
  • Loading branch information
ineiti authored Sep 9, 2024
2 parents 411c767 + 71f9fa2 commit eb2a6e9
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 83 deletions.
88 changes: 66 additions & 22 deletions flarch/src/web_rtc/libc/web_socket_client.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
use async_trait::async_trait;
use futures::stream::SplitStream;
use futures::{stream::SplitSink, Sink, SinkExt, StreamExt};
use std::pin::Pin;
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};

use crate::broker::{Broker, Subsystem, SubsystemHandler};
use crate::tasks::wait_ms;
use crate::web_rtc::websocket::{WSClientInput, WSClientMessage, WSClientOutput, WSError, WSSError};
use crate::web_rtc::websocket::{
WSClientInput, WSClientMessage, WSClientOutput, WSError, WSSError,
};

pub struct WebSocketClient {
write: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
url: String,
write: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>,
broker: Broker<WSClientMessage>,
}

impl WebSocketClient {
pub async fn connect(url: &str) -> Result<Broker<WSClientMessage>, WSSError> {
log::debug!("Connecting to websocket at {}", url);
let (websocket, _) = connect_async(url).await?;

let (write, mut read) = websocket.split();
let wsc = WebSocketClient { write };
let mut broker = Broker::new();
let wsb = WebSocketClient {
url: url.to_string(),
write: None,
broker: Broker::new(),
};
let mut broker = wsb.broker.clone();
broker
.add_subsystem(Subsystem::Handler(Box::new(wsc)))
.add_subsystem(Subsystem::Handler(Box::new(wsb)))
.await?;
let mut broker_cl = broker.clone();
broker.emit_msg(WSClientMessage::Input(WSClientInput::Connect))?;
Ok(broker)
}

fn listen(&mut self, mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
let mut broker_cl = self.broker.clone();
tokio::spawn(async move {
wait_ms(1000).await;
loop {
Expand All @@ -37,14 +47,32 @@ impl WebSocketClient {
}
}
Err(e) => {
log::warn!("Closing connection: {:?}", e);
log::warn!("Closing connection because of error: {:?}", e);
log::warn!("Trying to reconnect");
broker_cl
.emit_msg(WSClientMessage::Input(WSClientInput::Connect))
.expect("tried to reconnect");
return;
}
}
}
}
});
Ok(broker)
}

async fn connect_ws(&mut self) -> Result<(), WSSError> {
if let Some(mut write) = self.write.take() {
log::debug!("Reconnecting to websocket at {}", self.url);
write.close().await?;
} else {
log::debug!("Connecting to websocket at {}", self.url);
}
let (websocket, _) = connect_async(self.url.clone()).await?;
let (write, read) = websocket.split();
self.write = Some(write);
self.listen(read);

Ok(())
}
}

Expand All @@ -55,20 +83,36 @@ impl SubsystemHandler<WSClientMessage> for WebSocketClient {
if let WSClientMessage::Input(msg_in) = msg {
match msg_in {
WSClientInput::Message(msg) => {
Pin::new(&mut self.write)
.start_send(tungstenite::Message::text(msg))
.map_err(|e| WSError::Underlying(e.to_string()))
.expect("Error sending message");
Pin::new(&mut self.write)
.flush()
.await
.map_err(|e| WSError::Underlying(e.to_string()))
.expect("msg flush error");
if let Some(mut write) = self.write.as_mut() {
Pin::new(&mut write)
.start_send(tungstenite::Message::text(msg))
.map_err(|e| WSError::Underlying(e.to_string()))
.expect("Error sending message");
Pin::new(&mut write)
.flush()
.await
.map_err(|e| WSError::Underlying(e.to_string()))
.expect("msg flush error");
} else {
log::warn!("Tried to write a message to a closed connection");
if let Err(e) = self.connect_ws().await {
log::error!("Couldn't connect: {e}");
}
}
}
WSClientInput::Disconnect => {
self.write.close().await.unwrap();
if let Some(mut write) = self.write.take() {
write.close().await.unwrap();
} else {
log::warn!("Trying to disconnect a disconnected connection");
}
return vec![WSClientOutput::Disconnect.into()];
}
WSClientInput::Connect => {
if let Err(e) = self.connect_ws().await {
log::error!("Couldn't connect: {e}");
}
}
}
}
}
Expand Down
158 changes: 99 additions & 59 deletions flarch/src/web_rtc/wasm/web_socket_client.rs
Original file line number Diff line number Diff line change
@@ -1,100 +1,140 @@
use async_trait::async_trait;
use futures::lock::Mutex;
use std::sync::Arc;
use wasm_bindgen::{prelude::Closure, JsCast, JsValue};
use web_sys::{ErrorEvent, MessageEvent, WebSocket};

use crate::broker::{Broker, Subsystem, SubsystemHandler};

use crate::tasks::{now, wait_ms};
use crate::web_rtc::websocket::{WSClientError, WSClientInput, WSClientMessage, WSClientOutput};

pub struct WebSocketClient {
ws: Arc<Mutex<WebSocket>>,
url: String,
ws: Option<WebSocket>,
broker: Broker<WSClientMessage>,
last_connection: i64,
}

unsafe impl Send for WebSocketClient {}

impl WebSocketClient {
pub async fn connect(url: &str) -> Result<Broker<WSClientMessage>, WSClientError> {
log::info!("connecting to: {}", url);
let ws = WebSocket::new(url).map_err(|e| WSClientError::Connection(format!("{:?}", e)))?;
let mut wsw = WebSocketClient {
ws: Arc::new(Mutex::new(ws)),
let wsw = WebSocketClient {
url: url.to_string(),
ws: None,
broker: Broker::new(),
last_connection: 0,
};
let mut broker = wsw.attach_callbacks().await;
let mut broker = wsw.broker.clone();
broker
.add_subsystem(Subsystem::Handler(Box::new(wsw)))
.await?;
broker.emit_msg(WSClientMessage::Input(WSClientInput::Connect))?;
Ok(broker)
}

async fn attach_callbacks(&mut self) -> Broker<WSClientMessage> {
let broker = Broker::new();
let ws = self.ws.lock().await;
async fn connect_ws(&mut self) -> Result<(), WSClientError> {
if self.ws.is_some() && now() / 1000 < self.last_connection + 10 {
return Ok(())
}
self.last_connection = now() / 1000;

// create callback
let mut broker_clone = broker.clone();
let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
let txt_str = txt.as_string().unwrap();
broker_clone
.emit_msg(WSClientOutput::Message(txt_str).into())
.err()
.map(|e| log::error!("On_message_callback error: {e:?}"));
} else {
log::warn!("message event, received Unknown: {:?}", e);
if let Some(ws) = self.ws.take() {
log::debug!("Reconnecting websocket to {}", self.url);
ws.set_onmessage(None);
ws.set_onerror(None);
ws.set_onopen(None);
ws.close()
.map_err(|e| WSClientError::Connection(format!("{:?}", e)))?;
} else {
log::debug!("Connecting websocket to {}", self.url);
}
self.ws = Some(
WebSocket::new(&self.url).map_err(|e| WSClientError::Connection(format!("{:?}", e)))?,
);
self.attach_callbacks().await;

// Wait up to 5 seconds for connection, so that another connection request doesn't
// interrupt the ongoing connection.
for _ in 0..50 {
if self.ws.as_ref().unwrap().ready_state() == WebSocket::OPEN {
log::info!("Successfully opened WebSocket connection to {}", self.url);
return Ok(());
}
}) as Box<dyn FnMut(MessageEvent)>);
// set message event handler on WebSocket
ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
// forget the callback to keep it alive
onmessage_callback.forget();
wait_ms(100).await;
}

Ok(())
}

let mut broker_clone = broker.clone();
let onerror_callback = Closure::wrap(Box::new(move |e: ErrorEvent| {
log::error!("error event: {:?}", e);
broker_clone
.emit_msg(WSClientMessage::Output(WSClientOutput::Error(
e.as_string().unwrap_or("not an error string".into()),
)))
.err()
.map(|e| log::error!("On_error_callback error: {e:?}"));
}) as Box<dyn FnMut(ErrorEvent)>);
ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();
async fn attach_callbacks(&mut self) {
if let Some(ws) = self.ws.as_ref() {
// create callback
let mut broker_clone = self.broker.clone();
let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
let txt_str = txt.as_string().unwrap();
broker_clone
.emit_msg(WSClientOutput::Message(txt_str).into())
.err()
.map(|e| log::error!("On_message_callback error: {e:?}"));
} else {
log::warn!("message event, received Unknown: {:?}", e);
}
}) as Box<dyn FnMut(MessageEvent)>);
// set message event handler on WebSocket
ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
// forget the callback to keep it alive
onmessage_callback.forget();

let mut broker_clone = broker.clone();
let onopen_callback = Closure::wrap(Box::new(move |_| {
broker_clone
.emit_msg(WSClientMessage::Output(WSClientOutput::Connected))
.err()
.map(|e| log::error!("On_open_callback error: {e:?}"));
}) as Box<dyn FnMut(JsValue)>);
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
let mut broker_clone = self.broker.clone();
let onerror_callback = Closure::wrap(Box::new(move |_: ErrorEvent| {
broker_clone
.emit_msg(WSClientMessage::Output(WSClientOutput::Error(
"WS-error".into(),
)))
.err()
.map(|e| log::error!("On_error_callback error: {e:?}"));
}) as Box<dyn FnMut(ErrorEvent)>);
ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();

broker
let mut broker_clone = self.broker.clone();
let onopen_callback = Closure::wrap(Box::new(move |_| {
broker_clone
.emit_msg(WSClientMessage::Output(WSClientOutput::Connected))
.err()
.map(|e| log::error!("On_open_callback error: {e:?}"));
}) as Box<dyn FnMut(JsValue)>);
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
}
}
}

#[async_trait(?Send)]
impl SubsystemHandler<WSClientMessage> for WebSocketClient {
async fn messages(&mut self, msgs: Vec<WSClientMessage>) -> Vec<WSClientMessage> {
if let Some(ws) = self.ws.try_lock() {
for msg in msgs {
if let WSClientMessage::Input(msg_in) = msg {
match msg_in {
WSClientInput::Message(msg) => {
for msg in msgs {
if let WSClientMessage::Input(msg_in) = msg {
match msg_in {
WSClientInput::Message(msg) => {
if let Some(ws) = self.ws.as_mut() {
if ws.ready_state() != WebSocket::OPEN {
log::error!("WebSocket is not open for {msg:?}");
log::debug!("WebSocket is not open for {msg:?}");
if let Err(e) = self.connect_ws().await {
log::warn!("While reconnecting: {e}");
}
return vec![];
}
ws.send_with_str(&msg)
.err()
.map(|e| log::error!("Error sending message: {:?}", e));
}
WSClientInput::Disconnect => {
// ws.disconnect();
}
WSClientInput::Disconnect => {
// ws.disconnect();
}
WSClientInput::Connect => {
if let Err(e) = self.connect_ws().await {
log::warn!("While reconnecting: {e}");
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions flarch/src/web_rtc/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub enum WSClientInput {
Message(String),
/// Disconnect the websocket - no further messages will be sent after this message.
Disconnect,
/// Connect the websocket - this starts or resets the connection
Connect,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
4 changes: 4 additions & 0 deletions flmodules/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ impl NetworkBroker {
async fn msg_ws(&mut self, msg: WSClientOutput) -> Vec<NetworkMessage> {
let msg_node_str = match msg {
WSClientOutput::Message(msg) => msg,
WSClientOutput::Error(e) => {
log::warn!("Websocket client error: {e}");
return vec![];
}
_ => return vec![],
};
let msg_node =
Expand Down
9 changes: 7 additions & 2 deletions flmodules/src/random_connections/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,13 @@ impl RandomConnections {
NodeMessage::Module(msg),
))]
} else {
log::warn!("{self:p} Dropping message to unconnected node {dst}");
vec![]
log::warn!(
"{self:p} Dropping message to unconnected node {dst} - trying to connect"
);
vec![
RandomOut::DisconnectNode(dst),
RandomOut::ListUpdate(self.storage.connected.get_nodes()),
]
}
}
};
Expand Down

0 comments on commit eb2a6e9

Please sign in to comment.