Skip to content

Commit

Permalink
store: cache recent blocks in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
neysofu committed Nov 30, 2022
1 parent c565978 commit 328c9bc
Showing 1 changed file with 79 additions and 10 deletions.
89 changes: 79 additions & 10 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ pub struct ChainStore {
genesis_block_ptr: BlockPtr,
status: ChainStatus,
chain_head_update_sender: ChainHeadUpdateSender,
block_cache: TimedCache<&'static str, BlockPtr>,
recent_blocks_cache: RecentBlocksCache,
}

impl ChainStore {
Expand All @@ -1363,7 +1363,7 @@ impl ChainStore {
genesis_block_ptr: BlockPtr::new(net_identifier.genesis_block_hash.clone(), 0),
status,
chain_head_update_sender,
block_cache: TimedCache::new(Duration::from_secs(5)),
recent_blocks_cache: RecentBlocksCache::default(),
}
}

Expand Down Expand Up @@ -1597,7 +1597,7 @@ impl ChainStoreTrait for ChainStore {
})
.and_then(|opt: Option<BlockPtr>| opt)
.map(|head| {
self.block_cache.set("head", Arc::new(head.clone()));
self.recent_blocks_cache.set_chain_head(head.clone());
head
})
})
Expand All @@ -1607,8 +1607,8 @@ impl ChainStoreTrait for ChainStore {
}

async fn cached_head_ptr(self: Arc<Self>) -> Result<Option<BlockPtr>, Error> {
match self.block_cache.get("head") {
Some(head) => Ok(Some(head.as_ref().clone())),
match self.recent_blocks_cache.chain_head() {
Some(head) => Ok(Some(head)),
None => self.chain_head_ptr().await,
}
}
Expand Down Expand Up @@ -1645,6 +1645,7 @@ impl ChainStoreTrait for ChainStore {

//this will send an update via postgres, channel: chain_head_updates
self.chain_head_update_sender.send(&hash, number)?;
self.recent_blocks_cache.set_chain_head(ptr);

pool.with_conn(move |conn, _| {
conn.transaction(|| -> Result<(), StoreError> {
Expand Down Expand Up @@ -1686,15 +1687,27 @@ impl ChainStoreTrait for ChainStore {
block_ptr.hash_hex()
);

Ok(self
.cheap_clone()
// Check the local cache first.
if let Some((_, block_data)) = self.recent_blocks_cache.block(offset).as_deref() {
return Ok(block_data.clone());
}

let block_ptr_clone = block_ptr.clone();
let chain_store = self.cheap_clone();
let block_data = self
.pool
.with_conn(move |conn, _| {
self.storage
.ancestor_block(conn, block_ptr, offset)
chain_store
.storage
.ancestor_block(conn, block_ptr_clone, offset)
.map_err(|e| CancelableError::from(StoreError::from(e)))
})
.await?)
.await?;

self.recent_blocks_cache
.set_block(offset, &block_ptr, block_data.as_ref());

Ok(block_data)
}

fn cleanup_cached_blocks(
Expand Down Expand Up @@ -1813,6 +1826,62 @@ impl ChainStoreTrait for ChainStore {
}
}

type OffsetFromChainHead = BlockNumber;

/// We cache the most recent blocks in memory to avoid overloading the
/// database with unnecessary queries close to the chain head. We invalidate
/// blocks:
/// - After 5 seconds.
/// - Whenever the chain head advances.
pub struct RecentBlocksCache {
blocks: TimedCache<OffsetFromChainHead, (BlockPtr, Option<json::Value>)>,
}

impl RecentBlocksCache {
const CAPACITY: OffsetFromChainHead = 10;
const TTL: Duration = Duration::from_secs(5);

pub fn chain_head(&self) -> Option<BlockPtr> {
self.blocks.get(&0).map(|p| p.0.clone())
}

pub fn set_chain_head(&self, chain_head: BlockPtr) {
// We invalidate the entire cache when the chain head changes.
self.blocks.clear();
self.blocks.set(0, Arc::new((chain_head, None)));
}

pub fn block(
&self,
offset: OffsetFromChainHead,
) -> Option<Arc<(BlockPtr, Option<json::Value>)>> {
self.blocks.get(&offset)
}

/// Inserts this block into the cache, if close enough to the chain
/// head. If not, it's a no-op.
pub fn set_block(
&self,
offset: OffsetFromChainHead,
block: &BlockPtr,
data: Option<&json::Value>,
) {
if offset >= Self::CAPACITY as _ {
return;
}
self.blocks
.set(offset, Arc::new((block.clone(), data.cloned())));
}
}

impl Default for RecentBlocksCache {
fn default() -> Self {
RecentBlocksCache {
blocks: TimedCache::new(Self::TTL),
}
}
}

fn try_parse_timestamp(ts: Option<String>) -> Result<Option<u64>, StoreError> {
let ts = match ts {
Some(str) => str,
Expand Down

0 comments on commit 328c9bc

Please sign in to comment.