Skip to content

Commit

Permalink
[stateless_validation] Load memtrie async on catchup (#10983)
Browse files Browse the repository at this point in the history
**Context**
Issue: #10982
Follow up to: #10820.

Modifies StateSync state machine so that memtrie load happens
asynchronously on catchup.

**Summary**
* Split `chain.set_state_finalize()` into:
  * `create_flat_storage_for_shard()`
  * `schedule_load_memtrie()`
  * actual `set_state_finalize()`
* ^ we need it because creating flat storage and state finalize requires
`chain` which cannot be passed in a message to the separate thread.
* Code to trigger memtrie load in a separate thread, analogously to how
apply state parts is done.
* Modify shard sync stages:
  * `StateDownloadScheduling` --> `StateApplyScheduling`
* Just changed the name as it was confusing. What happens there is
scheduling applying of state parts.
  * `StateDownloadApplying` --> `StateApplyComplete`
* What it actually did before was initializing flat storage and
finalizing state update after state apply from previous stage.
* Now it only initializes flat storage and schedules memtrie loading.
  * `StateDownloadComplete` --> `StateApplyFinalizing`
    * Before it was just deciding what next stage to transit into.
* Now it also contains the finalizing state update logic that was
previously in the previous stage.

Integration tests are to be done as a part of:
#10844.
  • Loading branch information
staffik authored Apr 15, 2024
1 parent 4d506a7 commit 579c53c
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 108 deletions.
132 changes: 84 additions & 48 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2735,61 +2735,61 @@ impl Chain {
Ok(())
}

pub fn schedule_load_memtrie(
&self,
shard_uid: ShardUId,
sync_hash: CryptoHash,
chunk: &ShardChunk,
load_memtrie_scheduler: &near_async::messaging::Sender<LoadMemtrieRequest>,
) {
load_memtrie_scheduler.send(LoadMemtrieRequest {
runtime_adapter: self.runtime_adapter.clone(),
shard_uid,
prev_state_root: chunk.prev_state_root(),
sync_hash,
});
}

pub fn create_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
chunk: &ShardChunk,
) -> Result<(), Error> {
let flat_storage_manager = self.runtime_adapter.get_flat_storage_manager();
// Flat storage must not exist at this point because leftover keys corrupt its state.
assert!(flat_storage_manager.get_flat_storage_for_shard(shard_uid).is_none());

let flat_head_hash = *chunk.prev_block();
let flat_head_header = self.get_block_header(&flat_head_hash)?;
let flat_head_prev_hash = *flat_head_header.prev_hash();
let flat_head_height = flat_head_header.height();

tracing::debug!(target: "store", ?shard_uid, ?flat_head_hash, flat_head_height, "set_state_finalize - initialized flat storage");

let mut store_update = self.runtime_adapter.store().store_update();
store_helper::set_flat_storage_status(
&mut store_update,
shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: near_store::flat::BlockInfo {
hash: flat_head_hash,
prev_hash: flat_head_prev_hash,
height: flat_head_height,
},
}),
);
store_update.commit()?;
flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap();
Ok(())
}

pub fn set_state_finalize(
&mut self,
shard_id: ShardId,
sync_hash: CryptoHash,
apply_result: Result<(), near_chain_primitives::Error>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "sync", "set_state_finalize").entered();
apply_result?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let chunk = shard_state_header.cloned_chunk();

let block_hash = chunk.prev_block();

// We synced shard state on top of _previous_ block for chunk in shard state header and applied state parts to
// flat storage. Now we can set flat head to hash of this block and create flat storage.
// If block_hash is equal to default - this means that we're all the way back at genesis.
// So we don't have to add the storage state for shard in such case.
// TODO(8438) - add additional test scenarios for this case.
if *block_hash != CryptoHash::default() {
let block_header = self.get_block_header(block_hash)?;
let epoch_id = block_header.epoch_id();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;

let flat_storage_manager = self.runtime_adapter.get_flat_storage_manager();
// Flat storage must not exist at this point because leftover keys corrupt its state.
assert!(flat_storage_manager.get_flat_storage_for_shard(shard_uid).is_none());

let flat_head_hash = *chunk.prev_block();
let flat_head_header = self.get_block_header(&flat_head_hash)?;
let flat_head_prev_hash = *flat_head_header.prev_hash();
let flat_head_height = flat_head_header.height();

tracing::debug!(target: "store", ?shard_uid, ?flat_head_hash, flat_head_height, "set_state_finalize - initialized flat storage");

let mut store_update = self.runtime_adapter.store().store_update();
store_helper::set_flat_storage_status(
&mut store_update,
shard_uid,
FlatStorageStatus::Ready(FlatStorageReadyStatus {
flat_head: near_store::flat::BlockInfo {
hash: flat_head_hash,
prev_hash: flat_head_prev_hash,
height: flat_head_height,
},
}),
);
store_update.commit()?;
flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap();
// Flat storage is ready, load memtrie if it is enabled.
self.runtime_adapter
.get_tries()
.load_mem_trie_on_catchup(&shard_uid, &chunk.prev_state_root())?;
}

