Skip to content

Commit

Permalink
feat(optimistic_block): produce (#12761)
Browse files Browse the repository at this point in the history
We continue the implementation of Optimistic block #10584, by adding the
logic to produce the block as soon as the previous block is done.

If available, the optimistic block will be used in the production of the
block to use the same timestamp.

---------

Co-authored-by: Aleksandr Logunov <the.alex.logunov@gmail.com>
  • Loading branch information
VanBarbascu and Longarithm authored Jan 23, 2025
1 parent cef300f commit 81538fc
Show file tree
Hide file tree
Showing 17 changed files with 325 additions and 82 deletions.
1 change: 1 addition & 0 deletions chain/chain/src/tests/simple_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ fn build_chain_with_orphans() {
CryptoHash::default(),
clock,
None,
None,
);
assert_matches!(chain.process_block_test(&None, block).unwrap_err(), Error::Orphan);
assert_matches!(
Expand Down
189 changes: 157 additions & 32 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use near_primitives::errors::EpochError;
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::{merklize, MerklePath, PartialMerkleTree};
use near_primitives::network::PeerId;
use near_primitives::optimistic_block::OptimisticBlock;
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{
EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader, StateSyncInfo,
Expand Down Expand Up @@ -198,6 +199,8 @@ pub struct Client {
chunk_distribution_network: Option<ChunkDistributionNetwork>,
/// Upgrade schedule which determines when the client starts voting for new protocol versions.
upgrade_schedule: ProtocolUpgradeVotingSchedule,
/// Produced optimistic block.
last_optimistic_block_produced: Option<OptimisticBlock>,
}

impl AsRef<Client> for Client {
Expand Down Expand Up @@ -399,6 +402,7 @@ impl Client {
partial_witness_adapter,
chunk_distribution_network,
upgrade_schedule,
last_optimistic_block_produced: None,
})
}

Expand Down Expand Up @@ -541,6 +545,137 @@ impl Client {
Ok(true)
}

fn pre_block_production_check(
&self,
prev_header: &BlockHeader,
height: BlockHeight,
validator_signer: &Arc<ValidatorSigner>,
) -> Result<(), Error> {
// Check that we are were called at the block that we are producer for.
let epoch_id =
self.epoch_manager.get_epoch_id_from_prev_block(&prev_header.hash()).unwrap();
let next_block_proposer = self.epoch_manager.get_block_producer(&epoch_id, height)?;

let protocol_version = self
.epoch_manager
.get_epoch_protocol_version(&epoch_id)
.expect("Epoch info should be ready at this point");
if protocol_version > PROTOCOL_VERSION {
panic!("The client protocol version is older than the protocol version of the network. Please update nearcore. Client protocol version:{}, network protocol version {}", PROTOCOL_VERSION, protocol_version);
}

if !self.can_produce_block(
&prev_header,
height,
validator_signer.validator_id(),
&next_block_proposer,
)? {
debug!(target: "client", me=?validator_signer.validator_id(), ?next_block_proposer, "Should reschedule block");
return Err(Error::BlockProducer("Should reschedule".to_string()));
}

let (validator_stake, _) = self.epoch_manager.get_validator_by_account_id(
&epoch_id,
&prev_header.hash(),
&next_block_proposer,
)?;

let validator_pk = validator_stake.take_public_key();
if validator_pk != validator_signer.public_key() {
debug!(target: "client",
local_validator_key = ?validator_signer.public_key(),
?validator_pk,
"Local validator key does not match expected validator key, skipping optimistic block production");
let err = Error::BlockProducer("Local validator key mismatch".to_string());
#[cfg(not(feature = "test_features"))]
return Err(err);
#[cfg(feature = "test_features")]
match self.adv_produce_blocks {
None | Some(AdvProduceBlocksMode::OnlyValid) => return Err(err),
Some(AdvProduceBlocksMode::All) => {}
}
}
Ok(())
}

pub fn is_optimistic_block_done(&self, next_height: BlockHeight) -> bool {
self.last_optimistic_block_produced
.as_ref()
.filter(|ob| ob.inner.block_height == next_height)
.is_some()
}

pub fn save_optimistic_block(&mut self, optimistic_block: &OptimisticBlock) {
if let Some(old_block) = self.last_optimistic_block_produced.as_ref() {
if old_block.inner.block_height == optimistic_block.inner.block_height {
warn!(target: "client",
height=old_block.inner.block_height,
old_previous_hash=?old_block.inner.prev_block_hash,
new_previous_hash=?optimistic_block.inner.prev_block_hash,
"Optimistic block already exists, replacing");
}
}
self.last_optimistic_block_produced = Some(optimistic_block.clone());
}

/// Produce optimistic block for given `height` on top of chain head.
/// Either returns optimistic block or error.
pub fn produce_optimistic_block_on_head(
&mut self,
height: BlockHeight,
) -> Result<Option<OptimisticBlock>, Error> {
let _span =
tracing::debug_span!(target: "client", "produce_optimistic_block_on_head", height)
.entered();

let head = self.chain.head()?;
assert_eq!(
head.epoch_id,
self.epoch_manager.get_epoch_id_from_prev_block(&head.prev_block_hash).unwrap()
);

let prev_hash = head.last_block_hash;
let prev_header = self.chain.get_block_header(&prev_hash)?;

let validator_signer: Arc<ValidatorSigner> =
self.validator_signer.get().ok_or_else(|| {
Error::BlockProducer("Called without block producer info.".to_string())
})?;

if let Err(err) = self.pre_block_production_check(&prev_header, height, &validator_signer) {
debug!(target: "client", height, ?err, "Skipping optimistic block production.");
return Ok(None);
}

debug!(
target: "client",
validator=?validator_signer.validator_id(),
height=height,
prev_height=prev_header.height(),
prev_hash=format_hash(prev_hash),
"Producing optimistic block",
);

#[cfg(feature = "sandbox")]
let sandbox_delta_time = Some(self.sandbox_delta_time());
#[cfg(not(feature = "sandbox"))]
let sandbox_delta_time = None;

// TODO(#10584): Add debug information about the block production in self.block_production_info

let optimistic_block = OptimisticBlock::produce(
&prev_header,
height,
&*validator_signer,
self.clock.clone(),
sandbox_delta_time,
);

metrics::OPTIMISTIC_BLOCK_PRODUCED_TOTAL.inc();

Ok(Some(optimistic_block))
}

/// Produce block if we are block producer for given block `height`.
/// Either returns produced block (not applied) or error.
pub fn produce_block(&mut self, height: BlockHeight) -> Result<Option<Block>, Error> {
Expand Down Expand Up @@ -583,49 +718,38 @@ impl Client {
let validator_signer = self.validator_signer.get().ok_or_else(|| {
Error::BlockProducer("Called without block producer info.".to_string())
})?;

let optimistic_block = self
.last_optimistic_block_produced
.as_ref()
.filter(|ob| {
// Make sure that the optimistic block is produced on the same previous block.
if ob.inner.prev_block_hash == prev_hash {
return true;
}
warn!(target: "client",
height=height,
prev_hash=?prev_hash,
optimistic_block_prev_hash=?ob.inner.prev_block_hash,
"Optimistic block was constructed on different block, discarding it");
false
})
.cloned();
// Check that we are were called at the block that we are producer for.
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_hash).unwrap();
let next_block_proposer = self.epoch_manager.get_block_producer(&epoch_id, height)?;

let prev = self.chain.get_block_header(&prev_hash)?;
let prev_height = prev.height();
let prev_epoch_id = *prev.epoch_id();
let prev_next_bp_hash = *prev.next_bp_hash();

// Check and update the doomslug tip here. This guarantees that our endorsement will be in the
// doomslug witness. Have to do it before checking the ability to produce a block.
let _ = self.check_and_update_doomslug_tip()?;

if !self.can_produce_block(
&prev,
height,
validator_signer.validator_id(),
&next_block_proposer,
)? {
debug!(target: "client", me=?validator_signer.validator_id(), ?next_block_proposer, "Should reschedule block");
if let Err(err) = self.pre_block_production_check(&prev, height, &validator_signer) {
debug!(target: "client", height, ?err, "Skipping block production");
return Ok(None);
}
let (validator_stake, _) = self.epoch_manager.get_validator_by_account_id(
&epoch_id,
&prev_hash,
&next_block_proposer,
)?;

let validator_pk = validator_stake.take_public_key();
if validator_pk != validator_signer.public_key() {
debug!(target: "client",
local_validator_key = ?validator_signer.public_key(),
?validator_pk,
"Local validator key does not match expected validator key, skipping block production");
#[cfg(not(feature = "test_features"))]
return Ok(None);
#[cfg(feature = "test_features")]
match self.adv_produce_blocks {
None | Some(AdvProduceBlocksMode::OnlyValid) => return Ok(None),
Some(AdvProduceBlocksMode::All) => {}
}
}
// Check and update the doomslug tip here. This guarantees that our endorsement will be in the
// doomslug witness. Have to do it before checking the ability to produce a block.
let _ = self.check_and_update_doomslug_tip()?;

let new_chunks = self
.chunk_inclusion_tracker
Expand Down Expand Up @@ -812,6 +936,7 @@ impl Client {
block_merkle_root,
self.clock.clone(),
sandbox_delta_time,
optimistic_block,
);

// Update latest known even before returning block out, to prevent race conditions.
Expand Down
92 changes: 67 additions & 25 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,42 +1117,58 @@ impl ClientActorInner {
}
}

let prev_block_hash = &head.last_block_hash;
for height in
latest_known.height + 1..=self.client.doomslug.get_largest_height_crossing_threshold()
{
let next_block_producer_account =
self.client.epoch_manager.get_block_producer(&epoch_id, height)?;

if me == next_block_producer_account {
self.client.chunk_inclusion_tracker.prepare_chunk_headers_ready_for_inclusion(
&head.last_block_hash,
&mut self.client.chunk_endorsement_tracker,
)?;
let num_chunks = self
.client
.chunk_inclusion_tracker
.num_chunk_headers_ready_for_inclusion(&epoch_id, &head.last_block_hash);
let shard_ids = self.client.epoch_manager.shard_ids(&epoch_id).unwrap();
let have_all_chunks = head.height == 0 || num_chunks == shard_ids.len();
if me != next_block_producer_account {
continue;
}

if self.client.doomslug.ready_to_produce_block(
height,
have_all_chunks,
log_block_production_info,
) {
self.client
.chunk_inclusion_tracker
.record_endorsement_metrics(&head.last_block_hash, &shard_ids);
if let Err(err) = self.produce_block(height, signer) {
// If there is an error, report it and let it retry on the next loop step.
error!(target: "client", height, "Block production failed: {}", err);
} else {
self.post_block_production();
}
self.client.chunk_inclusion_tracker.prepare_chunk_headers_ready_for_inclusion(
prev_block_hash,
&mut self.client.chunk_endorsement_tracker,
)?;
let num_chunks = self
.client
.chunk_inclusion_tracker
.num_chunk_headers_ready_for_inclusion(&epoch_id, prev_block_hash);
let shard_ids = self.client.epoch_manager.shard_ids(&epoch_id).unwrap();
let have_all_chunks = head.height == 0 || num_chunks == shard_ids.len();

if self.client.doomslug.ready_to_produce_block(
height,
have_all_chunks,
log_block_production_info,
) {
self.client
.chunk_inclusion_tracker
.record_endorsement_metrics(prev_block_hash, &shard_ids);
if let Err(err) = self.produce_block(height, signer) {
// If there is an error, report it and let it retry on the next loop step.
error!(target: "client", height, "Block production failed: {}", err);
} else {
self.post_block_production();
}
}
}

let protocol_version = self.client.epoch_manager.get_epoch_protocol_version(&epoch_id)?;
if !ProtocolFeature::ProduceOptimisticBlock.enabled(protocol_version) {
return Ok(());
}
let optimistic_block_height = self.client.doomslug.get_timer_height();
if me != self.client.epoch_manager.get_block_producer(&epoch_id, optimistic_block_height)? {
return Ok(());
}
if let Err(err) = self.produce_optimistic_block(optimistic_block_height) {
// If there is an error, report it and let it retry.
error!(target: "client", optimistic_block_height, ?err, "Optimistic block production failed!");
}

Ok(())
}

Expand Down Expand Up @@ -1372,6 +1388,32 @@ impl ClientActorInner {
}
}

/// Produce optimistic block if we are block producer for given `next_height` height.
fn produce_optimistic_block(&mut self, next_height: BlockHeight) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "client", "produce_optimistic_block", next_height)
.entered();
// Check if optimistic block is already produced
if self.client.is_optimistic_block_done(next_height) {
return Ok(());
}

let Some(optimistic_block) = self.client.produce_optimistic_block_on_head(next_height)?
else {
return Ok(());
};

/* TODO(#10584): If we produced the optimistic block, send it out before we save it.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::OptimisticBlock { optimistic_block: block.clone() },
));
*/

// We’ve produced the optimistic block, mark it as done so we don't produce it again.
self.client.save_optimistic_block(&optimistic_block);

Ok(())
}

fn send_chunks_metrics(&mut self, block: &Block) {
let chunks = block.chunks();
for (chunk, &included) in chunks.iter_deprecated().zip(block.header().chunk_mask().iter()) {
Expand Down
8 changes: 8 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ pub(crate) static BLOCK_PRODUCED_TOTAL: LazyLock<IntCounter> = LazyLock::new(||
.unwrap()
});

pub(crate) static OPTIMISTIC_BLOCK_PRODUCED_TOTAL: LazyLock<IntCounter> = LazyLock::new(|| {
try_create_int_counter(
"near_optimistic_block_produced_total",
"Total number of optimistic blocks produced since starting this node",
)
.unwrap()
});

pub(crate) static CHUNK_PRODUCED_TOTAL: LazyLock<IntCounter> = LazyLock::new(|| {
try_create_int_counter(
"near_chunk_produced_total",
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/sync/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ mod test {
block_merkle_tree.root(),
clock.clock(),
None,
None,
);
block_merkle_tree.insert(*block.hash());
chain2.process_block_header(block.header(), &mut Vec::new()).unwrap(); // just to validate
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/test_utils/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ pub fn create_chunk(
block_merkle_tree.root(),
client.clock.clone(),
None,
None,
);
(
ProduceChunkResult {
Expand Down
Loading

0 comments on commit 81538fc

Please sign in to comment.