Skip to content

Commit

Permalink
Apply block updates to split shards (#4847)
Browse files Browse the repository at this point in the history
This PR makes block updates and catchups also update split states for the next epoch.

The algorithm works like follows. 
There are two possibilities:
1) States for next epoch are not ready when a block is processed. In this case, `apply_chunks_preprocessing` will be called first with `ApplyChunksMode::NotCaughtUp` and then `ApplyChunksMode::CatchingUp`. With `NotCaughtUp`, if a shard will be split
into multiple shards in the next epoch and the validator cares about one of the split shards, the validator stores the state changes in the database through a new column `ConsolidatedStateChanges`. Later, when catching up blocks, `apply_chunks_preprocessing` with `CatchingUp` will read the stored state changes and process them.
Note: we cannot use the existing stored `state_changes` or `trie_changes` for updating split states. `trie_changes` are updates on trie nodes and trie structure of the old and new states are different.  The existing `state_changes` do not include updates on internal states such as postponed receipts, delayed receipts, etc..
2) States for next epoch are ready. In this case, `apply_chunks_preprocessing` is only called once with `ApplyChunksMode::CaughtUp`. `apply_transactions` can update the states for shard in this epoch and the split shards in next epoch together
  • Loading branch information
mzhangmzz authored Sep 22, 2021
1 parent d956875 commit 8f90b92
Show file tree
Hide file tree
Showing 22 changed files with 1,050 additions and 286 deletions.
277 changes: 209 additions & 68 deletions chain/chain/src/chain.rs

Large diffs are not rendered by default.

