Skip to content

Commit

Permalink
fix(db): avoid deadlocks on the RwLock
Browse files Browse the repository at this point in the history
To fix this (and also make the struct code easier) we changed the RwLock to std instead of tokio.

This makes many functions sync by default and remove the need for the `block_on` part of the reason for the deadlock.

The actual reason is not 100% clear still though.
  • Loading branch information
zizou0x committed Nov 19, 2024
1 parent 62309a6 commit b18c60c
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 158 deletions.
1 change: 0 additions & 1 deletion examples/explorer/data_feed/tycho.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ fn balancer_pool_filter(component: &ComponentWithState) -> bool {
"SiloLinearPoolFactory",
"YearnLinearPoolFactory",
"ComposableStablePoolFactory",
"WeightedPool2TokensFactory",
]
.iter()
.cloned()
Expand Down
278 changes: 131 additions & 147 deletions src/evm/tycho_db.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

use revm::{
db::DatabaseRef,
primitives::{AccountInfo, Address, Bytecode, Bytes, B256, U256 as rU256},
};
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, warn};

use crate::evm::{
Expand Down Expand Up @@ -77,68 +79,57 @@ impl PreCachedDB {
}

#[instrument(skip_all)]
pub async fn update(&self, account_updates: Vec<AccountUpdate>, block: Option<BlockHeader>) {
// Block the current thread until the future completes.
self.block_on(async {
// Hold the write lock for the duration of the function so that no other thread can
// write to the storage.
let mut write_guard = self.inner.write().await;

write_guard.block = block;

for update in account_updates {
match update.change {
ChangeType::Update => {
info!(%update.address, "Updating account");

// If the account is not present, the internal storage will handle throwing
// an exception.
write_guard.accounts.update_account(
&update.address,
&StateUpdate {
storage: Some(update.slots.clone()),
balance: update.balance,
},
);
}
ChangeType::Deletion => {
info!(%update.address, "Deleting account");
pub fn update(&self, account_updates: Vec<AccountUpdate>, block: Option<BlockHeader>) {
// Hold the write lock for the duration of the function so that no other thread can
// write to the storage.
let mut write_guard = self.inner.write().unwrap();

write_guard.block = block;

for update in account_updates {
match update.change {
ChangeType::Update => {
info!(%update.address, "Updating account");

// If the account is not present, the internal storage will handle throwing
// an exception.
write_guard.accounts.update_account(
&update.address,
&StateUpdate {
storage: Some(update.slots.clone()),
balance: update.balance,
},
);
}
ChangeType::Deletion => {
info!(%update.address, "Deleting account");

// TODO: Implement deletion.
warn!(%update.address, "Deletion not implemented");
}
ChangeType::Creation => {
info!(%update.address, "Creating account");

// We expect the code and balance to be present.
let code = Bytecode::new_raw(Bytes::from(
update
.code
.clone()
.expect("account code"),
));
let balance = update.balance.expect("account balance");

// Initialize the account.
write_guard.accounts.init_account(
update.address,
AccountInfo::new(balance, 0, code.hash_slow(), code),
Some(update.slots.clone()),
true, /* Flag all accounts in TychoDB mocked to sign that we cannot
* call an RPC provider for an update */
);
}
// TODO: Implement deletion.
warn!(%update.address, "Deletion not implemented");
}
ChangeType::Creation => {
info!(%update.address, "Creating account");

// We expect the code and balance to be present.
let code = Bytecode::new_raw(Bytes::from(
update
.code
.clone()
.expect("account code"),
));
let balance = update.balance.expect("account balance");

// Initialize the account.
write_guard.accounts.init_account(
update.address,
AccountInfo::new(balance, 0, code.hash_slow(), code),
Some(update.slots.clone()),
true, /* Flag all accounts in TychoDB mocked to sign that we cannot
* call an RPC provider for an update */
);
}
}
})
}

/// Executes a future, blocking the current thread until the future completes.
fn block_on<F: core::future::Future>(&self, f: F) -> F::Output {
// If we get here and have to block the current thread, we really
// messed up indexing / filling the storage. In that case this will save us
// at the price of a very high time penalty.
futures::executor::block_on(f)
}
}

/// Retrieves the storage value at the specified index for the given account, if it exists.
Expand All @@ -156,19 +147,14 @@ impl PreCachedDB {
///
/// Returns an `Option` containing a reference to the storage value if it exists, otherwise
/// returns `None`.
async fn get_storage_async(&self, address: &Address, index: &rU256) -> Option<rU256> {
pub fn get_storage(&self, address: &Address, index: &rU256) -> Option<rU256> {
self.inner
.read()
.await
.unwrap()
.accounts
.get_storage(address, index)
}

/// Blocking version of [get_storage_async]
pub fn get_storage(&self, address: &Address, index: &rU256) -> Option<rU256> {
self.block_on(self.get_storage_async(address, index))
}

/// Update the simulation state.
///
/// This method modifies the current state of the simulation by applying the provided updates to
Expand All @@ -183,50 +169,47 @@ impl PreCachedDB {
updates: &HashMap<Address, StateUpdate>,
block: BlockHeader,
) -> HashMap<Address, StateUpdate> {
// Block the current thread until the future completes.
self.block_on(async {
// Hold the write lock for the duration of the function so that no other thread can
// write to the storage.
let mut write_guard = self.inner.write().await;
// Hold the write lock for the duration of the function so that no other thread can
// write to the storage.
let mut write_guard = self.inner.write().unwrap();

let mut revert_updates = HashMap::new();
write_guard.block = Some(block);
let mut revert_updates = HashMap::new();
write_guard.block = Some(block);

for (address, update_info) in updates.iter() {
let mut revert_entry = StateUpdate::default();
for (address, update_info) in updates.iter() {
let mut revert_entry = StateUpdate::default();

if let Some(current_account) = write_guard
.accounts
.get_account_info(address)
{
revert_entry.balance = Some(current_account.balance);
}
if let Some(current_account) = write_guard
.accounts
.get_account_info(address)
{
revert_entry.balance = Some(current_account.balance);
}

if update_info.storage.is_some() {
let mut revert_storage = HashMap::default();
for index in update_info
.storage
.as_ref()
.unwrap()
.keys()
if update_info.storage.is_some() {
let mut revert_storage = HashMap::default();
for index in update_info
.storage
.as_ref()
.unwrap()
.keys()
{
if let Some(s) = write_guard
.accounts
.get_storage(address, index)
{
if let Some(s) = write_guard
.accounts
.get_storage(address, index)
{
revert_storage.insert(*index, s);
}
revert_storage.insert(*index, s);
}
revert_entry.storage = Some(revert_storage);
}
revert_updates.insert(*address, revert_entry);
write_guard
.accounts
.update_account(address, update_info);
revert_entry.storage = Some(revert_storage);
}
revert_updates.insert(*address, revert_entry);
write_guard
.accounts
.update_account(address, update_info);
}

revert_updates
})
revert_updates
}

/// Deprecated in TychoDB
Expand All @@ -236,7 +219,10 @@ impl PreCachedDB {

/// If block is set, returns the number. Otherwise returns None.
pub fn block_number(&self) -> Option<u64> {
self.block_on(async { self.inner.read().await.block })
self.inner
.read()
.unwrap()
.block
.as_ref()
.map(|header| header.number)
}
Expand All @@ -262,13 +248,11 @@ impl EngineDatabaseInterface for PreCachedDB {
permanent_storage: Option<HashMap<rU256, rU256>>,
_mocked: bool,
) {
self.block_on(async {
self.inner
.write()
.await
.accounts
.init_account(address, to_analysed(account), permanent_storage, true)
});
self.inner
.write()
.unwrap()
.accounts
.init_account(address, to_analysed(account), permanent_storage, true)
}
}

Expand All @@ -287,15 +271,13 @@ impl DatabaseRef for PreCachedDB {
/// Returns a `Result` containing the account information or an error if the account is not
/// found.
fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
self.block_on(async {
self.inner
.read()
.await
.accounts
.get_account_info(&address)
.map(|acc| Some(acc.clone()))
.ok_or(PreCachedDBError::MissingAccount(address))
})
self.inner
.read()
.unwrap()
.accounts
.get_account_info(&address)
.map(|acc| Some(acc.clone()))
.ok_or(PreCachedDBError::MissingAccount(address))
}

fn code_by_hash_ref(&self, _code_hash: B256) -> Result<Bytecode, Self::Error> {
Expand All @@ -318,36 +300,34 @@ impl DatabaseRef for PreCachedDB {
/// Returns an error if the storage value is not found.
fn storage_ref(&self, address: Address, index: rU256) -> Result<rU256, Self::Error> {
debug!(%address, %index, "Requested storage of account");
self.block_on(async {
let read_guard = self.inner.read().await;
if let Some(storage_value) = read_guard
let read_guard = self.inner.read().unwrap();
if let Some(storage_value) = read_guard
.accounts
.get_storage(&address, &index)
{
debug!(%address, %index, %storage_value, "Got value locally");
Ok(storage_value)
} else {
// At this point we either don't know this address or we don't have anything at this
if read_guard
.accounts
.get_storage(&address, &index)
.account_present(&address)
{
debug!(%address, %index, %storage_value, "Got value locally");
Ok(storage_value)
// As we only store non-zero values, if the account is present it means this
// slot is zero.
debug!(%address, %index, "Account found, but slot is zero");
Ok(rU256::ZERO)
} else {
// At this point we either don't know this address or we don't have anything at this
if read_guard
.accounts
.account_present(&address)
{
// As we only store non-zero values, if the account is present it means this
// slot is zero.
debug!(%address, %index, "Account found, but slot is zero");
Ok(rU256::ZERO)
} else {
// At this point we know we don't have data for this address.
debug!(%address, %index, "Account not found");
Err(PreCachedDBError::MissingAccount(address))
}
// At this point we know we don't have data for this address.
debug!(%address, %index, "Account not found");
Err(PreCachedDBError::MissingAccount(address))
}
})
}
}

/// If block header is set, returns the hash. Otherwise returns a zero hash.
fn block_hash_ref(&self, _number: u64) -> Result<B256, Self::Error> {
match &self.block_on(async { self.inner.read().await.block }) {
match self.inner.read().unwrap().block {
Some(header) => Ok(B256::from_slice(header.hash.as_bytes())),
None => Ok(B256::default()),
}
Expand Down Expand Up @@ -485,7 +465,10 @@ mod tests {
.unwrap();
assert_eq!(account_info.balance, new_balance);
let block = mock_db
.block_on(async { mock_db.inner.read().await.block })
.inner
.read()
.unwrap()
.block
.expect("block is Some");
assert_eq!(block.number, 1);

Expand Down Expand Up @@ -546,9 +529,7 @@ mod tests {
.naive_utc(),
};

mock_db
.update(vec![account_update], Some(new_block.into()))
.await;
mock_db.update(vec![account_update], Some(new_block.into()));

let account_info = mock_db
.basic_ref(Address::from_str("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D").unwrap())
Expand All @@ -570,7 +551,10 @@ mod tests {

assert_eq!(
mock_db
.block_on(async { mock_db.inner.read().await.block })
.inner
.read()
.unwrap()
.block
.expect("block is Some")
.number,
1
Expand Down
Loading

0 comments on commit b18c60c

Please sign in to comment.