Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use troupe crate #202

Open
wants to merge 11 commits into
base: development
Choose a base branch
from
1 change: 1 addition & 0 deletions squire_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tower-http = { version = "0.5", features = ["cors"] }
derive_more = "0.99.17"
sorted-vec = "0.8.3"
fxhash = "=0.2.1"
troupe = "0.1.0"

[dev-dependencies]
# In-house
Expand Down
16 changes: 11 additions & 5 deletions squire_core/src/state/accounts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, future::Future, hash::Hasher};

use async_trait::async_trait;
use axum::response::{IntoResponse, Response};
use derive_more::From;
use futures::{FutureExt, StreamExt};
Expand All @@ -12,11 +13,12 @@ use mongodb::{
};
use serde::{Deserialize, Serialize};
use squire_sdk::{
actor::*,
api::{Credentials, RegForm},
model::{accounts::SquireAccount, identifiers::SquireAccountId},
};
use tokio::sync::oneshot::Sender as OneshotSender;
use tracing::Level;
use troupe::{prelude::*, sink::permanent::Tracker};

pub struct LoginError;

Expand All @@ -28,7 +30,7 @@ impl IntoResponse for LoginError {

#[derive(Debug, Clone)]
pub struct AccountStoreHandle {
client: ActorClient<AccountStore>,
client: SinkClient<Permanent, AccountCommand>,
}

fn salt_and_hash(password: &str, username: &str) -> u32 {
Expand All @@ -41,7 +43,7 @@ fn salt_and_hash(password: &str, username: &str) -> u32 {

impl AccountStoreHandle {
pub fn new(db: Database) -> Self {
let client = ActorClient::builder(AccountStore::new(db)).launch();
let client = ActorBuilder::new(AccountStore::new(db)).launch();
Self { client }
}

Expand Down Expand Up @@ -79,7 +81,11 @@ pub struct AccountStore {

#[async_trait]
impl ActorState for AccountStore {
type Permanence = Permanent;
type ActorType = SinkActor;

type Message = AccountCommand;
type Output = ();

async fn start_up(&mut self, _scheduler: &mut Scheduler<Self>) {
let db = self.db.clone();
Expand Down Expand Up @@ -131,7 +137,7 @@ impl AccountStore {
let account = SquireAccount::new(username, display_name);
let digest = account.id;
let user = DbUser { account, cred };
scheduler.process(self.db.persist_account(user.clone()));
scheduler.manage_future(self.db.persist_account(user.clone()));
self.credentials.insert(cred, digest);
self.users.insert(digest, user);
digest
Expand All @@ -150,7 +156,7 @@ impl AccountStore {
fn delete_account(&mut self, id: SquireAccountId, scheduler: &mut Scheduler<Self>) -> bool {
self.credentials.retain(|_, a_id| id != *a_id);
if let Some(user) = self.users.remove(&id) {
scheduler.process(self.db.remove_account(user));
scheduler.manage_future(self.db.remove_account(user));
true
} else {
false
Expand Down
13 changes: 12 additions & 1 deletion squire_core/src/state/boilerplate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Debug;

use super::SessionCommand;
use super::{AccountCommand, SessionCommand};

impl Debug for SessionCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -16,3 +16,14 @@ impl Debug for SessionCommand {
}
}
}

impl Debug for AccountCommand {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary for this PR, but I wonder if we could use the derive_more crates macros to generate this.

fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Create(value, _) => write!(f, "Create({value:?})"),
Self::Authenticate(value, _) => write!(f, "Authenticate({value:?})"),
Self::Get(value, _) => write!(f, "Get({value:?})"),
Self::Delete(value, _) => write!(f, "Delete({value:?})"),
}
}
}
10 changes: 5 additions & 5 deletions squire_core/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use async_trait::async_trait;
use axum::extract::ws::WebSocket;
use mongodb::{options::ClientOptions, Client as DbClient, Database};
use squire_sdk::{
actor::{ActorBuilder, ActorClient},
api::*,
model::{
accounts::SquireAccount,
Expand All @@ -17,6 +16,7 @@ use squire_sdk::{
},
sync::TournamentManager,
};
use troupe::prelude::{ActorBuilder, Permanent, SinkClient};

mod accounts;
mod boilerplate;
Expand Down Expand Up @@ -100,7 +100,7 @@ impl AppStateBuilder<Uri, DbName> {
.database(self.get_db_name());
let tourn_coll = Arc::from(self.get_tournament_collection_name());
let tourn_db = TournDb::new(db_conn.clone(), tourn_coll);
let tournaments = ActorClient::builder(TournPersister::new(tourn_db.clone())).launch();
let tournaments = ActorBuilder::new(TournPersister::new(tourn_db.clone())).launch();
let gatherings = ActorBuilder::new(GatheringHall::new(tournaments.clone())).launch();
AppState {
sessions: SessionStoreHandle::new(db_conn.clone()),
Expand All @@ -125,7 +125,7 @@ impl AppStateBuilder<Database, ()> {
pub fn build(self) -> AppState {
let tourn_coll: Arc<str> = Arc::from(self.get_tournament_collection_name());
let tourn_db = TournDb::new(self.db_conn.clone(), tourn_coll);
let tourns = ActorClient::builder(TournPersister::new(tourn_db.clone())).launch();
let tourns = ActorBuilder::new(TournPersister::new(tourn_db.clone())).launch();
let gatherings = ActorBuilder::new(GatheringHall::new(tourns.clone())).launch();
AppState {
Copy link
Contributor Author

@orblivion orblivion Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to be sure of my reasoning here: it looks like sessions, accounts, and gatherings are clients attached the app state, so those handlers should be Permanent. I think that tourns stays attached to gatherings, which means that that should also be Permanent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. All of those should be permanent. The only actors that should be transient are the Gatherings, which are internally managed by the GatheringHall actor.

sessions: SessionStoreHandle::new(self.db_conn.clone()),
Expand Down Expand Up @@ -154,7 +154,7 @@ pub struct AppState {
tourn_db: TournDb,
sessions: SessionStoreHandle,
accounts: AccountStoreHandle,
gatherings: ActorClient<GatheringHall<TournPersister>>,
gatherings: SinkClient<Permanent, GatheringHallMessage>,
}

impl AppState {
Expand Down Expand Up @@ -215,7 +215,7 @@ impl ServerState for AppState {
self.tourn_db.persist_tourn(tourn).await
}

async fn handle_new_onlooker(&self, id: TournamentId, user: SessionWatcher, ws: WebSocket) {
async fn handle_new_onlooker(&self, id: TournamentId, user: SessionWatcher, ws: WebSocket) -> bool {
println!("Passing connection request off to gathering hall...");
self.gatherings
.send(GatheringHallMessage::NewConnection(id, user, ws))
Expand Down
23 changes: 14 additions & 9 deletions squire_core/src/state/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
//! should be termianted (or at least downgraded to be that of a guest) once it expires. Reauth
//! should reset the timer in the task that manages a tournament's WS connections.

use async_trait::async_trait;
use std::{
collections::hash_map::HashMap,
time::{Duration, Instant},
Expand All @@ -55,7 +56,6 @@ use mongodb::{
use rand::{rngs::StdRng, RngCore, SeedableRng};
use serde::{Deserialize, Serialize};
use squire_sdk::{
actor::*,
api::SessionToken,
model::identifiers::SquireAccountId,
server::session::{AnyUser, SquireSession},
Expand All @@ -65,6 +65,7 @@ use tokio::sync::{
watch::{channel, Receiver as Watcher, Sender as Broadcaster},
};
use tracing::Level;
use troupe::{prelude::*, sink::permanent::Tracker};

#[derive(From)]
pub enum SessionCommand {
Expand Down Expand Up @@ -110,7 +111,11 @@ struct Session {

#[async_trait]
impl ActorState for SessionStore {
type Permanence = Permanent;
type ActorType = SinkActor;

type Message = SessionCommand;
type Output = ();

async fn start_up(&mut self, scheduler: &mut Scheduler<Self>) {
self.db.clone().load_all_sessions(self).await;
Expand Down Expand Up @@ -163,7 +168,7 @@ impl SessionStore {
self.sessions.insert(token.clone(), session.clone());
let db = self.db.clone();
let db_session = session.clone();
scheduler.process(async move { db.persist_session(db_session).await });
scheduler.manage_future(async move { db.persist_session(db_session).await });
session
}

Expand All @@ -173,7 +178,7 @@ impl SessionStore {
self.sessions.insert(token.clone(), session.clone());
let db = self.db.clone();
let db_session = session.clone();
scheduler.process(async move { db.persist_session(db_session).await });
scheduler.manage_future(async move { db.persist_session(db_session).await });
session
}

Expand Down Expand Up @@ -214,7 +219,7 @@ impl SessionStore {
AnyUser::Guest(token) | AnyUser::Active(token) => {
if let Some(session) = self.remove_session(&token) {
let db = self.db.clone();
scheduler.process(async move { db.remove_session(session).await });
scheduler.manage_future(async move { db.remove_session(session).await });
true
} else {
false
Expand All @@ -223,7 +228,7 @@ impl SessionStore {
AnyUser::ExpiredGuest(token) | AnyUser::Expired(token) => {
if let Some(session) = self.remove_session(&token) {
let db = self.db.clone();
scheduler.process(async move { db.remove_expired_session(session).await });
scheduler.manage_future(async move { db.remove_expired_session(session).await });
true
} else {
false
Expand All @@ -243,14 +248,14 @@ impl SessionStore {
SessionCommand::Revoke(token.clone()),
);
let db = self.db.clone();
scheduler.process(async move { db.expire_session(session).await });
scheduler.manage_future(async move { db.expire_session(session).await });
}
}

fn revoke_session(&mut self, scheduler: &mut Scheduler<Self>, token: &SessionToken) {
if let Some(session) = self.remove_session(token) {
let db = self.db.clone();
scheduler.process(async move { db.remove_expired_session(session).await });
scheduler.manage_future(async move { db.remove_expired_session(session).await });
}
}

Expand Down Expand Up @@ -378,12 +383,12 @@ async fn delete_session(table: Collection<Session>, session: Session) -> bool {

#[derive(Debug, Clone)]
pub struct SessionStoreHandle {
client: ActorClient<SessionStore>,
client: SinkClient<Permanent, SessionCommand>,
}

impl SessionStoreHandle {
pub fn new(db: Database) -> Self {
let client = ActorClient::builder(SessionStore::new(db)).launch();
let client = ActorBuilder::new(SessionStore::new(db)).launch();
Self { client }
}

Expand Down
8 changes: 7 additions & 1 deletion squire_core/src/state/tournaments.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::{ops::Range, sync::Arc};

use async_trait::async_trait;
use futures::StreamExt;
use mongodb::{
bson::{doc, spec::BinarySubtype, Binary, Document},
options::{FindOptions, Hint, UpdateModifications, UpdateOptions},
Collection, Database,
};
use squire_sdk::{
actor::*, api::TournamentSummary, model::tournament::TournamentId,
api::TournamentSummary, model::tournament::TournamentId,
server::gathering::PersistMessage, sync::TournamentManager,
};
use tracing::Level;
use troupe::prelude::{ActorState, Permanent, Scheduler, SinkActor};

#[derive(Debug, Clone)]
pub struct TournDb {
Expand All @@ -24,7 +26,11 @@ pub struct TournPersister {

#[async_trait]
impl ActorState for TournPersister {
type Permanence = Permanent;
type ActorType = SinkActor;

type Message = PersistMessage;
type Output = ();

async fn process(&mut self, _scheduler: &mut Scheduler<Self>, msg: Self::Message) {
match msg {
Expand Down
1 change: 0 additions & 1 deletion squire_core/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use http::{
header::{CONTENT_TYPE, SET_COOKIE},
Method,
};
use hyper::Body;
use serde::{de::DeserializeOwned, Serialize};
use squire_sdk::api::Url;
use tower::{Service, ServiceExt};
Expand Down
2 changes: 2 additions & 0 deletions squire_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ headers = { version = "0.4", optional = true }
# To be moved
hashbag = { version = "0.1.11", features = ["serde"] }
derive_more = "0.99.17"
hyper = "1.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The history of this one is a bit weird. Within the history of development, we see hyper = "0.14" added and removed.

Should this line be here in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, the history is a bit weird, but this line is fine. This change is needed regardless.

troupe = "0.1.0"

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = { version = "0.4.37" }
Expand Down
Loading