let mut height = shard_state_header.chunk_height_included();
let mut chain_update = self.chain_update();
chain_update.set_state_finalize(shard_id, sync_hash, shard_state_header)?;
Expand Down Expand Up @@ -4530,6 +4530,42 @@ pub struct ApplyStatePartsResponse {
pub sync_hash: CryptoHash,
}

// This message is handled by `sync_job_actions.rs::handle_load_memtrie_request()`.
// It is a request for `runtime_adapter` to load in-memory trie for `shard_uid`.
#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct LoadMemtrieRequest {
pub runtime_adapter: Arc<dyn RuntimeAdapter>,
pub shard_uid: ShardUId,
// Required to load memtrie.
pub prev_state_root: StateRoot,
// Needs to be included in a response to the caller for identification purposes.
pub sync_hash: CryptoHash,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
// and many fields.
impl Debug for LoadMemtrieRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LoadMemtrieRequest")
.field("runtime_adapter", &"<not shown>")
.field("shard_uid", &self.shard_uid)
.field("prev_state_root", &self.prev_state_root)
.field("sync_hash", &self.sync_hash)
.finish()
}
}

// It is message indicating the result of loading in-memory trie for `shard_id`.
// `sync_hash` is passed around to indicate to which block we were catching up.
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct LoadMemtrieResponse {
pub load_result: Result<(), near_chain_primitives::error::Error>,
pub shard_uid: ShardUId,
pub sync_hash: CryptoHash,
}

