Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop beacon_chain pubkey to index map cache #17

Draft
wants to merge 5 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 0 additions & 44 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1462,50 +1462,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.state_at_slot(self.slot()?, StateSkipConfig::WithStateRoots)
}

/// Returns the validator index (if any) for the given public key.
///
/// ## Notes
///
/// This query uses the `validator_pubkey_cache` which contains _all_ validators ever seen,
/// even if those validators aren't included in the head state. It is important to remember
/// that just because a validator exists here, it doesn't necessarily exist in all
/// `BeaconStates`.
///
/// ## Errors
///
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Result<Option<usize>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;

Ok(pubkey_cache.get_index(pubkey))
}

/// Return the validator indices of all public keys fetched from an iterator.
///
/// If any public key doesn't belong to a known validator then an error will be returned.
/// We could consider relaxing this by returning `Vec<Option<usize>>` in future.
pub fn validator_indices<'a>(
&self,
validator_pubkeys: impl Iterator<Item = &'a PublicKeyBytes>,
) -> Result<Vec<u64>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;

validator_pubkeys
.map(|pubkey| {
pubkey_cache
.get_index(pubkey)
.map(|id| id as u64)
.ok_or(Error::ValidatorPubkeyUnknown(*pubkey))
})
.collect()
}

/// Returns the validator pubkey (if any) for the given validator index.
///
/// ## Notes
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2083,7 +2083,8 @@ fn get_signature_verifier<'a, T: BeaconChainTypes>(

