Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii): retrieve events and store only relevant txns #1631

Merged
merged 14 commits into from
Mar 20, 2024
139 changes: 74 additions & 65 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{
BlockId, BlockWithTxs, Event, InvokeTransaction, MaybePendingBlockWithTxs,
MaybePendingTransactionReceipt, Transaction, TransactionReceipt,
BlockId, EmittedEvent, Event, EventFilter, MaybePendingTransactionReceipt, Transaction,
TransactionReceipt,
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
Expand Down Expand Up @@ -56,7 +56,7 @@
data: Vec<String>,
}

impl<'db, P: Provider + Sync> Engine<'db, P> {
impl<'db, P: Provider + Sync + Send> Engine<'db, P> {
pub fn new(
world: WorldContractReader<P>,
db: &'db mut Sql,
Expand Down Expand Up @@ -119,67 +119,66 @@
Ok(latest_block_number)
}

pub async fn sync_range(&mut self, mut from: u64, to: u64) -> Result<()> {
pub async fn sync_range(&mut self, from: u64, to: u64) -> Result<()> {
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
// Process all blocks from current to latest.
while from <= to {
let block_with_txs = match self.provider.get_block_with_txs(BlockId::Number(from)).await
{
Ok(block_with_txs) => block_with_txs,
Err(e) => {
error!("getting block: {}", e);
continue;
}
};
let get_event = |token: Option<String>| {
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
self.provider.get_events(
EventFilter {
from_block: Some(BlockId::Number(from)),
to_block: Some(BlockId::Number(to)),
address: Some(self.world.address),
keys: None,
},
token,
1000,
)
};

// send the current block number
if let Some(ref block_tx) = self.block_tx {
block_tx.send(from).await.expect("failed to send block number to gRPC server");
}
// handle next events pages
let mut events_pages = vec![];

match self.process(block_with_txs).await {
Ok(_) => {
self.db.set_head(from);
self.db.execute().await?;
from += 1;
}
Err(e) => {
error!("processing block: {}", e);
continue;
}
events_pages.push(get_event(None).await?);
while let Some(token) = &events_pages.last().unwrap().continuation_token {
events_pages.push(get_event(Some(token.clone())).await?);

Check warning on line 142 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L142

Added line #L142 was not covered by tests
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
}

let mut last_block: u64 = 0;
for events_page in events_pages {
for event in events_page.events {
self.process(event, &mut last_block).await?;
}
}

self.db.execute().await?;

Ok(())
}

async fn process(&mut self, block: MaybePendingBlockWithTxs) -> Result<()> {
let block: BlockWithTxs = match block {
MaybePendingBlockWithTxs::Block(block) => block,
_ => return Ok(()),
async fn process(&mut self, event: EmittedEvent, last_block: &mut u64) -> Result<()> {
let block_number = match event.block_number {
Some(block_number) => block_number,
None => {
error!("event without block number");
return Ok(());

Check warning on line 162 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L161-L162

Added lines #L161 - L162 were not covered by tests
}
};

Self::process_block(self, &block).await?;
if block_number > *last_block {
*last_block = block_number;

for (tx_idx, transaction) in block.clone().transactions.iter().enumerate() {
let transaction_hash = match transaction {
Transaction::Invoke(invoke_transaction) => {
if let InvokeTransaction::V1(invoke_transaction) = invoke_transaction {
invoke_transaction.transaction_hash
} else {
continue;
}
}
Transaction::L1Handler(l1_handler_transaction) => {
l1_handler_transaction.transaction_hash
}
_ => continue,
};
if let Some(ref block_tx) = self.block_tx {
block_tx.send(block_number).await?;

Check warning on line 170 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L170

Added line #L170 was not covered by tests
}

self.process_transaction_and_receipt(transaction_hash, transaction, &block, tx_idx)
.await?;
Self::process_block(self, block_number, event.block_hash.unwrap()).await?;
info!(target: "torii_core::engine", block = %block_number, "Processed block");

Check warning on line 174 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L174

Added line #L174 was not covered by tests
Larkooo marked this conversation as resolved.
Show resolved Hide resolved

self.db.set_head(block_number);
}

info!("processed block: {}", block.block_number);
let transaction = self.provider.get_transaction_by_hash(event.transaction_hash).await?;
self.process_transaction_and_receipt(event.transaction_hash, &transaction, block_number)
.await?;

Ok(())
}
Expand All @@ -188,8 +187,7 @@
&mut self,
transaction_hash: FieldElement,
transaction: &Transaction,
block: &BlockWithTxs,
tx_idx: usize,
block_number: u64,
) -> Result<()> {
let receipt = match self.provider.get_transaction_receipt(transaction_hash).await {
Ok(receipt) => match receipt {
Expand Down Expand Up @@ -222,45 +220,49 @@

world_event = true;
let event_id =
format!("0x{:064x}:0x{:04x}:0x{:04x}", block.block_number, tx_idx, event_idx);
format!("{:#064x}:{:#x}:{:#04x}", block_number, transaction_hash, event_idx);

Self::process_event(self, block, &receipt, &event_id, event).await?;
Self::process_event(self, block_number, &receipt, &event_id, event).await?;
}

if world_event {
let transaction_id = format!("0x{:064x}:0x{:04x}", block.block_number, tx_idx);

Self::process_transaction(self, block, &receipt, &transaction_id, transaction)
.await?;
Self::process_transaction(
self,
block_number,
&receipt,
transaction_hash,
transaction,
)
.await?;

Check warning on line 236 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L236

Added line #L236 was not covered by tests
}
}

Ok(())
}

async fn process_block(&mut self, block: &BlockWithTxs) -> Result<()> {
async fn process_block(&mut self, block_number: u64, block_hash: FieldElement) -> Result<()> {
for processor in &self.processors.block {
processor.process(self.db, self.provider.as_ref(), block).await?;
processor.process(self.db, self.provider.as_ref(), block_number, block_hash).await?;

Check warning on line 245 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L245

Added line #L245 was not covered by tests
}
Ok(())
}

async fn process_transaction(
&mut self,
block: &BlockWithTxs,
block_number: u64,
transaction_receipt: &TransactionReceipt,
transaction_id: &str,
transaction_hash: FieldElement,
transaction: &Transaction,
) -> Result<()> {
for processor in &self.processors.transaction {
processor
.process(
self.db,
self.provider.as_ref(),
block,
block_number,

Check warning on line 262 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L262

Added line #L262 was not covered by tests
transaction_receipt,
transaction_hash,

Check warning on line 264 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L264

Added line #L264 was not covered by tests
transaction,
transaction_id,
)
.await?
}
Expand All @@ -270,7 +272,7 @@

async fn process_event(
&mut self,
block: &BlockWithTxs,
block_number: u64,
transaction_receipt: &TransactionReceipt,
event_id: &str,
event: &Event,
Expand All @@ -288,7 +290,14 @@
&& processor.validate(event)
{
processor
.process(&self.world, self.db, block, transaction_receipt, event_id, event)
.process(
&self.world,
self.db,
block_number,
transaction_receipt,
event_id,
event,
)
.await?;
} else {
let unprocessed_event = UnprocessedEvent {
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/metadata_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use dojo_world::contracts::world::WorldContractReader;
use dojo_world::metadata::{Uri, WorldMetadata};
use reqwest::Client;
use starknet::core::types::{BlockWithTxs, Event, TransactionReceipt};
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use starknet_crypto::FieldElement;
Expand Down Expand Up @@ -48,7 +48,7 @@
&self,
_world: &WorldContractReader<P>,
db: &mut Sql,
_block: &BlockWithTxs,
_block_number: u64,

Check warning on line 51 in crates/torii/core/src/processors/metadata_update.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/metadata_update.rs#L51

Added line #L51 was not covered by tests
_transaction_receipt: &TransactionReceipt,
_event_id: &str,
event: &Event,
Expand Down
17 changes: 12 additions & 5 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{BlockWithTxs, Event, Transaction, TransactionReceipt};
use starknet::core::types::{Event, Transaction, TransactionReceipt};
use starknet::providers::Provider;
use starknet_crypto::FieldElement;

use crate::sql::Sql;

Expand Down Expand Up @@ -33,7 +34,7 @@ where
&self,
world: &WorldContractReader<P>,
db: &mut Sql,
block: &BlockWithTxs,
block_number: u64,
transaction_receipt: &TransactionReceipt,
event_id: &str,
event: &Event,
Expand All @@ -43,7 +44,13 @@ where
#[async_trait]
pub trait BlockProcessor<P: Provider + Sync> {
fn get_block_number(&self) -> String;
async fn process(&self, db: &mut Sql, provider: &P, block: &BlockWithTxs) -> Result<(), Error>;
async fn process(
&self,
db: &mut Sql,
provider: &P,
block_number: u64,
block_hash: FieldElement,
) -> Result<(), Error>;
}

#[async_trait]
Expand All @@ -52,9 +59,9 @@ pub trait TransactionProcessor<P: Provider + Sync> {
&self,
db: &mut Sql,
provider: &P,
block: &BlockWithTxs,
block_number: u64,
transaction_receipt: &TransactionReceipt,
transaction_hash: FieldElement,
transaction: &Transaction,
transaction_id: &str,
) -> Result<(), Error>;
}
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/register_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{BlockWithTxs, Event, TransactionReceipt};
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use tracing::{debug, info};
Expand Down Expand Up @@ -38,7 +38,7 @@ where
&self,
world: &WorldContractReader<P>,
db: &mut Sql,
_block: &BlockWithTxs,
_block_number: u64,
_transaction_receipt: &TransactionReceipt,
_event_id: &str,
event: &Event,
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{BlockWithTxs, Event, TransactionReceipt};
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use tracing::info;
Expand Down Expand Up @@ -39,7 +39,7 @@ where
&self,
_world: &WorldContractReader<P>,
db: &mut Sql,
_block: &BlockWithTxs,
_block_number: u64,
_transaction_receipt: &TransactionReceipt,
_event_id: &str,
event: &Event,
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{BlockWithTxs, Event, TransactionReceipt};
use starknet::core::types::{Event, TransactionReceipt};
use starknet::core::utils::parse_cairo_short_string;
use starknet::providers::Provider;
use tracing::info;
Expand Down Expand Up @@ -39,7 +39,7 @@ where
&self,
_world: &WorldContractReader<P>,
db: &mut Sql,
_block: &BlockWithTxs,
_block_number: u64,
_transaction_receipt: &TransactionReceipt,
event_id: &str,
event: &Event,
Expand Down
10 changes: 6 additions & 4 deletions crates/torii/core/src/processors/store_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use starknet::core::types::{BlockWithTxs, Transaction, TransactionReceipt};
use starknet::core::types::{Transaction, TransactionReceipt};
use starknet::providers::Provider;
use starknet_crypto::FieldElement;

use super::TransactionProcessor;
use crate::sql::Sql;
Expand All @@ -15,12 +16,13 @@
&self,
db: &mut Sql,
_provider: &P,
_block: &BlockWithTxs,
block_number: u64,

Check warning on line 19 in crates/torii/core/src/processors/store_transaction.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_transaction.rs#L19

Added line #L19 was not covered by tests
_receipt: &TransactionReceipt,
transaction_hash: FieldElement,

Check warning on line 21 in crates/torii/core/src/processors/store_transaction.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_transaction.rs#L21

Added line #L21 was not covered by tests
transaction: &Transaction,
transaction_id: &str,
) -> Result<(), Error> {
db.store_transaction(transaction, transaction_id);
let transaction_id = format!("{:#064x}:{:#x}", block_number, transaction_hash);
db.store_transaction(transaction, &transaction_id);

Check warning on line 25 in crates/torii/core/src/processors/store_transaction.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_transaction.rs#L24-L25

Added lines #L24 - L25 were not covered by tests

Ok(())
}
Expand Down
Loading