Skip to content

Commit

Permalink
feat: worker auto-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcel-G committed Dec 22, 2024
1 parent 4a90c97 commit 282584e
Show file tree
Hide file tree
Showing 17 changed files with 267 additions and 75 deletions.
1 change: 1 addition & 0 deletions backend/client/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target
.db
.env
30 changes: 30 additions & 0 deletions backend/client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions backend/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ yrs = "0.21"
yrs-kvstore = { path = "../../../yrs-persistence/yrs-kvstore" }
yrs-lmdb = { path = "../../../yrs-persistence/yrs-lmdb" }
lmdb-rs = { version = "0.7" }
cookie = "0.18.1"
dotenv = "0.15.0"



20 changes: 15 additions & 5 deletions backend/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct Client {

pub struct ClientOptions {
pub signal_url: String,
pub signal_token: Option<String>,
}

#[derive(Debug)]
Expand All @@ -47,6 +48,7 @@ impl Client {
sigterm: signal(SignalKind::interrupt()).expect("Failed to create SIGTERM signal"),
signal_connection: SignalConnection::new_with_options(SignalOptions {
url: Url::parse(&options.signal_url).expect("Failed to parse signal URL"),
token: options.signal_token,
}),
workspaces: HashMap::new(),
}
Expand Down Expand Up @@ -87,13 +89,19 @@ impl Client {

let message = Message::Publish {
topic: workspace_id,
identity: None,
kind: None,
data: MessageData::Signal { from, to, signal },
};
self.signal_connection.send(message);
}

pub fn join_workspace(&mut self, workspace_id: String) {
let workspace = Workspace::new();
if self.workspaces.contains_key(&workspace_id) {
return;
}
let workspace = Workspace::new(&workspace_id);
self.workspaces.insert(workspace_id.clone(), workspace);

let subscribe = Message::Subscribe {
topics: [workspace_id.clone()].to_vec(),
Expand All @@ -102,13 +110,13 @@ impl Client {

let announce = Message::Publish {
topic: workspace_id.clone(),
identity: None,
kind: None,
data: MessageData::Announce {
from: self.peer_id.to_string(),
},
};
self.signal_connection.send(announce);

self.workspaces.insert(workspace_id.clone(), workspace);
}

fn leave_workspace(&mut self, workspace_id: String) {
Expand All @@ -127,10 +135,12 @@ impl Client {
if from == self.peer_id.to_string() {
return;
}
self.join_workspace(workspace_id.clone());

let workspace = self
.workspaces
.get_mut(&workspace_id)
.expect("Workspace not found");
.expect("workspace not found");

// If we don't already have a peer connection, initiate one.
if let Entry::Vacant(entry) = workspace.peers.entry(from.clone()) {
Expand All @@ -143,7 +153,7 @@ impl Client {
// 1. Work on the signal connection
match self.signal_connection.poll(cx) {
Poll::Ready(Ok(SignalEvent::IncomingMessage(message))) => match message {
Message::Publish { topic, data } => match data {
Message::Publish { topic, data, .. } => match data {
MessageData::Announce { from } => {
self.handle_peer_discovered(topic, from);
continue;
Expand Down
6 changes: 4 additions & 2 deletions backend/client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::future::poll_fn;

use client::{Client, ClientError, ClientEvent, ClientOptions};
use dotenv::dotenv;

mod client;
pub(crate) mod peer;
Expand All @@ -10,13 +11,14 @@ mod workspace;
#[tokio::main]
async fn main() -> Result<(), ClientError> {
env_logger::init();
dotenv().ok();

let mut client = Client::new_with_options(ClientOptions {
signal_url: "ws://localhost:8000/signaling".into(),
signal_token: std::env::var("JWT").ok(),
});

client.connect();
client.join_workspace("test".into());

match poll_fn(|cx| client.poll(cx)).await {
Err(ClientError::ForceShutdown) => {
Expand All @@ -30,7 +32,7 @@ async fn main() -> Result<(), ClientError> {
Ok(())
}

// API
// API
// - `/auth` ?
// - `/subscribe/{cid}` - jwt { uuid: string, role: string }
// - `/unsubscribe/{cid}` - jwt { uuid: string, role: string }
Expand Down
18 changes: 12 additions & 6 deletions backend/client/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ pub enum PeerConnEvent {
// https://github.com/feross/simple-peer/blob/f1a492d1999ce727fa87193ebdea20ac89c1fc6d/README.md?plain=1#L315
OutboundSignal(Signal),
IncomingMessage(yrs::sync::Message),
Connected
Disconnect,
Connected,
}

impl PeerConnection {
Expand Down Expand Up @@ -171,7 +172,7 @@ impl PeerConnection {
self.rtc
.sdp_api()
.accept_answer(pending_offer.take().ok_or(PeerConnError::SdpError)?, answer)
.expect("Failed to accept answer");
.map_err(|_| PeerConnError::SdpError)?;

for candidate in candidates.drain(..) {
self.rtc.add_remote_candidate(candidate);
Expand All @@ -181,7 +182,7 @@ impl PeerConnection {

Ok(())
}
_ => panic!("Invalid state for accepting answer"),
_ => Err(PeerConnError::SdpError),
}
}

Expand Down Expand Up @@ -372,7 +373,8 @@ impl PeerConnection {
// may return to the connected state.
self.on_connection_closed();

continue;
cx.waker().wake_by_ref();
return Poll::Ready(Ok(PeerConnEvent::Disconnect));
}
RTCEvent::IceConnectionStateChange(IceConnectionState::Connected)
| RTCEvent::IceConnectionStateChange(IceConnectionState::Completed) => {
Expand Down Expand Up @@ -472,11 +474,15 @@ impl PeerConnection {
continue;
}
Signal::SdpAnswer(sdp) => {
self.accept_answer(sdp).expect("Failed to accept answer");
if let Err(error) = self.accept_answer(sdp) {
log::warn!("Failed to accept answer {:?}", error);
};
continue;
}
Signal::SdpOffer(sdp) => {
self.accept_offer(sdp).expect("Failed to accept offer");
if let Err(error) = self.accept_offer(sdp) {
log::warn!("Failed to accept offer {:?}", error);
};
continue;
}
_ => {}
Expand Down
42 changes: 35 additions & 7 deletions backend/client/src/signal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ use std::{
task::{Context, Poll, Waker},
};

use cookie::Cookie;
use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::http::StatusCode;
use tokio_tungstenite::tungstenite::protocol::Message as WsMessage;
use tokio_tungstenite::tungstenite::{
client::IntoClientRequest,
http::{header::COOKIE, HeaderValue},
};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use url::Url;

Expand All @@ -24,6 +28,7 @@ pub struct SignalConnection {

pub struct SignalOptions {
pub url: Url,
pub token: Option<String>,
}
#[derive(Debug)]
pub enum SignalError {
Expand All @@ -50,7 +55,7 @@ impl SignalConnection {
}

pub fn connect(&mut self) {
self.state = State::connect(self.options.url.clone());
self.state = State::connect(self.options.url.clone(), self.options.token.clone());

if let Some(waker) = self.waker.take() {
waker.wake();
Expand Down Expand Up @@ -226,21 +231,44 @@ impl Display for State {
}

impl State {
fn connect(url: Url) -> Self {
Self::Connecting(create_and_connect_websocket(url).boxed())
fn connect(url: Url, token: Option<String>) -> Self {
Self::Connecting(create_and_connect_websocket(url, token).boxed())
}
}

async fn create_and_connect_websocket(url: Url) -> Result<SignalStream, InternalError> {
let request = url
async fn create_and_connect_websocket(
url: Url,
token: Option<String>,
) -> Result<SignalStream, InternalError> {
let mut request = url
.to_string()
.into_client_request()
.map_err(|_| InternalError::InvalidUrl)?;

let (stream, _response) = tokio_tungstenite::connect_async(request)
if let Some(token) = token {
request.headers_mut().insert(
COOKIE,
HeaderValue::from_str(&format!("jwt={}; HttpOnly; Path=/", token))
.map_err(|_| InternalError::InvalidUrl)?,
);
}

let (stream, response) = tokio_tungstenite::connect_async(request)
.await
.map_err(InternalError::WebSocket)?;

let cookies = response
.headers()
.get_all("set-cookie")
.iter()
.filter_map(|header_value| header_value.to_str().ok())
.flat_map(|set_cookie| Cookie::parse(set_cookie).ok())
.collect::<Vec<Cookie>>();

for cookie in cookies {
println!("Cookie Name: {}, Value: {}", cookie.name(), cookie.value());
}

Ok(stream)
}

Expand Down
15 changes: 14 additions & 1 deletion backend/client/src/signal/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum PeerKind {
#[serde(rename = "worker")]
Worker,
#[serde(rename = "client")]
Client,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Message {
#[serde(rename = "publish")]
Publish { topic: String, data: MessageData },
Publish {
topic: String,
data: MessageData,
identity: Option<String>,
kind: Option<PeerKind>,
},
#[serde(rename = "subscribe")]
Subscribe { topics: Vec<String> },
#[serde(rename = "unsubscribe")]
Expand Down
Loading

0 comments on commit 282584e

Please sign in to comment.