let decompressor = move |pk_bytes| {
// Map compressed pubkey to validator index.
let validator_index = validator_pubkey_cache.get_index(pk_bytes)?;
// TODO(lion): How to ensure the cache is updated here?
let validator_index = state.get_validator_index_readonly_unchecked(pk_bytes)?;
// Map validator index to pubkey (respecting guard on unknown validators).
get_pubkey(validator_index)
};
Expand Down
39 changes: 26 additions & 13 deletions beacon_node/beacon_chain/src/sync_committee_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,22 @@ impl<T: BeaconChainTypes> VerifiedSyncContribution<T> {
.filter_map(|(pubkey, bit)| bit.then_some(pubkey))
.collect::<Vec<_>>();

let participant_indices = {
// TODO(lion): okay to get head state here?
let head_state = &chain.head_snapshot().beacon_state;

participant_pubkeys
.iter()
.map(|pubkey| {
head_state
.get_validator_index_readonly(pubkey)?
.ok_or(Error::UnknownValidatorPubkey(*pubkey))
})
.collect::<Result<Vec<_>, Error>>()?
};

// Ensure that all signatures are valid.
if !verify_signed_aggregate_signatures(
chain,
&signed_aggregate,
participant_pubkeys.as_slice(),
)? {
if !verify_signed_aggregate_signatures(chain, &signed_aggregate, &participant_indices)? {
return Err(Error::InvalidSignature);
}

Expand Down Expand Up @@ -617,7 +627,7 @@ pub fn verify_propagation_slot_range<S: SlotClock, U: SlotData>(
pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
signed_aggregate: &SignedContributionAndProof<T::EthSpec>,
participant_pubkeys: &[PublicKeyBytes],
participant_indexes: &[usize],
) -> Result<bool, Error> {
let pubkey_cache = chain
.validator_pubkey_cache
Expand Down Expand Up @@ -651,12 +661,8 @@ pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
)
.map_err(BeaconChainError::SignatureSetError)?,
sync_committee_contribution_signature_set_from_pubkeys::<T::EthSpec, _>(
|validator_index| {
pubkey_cache
.get_pubkey_from_pubkey_bytes(validator_index)
.map(Cow::Borrowed)
},
participant_pubkeys,
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
participant_indexes,
&signed_aggregate.message.contribution.signature,
signed_aggregate
.message
Expand All @@ -683,13 +689,20 @@ pub fn verify_sync_committee_message<T: BeaconChainTypes>(
let signature_setup_timer =
metrics::start_timer(&metrics::SYNC_MESSAGE_PROCESSING_SIGNATURE_SETUP_TIMES);

let index = chain
.head_snapshot()
.beacon_state
.pubkey_cache()
.get(pubkey_bytes)
.ok_or(Error::UnknownValidatorPubkey(*pubkey_bytes))?;

let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;

let pubkey = pubkey_cache
.get_pubkey_from_pubkey_bytes(pubkey_bytes)
.get(index)
.map(Cow::Borrowed)
.ok_or(Error::UnknownValidatorPubkey(*pubkey_bytes))?;

Expand Down
43 changes: 27 additions & 16 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,16 @@ where
}

pub fn get_current_state(&self) -> BeaconState<E> {
self.chain.head_beacon_state_cloned()
self.chain
.head_snapshot()
.beacon_state
.clone_with(CloneConfig {
committee_caches: true,
pubkey_cache: true,
exit_cache: false,
tree_hash_cache: false,
progressive_balances_cache: false,
})
}

pub fn get_timestamp_at_slot(&self) -> u64 {
Expand Down Expand Up @@ -1163,7 +1172,7 @@ where
/// A list of sync messages for the given state.
pub fn make_sync_committee_messages(
&self,
state: &BeaconState<E>,
state: &mut BeaconState<E>,
head_block_root: Hash256,
message_slot: Slot,
relative_sync_committee: RelativeSyncCommittee,
Expand All @@ -1182,6 +1191,8 @@ where
.spec
.fork_at_epoch(message_slot.epoch(E::slots_per_epoch()));

state.update_pubkey_cache().unwrap();

sync_committee
.pubkeys
.as_ref()
Expand All @@ -1191,11 +1202,10 @@ where
.iter()
.enumerate()
.map(|(subcommittee_position, pubkey)| {
let validator_index = self
.chain
.validator_index(pubkey)
.expect("should find validator index")
.expect("pubkey should exist in the beacon chain");
let validator_index = state
.get_validator_index_readonly(pubkey)
.expect("pubkey cache not updated")
.expect("should find validator index");

let sync_message = SyncCommitteeMessage::new::<E>(
message_slot,
Expand Down Expand Up @@ -1379,14 +1389,16 @@ where

pub fn make_sync_contributions(
&self,
state: &BeaconState<E>,
state: &mut BeaconState<E>,
block_hash: Hash256,
slot: Slot,
relative_sync_committee: RelativeSyncCommittee,
) -> HarnessSyncContributions<E> {
let sync_messages =
self.make_sync_committee_messages(state, block_hash, slot, relative_sync_committee);

state.update_pubkey_cache().unwrap();

let sync_contributions: Vec<Option<SignedContributionAndProof<E>>> = sync_messages
.iter()
.enumerate()
Expand All @@ -1403,11 +1415,10 @@ where
.unwrap()
.iter()
.find_map(|pubkey| {
let validator_index = self
.chain
.validator_index(pubkey)
.expect("should find validator index")
.expect("pubkey should exist in the beacon chain");
let validator_index = state
.get_validator_index_readonly(pubkey)
.expect("pubkey cache not updated")
.expect("should find validator index");

let selection_proof = SyncSelectionProof::new::<E>(
slot,
Expand Down Expand Up @@ -2001,7 +2012,7 @@ where

pub fn sync_committee_sign_block(
&self,
state: &BeaconState<E>,
state: &mut BeaconState<E>,
block_hash: Hash256,
slot: Slot,
relative_sync_committee: RelativeSyncCommittee,
Expand Down Expand Up @@ -2036,14 +2047,14 @@ where
validators: &[usize],
sync_committee_strategy: SyncCommitteeStrategy,
) -> Result<(SignedBeaconBlockHash, BeaconState<E>), BlockError<E>> {
let (block_hash, block, state) = self.add_block_at_slot(slot, state).await?;
let (block_hash, block, mut state) = self.add_block_at_slot(slot, state).await?;
self.attest_block(&state, state_root, block_hash, &block.0, validators);

if sync_committee_strategy == SyncCommitteeStrategy::AllValidators
&& state.current_sync_committee().is_ok()
{
self.sync_committee_sign_block(
&state,
&mut state,
block_hash.into(),
slot,
if (slot + 1).epoch(E::slots_per_epoch())
Expand Down
Loading
Loading