Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep PendingComponents in da_checker during import_block #5845

Merged
merged 5 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3328,6 +3328,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"payload_verification_handle",
)
.await??;

// Remove block components from da_checker AFTER completing block import. Then we can assert
// the following invariant:
// > A valid unfinalized block is either in fork-choice or da_checker.
//
// If we remove the block when it becomes available, there's some time window during
// `import_block` where the block is nowhere. Consumers of the da_checker can handle the
// extend time a block may exist in the da_checker.
//
// If `import_block` errors (only errors with internal errors), the pending components will
// be pruned on data_availability_checker maintenance as finality advances.
self.data_availability_checker
.remove_pending_components(block_root);

Ok(AvailabilityProcessingStatus::Imported(block_root))
}

Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_pending_executed_block(executed_block)
}

pub fn remove_pending_components(&self, block_root: Hash256) {
self.availability_cache
.remove_pending_components(block_root)
}

/// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may
/// include the fully available block.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,18 +475,18 @@ impl<T: BeaconChainTypes> Critical<T> {
Ok(())
}

/// Removes and returns the pending_components corresponding to
/// the `block_root` or `None` if it does not exist
pub fn pop_pending_components(
/// Returns the pending_components corresponding to the `block_root` or `None` if it does not
/// exist
pub fn get_pending_components(
&mut self,
block_root: Hash256,
store: &OverflowStore<T>,
) -> Result<Option<PendingComponents<T::EthSpec>>, AvailabilityCheckError> {
match self.in_memory.pop_entry(&block_root) {
Some((_, pending_components)) => Ok(Some(pending_components)),
match self.in_memory.get(&block_root) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be an rare edge case scenario here where:

  1. we load the PendingComponents from memory
  2. we update it and try to put it back, however in_memory is at capacity and it goes to the disk.
  3. next time we want to update it, it loads from memory which would give us the outdated component

In this case I think it would just delay availability as we would do another lookup right? I think we might be able to avoid this altogether if we check if the components is already in memory before we attempt to use the overflow store?

Another one that's probably less of an issue:

  1. We load the PendingComponents, not found in memory, so it returned from store
  2. We update it and put it back to memory, now we have two versions one in memory and one on disk
  3. Now if the in memory one somehow gets pruned first, then we end up loading an old component from disk.

I think this is unlikely under normal network condition as the LRU cache's default capacity is 1024.

Some(pending_components) => Ok(Some(pending_components.clone())),
None => {
// not in memory, is it in the store?
if self.store_keys.remove(&block_root) {
if self.store_keys.contains(&block_root) {
// We don't need to remove the data from the store as we have removed it from
// `store_keys` so we won't go looking for it on disk. The maintenance thread
// will remove it from disk the next time it runs.
Expand All @@ -498,6 +498,21 @@ impl<T: BeaconChainTypes> Critical<T> {
}
}

/// Removes and returns the pending_components corresponding to
/// the `block_root` or `None` if it does not exist
Comment on lines +501 to +502
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Removes and returns the pending_components corresponding to
/// the `block_root` or `None` if it does not exist
/// Removes the `pending_components` corresponding to the `block_root`.

pub fn remove_pending_components(&mut self, block_root: Hash256) {
match self.in_memory.pop_entry(&block_root) {
Some { .. } => {}
None => {
// not in memory, is it in the store?
// We don't need to remove the data from the store as we have removed it from
// `store_keys` so we won't go looking for it on disk. The maintenance thread
// will remove it from disk the next time it runs.
self.store_keys.remove(&block_root);
}
}
}

/// Returns the number of pending component entries in memory.
pub fn num_blocks(&self) -> usize {
self.in_memory.len()
Expand Down Expand Up @@ -600,13 +615,18 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {

// Grab existing entry or create a new entry.
let mut pending_components = write_lock
.pop_pending_components(block_root, &self.overflow_store)?
.get_pending_components(block_root, &self.overflow_store)?
.unwrap_or_else(|| PendingComponents::empty(block_root));

// Merge in the blobs.
pending_components.merge_blobs(fixed_blobs);

if pending_components.is_available() {
write_lock.put_pending_components(
block_root,
pending_components.clone(),
&self.overflow_store,
)?;
Comment on lines +625 to +629
Copy link
Member

@jimmygchen jimmygchen May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're cloning pending components for a second time here and I'm not sure if it's necessary.

If the pending component is newly created in this function, it won't immediately become available. So this means the pending components would always be in the overflow_store if it becomes available here right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh never mind, we do need this as we're mutating a clone!

Copy link
Member

@ethDreamer ethDreamer May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to modify pop_pending_components to get_pending_components. Instead you should keep it as is. Because the write lock is held the whole time, there is no difference between:

lock = get_write_lock()
item = cache.remove()
item.mutate()
cache.insert(item)
drop(lock)

and

lock = get_write_lock()
item = cache.get()
item.mutate()
cache.update(item)
drop(lock)

You can just re-insert the item even when the components are complete (just like you did above) and then remove them with the same pop_pending_components method.

// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
Expand Down Expand Up @@ -638,14 +658,19 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {

// Grab existing entry or create a new entry.
let mut pending_components = write_lock
.pop_pending_components(block_root, &self.overflow_store)?
.get_pending_components(block_root, &self.overflow_store)?
.unwrap_or_else(|| PendingComponents::empty(block_root));

// Merge in the block.
pending_components.merge_block(diet_executed_block);

// Check if we have all components and entire set is consistent.
if pending_components.is_available() {
write_lock.put_pending_components(
block_root,
pending_components.clone(),
&self.overflow_store,
)?;
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
Expand All @@ -661,6 +686,10 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}
}

pub fn remove_pending_components(&self, block_root: Hash256) {
self.critical.write().remove_pending_components(block_root);
}

/// write all in memory objects to disk
pub fn write_all_to_disk(&self) -> Result<(), AvailabilityCheckError> {
let maintenance_lock = self.maintenance_lock.lock();
Expand Down
Loading