Skip to content

Commit

Permalink
feat(relay): persist and expose messages through grpc (#1526)
Browse files Browse the repository at this point in the history
* feat(libp2p): start working on persistence & grpc

* feat: add relay proto and db migration

* feat(libp2p): relaystorage for message persistence

* feat(libp2p): relay subscription service

* refactor: start implementing models/ty for libp2p

* refactor(torii-core): generic set ty query

* feat: use models for libp2p messages

* feat: rpcs for subscring and retrieving messages

* refactor(torii-core): arc rwlock for sql database & work on messages for models

* refactor: tests & write message to db

* refactor(grpc): grpc service & subscription

* refactor: use entitiy type and models for messages

* chore: fmt & clippy

* refactor: unused async

* chore: world descriptor

* refactor: unused deps & fmt

* chore: remove post types

* chore: unused libp2p deps

* chore: non wasm deps

* chore

* chore: fmt

* fix: test

* refactor: check if entity already exists for libp2p messages

* chore: fmt & clippy

* feat(libp2p): start working on identity and signatures for messages

* refactor(libp2p): use array for signature type and check its ocmponents

* refactor: decouple logic

* refactor(libpo2p): extract identity & signature from validate message

* feat(libp2p): eip 712 typed data

* feat(libp2p): start workling on typed data structure snip-12

* refactor: change name

* feat: handle type encoding

* feat: typed data message encoding

* refactor: rework in progress for using snip-12

* feat(libp2p): testing typed data with mocks & tests

* fix: typed data passing test for enums

* feat: test for presets and refactor type primitives

* refactor: string parsing for felts

* fix: dont include type hash on children objects

* fix: json ordering issue with serde_json

* fix: typed data for shortstring

* fix: message encoding - passes all tests

* feat: start supporting enums

* feat: handle enum variant types

* chore: rebase and fix db rwlock

* refactor: encode all object types & start fixing enums

* fix: enum test

* fix: preset types. full compatibility with starknet js

* refactor: more optimized approach for preset types

* feat(libp2p): valid message signature & identity

* refactor: wip identity and verifying message

* refactor: get entity model identity & check

* wip: start working on encoding typed data to Ty

* refactor: implement ty types for encode typed data

* refactor: parse object to ty function

* refactor: set entity from relay w verification

* feat: sign last signature and check signed last sig against last sig in db

* chore: fmt

* fix: torii grpc

* refactor: sqlx onlyu server side

* chore: clippy

* chore: use char

* refactor: sql reference

* chore: fmt & clippy

* refactor: typed data payload

* fix: clippy
  • Loading branch information
Larkooo authored Mar 15, 2024
1 parent cfdd170 commit 953e5e5
Show file tree
Hide file tree
Showing 20 changed files with 1,480 additions and 237 deletions.
44 changes: 27 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 11 additions & 10 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn main() -> anyhow::Result<()> {
// Get world address
let world = WorldContractReader::new(args.world_address, &provider);

let mut db = Sql::new(pool.clone(), args.world_address).await?;
let db = Sql::new(pool.clone(), args.world_address).await?;
let processors = Processors {
event: vec![
Box::new(RegisterModelProcessor),
Expand All @@ -161,7 +161,7 @@ async fn main() -> anyhow::Result<()> {

let mut engine = Engine::new(
world,
&mut db,
db.clone(),
&provider,
processors,
EngineConfig { start_block: args.start_block, ..Default::default() },
Expand All @@ -179,6 +179,15 @@ async fn main() -> anyhow::Result<()> {
)
.await?;

let mut libp2p_relay_server = torii_relay::server::Relay::new(
db,
args.relay_port,
args.relay_webrtc_port,
args.relay_local_key_path,
args.relay_cert_path,
)
.expect("Failed to start libp2p relay server");

let proxy_server = Arc::new(Proxy::new(args.addr, args.allowed_origins, Some(grpc_addr), None));

let graphql_server = spawn_rebuilding_graphql_server(
Expand All @@ -188,14 +197,6 @@ async fn main() -> anyhow::Result<()> {
proxy_server.clone(),
);

let mut libp2p_relay_server = torii_relay::server::Relay::new(
args.relay_port,
args.relay_webrtc_port,
args.relay_local_key_path,
args.relay_cert_path,
)
.expect("Failed to start libp2p relay server");

let endpoint = format!("http://{}", args.addr);
let gql_endpoint = format!("{}/graphql", endpoint);
let encoded: String =
Expand Down
32 changes: 3 additions & 29 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use dojo_types::packing::unpack;
use dojo_types::schema::Ty;
use dojo_types::WorldMetadata;
use dojo_world::contracts::WorldContractReader;
use futures::channel::mpsc::UnboundedReceiver;
use futures_util::lock::Mutex;
use parking_lot::{RwLock, RwLockReadGuard};
use starknet::core::utils::cairo_short_string_to_felt;
use starknet::providers::jsonrpc::HttpTransport;
Expand All @@ -22,7 +20,7 @@ use torii_grpc::client::{EntityUpdateStreaming, ModelDiffsStreaming};
use torii_grpc::proto::world::RetrieveEntitiesResponse;
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{KeysClause, Query};
use torii_relay::client::{EventLoop, Message};
use torii_relay::types::Message;

use crate::client::error::{Error, ParseError};
use crate::client::storage::ModelStorage;
Expand Down Expand Up @@ -106,41 +104,17 @@ impl Client {
self.relay_client.command_sender.wait_for_relay().await.map_err(Error::RelayClient)
}

/// Subscribes to a topic.
/// Returns true if the topic was subscribed to.
/// Returns false if the topic was already subscribed to.
pub async fn subscribe_topic(&mut self, topic: String) -> Result<bool, Error> {
self.relay_client.command_sender.subscribe(topic).await.map_err(Error::RelayClient)
}

/// Unsubscribes from a topic.
/// Returns true if the topic was subscribed to.
pub async fn unsubscribe_topic(&mut self, topic: String) -> Result<bool, Error> {
self.relay_client.command_sender.unsubscribe(topic).await.map_err(Error::RelayClient)
}

/// Publishes a message to a topic.
/// Returns the message id.
pub async fn publish_message(&mut self, topic: &str, message: &[u8]) -> Result<Vec<u8>, Error> {
pub async fn publish_message(&mut self, message: Message) -> Result<Vec<u8>, Error> {
self.relay_client
.command_sender
.publish(topic.to_string(), message.to_vec())
.publish(message)
.await
.map_err(Error::RelayClient)
.map(|m| m.0)
}

/// Returns the event loop of the relay client.
/// Which can then be used to run the relay client
pub fn relay_client_runner(&self) -> Arc<Mutex<EventLoop>> {
self.relay_client.event_loop.clone()
}

/// Returns the message receiver of the relay client.
pub fn relay_client_stream(&self) -> Arc<Mutex<UnboundedReceiver<Message>>> {
self.relay_client.message_receiver.clone()
}

/// Returns a read lock on the World metadata that the client is connected to.
pub fn metadata(&self) -> RwLockReadGuard<'_, WorldMetadata> {
self.metadata.read()
Expand Down
14 changes: 7 additions & 7 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ impl Default for EngineConfig {
}
}

pub struct Engine<'db, P: Provider + Sync> {
pub struct Engine<P: Provider + Sync> {
world: WorldContractReader<P>,
db: &'db mut Sql,
db: Sql,
provider: Box<P>,
processors: Processors<P>,
config: EngineConfig,
Expand All @@ -56,10 +56,10 @@ struct UnprocessedEvent {
data: Vec<String>,
}

impl<'db, P: Provider + Sync> Engine<'db, P> {
impl<P: Provider + Sync> Engine<P> {
pub fn new(
world: WorldContractReader<P>,
db: &'db mut Sql,
db: Sql,
provider: P,
processors: Processors<P>,
config: EngineConfig,
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<'db, P: Provider + Sync> Engine<'db, P> {

async fn process_block(&mut self, block: &BlockWithTxs) -> Result<()> {
for processor in &self.processors.block {
processor.process(self.db, self.provider.as_ref(), block).await?;
processor.process(&mut self.db, self.provider.as_ref(), block).await?;
}
Ok(())
}
Expand All @@ -255,7 +255,7 @@ impl<'db, P: Provider + Sync> Engine<'db, P> {
for processor in &self.processors.transaction {
processor
.process(
self.db,
&mut self.db,
self.provider.as_ref(),
block,
transaction_receipt,
Expand Down Expand Up @@ -288,7 +288,7 @@ impl<'db, P: Provider + Sync> Engine<'db, P> {
&& processor.validate(event)
{
processor
.process(&self.world, self.db, block, transaction_receipt, event_id, event)
.process(&self.world, &mut self.db, block, transaction_receipt, event_id, event)
.await?;
} else {
let unprocessed_event = UnprocessedEvent {
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod test;
#[derive(Debug, Clone)]
pub struct Sql {
world_address: FieldElement,
pool: Pool<Sqlite>,
pub pool: Pool<Sqlite>,
query_queue: QueryQueue,
}

Expand Down
6 changes: 3 additions & 3 deletions crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use crate::sql::Sql;

pub async fn bootstrap_engine<P>(
world: WorldContractReader<P>,
db: &mut Sql,
db: Sql,
provider: P,
migration: MigrationStrategy,
sequencer: TestSequencer,
) -> Result<Engine<'_, P>, Box<dyn std::error::Error>>
) -> Result<Engine<P>, Box<dyn std::error::Error>>
where
P: Provider + Send + Sync,
{
Expand Down Expand Up @@ -72,7 +72,7 @@ async fn test_load_from_remote() {
let world = WorldContractReader::new(migration.world_address().unwrap(), &provider);

let mut db = Sql::new(pool.clone(), migration.world_address().unwrap()).await.unwrap();
let _ = bootstrap_engine(world, &mut db, &provider, migration, sequencer).await;
let _ = bootstrap_engine(world, db.clone(), &provider, migration, sequencer).await;

let models = sqlx::query("SELECT * FROM models").fetch_all(&pool).await.unwrap();
assert_eq!(models.len(), 2);
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/graphql/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ pub async fn spinup_types_test() -> Result<SqlitePool> {
let target_path = format!("{}/target/dev", base_path);
let migration = prepare_migration(base_path.into(), target_path.into()).unwrap();
let config = build_test_config("../types-test/Scarb.toml").unwrap();
let mut db = Sql::new(pool.clone(), migration.world_address().unwrap()).await.unwrap();
let db = Sql::new(pool.clone(), migration.world_address().unwrap()).await.unwrap();

let sequencer =
TestSequencer::start(SequencerConfig::default(), get_default_test_starknet_config()).await;
Expand Down Expand Up @@ -316,7 +316,7 @@ pub async fn spinup_types_test() -> Result<SqlitePool> {
let (shutdown_tx, _) = broadcast::channel(1);
let mut engine = Engine::new(
world,
&mut db,
db,
&provider,
Processors {
event: vec![
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/grpc/src/server/subscriptions/model_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl StateDiffManager {
&self,
reqs: Vec<ModelDiffRequest>,
) -> Result<Receiver<Result<proto::world::SubscribeModelsResponse, tonic::Status>>, Error> {
let id = rand::thread_rng().gen::<usize>();
let id: usize = rand::thread_rng().gen::<usize>();

let (sender, receiver) = channel(1);

Expand Down
Loading

0 comments on commit 953e5e5

Please sign in to comment.