#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct BlockCatchUpRequest {
Expand Down
18 changes: 9 additions & 9 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ impl Clone for DownloadStatus {
pub enum ShardSyncStatus {
StateDownloadHeader,
StateDownloadParts,
StateDownloadScheduling,
StateDownloadApplying,
StateDownloadComplete,
StateApplyScheduling,
StateApplyComplete,
StateApplyFinalizing,
ReshardingScheduling,
ReshardingApplying,
StateSyncDone,
Expand All @@ -105,9 +105,9 @@ impl ShardSyncStatus {
match self {
ShardSyncStatus::StateDownloadHeader => 0,
ShardSyncStatus::StateDownloadParts => 1,
ShardSyncStatus::StateDownloadScheduling => 2,
ShardSyncStatus::StateDownloadApplying => 3,
ShardSyncStatus::StateDownloadComplete => 4,
ShardSyncStatus::StateApplyScheduling => 2,
ShardSyncStatus::StateApplyComplete => 3,
ShardSyncStatus::StateApplyFinalizing => 4,
ShardSyncStatus::ReshardingScheduling => 5,
ShardSyncStatus::ReshardingApplying => 6,
ShardSyncStatus::StateSyncDone => 7,
Expand All @@ -129,9 +129,9 @@ impl ToString for ShardSyncStatus {
match self {
ShardSyncStatus::StateDownloadHeader => "header".to_string(),
ShardSyncStatus::StateDownloadParts => "parts".to_string(),
ShardSyncStatus::StateDownloadScheduling => "scheduling".to_string(),
ShardSyncStatus::StateDownloadApplying => "applying".to_string(),
ShardSyncStatus::StateDownloadComplete => "download complete".to_string(),
ShardSyncStatus::StateApplyScheduling => "apply scheduling".to_string(),
ShardSyncStatus::StateApplyComplete => "apply complete".to_string(),
ShardSyncStatus::StateApplyFinalizing => "apply finalizing".to_string(),
ShardSyncStatus::ReshardingScheduling => "resharding scheduling".to_string(),
ShardSyncStatus::ReshardingApplying => "resharding applying".to_string(),
ShardSyncStatus::StateSyncDone => "done".to_string(),
Expand Down
4 changes: 3 additions & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use near_async::futures::{AsyncComputationSpawner, FutureSpawner};
use near_async::messaging::IntoSender;
use near_async::messaging::{CanSend, Sender};
use near_async::time::{Clock, Duration, Instant};
use near_chain::chain::VerifyBlockHashAndSignatureResult;
use near_chain::chain::{
ApplyStatePartsRequest, BlockCatchUpRequest, BlockMissingChunks, BlocksCatchUpState,
LoadMemtrieRequest, VerifyBlockHashAndSignatureResult,
};
use near_chain::flat_storage_creator::FlatStorageCreator;
use near_chain::orphan::OrphanMissingChunks;
Expand Down Expand Up @@ -2327,6 +2327,7 @@ impl Client {
&mut self,
highest_height_peers: &[HighestHeightPeerInfo],
state_parts_task_scheduler: &Sender<ApplyStatePartsRequest>,
load_memtrie_scheduler: &Sender<LoadMemtrieRequest>,
block_catch_up_task_scheduler: &Sender<BlockCatchUpRequest>,
resharding_scheduler: &Sender<ReshardingRequest>,
apply_chunks_done_callback: DoneApplyChunkCallback,
Expand Down Expand Up @@ -2418,6 +2419,7 @@ impl Client {
highest_height_peers,
tracking_shards,
state_parts_task_scheduler,
load_memtrie_scheduler,
resharding_scheduler,
state_parts_future_spawner,
use_colour,
Expand Down
24 changes: 24 additions & 0 deletions chain/client/src/client_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use near_async::time::{Duration, Instant};
use near_async::{MultiSend, MultiSendMessage, MultiSenderFrom};
use near_chain::chain::{
ApplyStatePartsRequest, ApplyStatePartsResponse, BlockCatchUpRequest, BlockCatchUpResponse,
LoadMemtrieRequest, LoadMemtrieResponse,
};
use near_chain::resharding::{ReshardingRequest, ReshardingResponse};
use near_chain::test_utils::format_hash;
Expand Down Expand Up @@ -89,6 +90,7 @@ pub struct ClientSenderForClient {
#[multi_send_message_derive(Debug)]
pub struct SyncJobsSenderForClient {
pub apply_state_parts: Sender<ApplyStatePartsRequest>,
pub load_memtrie: Sender<LoadMemtrieRequest>,
pub block_catch_up: Sender<BlockCatchUpRequest>,
pub resharding: Sender<ReshardingRequest>,
}
Expand Down Expand Up @@ -1394,6 +1396,7 @@ impl ClientActions {
if let Err(err) = self.client.run_catchup(
&self.network_info.highest_height_peers,
&self.sync_jobs_sender.apply_state_parts,
&self.sync_jobs_sender.load_memtrie,
&self.sync_jobs_sender.block_catch_up,
&self.sync_jobs_sender.resharding,
self.get_apply_chunks_done_callback(),
Expand Down Expand Up @@ -1620,6 +1623,7 @@ impl ClientActions {
&self.network_info.highest_height_peers,
shards_to_sync,
&self.sync_jobs_sender.apply_state_parts,
&self.sync_jobs_sender.load_memtrie,
&self.sync_jobs_sender.resharding,
self.state_parts_future_spawner.as_ref(),
use_colour,
Expand Down Expand Up @@ -1782,6 +1786,26 @@ impl ClientActionHandler<ReshardingResponse> for ClientActions {
}
}

impl ClientActionHandler<LoadMemtrieResponse> for ClientActions {
type Result = ();

// The memtrie was loaded as a part of catchup or state-sync,
// (see https://github.com/near/nearcore/blob/master/docs/architecture/how/sync.md#basics).
// Here we save the result of loading memtrie to the appropriate place,
// depending on whether it was catch-up or state sync.
#[perf]
fn handle(&mut self, msg: LoadMemtrieResponse) -> Self::Result {
tracing::debug!(target: "client", ?msg);
if let Some((sync, _, _)) = self.client.catchup_state_syncs.get_mut(&msg.sync_hash) {
// We are doing catchup
sync.set_load_memtrie_result(msg.shard_uid, msg.load_result);
} else {
// We are doing state sync
self.client.state_sync.set_load_memtrie_result(msg.shard_uid, msg.load_result);
}
}
}

impl ClientActionHandler<ShardsManagerResponse> for ClientActions {
type Result = ();

Expand Down
Loading

0 comments on commit 579c53c

Please sign in to comment.