Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elastic scaling: use an assumed CoreIndex in candidate-backing #3229

Merged
merged 24 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cab6ab9
Switch statement table from ParaId to CoreIndex
sandreim Feb 1, 2024
d2df658
cargo lock
sandreim Feb 2, 2024
4a8b8d5
add experimental feature
sandreim Feb 6, 2024
9244632
inject core_index from statements
sandreim Feb 6, 2024
22e017b
temporary provisioner fix
sandreim Feb 6, 2024
574b06a
Merge branch 'master' of github.com:paritytech/polkadot-sdk into sand…
sandreim Feb 6, 2024
fbb7351
cargo lock
sandreim Feb 6, 2024
6c72918
It was damn hard to fix these tests
sandreim Feb 12, 2024
fc5c109
These tests were easy to fix
sandreim Feb 12, 2024
33351b4
Fix comment
sandreim Feb 12, 2024
0d994bf
clippy was angry
sandreim Feb 12, 2024
534c019
A bit refactor and add a test
sandreim Feb 12, 2024
10d86dd
taplo happy
sandreim Feb 12, 2024
222609c
review feedback
sandreim Feb 14, 2024
a02e896
remove log
sandreim Feb 14, 2024
dd34850
more feedback
sandreim Feb 14, 2024
838a846
Merge remote-tracking branch 'origin/master' into sandreim/backing_mu…
alindima Feb 19, 2024
ad98f18
use next up on available instead of occupied core index
alindima Feb 19, 2024
606d7c4
ElasticScalingCoreIndex -> ElasticScalingMVP
alindima Feb 19, 2024
c793b89
small nits and typos
alindima Feb 20, 2024
8398bb1
Merge remote-tracking branch 'origin/master' into sandreim/backing_mu…
alindima Feb 20, 2024
9f70276
Merge remote-tracking branch 'origin/master' into sandreim/backing_mu…
alindima Feb 21, 2024
9c3dd5c
cache Validator->Group mapping
alindima Feb 21, 2024
af1cd82
use Arc to avoid cloning
alindima Feb 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/node/core/backing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
gum = { package = "tracing-gum", path = "../../gum" }
thiserror = { workspace = true }
fatality = "0.0.6"
schnellru = "0.2.1"

[dev-dependencies]
sp-core = { path = "../../../../substrate/primitives/core" }
Expand Down
143 changes: 96 additions & 47 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use futures::{
stream::FuturesOrdered,
FutureExt, SinkExt, StreamExt, TryFutureExt,
};
use schnellru::{ByLength, LruMap};

