Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statement-distribution: fix filtering of statements for elastic parachains #3879

Merged
merged 19 commits into from
Apr 3, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 44 additions & 41 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,9 @@ pub(crate) async fn handle_active_leaves_update<Context>(
.map_err(JfyiError::FetchClaimQueue)?;

let groups_per_para = determine_groups_per_para(
&availability_cores,
&group_rotation_info,
&maybe_claim_queue,
availability_cores,
group_rotation_info,
maybe_claim_queue,
max_candidate_depth,
);
state.per_relay_parent.insert(
Expand Down Expand Up @@ -2139,47 +2139,52 @@ async fn provide_candidate_to_grid<Context>(

// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
fn determine_groups_per_para(
availability_cores: &[CoreState],
group_rotation_info: &GroupRotationInfo,
maybe_claim_queue: &Option<ClaimQueueSnapshot>,
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
maybe_claim_queue: Option<ClaimQueueSnapshot>,
sandreim marked this conversation as resolved.
Show resolved Hide resolved
max_candidate_depth: usize,
) -> HashMap<ParaId, Vec<GroupIndex>> {
let n_cores = availability_cores.len();

// Determine the core indices occupied by each para at the current relay parent. To support
// on-demand parachains we also consider the core indices at next block if core has a candidate
// pending availability.
let para_core_indices = availability_cores.iter().enumerate().filter_map(|(index, core)| {
match core {
CoreState::Scheduled(scheduled_core) =>
Some((scheduled_core.para_id, CoreIndex(index as u32))),
CoreState::Occupied(occupied_core) => {
if max_candidate_depth >= 1 {
// Use claim queue if available, or fallback to `next_up_on_available`
let maybe_scheduled_core = match maybe_claim_queue {
Some(claim_queue) => {
// What's up next on this core ?
fetch_next_scheduled_on_core(claim_queue, CoreIndex(index as u32))
},
None => {
// Runtime doesn't support claim queue runtime api. Fallback to
// `next_up_on_available`
occupied_core.next_up_on_available.clone()
},
};

maybe_scheduled_core
.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32)))
} else {
None
}
},
CoreState::Free => None,
}
});
let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue {
claim_queue
.keys()
.filter_map(|core_index| {
let Some(scheduled_core) = fetch_next_scheduled_on_core(&claim_queue, *core_index)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, first (not introduced here): I find the name of this function vastly misleading as it does not fetch anything.

Second, why are we iterating only keys of the claim queue to fetch the actual element for each key, instead of iterating key/values to begin with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to reuse the fn, but you are right, it is actually useless in this case. A wrapper for this BTreeMap with a method like iter_claims(depth: BlockNumber) returning an iterator over (core_index, para) so we can lookup as far into the future as the lookahead param/ async backing settings allow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdimitrov wdyt ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else {
return None
};

Some((scheduled_core.para_id, *core_index))
})
.collect()
} else {
availability_cores
.into_iter()
.enumerate()
.filter_map(|(index, core)| match core {
CoreState::Scheduled(scheduled_core) =>
Some((scheduled_core.para_id, CoreIndex(index as u32))),
CoreState::Occupied(occupied_core) =>
if max_candidate_depth >= 1 {
occupied_core
.next_up_on_available
.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32)))
} else {
None
},
CoreState::Free => None,
})
.collect()
};

let mut groups_per_para = HashMap::new();
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
for (para, core_index) in para_core_indices {
let group_index = group_rotation_info.group_for_core(core_index, availability_cores.len());
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index)
}

Expand Down Expand Up @@ -3081,13 +3086,11 @@ pub(crate) async fn handle_response<Context>(
relay_parent_state.session,
|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
|para, g_index| {
let expected_groups = relay_parent_state.groups_per_para.get(&para);
let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para) else {
return false
};

expected_groups.is_some() &&
expected_groups
.expect("checked is_some(); qed")
.iter()
.any(|g| g == &g_index)
expected_groups.iter().any(|g| g == &g_index)
},
disabled_mask,
);
Expand Down
Loading