Skip to content

Commit

Permalink
store: cache recent blocks in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
neysofu committed Nov 30, 2022
1 parent c565978 commit 2a1f6e7
Showing 1 changed file with 163 additions and 26 deletions.
189 changes: 163 additions & 26 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, PooledConnection};
use diesel::sql_types::Text;
use diesel::{insert_into, update};
use graph::parking_lot::RwLock;

use std::{
collections::HashMap,
Expand All @@ -23,6 +24,7 @@ use graph::prelude::{
use graph::util::timed_cache::TimedCache;
use graph::{constraint_violation, ensure};

use self::recent_blocks_cache::RecentBlocksCache;
use crate::{
block_store::ChainStatus, chain_head_listener::ChainHeadUpdateSender,
connection_pool::ConnectionPool,
Expand Down Expand Up @@ -71,6 +73,7 @@ mod data {
};
use std::fmt;
use std::iter::FromIterator;
use std::str::FromStr;
use std::{convert::TryFrom, io::Write};

use crate::transaction_receipt::RawTransactionReceipt;
Expand Down Expand Up @@ -795,8 +798,8 @@ mod data {
conn: &PgConnection,
block_ptr: BlockPtr,
offset: BlockNumber,
) -> Result<Option<json::Value>, Error> {
let data = match self {
) -> Result<Option<(json::Value, BlockPtr)>, Error> {
let data_and_hash = match self {
Storage::Shared => {
const ANCESTOR_SQL: &str = "
with recursive ancestors(block_hash, block_offset) as (
Expand All @@ -821,12 +824,13 @@ mod data {

match hash {
None => None,
Some(hash) => Some(
Some(hash) => Some((
b::table
.filter(b::hash.eq(hash.hash))
.filter(b::hash.eq(&hash.hash))
.select(b::data)
.first::<json::Value>(conn)?,
),
BlockHash::from_str(&hash.hash)?,
)),
}
}
Storage::Private(Schema { blocks, .. }) => {
Expand Down Expand Up @@ -854,13 +858,14 @@ mod data {
.optional()?;
match hash {
None => None,
Some(hash) => Some(
Some(hash) => Some((
blocks
.table()
.filter(blocks.hash().eq(hash.hash))
.filter(blocks.hash().eq(&hash.hash))
.select(blocks.data())
.first::<json::Value>(conn)?,
),
BlockHash::from(hash.hash),
)),
}
}
};
Expand All @@ -871,15 +876,20 @@ mod data {
// has a 'block' entry
//
// see also 7736e440-4c6b-11ec-8c4d-b42e99f52061
let data = {
let data_and_ptr = {
use graph::prelude::serde_json::json;

data.map(|data| match data.get("block") {
Some(_) => data,
None => json!({ "block": data, "transaction_receipts": [] }),
data_and_hash.map(|(data, hash)| {
(
match data.get("block") {
Some(_) => data,
None => json!({ "block": data, "transaction_receipts": [] }),
},
BlockPtr::new(hash, block_ptr.number - offset),
)
})
};
Ok(data)
Ok(data_and_ptr)
}

pub(super) fn delete_blocks_before(
Expand Down Expand Up @@ -1344,7 +1354,7 @@ pub struct ChainStore {
genesis_block_ptr: BlockPtr,
status: ChainStatus,
chain_head_update_sender: ChainHeadUpdateSender,
block_cache: TimedCache<&'static str, BlockPtr>,
recent_blocks_cache: RecentBlocksCache,
}

impl ChainStore {
Expand All @@ -1363,7 +1373,7 @@ impl ChainStore {
genesis_block_ptr: BlockPtr::new(net_identifier.genesis_block_hash.clone(), 0),
status,
chain_head_update_sender,
block_cache: TimedCache::new(Duration::from_secs(5)),
recent_blocks_cache: RecentBlocksCache::default(),
}
}

Expand Down Expand Up @@ -1597,7 +1607,7 @@ impl ChainStoreTrait for ChainStore {
})
.and_then(|opt: Option<BlockPtr>| opt)
.map(|head| {
self.block_cache.set("head", Arc::new(head.clone()));
self.recent_blocks_cache.set_chain_head(head.clone());
head
})
})
Expand All @@ -1607,8 +1617,8 @@ impl ChainStoreTrait for ChainStore {
}

async fn cached_head_ptr(self: Arc<Self>) -> Result<Option<BlockPtr>, Error> {
match self.block_cache.get("head") {
Some(head) => Ok(Some(head.as_ref().clone())),
match self.recent_blocks_cache.chain_head_ptr() {
Some(head) => Ok(Some(head)),
None => self.chain_head_ptr().await,
}
}
Expand Down Expand Up @@ -1645,6 +1655,7 @@ impl ChainStoreTrait for ChainStore {

//this will send an update via postgres, channel: chain_head_updates
self.chain_head_update_sender.send(&hash, number)?;
self.recent_blocks_cache.set_chain_head(ptr);

pool.with_conn(move |conn, _| {
conn.transaction(|| -> Result<(), StoreError> {
Expand Down Expand Up @@ -1676,25 +1687,41 @@ impl ChainStoreTrait for ChainStore {

async fn ancestor_block(
self: Arc<Self>,
block_ptr: BlockPtr,
ancestor_ptr: BlockPtr,
offset: BlockNumber,
) -> Result<Option<json::Value>, Error> {
ensure!(
block_ptr.number >= offset,
ancestor_ptr.number >= offset,
"block offset {} for block `{}` points to before genesis block",
offset,
block_ptr.hash_hex()
ancestor_ptr.hash_hex()
);

Ok(self
.cheap_clone()
// Check the local cache first.
if let Some(data) = self.recent_blocks_cache.get_block(&ancestor_ptr, offset) {
return Ok(data.1.clone());
}

let ancestor_ptr_clone = ancestor_ptr.clone();
let chain_store = self.cheap_clone();
let block_data = self
.pool
.with_conn(move |conn, _| {
self.storage
.ancestor_block(conn, block_ptr, offset)
chain_store
.storage
.ancestor_block(conn, ancestor_ptr_clone, offset)
.map_err(|e| CancelableError::from(StoreError::from(e)))
})
.await?)
.await?;

if let Some((data, ptr)) = block_data {
// Update the local cache.
self.recent_blocks_cache.set_block(ancestor_ptr, ptr, &data);

Ok(Some(data))
} else {
Ok(None)
}
}

fn cleanup_cached_blocks(
Expand All @@ -1709,6 +1736,8 @@ impl ChainStoreTrait for ChainStore {
block: i32,
}

self.recent_blocks_cache.clear();

// Remove all blocks from the cache that are behind the slowest
// subgraph's head block, but retain the genesis block. We stay
// behind the slowest subgraph so that we do not interfere with its
Expand Down Expand Up @@ -1813,6 +1842,114 @@ impl ChainStoreTrait for ChainStore {
}
}

mod recent_blocks_cache {
use super::*;

type OffsetFromChainHead = BlockNumber;
type Inner = TimedCache<OffsetFromChainHead, (BlockPtr, Option<json::Value>)>;

/// We cache the most recent blocks in memory to avoid overloading the
/// database with unnecessary queries close to the chain head. We invalidate
/// blocks:
/// - After 5 seconds.
/// - Whenever the chain head advances.
pub struct RecentBlocksCache {
// We protect everything with a global `RwLock` to avoid data races.
// Unfortunately, it makes everything so much uglier than it should be.
blocks: RwLock<Inner>,
}

impl RecentBlocksCache {
const CAPACITY: OffsetFromChainHead = 10;
const TTL: Duration = Duration::from_secs(5);

pub fn set_chain_head(&self, chain_head: BlockPtr) {
// We invalidate the entire cache when the chain head changes.
let blocks = self.blocks.write();
blocks.clear();
blocks.set(0, Arc::new((chain_head, None)));
}

pub fn chain_head_ptr(&self) -> Option<BlockPtr> {
self.blocks.read().get(&0).map(|p| p.0.clone())
}

pub fn clear(&self) {
self.blocks.write().clear();
}

fn chain_head_number_inner(blocks: &Inner) -> Option<BlockNumber> {
blocks.get(&0).map(|p| p.0.number)
}

pub fn get_block(
&self,
ancestor: &BlockPtr,
offset: BlockNumber,
) -> Option<Arc<(BlockPtr, Option<json::Value>)>> {
Self::get_block_inner(&*self.blocks.read(), ancestor, offset)
}

fn get_block_inner(
blocks: &Inner,
ancestor: &BlockPtr,
offset: BlockNumber,
) -> Option<Arc<(BlockPtr, Option<json::Value>)>> {
// Check if the ancestor itself is in the cache.
let ancestor_offset_from_head =
Self::chain_head_number_inner(blocks)? - ancestor.number;
match blocks.get(&ancestor_offset_from_head).map(|p| p.0.clone()) {
None => return None,
Some(ptr) if ptr.hash != ancestor.hash => return None,
_ => {}
}

let offset_from_head = ancestor_offset_from_head + offset;
blocks.get(&offset_from_head)
}

/// Inserts this block into the cache, if close enough to the chain
/// head. If not, it's a no-op.
pub fn set_block(&self, ancestor: BlockPtr, ptr: BlockPtr, data: &json::Value) {
Self::set_block_inner(&mut *self.blocks.write(), ancestor, ptr, data)
}

fn set_block_inner(
blocks: &mut Inner,
ancestor: BlockPtr,
ptr: BlockPtr,
data: &json::Value,
) {
// We must have a chain head to cache blocks.
let chain_head_number = if let Some(n) = Self::chain_head_number_inner(blocks) {
n
} else {
return;
};

let offset_from_head = chain_head_number - ptr.number;
// We only cache blocks that are close to the chain head.
if offset_from_head >= Self::CAPACITY as _ {
return;
}
// We only cache blocks if their ancestor is in the cache (chain head excluded).
if Self::get_block_inner(blocks, &ancestor, 0).is_none() {
return;
};

blocks.set(offset_from_head, Arc::new((ptr, Some(data.clone()))));
}
}

impl Default for RecentBlocksCache {
fn default() -> Self {
Self {
blocks: RwLock::new(TimedCache::new(Self::TTL)),
}
}
}
}

fn try_parse_timestamp(ts: Option<String>) -> Result<Option<u64>, StoreError> {
let ts = match ts {
Some(str) => str,
Expand Down

0 comments on commit 2a1f6e7

Please sign in to comment.