use error::{Error, FatalResult};
use polkadot_node_primitives::{
Expand Down Expand Up @@ -107,8 +108,9 @@ use polkadot_primitives::{
vstaging::{node_features::FeatureIndex, NodeFeatures},
BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt,
CommittedCandidateReceipt, CoreIndex, CoreState, ExecutorParams, GroupIndex, GroupRotationInfo,
Hash, Id as ParaId, PersistedValidationData, PvfExecKind, SigningContext, ValidationCode,
ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
Hash, Id as ParaId, IndexedVec, PersistedValidationData, PvfExecKind, SessionIndex,
SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature,
ValidityAttestation,
};
use sp_keystore::KeystorePtr;
use statement_table::{
Expand Down Expand Up @@ -232,8 +234,8 @@ struct PerRelayParentState {
inject_core_index: bool,
/// The core states for all cores.
cores: Vec<CoreState>,
/// The validator groups at this relay parent.
validator_groups: Vec<Vec<ValidatorIndex>>,
/// The validator index -> group mapping at this relay parent.
validator_to_group: IndexedVec<ValidatorIndex, Option<GroupIndex>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really necessary to copy this to the relay parent state? Can't we just keep the session index here and then fetch a reference from state directly where we need it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this being an LruMap, it makes it a bit harder to reason about the existence of the session index in the map.

And we can't store a mutable reference here and just populate the map when the session is not present because we'd need to hand out multiple mutable references.

Realistically, backing can only happen on the current session, right? So the LruMap doesn't need a capacity larger than 1 (so then, it doesn't even need to be a LruMap). Am I right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with current async backing code, we cannot start backing a candidate in the next session in advance. But we may want to keep this possibility, making the LruMap neccessary.

What I can do is wrap it in an Arc, because they don't need to modify it. Which would achieve the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. I'll merge it once CI passes again

/// The associated group rotation information.
group_rotation_info: GroupRotationInfo,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two are really per session. Storing them per relay parent is rather unnecessary. Seems fine for an interim solution though.

}
Expand Down Expand Up @@ -287,6 +289,8 @@ struct State {
/// This is guaranteed to have an entry for each candidate with a relay parent in the implicit
/// or explicit view for which a `Seconded` statement has been successfully imported.
per_candidate: HashMap<CandidateHash, PerCandidateState>,
/// Cache the per-session Validator->Group mapping.
validator_to_group_cache: LruMap<SessionIndex, IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
/// A cloneable sender which is dispatched to background candidate validation tasks to inform
/// the main task of the result.
background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
Expand All @@ -304,6 +308,7 @@ impl State {
per_leaf: HashMap::default(),
per_relay_parent: HashMap::default(),
per_candidate: HashMap::new(),
validator_to_group_cache: LruMap::new(ByLength::new(2)),
background_validation_tx,
keystore,
}
Expand Down Expand Up @@ -986,7 +991,14 @@ async fn handle_active_leaves_update<Context>(

// construct a `PerRelayParent` from the runtime API
// and insert it.
let per = construct_per_relay_parent_state(ctx, maybe_new, &state.keystore, mode).await?;
let per = construct_per_relay_parent_state(
ctx,
maybe_new,
&state.keystore,
&mut state.validator_to_group_cache,
mode,
)
.await?;

if let Some(per) = per {
state.per_relay_parent.insert(maybe_new, per);
Expand Down Expand Up @@ -1014,7 +1026,7 @@ macro_rules! try_runtime_api {
}

fn core_index_from_statement(
validator_groups: &[Vec<ValidatorIndex>],
validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
group_rotation_info: &GroupRotationInfo,
cores: &[CoreState],
statement: &SignedFullStatementWithPVD,
Expand All @@ -1024,51 +1036,70 @@ fn core_index_from_statement(

let n_cores = cores.len();

gum::trace!(target: LOG_TARGET, ?group_rotation_info, ?statement, ?validator_groups, n_cores = ?cores.len() , ?candidate_hash, "Extracting core index from statement");
gum::trace!(
target:LOG_TARGET,
?group_rotation_info,
?statement,
?validator_to_group,
n_cores = ?cores.len(),
?candidate_hash,
"Extracting core index from statement"
);

let statement_validator_index = statement.validator_index();
for (group_index, group) in validator_groups.iter().enumerate() {
for validator_index in group {
if *validator_index == statement_validator_index {
// First check if the statement para id matches the core assignment.
let core_index =
group_rotation_info.core_for_group(GroupIndex(group_index as u32), n_cores);

if core_index.0 as usize > n_cores {
gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
return None
}
let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
gum::debug!(
target: LOG_TARGET,
?group_rotation_info,
?statement,
?validator_to_group,
n_cores = ?cores.len() ,
?candidate_hash,
"Invalid validator index: {:?}",
statement_validator_index
);
return None
};

if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
let candidate_para_id = candidate.descriptor.para_id;
let assigned_para_id = match &cores[core_index.0 as usize] {
CoreState::Free => {
gum::debug!(target: LOG_TARGET, ?candidate_hash, "Invalid CoreIndex, core is not assigned to any para_id");
return None
},
CoreState::Occupied(occupied) => {
if let Some(next) = &occupied.next_up_on_available {
next.para_id
} else {
return None
}
},
CoreState::Scheduled(scheduled) => scheduled.para_id,
};
// First check if the statement para id matches the core assignment.
let core_index = group_rotation_info.core_for_group(*group_index, n_cores);

if assigned_para_id != candidate_para_id {
gum::debug!(target: LOG_TARGET, ?candidate_hash, ?core_index, ?assigned_para_id, ?candidate_para_id, "Invalid CoreIndex, core is assigned to a different para_id");
return None
}
return Some(core_index)
if core_index.0 as usize > n_cores {
gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
return None
}

if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
let candidate_para_id = candidate.descriptor.para_id;
let assigned_para_id = match &cores[core_index.0 as usize] {
CoreState::Free => {
gum::debug!(target: LOG_TARGET, ?candidate_hash, "Invalid CoreIndex, core is not assigned to any para_id");
return None
},
CoreState::Occupied(occupied) =>
if let Some(next) = &occupied.next_up_on_available {
next.para_id
} else {
return Some(core_index)
}
}
return None
},
CoreState::Scheduled(scheduled) => scheduled.para_id,
};

if assigned_para_id != candidate_para_id {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?core_index,
?assigned_para_id,
?candidate_para_id,
"Invalid CoreIndex, core is assigned to a different para_id"
);
return None
}
return Some(core_index)
} else {
return Some(core_index)
}

None
}

/// Load the data necessary to do backing work on top of a relay-parent.
Expand All @@ -1077,6 +1108,10 @@ async fn construct_per_relay_parent_state<Context>(
ctx: &mut Context,
relay_parent: Hash,
keystore: &KeystorePtr,
validator_to_group_cache: &mut LruMap<
SessionIndex,
IndexedVec<ValidatorIndex, Option<GroupIndex>>,
>,
mode: ProspectiveParachainsMode,
) -> Result<Option<PerRelayParentState>, Error> {
let parent = relay_parent;
Expand Down Expand Up @@ -1172,7 +1207,21 @@ async fn construct_per_relay_parent_state<Context>(
groups.insert(core_index, g.clone());
}
}
gum::debug!(target: LOG_TARGET, ?groups, "TableContext" );
gum::debug!(target: LOG_TARGET, ?groups, "TableContext");

let validator_to_group = validator_to_group_cache
.get_or_insert(session_index, || {
let mut vector = vec![None; validators.len()];

for (group_idx, validator_group) in validator_groups.iter().enumerate() {
for validator in validator_group {
vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
}
}

IndexedVec::<_, _>::from(vector)
})
.expect("Just inserted");

let table_context = TableContext { validator, groups, validators, disabled_validators };
let table_config = TableConfig {
Expand All @@ -1196,7 +1245,7 @@ async fn construct_per_relay_parent_state<Context>(
minimum_backing_votes,
inject_core_index,
cores,
validator_groups,
validator_to_group: validator_to_group.clone(),
group_rotation_info,
}))
}
Expand Down Expand Up @@ -1691,7 +1740,7 @@ async fn import_statement<Context>(
let stmt = primitive_statement_to_table(statement);

let core = core_index_from_statement(
&rp_state.validator_groups,
&rp_state.validator_to_group,
&rp_state.group_rotation_info,
&rp_state.cores,
statement,
Expand Down
13 changes: 10 additions & 3 deletions polkadot/node/core/backing/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub(crate) struct TestState {
validator_public: Vec<ValidatorId>,
validation_data: PersistedValidationData,
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
validator_to_group: IndexedVec<ValidatorIndex, Option<GroupIndex>>,
availability_cores: Vec<CoreState>,
head_data: HashMap<ParaId, HeadData>,
signing_context: SigningContext,
Expand Down Expand Up @@ -114,6 +115,11 @@ impl Default for TestState {
.into_iter()
.map(|g| g.into_iter().map(ValidatorIndex).collect())
.collect();
let validator_to_group: IndexedVec<_, _> =
vec![Some(0), Some(1), Some(0), Some(0), None, Some(0)]
.into_iter()
.map(|x| x.map(|x| GroupIndex(x)))
.collect();
let group_rotation_info =
GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, now: 1 };

Expand Down Expand Up @@ -143,6 +149,7 @@ impl Default for TestState {
validators,
validator_public,
validator_groups: (validator_groups, group_rotation_info),
validator_to_group,
availability_cores,
head_data,
validation_data,
Expand Down Expand Up @@ -720,7 +727,7 @@ fn extract_core_index_from_statement_works() {
.expect("should be signed");

let core_index_1 = core_index_from_statement(
&test_state.validator_groups.0,
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
&signed_statement_1,
Expand All @@ -730,7 +737,7 @@ fn extract_core_index_from_statement_works() {
assert_eq!(core_index_1, CoreIndex(0));

let core_index_2 = core_index_from_statement(
&test_state.validator_groups.0,
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
&signed_statement_2,
Expand All @@ -740,7 +747,7 @@ fn extract_core_index_from_statement_works() {
assert_eq!(core_index_2, None);

let core_index_3 = core_index_from_statement(
&test_state.validator_groups.0,
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
&signed_statement_3,
Expand Down
Loading