66 changes: 65 additions & 1 deletion chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use near_primitives::trie_key::{trie_key_parsers, TrieKey};
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{
AccountId, BlockExtra, BlockHeight, EpochId, GCCount, NumBlocks, ShardId, StateChanges,
StateChangesExt, StateChangesKinds, StateChangesKindsExt, StateChangesRequest,
StateChangesExt, StateChangesForSplitStates, StateChangesKinds, StateChangesKindsExt,
StateChangesRequest,
};
use near_primitives::utils::{get_block_shard_id, index_to_bytes, to_timestamp};
use near_primitives::views::LightClientBlockView;
Expand All @@ -51,6 +52,7 @@ use near_store::{

use crate::byzantine_assert;
use crate::types::{Block, BlockHeader, LatestKnown};
use near_store::db::DBCol::ColStateChangesForSplitStates;

/// lru cache size
#[cfg(not(feature = "no_cache"))]
Expand Down Expand Up @@ -426,6 +428,18 @@ impl ChainStore {
.collect()
}

pub fn get_state_changes_for_split_states(
&self,
block_hash: &CryptoHash,
shard_id: ShardId,
) -> Result<StateChangesForSplitStates, Error> {
let key = &get_block_shard_id(block_hash, shard_id);
option_to_not_found(
self.store.get_ser::<StateChangesForSplitStates>(ColStateChangesForSplitStates, key),
&format!("CONSOLIDATED STATE CHANGES: {}:{}", block_hash, shard_id),
)
}

pub fn get_outgoing_receipts_for_shard(
&mut self,
prev_block_hash: CryptoHash,
Expand Down Expand Up @@ -1150,6 +1164,9 @@ pub struct ChainStoreUpdate<'a> {
final_head: Option<Tip>,
largest_target_height: Option<BlockHeight>,
trie_changes: Vec<WrappedTrieChanges>,
// All state changes made by a chunk, this is only used for splitting states
add_state_changes_for_split_states: HashMap<(CryptoHash, ShardId), StateChangesForSplitStates>,
remove_state_changes_for_split_states: HashSet<(CryptoHash, ShardId)>,
add_blocks_to_catchup: Vec<(CryptoHash, CryptoHash)>,
// A pair (prev_hash, hash) to be removed from blocks to catchup
remove_blocks_to_catchup: Vec<(CryptoHash, CryptoHash)>,
Expand All @@ -1174,6 +1191,8 @@ impl<'a> ChainStoreUpdate<'a> {
final_head: None,
largest_target_height: None,
trie_changes: vec![],
add_state_changes_for_split_states: HashMap::new(),
remove_state_changes_for_split_states: HashSet::new(),
add_blocks_to_catchup: vec![],
remove_blocks_to_catchup: vec![],
remove_prev_blocks_to_catchup: vec![],
Expand Down Expand Up @@ -1583,6 +1602,14 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
}

impl<'a> ChainStoreUpdate<'a> {
pub fn get_state_changes_for_split_states(
&self,
block_hash: &CryptoHash,
shard_id: ShardId,
) -> Result<StateChangesForSplitStates, Error> {
self.chain_store.get_state_changes_for_split_states(block_hash, shard_id)
}

/// Update both header and block body head.
pub fn save_head(&mut self, t: &Tip) -> Result<(), Error> {
self.save_body_head(t)?;
Expand Down Expand Up @@ -1853,6 +1880,29 @@ impl<'a> ChainStoreUpdate<'a> {
self.trie_changes.push(trie_changes);
}

pub fn add_state_changes_for_split_states(
&mut self,
block_hash: CryptoHash,
shard_id: ShardId,
state_changes: StateChangesForSplitStates,
) {
let prev =
self.add_state_changes_for_split_states.insert((block_hash, shard_id), state_changes);
// We should not save state changes for the same chunk twice
assert!(prev.is_none());
}

pub fn remove_state_changes_for_split_states(
&mut self,
block_hash: CryptoHash,
shard_id: ShardId,
) {
// We should not remove state changes for the same chunk twice
let value_not_present =
self.remove_state_changes_for_split_states.insert((block_hash, shard_id));
assert!(value_not_present);
}

pub fn add_block_to_catchup(&mut self, prev_hash: CryptoHash, block_hash: CryptoHash) {
self.add_blocks_to_catchup.push((prev_hash, block_hash));
}
Expand Down Expand Up @@ -2408,6 +2458,7 @@ impl<'a> ChainStoreUpdate<'a> {
| DBCol::ColEpochValidatorInfo
| DBCol::ColBlockOrdinal
| DBCol::_ColTransactionRefCount
| DBCol::ColStateChangesForSplitStates
| DBCol::ColCachedContractCode => {
unreachable!();
}
Expand Down Expand Up @@ -2641,6 +2692,19 @@ impl<'a> ChainStoreUpdate<'a> {
.wrapped_into(&mut store_update)
.map_err(|err| ErrorKind::Other(err.to_string()))?;
}
for ((block_hash, shard_id), state_changes) in
self.add_state_changes_for_split_states.drain()
{
store_update.set_ser(
ColStateChangesForSplitStates,
&get_block_shard_id(&block_hash, shard_id),
&state_changes,
)?;
}
for (block_hash, shard_id) in self.remove_state_changes_for_split_states.drain() {
store_update
.delete(ColStateChangesForSplitStates, &get_block_shard_id(&block_hash, shard_id));
}

let mut affected_catchup_blocks = HashSet::new();
for (prev_hash, hash) in self.remove_blocks_to_catchup.drain(..) {
Expand Down
17 changes: 15 additions & 2 deletions chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use near_primitives::transaction::{
use near_primitives::types::validator_stake::{ValidatorStake, ValidatorStakeIter};
use near_primitives::types::{
AccountId, ApprovalStake, Balance, BlockHeight, EpochId, Gas, Nonce, NumBlocks, NumShards,
ShardId, StateRoot, StateRootNode,
ShardId, StateChangesForSplitStates, StateRoot, StateRootNode,
};
use near_primitives::validator_signer::InMemoryValidatorSigner;
use near_primitives::version::{ProtocolVersion, PROTOCOL_VERSION};
Expand All @@ -50,7 +50,8 @@ use near_store::{
use crate::chain::{Chain, NUM_EPOCHS_TO_KEEP_STORE_DATA};
use crate::store::ChainStoreAccess;
use crate::types::{
ApplyTransactionResult, BlockHeaderInfo, ChainGenesis, ValidatorInfoIdentifier,
ApplySplitStateResult, ApplyTransactionResult, BlockHeaderInfo, ChainGenesis,
ValidatorInfoIdentifier,
};
#[cfg(feature = "protocol_feature_block_header_v3")]
use crate::Doomslug;
Expand Down Expand Up @@ -626,6 +627,7 @@ impl RuntimeAdapter for KeyValueRuntime {
&self,
shard_id: ShardId,
state_root: &StateRoot,
_state_roots: Option<HashMap<ShardUId, StateRoot>>,
_height: BlockHeight,
_block_timestamp: u64,
_prev_block_hash: &CryptoHash,
Expand Down Expand Up @@ -815,6 +817,7 @@ impl RuntimeAdapter for KeyValueRuntime {
total_gas_burnt: 0,
total_balance_burnt: 0,
proof: None,
apply_split_state_result_or_state_changes: None,
})
}

Expand Down Expand Up @@ -1169,6 +1172,16 @@ impl RuntimeAdapter for KeyValueRuntime {
Ok(false)
}

fn apply_update_to_split_states(
&self,
_block_hash: &CryptoHash,
_state_roots: HashMap<ShardUId, StateRoot>,
_next_shard_layout: &ShardLayout,
_state_changes: StateChangesForSplitStates,
) -> Result<Vec<ApplySplitStateResult>, Error> {
Ok(vec![])
}

fn build_state_for_split_shards(
&self,
_shard_uid: ShardUId,
Expand Down
41 changes: 40 additions & 1 deletion chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use near_primitives::transaction::{ExecutionOutcomeWithId, SignedTransaction};
use near_primitives::types::validator_stake::{ValidatorStake, ValidatorStakeIter};
use near_primitives::types::{
AccountId, ApprovalStake, Balance, BlockHeight, BlockHeightDelta, EpochId, Gas, MerkleHash,
NumBlocks, ShardId, StateRoot, StateRootNode,
NumBlocks, ShardId, StateChangesForSplitStates, StateRoot, StateRootNode,
};
use near_primitives::version::{
ProtocolVersion, MIN_GAS_PRICE_NEP_92, MIN_GAS_PRICE_NEP_92_FIX, MIN_PROTOCOL_VERSION_NEP_92,
Expand Down Expand Up @@ -79,6 +79,22 @@ pub struct AcceptedBlock {
pub provenance: Provenance,
}

pub struct ApplySplitStateResult {
pub shard_uid: ShardUId,
pub trie_changes: WrappedTrieChanges,
pub new_root: StateRoot,
}

// This struct captures two cases
// when apply transactions, split states may or may not be ready
// if it's ready, apply transactions also apply updates to split states and this enum will be
// ApplySplitStateResults
// otherwise, it simply returns the state changes needed to be applied to split states
pub enum ApplySplitStateResultOrStateChanges {
ApplySplitStateResults(Vec<ApplySplitStateResult>),
StateChangesForSplitStates(StateChangesForSplitStates),
}

pub struct ApplyTransactionResult {
pub trie_changes: WrappedTrieChanges,
pub new_root: StateRoot,
Expand All @@ -88,6 +104,7 @@ pub struct ApplyTransactionResult {
pub total_gas_burnt: Gas,
pub total_balance_burnt: Balance,
pub proof: Option<PartialStorage>,
pub apply_split_state_result_or_state_changes: Option<ApplySplitStateResultOrStateChanges>,
}

impl ApplyTransactionResult {
Expand Down Expand Up @@ -529,11 +546,23 @@ pub trait RuntimeAdapter: Send + Sync {
) -> Result<StoreUpdate, Error>;

/// Apply transactions to given state root and return store update and new state root.
/// `split_state_roots` is only used when shards will change next epoch and we are building
/// states for the next epoch.
/// When shards will change next epoch,
/// if `split_state_roots` is not None, that means states for the split shards are ready
/// `apply_transations` will update these states and return apply results for these states
/// through `ApplyTransactionResult.apply_split_state_result_or_state_changes`
/// otherwise, `apply_transactions` will generate state changes needed to be applied to split
/// states and return them through
/// `ApplyTransactionResult.apply_split_state_result_or_state_changes`.
/// The caller(chain) can store it in the database, waiting to be processed after state
/// catchup is finished
/// Also returns transaction result for each transaction and new receipts.
fn apply_transactions(
&self,
shard_id: ShardId,
state_root: &StateRoot,
split_state_roots: Option<HashMap<ShardUId, StateRoot>>,
height: BlockHeight,
block_timestamp: u64,
prev_block_hash: &CryptoHash,
Expand All @@ -552,6 +581,7 @@ pub trait RuntimeAdapter: Send + Sync {
self.apply_transactions_with_optional_storage_proof(
shard_id,
state_root,
split_state_roots,
height,
block_timestamp,
prev_block_hash,
Expand All @@ -574,6 +604,7 @@ pub trait RuntimeAdapter: Send + Sync {
&self,
shard_id: ShardId,
state_root: &StateRoot,
split_state_roots: Option<HashMap<ShardUId, StateRoot>>,
height: BlockHeight,
block_timestamp: u64,
prev_block_hash: &CryptoHash,
Expand Down Expand Up @@ -650,6 +681,14 @@ pub trait RuntimeAdapter: Send + Sync {
data: &Vec<u8>,
) -> bool;

fn apply_update_to_split_states(
&self,
block_hash: &CryptoHash,
state_roots: HashMap<ShardUId, StateRoot>,
next_shard_layout: &ShardLayout,
state_changes: StateChangesForSplitStates,
) -> Result<Vec<ApplySplitStateResult>, Error>;

fn build_state_for_split_shards(
&self,
shard_uid: ShardUId,
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,7 @@ impl Client {
debug!(target: "catchup", "need to split states for shards {:?}", new_shard_sync);
new_shard_sync
} else {
debug!(target: "catchup", "do not need to split states for shards");
HashMap::new()
}
};
Expand Down
69 changes: 16 additions & 53 deletions chain/epoch_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,15 +787,16 @@ impl EpochManager {
shard_id: ShardId,
) -> Result<bool, EpochError> {
let next_epoch_id = self.get_next_epoch_id_from_prev_block(parent_hash)?;
if let Some(split_shards) =
self.get_split_shards_if_shards_will_change(parent_hash, vec![shard_id])?
{
// we can safely unwrap here because split_shards will always has `shard_id` in there
for next_shard_id in split_shards.get(&shard_id).unwrap() {
if self.will_shard_layout_change(parent_hash)? {
let shard_layout = self.get_shard_layout(&next_epoch_id)?;
let split_shards = shard_layout
.get_split_shards(shard_id)
.expect("all shard layouts expect the first one must have a split map");
for next_shard_id in split_shards {
if self.cares_about_shard_in_epoch(
next_epoch_id.clone(),
account_id,
*next_shard_id,
next_shard_id.shard_id(),
)? {
return Ok(true);
}
Expand Down Expand Up @@ -1265,41 +1266,23 @@ impl EpochManager {
}

#[cfg(not(feature = "protocol_feature_simple_nightshade"))]
pub fn get_split_shards_if_shards_will_change(
pub fn will_shard_layout_change(
&mut self,
_parent_hash: &CryptoHash,
_shards: Vec<ShardId>,
) -> Result<Option<HashMap<ShardId, Vec<ShardId>>>, EpochError> {
Ok(None)
) -> Result<bool, EpochError> {
Ok(false)
}

#[cfg(feature = "protocol_feature_simple_nightshade")]
pub fn get_split_shards_if_shards_will_change(
pub fn will_shard_layout_change(
&mut self,
parent_hash: &CryptoHash,
shards: Vec<ShardId>,
) -> Result<Option<HashMap<ShardId, Vec<ShardId>>>, EpochError> {
) -> Result<bool, EpochError> {
let epoch_id = self.get_epoch_id_from_prev_block(parent_hash)?;
let next_epoch_id = self.get_next_epoch_id_from_prev_block(parent_hash)?;
let shard_layout = self.get_shard_layout(&epoch_id)?.clone();
let next_shard_layout = self.get_shard_layout(&next_epoch_id)?.clone();
if shard_layout == next_shard_layout {
Ok(None)
} else {
let split_shards: Result<Vec<Vec<ShardId>>, String> = shards
.iter()
.map(|shard_id| {
next_shard_layout.get_split_shards(*shard_id).cloned().ok_or(format!(
"cannot find split shards for shard {} in layout {:?}, next_shard_layout: {:?}, epoch_id: {:?}, next_epoch_id: {:?}",
*shard_id, shard_layout, next_shard_layout, epoch_id, next_epoch_id,
))
})
.collect();
let split_shards = split_shards.map_err(|s| EpochError::ShardingError(s))?;
let split_shards: HashMap<_, _> =
shards.into_iter().zip(split_shards.into_iter()).collect();
Ok(Some(split_shards))
}
Ok(shard_layout != next_shard_layout)
}

pub fn get_epoch_info(&mut self, epoch_id: &EpochId) -> Result<&EpochInfo, EpochError> {
Expand Down Expand Up @@ -3849,31 +3832,11 @@ mod tests {

// Check split shards
// h[5] is the first block of epoch epochs[1] and shard layout will change at epochs[2]
assert_eq!(
epoch_manager.get_split_shards_if_shards_will_change(&h[3], vec![]).unwrap(),
None
);
assert_eq!(
epoch_manager.get_split_shards_if_shards_will_change(&h[6], vec![]).unwrap(),
None
);
assert_eq!(epoch_manager.will_shard_layout_change(&h[3]).unwrap(), false);
for i in 4..=5 {
assert_eq!(
epoch_manager
.get_split_shards_if_shards_will_change(&h[i], vec![])
.unwrap()
.unwrap(),
HashMap::new(),
);
assert_eq!(
epoch_manager
.get_split_shards_if_shards_will_change(&h[i], vec![0])
.unwrap()
.unwrap(),
vec![(0, vec![0, 1, 2, 3])].into_iter().collect::<HashMap<_, _>>(),
);
assert!(epoch_manager.get_split_shards_if_shards_will_change(&h[i], vec![1]).is_err());
assert_eq!(epoch_manager.will_shard_layout_change(&h[i]).unwrap(), true);
}
assert_eq!(epoch_manager.will_shard_layout_change(&h[6]).unwrap(), false);

let account2 = "test2".parse().unwrap();
// check that even though "test2" does not track shard 0 in epochs[2], it still cares about shard 0 at epochs[1] because
Expand Down
Loading

0 comments on commit 8f90b92

Please sign in to comment.