Skip to content

Commit

Permalink
Fix #6388, rename sent_hashes field
Browse files Browse the repository at this point in the history
  • Loading branch information
arya2 committed Jul 25, 2023
1 parent b8dd540 commit 6ffccd5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
44 changes: 27 additions & 17 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub(crate) struct StateService {

/// A set of block hashes that have been sent to the block write task.
/// Hashes of blocks below the finalized tip height are periodically pruned.
non_finalized_block_write_sent_hashes: SentHashes,
block_write_sent_hashes: SentHashes,

/// If an invalid block is sent on `finalized_block_write_sender`
/// or `non_finalized_block_write_sender`,
Expand Down Expand Up @@ -405,7 +405,7 @@ impl StateService {
non_finalized_block_write_sender: Some(non_finalized_block_write_sender),
finalized_block_write_sender: Some(finalized_block_write_sender),
finalized_block_write_last_sent_hash,
non_finalized_block_write_sent_hashes: SentHashes::default(),
block_write_sent_hashes: SentHashes::default(),
invalid_block_write_reset_receiver,
pending_utxos,
last_prune: Instant::now(),
Expand Down Expand Up @@ -468,7 +468,7 @@ impl StateService {
// If we're close to the final checkpoint, make the block's UTXOs available for
// semantic block verification, even when it is in the channel.
if self.is_close_to_final_checkpoint(queued_height) {
self.non_finalized_block_write_sent_hashes
self.block_write_sent_hashes
.add_finalized(&checkpoint_verified)
}

Expand Down Expand Up @@ -647,7 +647,7 @@ impl StateService {
let parent_hash = semantically_verrified.block.header.previous_block_hash;

if self
.non_finalized_block_write_sent_hashes
.block_write_sent_hashes
.contains(&semantically_verrified.hash)
{
let (rsp_tx, rsp_rx) = oneshot::channel();
Expand Down Expand Up @@ -707,6 +707,9 @@ impl StateService {
// Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
// and move on to committing semantically verified blocks to the non-finalized state.
std::mem::drop(self.finalized_block_write_sender.take());
self.prune_by_height();
// Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
self.block_write_sent_hashes.can_fork_chain_at_hashes = true;

// We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
self.clear_finalized_block_queue(
Expand All @@ -726,24 +729,32 @@ impl StateService {
if self.finalized_block_write_sender.is_none() {
// Wait until block commit task is ready to write non-finalized blocks before dequeuing them
self.send_ready_non_finalized_queued(parent_hash);
self.prune_by_height();
}

let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
rsp_rx
}

/// Prunes queued blocks and sent hashes up to the finalized tip height.
///
/// # Panics
///
/// If called when the finalized state has no blocks.
fn prune_by_height(&mut self) {
let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state",
);

self.non_finalized_state_queued_blocks
.prune_by_height(finalized_tip_height);
self.non_finalized_state_queued_blocks
.prune_by_height(finalized_tip_height);

self.non_finalized_block_write_sent_hashes
.prune_by_height(finalized_tip_height);
}

rsp_rx
self.block_write_sent_hashes
.prune_by_height(finalized_tip_height);
}

/// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
self.non_finalized_block_write_sent_hashes.contains(hash)
self.block_write_sent_hashes.can_fork_chain_at(hash)
|| &self.read_service.db.finalized_tip_hash() == hash
}

Expand Down Expand Up @@ -774,8 +785,7 @@ impl StateService {
for queued_child in queued_children {
let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;

self.non_finalized_block_write_sent_hashes
.add(&queued_child.0);
self.block_write_sent_hashes.add(&queued_child.0);
let send_result = non_finalized_block_write_sender.send(queued_child);

if let Err(SendError(queued)) = send_result {
Expand All @@ -796,7 +806,7 @@ impl StateService {
}
}

self.non_finalized_block_write_sent_hashes.finish_batch();
self.block_write_sent_hashes.finish_batch();
};
}

Expand Down Expand Up @@ -1030,7 +1040,7 @@ impl Service<Request> for StateService {
}

// Check the sent non-finalized blocks
if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
if let Some(utxo) = self.block_write_sent_hashes.utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo);

// We're finished, the returned future gets the UTXO from the respond() channel.
Expand Down
11 changes: 11 additions & 0 deletions zebra-state/src/service/queued_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ pub(crate) struct SentHashes {

/// Known UTXOs.
known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,

/// Whether the hashes in this struct can be used check if the chain can be forked.
/// This is set to false until all checkpoint-verified block hashes have been pruned.
pub(crate) can_fork_chain_at_hashes: bool,
}

impl SentHashes {
Expand Down Expand Up @@ -335,6 +339,8 @@ impl SentHashes {
});

self.sent.shrink_to_fit();
self.known_utxos.shrink_to_fit();
self.bufs.shrink_to_fit();

self.update_metrics_for_cache();
}
Expand All @@ -344,6 +350,11 @@ impl SentHashes {
self.sent.contains_key(hash)
}

/// Returns true if the chain can be forked at the provided hash
pub fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
self.can_fork_chain_at_hashes && self.contains(hash)
}

/// Update sent block metrics after a block is sent.
fn update_metrics_for_block(&self, height: block::Height) {
metrics::counter!("state.memory.sent.block.count", 1);
Expand Down

0 comments on commit 6ffccd5

Please sign in to comment.