Skip to content

Commit

Permalink
statement-distribution: RFC103 implementation (#5883)
Browse files Browse the repository at this point in the history
Part of #5047
On top of #5679

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
Co-authored-by: GitHub Action <action@github.com>
  • Loading branch information
sandreim and actions-user authored Nov 4, 2024
1 parent 028e61b commit 38cd03c
Show file tree
Hide file tree
Showing 13 changed files with 731 additions and 137 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn make_committed_candidate(
persisted_validation_data_hash: persisted_validation_data.hash(),
pov_hash: Hash::repeat_byte(1),
erasure_root: Hash::repeat_byte(1),
signature: test_helpers::dummy_collator_signature(),
signature: test_helpers::zero_collator_signature(),
para_head: para_head.hash(),
validation_code_hash: Hash::repeat_byte(42).into(),
}
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/network/statement-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ polkadot-primitives = { workspace = true, features = ["test"] }
polkadot-primitives-test-helpers = { workspace = true }
rand_chacha = { workspace = true, default-features = true }
polkadot-subsystem-bench = { workspace = true }
rstest = { workspace = true }

[[bench]]
name = "statement-distribution-regression-bench"
Expand Down
5 changes: 1 addition & 4 deletions polkadot/node/network/statement-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,14 @@ pub enum Error {
#[error("Fetching session info failed {0:?}")]
FetchSessionInfo(RuntimeApiError),

#[error("Fetching availability cores failed {0:?}")]
FetchAvailabilityCores(RuntimeApiError),

#[error("Fetching disabled validators failed {0:?}")]
FetchDisabledValidators(runtime::Error),

#[error("Fetching validator groups failed {0:?}")]
FetchValidatorGroups(RuntimeApiError),

#[error("Fetching claim queue failed {0:?}")]
FetchClaimQueue(runtime::Error),
FetchClaimQueue(RuntimeApiError),

#[error("Attempted to share statement when not a validator or not assigned")]
InvalidShare,
Expand Down
153 changes: 72 additions & 81 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
runtime::{
fetch_claim_queue, request_min_backing_votes, ClaimQueueSnapshot, ProspectiveParachainsMode,
request_min_backing_votes, request_node_features, ClaimQueueSnapshot,
ProspectiveParachainsMode,
},
};
use polkadot_primitives::{
vstaging::CoreState, AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex,
GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, SessionIndex, SessionInfo,
node_features::FeatureIndex,
vstaging::{transpose_claim_queue, CandidateDescriptorVersion, TransposedClaimQueue},
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, GroupIndex,
GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures, SessionIndex, SessionInfo,
SignedStatement, SigningContext, UncheckedSignedStatement, ValidatorId, ValidatorIndex,
};

Expand Down Expand Up @@ -137,6 +140,12 @@ const COST_UNREQUESTED_RESPONSE_STATEMENT: Rep =
Rep::CostMajor("Un-requested Statement In Response");
const COST_INACCURATE_ADVERTISEMENT: Rep =
Rep::CostMajor("Peer advertised a candidate inaccurately");
const COST_UNSUPPORTED_DESCRIPTOR_VERSION: Rep =
Rep::CostMajor("Candidate Descriptor version is not supported");
const COST_INVALID_CORE_INDEX: Rep =
Rep::CostMajor("Candidate Descriptor contains an invalid core index");
const COST_INVALID_SESSION_INDEX: Rep =
Rep::CostMajor("Candidate Descriptor contains an invalid session index");

const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
const COST_INVALID_REQUEST_BITFIELD_SIZE: Rep =
Expand All @@ -156,6 +165,7 @@ struct PerRelayParentState {
statement_store: StatementStore,
seconding_limit: usize,
session: SessionIndex,
transposed_cq: TransposedClaimQueue,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
disabled_validators: HashSet<ValidatorIndex>,
}
Expand Down Expand Up @@ -219,10 +229,17 @@ struct PerSessionState {
// getting the topology from the gossip-support subsystem
grid_view: Option<grid::SessionTopologyView>,
local_validator: Option<LocalValidatorIndex>,
// `true` if v2 candidate receipts are allowed by the runtime
allow_v2_descriptors: bool,
}

impl PerSessionState {
fn new(session_info: SessionInfo, keystore: &KeystorePtr, backing_threshold: u32) -> Self {
fn new(
session_info: SessionInfo,
keystore: &KeystorePtr,
backing_threshold: u32,
allow_v2_descriptors: bool,
) -> Self {
let groups = Groups::new(session_info.validator_groups.clone(), backing_threshold);
let mut authority_lookup = HashMap::new();
for (i, ad) in session_info.discovery_keys.iter().cloned().enumerate() {
Expand All @@ -235,7 +252,14 @@ impl PerSessionState {
)
.map(|(_, index)| LocalValidatorIndex::Active(index));

PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator }
PerSessionState {
session_info,
groups,
authority_lookup,
grid_view: None,
local_validator,
allow_v2_descriptors,
}
}

fn supply_topology(
Expand Down Expand Up @@ -271,6 +295,11 @@ impl PerSessionState {
fn is_not_validator(&self) -> bool {
self.grid_view.is_some() && self.local_validator.is_none()
}

/// Returns `true` if v2 candidate receipts are enabled
fn candidate_receipt_v2_enabled(&self) -> bool {
self.allow_v2_descriptors
}
}

pub(crate) struct State {
Expand Down Expand Up @@ -615,8 +644,18 @@ pub(crate) async fn handle_active_leaves_update<Context>(

let minimum_backing_votes =
request_min_backing_votes(new_relay_parent, session_index, ctx.sender()).await?;
let mut per_session_state =
PerSessionState::new(session_info, &state.keystore, minimum_backing_votes);
let node_features =
request_node_features(new_relay_parent, session_index, ctx.sender()).await?;
let mut per_session_state = PerSessionState::new(
session_info,
&state.keystore,
minimum_backing_votes,
node_features
.unwrap_or(NodeFeatures::EMPTY)
.get(FeatureIndex::CandidateReceiptV2 as usize)
.map(|b| *b)
.unwrap_or(false),
);
if let Some(topology) = state.unused_topologies.remove(&session_index) {
per_session_state.supply_topology(&topology.topology, topology.local_index);
}
Expand All @@ -642,18 +681,6 @@ pub(crate) async fn handle_active_leaves_update<Context>(
continue
}

// New leaf: fetch info from runtime API and initialize
// `per_relay_parent`.

let availability_cores = polkadot_node_subsystem_util::request_availability_cores(
new_relay_parent,
ctx.sender(),
)
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchAvailabilityCores)?;

let group_rotation_info =
polkadot_node_subsystem_util::request_validator_groups(new_relay_parent, ctx.sender())
.await
Expand All @@ -662,37 +689,37 @@ pub(crate) async fn handle_active_leaves_update<Context>(
.map_err(JfyiError::FetchValidatorGroups)?
.1;

let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent)
.await
.unwrap_or_else(|err| {
gum::debug!(target: LOG_TARGET, ?new_relay_parent, ?err, "handle_active_leaves_update: `claim_queue` API not available");
None
});
let claim_queue = ClaimQueueSnapshot(
polkadot_node_subsystem_util::request_claim_queue(new_relay_parent, ctx.sender())
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchClaimQueue)?,
);

let local_validator = per_session.local_validator.and_then(|v| {
if let LocalValidatorIndex::Active(idx) = v {
find_active_validator_state(
idx,
&per_session.groups,
&availability_cores,
&group_rotation_info,
&maybe_claim_queue,
&claim_queue,
seconding_limit,
max_candidate_depth,
)
} else {
Some(LocalValidatorState { grid_tracker: GridTracker::default(), active: None })
}
});

let groups_per_para = determine_groups_per_para(
availability_cores,
per_session.groups.all().len(),
group_rotation_info,
&maybe_claim_queue,
max_candidate_depth,
&claim_queue,
)
.await;

let transposed_cq = transpose_claim_queue(claim_queue.0);

state.per_relay_parent.insert(
new_relay_parent,
PerRelayParentState {
Expand All @@ -702,6 +729,7 @@ pub(crate) async fn handle_active_leaves_update<Context>(
session: session_index,
groups_per_para,
disabled_validators,
transposed_cq,
},
);
}
Expand Down Expand Up @@ -741,35 +769,18 @@ pub(crate) async fn handle_active_leaves_update<Context>(
fn find_active_validator_state(
validator_index: ValidatorIndex,
groups: &Groups,
availability_cores: &[CoreState],
group_rotation_info: &GroupRotationInfo,
maybe_claim_queue: &Option<ClaimQueueSnapshot>,
claim_queue: &ClaimQueueSnapshot,
seconding_limit: usize,
max_candidate_depth: usize,
) -> Option<LocalValidatorState> {
if groups.all().is_empty() {
return None
}

let our_group = groups.by_validator_index(validator_index)?;

let core_index = group_rotation_info.core_for_group(our_group, availability_cores.len());
let paras_assigned_to_core = if let Some(claim_queue) = maybe_claim_queue {
claim_queue.iter_claims_for_core(&core_index).copied().collect()
} else {
availability_cores
.get(core_index.0 as usize)
.and_then(|core_state| match core_state {
CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id),
CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core
.next_up_on_available
.as_ref()
.map(|scheduled_core| scheduled_core.para_id),
CoreState::Free | CoreState::Occupied(_) => None,
})
.into_iter()
.collect()
};
let core_index = group_rotation_info.core_for_group(our_group, groups.all().len());
let paras_assigned_to_core = claim_queue.iter_claims_for_core(&core_index).copied().collect();
let group_validators = groups.get(our_group)?.to_owned();

Some(LocalValidatorState {
Expand Down Expand Up @@ -2174,39 +2185,16 @@ async fn provide_candidate_to_grid<Context>(

// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
async fn determine_groups_per_para(
availability_cores: Vec<CoreState>,
n_cores: usize,
group_rotation_info: GroupRotationInfo,
maybe_claim_queue: &Option<ClaimQueueSnapshot>,
max_candidate_depth: usize,
claim_queue: &ClaimQueueSnapshot,
) -> HashMap<ParaId, Vec<GroupIndex>> {
let n_cores = availability_cores.len();

// Determine the core indices occupied by each para at the current relay parent. To support
// on-demand parachains we also consider the core indices at next blocks.
let schedule: HashMap<CoreIndex, Vec<ParaId>> = if let Some(claim_queue) = maybe_claim_queue {
claim_queue
.iter_all_claims()
.map(|(core_index, paras)| (*core_index, paras.iter().copied().collect()))
.collect()
} else {
availability_cores
.into_iter()
.enumerate()
.filter_map(|(index, core)| match core {
CoreState::Scheduled(scheduled_core) =>
Some((CoreIndex(index as u32), vec![scheduled_core.para_id])),
CoreState::Occupied(occupied_core) =>
if max_candidate_depth >= 1 {
occupied_core.next_up_on_available.map(|scheduled_core| {
(CoreIndex(index as u32), vec![scheduled_core.para_id])
})
} else {
None
},
CoreState::Free => None,
})
.collect()
};
let schedule: HashMap<CoreIndex, Vec<ParaId>> = claim_queue
.iter_all_claims()
.map(|(core_index, paras)| (*core_index, paras.iter().copied().collect()))
.collect();

let mut groups_per_para = HashMap::new();
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
Expand Down Expand Up @@ -3106,11 +3094,12 @@ pub(crate) async fn handle_response<Context>(
) {
let &requests::CandidateIdentifier { relay_parent, candidate_hash, group_index } =
response.candidate_identifier();
let peer = *response.requested_peer();

gum::trace!(
target: LOG_TARGET,
?candidate_hash,
peer = ?response.requested_peer(),
?peer,
"Received response",
);

Expand Down Expand Up @@ -3145,6 +3134,8 @@ pub(crate) async fn handle_response<Context>(
expected_groups.iter().any(|g| g == &g_index)
},
disabled_mask,
&relay_parent_state.transposed_cq,
per_session.candidate_receipt_v2_enabled(),
);

for (peer, rep) in res.reputation_changes {
Expand Down
Loading

0 comments on commit 38cd03c

Please sign in to comment.