Skip to content

Commit

Permalink
fix compile errors
Browse files Browse the repository at this point in the history
  • Loading branch information
staffik committed Apr 8, 2024
1 parent 8b8762b commit bfede3c
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 22 deletions.
6 changes: 4 additions & 2 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2734,7 +2734,7 @@ impl Chain {
Ok(())
}

pub fn schedule_state_finalize(
pub fn schedule_load_memtrie(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
Expand All @@ -2744,7 +2744,6 @@ impl Chain {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let state_root = shard_state_header.chunk_prev_state_root();
let chunk = shard_state_header.cloned_chunk();

let block_hash = chunk.prev_block();
Expand Down Expand Up @@ -2790,6 +2789,7 @@ impl Chain {
runtime_adapter: self.runtime_adapter.clone(),
shard_uid,
prev_state_root: chunk.prev_state_root(),
sync_hash,
});

Ok(())
Expand Down Expand Up @@ -4543,6 +4543,7 @@ pub struct LoadMemtrieRequest {
pub runtime_adapter: Arc<dyn RuntimeAdapter>,
pub shard_uid: ShardUId,
pub prev_state_root: StateRoot,
pub sync_hash: CryptoHash,
}

// Skip `runtime_adapter` and `epoch_manager`, because these are complex object that have complex logic
Expand All @@ -4553,6 +4554,7 @@ impl Debug for LoadMemtrieRequest {
.field("runtime_adapter", &"<not shown>")
.field("shard_uid", &self.shard_uid)
.field("prev_state_root", &self.prev_state_root)
.field("sync_hash", &self.sync_hash)
.finish()
}
}
Expand Down
1 change: 0 additions & 1 deletion chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ impl ToString for ShardSyncStatus {
ShardSyncStatus::StateDownloadParts => "parts".to_string(),
ShardSyncStatus::StateDownloadScheduling => "scheduling".to_string(),
ShardSyncStatus::LoadMemtrieScheduling => "memtrie".to_string(),
ShardSyncStatus::StateDownloadApplying => "applying".to_string(),
ShardSyncStatus::StateDownloadComplete => "download complete".to_string(),
ShardSyncStatus::ReshardingScheduling => "resharding scheduling".to_string(),
ShardSyncStatus::ReshardingApplying => "resharding applying".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2419,7 +2419,7 @@ impl Client {
highest_height_peers,
tracking_shards,
state_parts_task_scheduler,
state_finalize_scheduler,
load_memtrie_scheduler,
resharding_scheduler,
state_parts_future_spawner,
use_colour,
Expand Down
21 changes: 18 additions & 3 deletions chain/client/src/client_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use near_async::time::{Duration, Instant};
use near_async::{MultiSend, MultiSendMessage, MultiSenderFrom};
use near_chain::chain::{
ApplyStatePartsRequest, ApplyStatePartsResponse, BlockCatchUpRequest, BlockCatchUpResponse,
LoadMemtrieRequest,
LoadMemtrieRequest, LoadMemtrieResponse,
};
use near_chain::resharding::{ReshardingRequest, ReshardingResponse};
use near_chain::test_utils::format_hash;
Expand Down Expand Up @@ -1395,7 +1395,7 @@ impl ClientActions {
if let Err(err) = self.client.run_catchup(
&self.network_info.highest_height_peers,
&self.sync_jobs_sender.apply_state_parts,
&self.sync_jobs_sender.state_finalize,
&self.sync_jobs_sender.load_memtrie,
&self.sync_jobs_sender.block_catch_up,
&self.sync_jobs_sender.resharding,
self.get_apply_chunks_done_callback(),
Expand Down Expand Up @@ -1622,7 +1622,7 @@ impl ClientActions {
&self.network_info.highest_height_peers,
shards_to_sync,
&self.sync_jobs_sender.apply_state_parts,
&self.sync_jobs_sender.state_finalize,
&self.sync_jobs_sender.load_memtrie,
&self.sync_jobs_sender.resharding,
self.state_parts_future_spawner.as_ref(),
use_colour,
Expand Down Expand Up @@ -1785,6 +1785,21 @@ impl ClientActionHandler<ReshardingResponse> for ClientActions {
}
}

impl ClientActionHandler<LoadMemtrieResponse> for ClientActions {
type Result = ();

#[perf]
fn handle(&mut self, msg: LoadMemtrieResponse) -> Self::Result {
tracing::debug!(target: "client", ?msg);
if let Some((sync, _, _)) = self.client.catchup_state_syncs.get_mut(&msg.sync_hash) {
// We are doing catchup
sync.set_load_memtrie_result(msg.shard_id, msg.load_result);
} else {
self.client.state_sync.set_load_memtrie_result(msg.shard_id, msg.load_result);
}
}
}

impl ClientActionHandler<ShardsManagerResponse> for ClientActions {
type Result = ();

Expand Down
31 changes: 20 additions & 11 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub struct StateSync {
state_parts_apply_results: HashMap<ShardId, Result<(), near_chain_primitives::error::Error>>,

/// Maps shard_id to result of finalizing the result of applying downloaded state.
state_finalize_results: HashMap<ShardId, Result<(), near_chain_primitives::error::Error>>,
load_memtrie_results: HashMap<ShardId, Result<(), near_chain_primitives::error::Error>>,

/// Maps shard_id to result of splitting state for resharding.
resharding_state_roots:
Expand Down Expand Up @@ -218,7 +218,7 @@ impl StateSync {
network_adapter,
timeout,
state_parts_apply_results: HashMap::new(),
state_finalize_results: HashMap::new(),
load_memtrie_results: HashMap::new(),
resharding_state_roots: HashMap::new(),
state_parts_mpsc_rx: rx,
state_parts_mpsc_tx: tx,
Expand Down Expand Up @@ -286,8 +286,8 @@ impl StateSync {
download_timeout = res.0;
run_shard_state_download = res.1;
}
ShardSyncStatus::StateApplyScheduling => {
self.sync_shards_apply_parts_scheduling_status(
ShardSyncStatus::StateDownloadScheduling => {
self.sync_shards_download_scheduling_status(
shard_id,
shard_sync_download,
sync_hash,
Expand All @@ -302,7 +302,7 @@ impl StateSync {
shard_sync_download,
sync_hash,
chain,
state_finalize_scheduler,
load_memtrie_scheduler,
)?;
}
ShardSyncStatus::StateDownloadComplete => {
Expand Down Expand Up @@ -464,6 +464,15 @@ impl StateSync {
self.resharding_state_roots.insert(shard_id, result);
}

// Called by the client actor, when it finished loading memtrie.
pub fn set_load_memtrie_result(
&mut self,
shard_id: ShardId,
result: Result<(), near_chain::Error>,
) {
self.load_memtrie_results.insert(shard_id, result);
}

/// Find the hash of the first block on the same epoch (and chain) of block with hash `sync_hash`.
pub fn get_epoch_start_sync_hash(
chain: &Chain,
Expand Down Expand Up @@ -779,7 +788,7 @@ impl StateSync {
tracking_shards,
now,
state_parts_task_scheduler,
state_finalize_scheduler,
load_memtrie_scheduler,
resharding_scheduler,
state_parts_future_spawner,
use_colour,
Expand Down Expand Up @@ -962,13 +971,13 @@ impl StateSync {
if parts_done {
*shard_sync_download = ShardSyncDownload {
downloads: vec![],
status: ShardSyncStatus::StateApplyScheduling,
status: ShardSyncStatus::StateDownloadScheduling,
};
}
(download_timeout, run_shard_state_download)
}

fn sync_shards_apply_parts_scheduling_status(
fn sync_shards_download_scheduling_status(
&mut self,
shard_id: ShardId,
shard_sync_download: &mut ShardSyncDownload,
Expand Down Expand Up @@ -1006,7 +1015,7 @@ impl StateSync {
Ok(())
}

fn sync_shards_state_finalize_scheduling_status(
fn sync_shards_load_memtrie_scheduling_status(
&mut self,
shard_id: ShardId,
shard_sync_download: &mut ShardSyncDownload,
Expand All @@ -1018,7 +1027,7 @@ impl StateSync {
// (these are set via callback from ClientActor - both for sync and catchup).
if let Some(result) = self.state_parts_apply_results.remove(&shard_id) {
result?;
chain.schedule_state_finalize(shard_id, sync_hash, load_memtrie_scheduler)?;
chain.schedule_load_memtrie(shard_id, sync_hash, load_memtrie_scheduler)?;
*shard_sync_download = ShardSyncDownload {
downloads: vec![],
status: ShardSyncStatus::StateDownloadComplete,
Expand All @@ -1038,7 +1047,7 @@ impl StateSync {
) -> Result<bool, near_chain::Error> {
// Keep waiting until our shard is on the list of results
// (these are set via callback from ClientActor - both for sync and catchup).
if let Some(result) = self.state_finalize_results.remove(&shard_id) {
if let Some(result) = self.load_memtrie_results.remove(&shard_id) {
match result {
Ok(()) => {
// If the shard layout is changing in this epoch - we have to apply it right now.
Expand Down
8 changes: 5 additions & 3 deletions chain/client/src/sync_jobs_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ use near_chain::chain::{
};
use near_chain::resharding::{ReshardingRequest, ReshardingResponse};
use near_chain::Chain;
use near_primitives::hash::CryptoHash;
use near_primitives::state_part::PartId;
use near_primitives::state_sync::StatePartKey;
use near_primitives::types::ShardId;
use near_store::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use near_store::DBCol;

#[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)]
Expand All @@ -21,6 +19,7 @@ pub struct ClientSenderForSyncJobs {
apply_state_parts_response: Sender<ApplyStatePartsResponse>,
block_catch_up_response: Sender<BlockCatchUpResponse>,
resharding_response: Sender<ReshardingResponse>,
load_memtrie_response: Sender<LoadMemtrieResponse>,
}

#[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)]
Expand Down Expand Up @@ -83,7 +82,10 @@ impl SyncJobsActions {
}

pub fn handle_load_memtrie_request(&mut self, msg: LoadMemtrieRequest) {
let result = msg.runtime_adapter.load_mem_trie_on_catchup(&msg.shard_uid, &msg.prev_state_root);
let result = msg
.runtime_adapter
.load_mem_trie_on_catchup(&msg.shard_uid, &msg.prev_state_root)
.map_err(|error| error.into());
self.client_sender.send(LoadMemtrieResponse {
load_result: result,
shard_id: msg.shard_uid.shard_id(),
Expand Down
16 changes: 15 additions & 1 deletion chain/client/src/sync_jobs_actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::sync_jobs_actions::SyncJobsActions;
use near_async::futures::ActixFutureSpawner;
use near_chain::chain::{ApplyStatePartsRequest, BlockCatchUpRequest};
use near_chain::chain::{ApplyStatePartsRequest, BlockCatchUpRequest, LoadMemtrieRequest};
use near_chain::resharding::ReshardingRequest;
use near_o11y::{handler_debug_span, WithSpanContext};
use near_performance_metrics_macros::perf;
Expand All @@ -17,6 +17,20 @@ impl actix::Actor for SyncJobsActor {
type Context = actix::Context<Self>;
}

impl actix::Handler<WithSpanContext<LoadMemtrieRequest>> for SyncJobsActor {
type Result = ();

#[perf]
fn handle(
&mut self,
msg: WithSpanContext<LoadMemtrieRequest>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "client", msg);
self.actions.handle_load_memtrie_request(msg);
}
}

impl actix::Handler<WithSpanContext<ApplyStatePartsRequest>> for SyncJobsActor {
type Result = ();

Expand Down
1 change: 1 addition & 0 deletions chain/client/src/test_utils/client_actions_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub fn forward_client_messages_from_sync_jobs_to_client_actions(
}
ClientSenderForSyncJobsMessage::_block_catch_up_response(msg) => client_actions.handle(msg),
ClientSenderForSyncJobsMessage::_resharding_response(msg) => client_actions.handle(msg),
ClientSenderForSyncJobsMessage::_load_memtrie_response(msg) => client_actions.handle(msg),
})
}

Expand Down
3 changes: 3 additions & 0 deletions chain/client/src/test_utils/sync_jobs_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ pub fn forward_sync_jobs_messages_from_client_to_sync_jobs_actions(
SyncJobsSenderForClientMessage::_resharding(msg) => {
sync_jobs_actions.handle_resharding_request(msg, &future_spawner);
}
SyncJobsSenderForClientMessage::_load_memtrie(msg) => {
sync_jobs_actions.handle_load_memtrie_request(msg);
}
})
}

0 comments on commit bfede3c

Please sign in to comment.