Skip to content

Commit

Permalink
Adding reconnection to websocket client
Browse files Browse the repository at this point in the history
  • Loading branch information
ineiti committed Sep 9, 2024
1 parent 1fff5ce commit 71f9fa2
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 60 deletions.
158 changes: 99 additions & 59 deletions flarch/src/web_rtc/wasm/web_socket_client.rs
Original file line number Diff line number Diff line change
@@ -1,100 +1,140 @@
use async_trait::async_trait;
use futures::lock::Mutex;
use std::sync::Arc;
use wasm_bindgen::{prelude::Closure, JsCast, JsValue};
use web_sys::{ErrorEvent, MessageEvent, WebSocket};

use crate::broker::{Broker, Subsystem, SubsystemHandler};

use crate::tasks::{now, wait_ms};
use crate::web_rtc::websocket::{WSClientError, WSClientInput, WSClientMessage, WSClientOutput};

pub struct WebSocketClient {
ws: Arc<Mutex<WebSocket>>,
url: String,
ws: Option<WebSocket>,
broker: Broker<WSClientMessage>,
last_connection: i64,
}

unsafe impl Send for WebSocketClient {}

impl WebSocketClient {
pub async fn connect(url: &str) -> Result<Broker<WSClientMessage>, WSClientError> {
log::info!("connecting to: {}", url);
let ws = WebSocket::new(url).map_err(|e| WSClientError::Connection(format!("{:?}", e)))?;
let mut wsw = WebSocketClient {
ws: Arc::new(Mutex::new(ws)),
let wsw = WebSocketClient {
url: url.to_string(),
ws: None,
broker: Broker::new(),
last_connection: 0,
};
let mut broker = wsw.attach_callbacks().await;
let mut broker = wsw.broker.clone();
broker
.add_subsystem(Subsystem::Handler(Box::new(wsw)))
.await?;
broker.emit_msg(WSClientMessage::Input(WSClientInput::Connect))?;
Ok(broker)
}

async fn attach_callbacks(&mut self) -> Broker<WSClientMessage> {
let broker = Broker::new();
let ws = self.ws.lock().await;
async fn connect_ws(&mut self) -> Result<(), WSClientError> {
if self.ws.is_some() && now() / 1000 < self.last_connection + 10 {
return Ok(())
}
self.last_connection = now() / 1000;

// create callback
let mut broker_clone = broker.clone();
let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
let txt_str = txt.as_string().unwrap();
broker_clone
.emit_msg(WSClientOutput::Message(txt_str).into())
.err()
.map(|e| log::error!("On_message_callback error: {e:?}"));
} else {
log::warn!("message event, received Unknown: {:?}", e);
if let Some(ws) = self.ws.take() {
log::debug!("Reconnecting websocket to {}", self.url);
ws.set_onmessage(None);
ws.set_onerror(None);
ws.set_onopen(None);
ws.close()
.map_err(|e| WSClientError::Connection(format!("{:?}", e)))?;
} else {
log::debug!("Connecting websocket to {}", self.url);
}
self.ws = Some(
WebSocket::new(&self.url).map_err(|e| WSClientError::Connection(format!("{:?}", e)))?,
);
self.attach_callbacks().await;

// Wait up to 5 seconds for connection, so that another connection request doesn't
// interrupt the ongoing connection.
for _ in 0..50 {
if self.ws.as_ref().unwrap().ready_state() == WebSocket::OPEN {
log::info!("Successfully opened WebSocket connection to {}", self.url);
return Ok(());
}
}) as Box<dyn FnMut(MessageEvent)>);
// set message event handler on WebSocket
ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
// forget the callback to keep it alive
onmessage_callback.forget();
wait_ms(100).await;
}

Ok(())
}

let mut broker_clone = broker.clone();
let onerror_callback = Closure::wrap(Box::new(move |e: ErrorEvent| {
log::error!("error event: {:?}", e);
broker_clone
.emit_msg(WSClientMessage::Output(WSClientOutput::Error(
e.as_string().unwrap_or("not an error string".into()),
)))
.err()
.map(|e| log::error!("On_error_callback error: {e:?}"));
}) as Box<dyn FnMut(ErrorEvent)>);
ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();
async fn attach_callbacks(&mut self) {
if let Some(ws) = self.ws.as_ref() {
// create callback
let mut broker_clone = self.broker.clone();
let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
let txt_str = txt.as_string().unwrap();
broker_clone
.emit_msg(WSClientOutput::Message(txt_str).into())
.err()
.map(|e| log::error!("On_message_callback error: {e:?}"));
} else {
log::warn!("message event, received Unknown: {:?}", e);
}
}) as Box<dyn FnMut(MessageEvent)>);
// set message event handler on WebSocket
ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
// forget the callback to keep it alive
onmessage_callback.forget();

let mut broker_clone = broker.clone();
let onopen_callback = Closure::wrap(Box::new(move |_| {
broker_clone
.emit_msg(WSClientMessage::Output(WSClientOutput::Connected))
.err()
.map(|e| log::error!("On_open_callback error: {e:?}"));
}) as Box<dyn FnMut(JsValue)>);
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
let mut broker_clone = self.broker.clone();
let onerror_callback = Closure::wrap(Box::new(move |_: ErrorEvent| {
broker_clone
.emit_msg(WSClientMessage::Output(WSClientOutput::Error(
"WS-error".into(),
)))
.err()
.map(|e| log::error!("On_error_callback error: {e:?}"));
}) as Box<dyn FnMut(ErrorEvent)>);
ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();

broker
let mut broker_clone = self.broker.clone();
let onopen_callback = Closure::wrap(Box::new(move |_| {
broker_clone
.emit_msg(WSClientMessage::Output(WSClientOutput::Connected))
.err()
.map(|e| log::error!("On_open_callback error: {e:?}"));
}) as Box<dyn FnMut(JsValue)>);
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
}
}
}

#[async_trait(?Send)]
impl SubsystemHandler<WSClientMessage> for WebSocketClient {
async fn messages(&mut self, msgs: Vec<WSClientMessage>) -> Vec<WSClientMessage> {
if let Some(ws) = self.ws.try_lock() {
for msg in msgs {
if let WSClientMessage::Input(msg_in) = msg {
match msg_in {
WSClientInput::Message(msg) => {
for msg in msgs {
if let WSClientMessage::Input(msg_in) = msg {
match msg_in {
WSClientInput::Message(msg) => {
if let Some(ws) = self.ws.as_mut() {
if ws.ready_state() != WebSocket::OPEN {
log::error!("WebSocket is not open for {msg:?}");
log::debug!("WebSocket is not open for {msg:?}");
if let Err(e) = self.connect_ws().await {
log::warn!("While reconnecting: {e}");
}
return vec![];
}
ws.send_with_str(&msg)
.err()
.map(|e| log::error!("Error sending message: {:?}", e));
}
WSClientInput::Disconnect => {
// ws.disconnect();
}
WSClientInput::Disconnect => {
// ws.disconnect();
}
WSClientInput::Connect => {
if let Err(e) = self.connect_ws().await {
log::warn!("While reconnecting: {e}");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion flmodules/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl NetworkBroker {
let msg_node_str = match msg {
WSClientOutput::Message(msg) => msg,
WSClientOutput::Error(e) => {
log::error!("Websocket client error: {e}");
log::warn!("Websocket client error: {e}");
return vec![];
}
_ => return vec![],
Expand Down

0 comments on commit 71f9fa2

Please sign in to comment.