Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Better error handling in approval-voting (#2603)
Browse files Browse the repository at this point in the history
* make approval voting resilient to dropped requests

* some more

* skip whole chain if encountering spurious error
  • Loading branch information
rphmeier authored Mar 10, 2021
1 parent 95b2e18 commit c257eaf
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 50 deletions.
100 changes: 62 additions & 38 deletions node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,16 @@ async fn determine_new_blocks(
Ok(ancestry)
}

// Sessions unavailable in state to cache.
#[derive(Debug)]
struct SessionsUnavailable;

async fn load_all_sessions(
ctx: &mut impl SubsystemContext,
block_hash: Hash,
start: SessionIndex,
end_inclusive: SessionIndex,
) -> SubsystemResult<Option<Vec<SessionInfo>>> {
) -> Result<Vec<SessionInfo>, SessionsUnavailable> {
let mut v = Vec::new();
for i in start..=end_inclusive {
let (tx, rx)= oneshot::channel();
Expand All @@ -214,22 +218,17 @@ async fn load_all_sessions(
block_hash,
);

return Ok(None);
return Err(SessionsUnavailable);
}
Ok(Err(e)) => return Err(SubsystemError::with_origin("approval-voting", e)),
Err(e) => return Err(SubsystemError::with_origin("approval-voting", e)),
Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable),
};

v.push(session_info);
}

Ok(Some(v))
Ok(v)
}

// Sessions unavailable in state to cache.
#[derive(Debug)]
struct SessionsUnavailable;

// When inspecting a new import notification, updates the session info cache to match
// the session of the imported block.
//
Expand All @@ -242,7 +241,7 @@ async fn cache_session_info_for_head(
session_window: &mut RollingSessionWindow,
block_hash: Hash,
block_header: &Header,
) -> SubsystemResult<Result<(), SessionsUnavailable>> {
) -> Result<(), SessionsUnavailable> {
let session_index = {
let (s_tx, s_rx) = oneshot::channel();

Expand All @@ -254,9 +253,9 @@ async fn cache_session_info_for_head(
RuntimeApiRequest::SessionIndexForChild(s_tx),
).into()).await;

match s_rx.await? {
Ok(s) => s,
Err(e) => return Err(SubsystemError::with_origin("approval-voting", e)),
match s_rx.await {
Ok(Ok(s)) => s,
Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable),
}
};

Expand All @@ -271,17 +270,17 @@ async fn cache_session_info_for_head(
window_start, session_index,
);

match load_all_sessions(ctx, block_hash, window_start, session_index).await? {
None => {
match load_all_sessions(ctx, block_hash, window_start, session_index).await {
Err(SessionsUnavailable) => {
tracing::warn!(
target: LOG_TARGET,
"Could not load sessions {}..={} from block {:?} in session {}",
window_start, session_index, block_hash, session_index,
);

return Ok(Err(SessionsUnavailable));
return Err(SessionsUnavailable);
},
Some(s) => {
Ok(s) => {
session_window.earliest_session = Some(window_start);
session_window.session_info = s;
}
Expand All @@ -291,7 +290,7 @@ async fn cache_session_info_for_head(
let latest = session_window.latest_session().expect("latest always exists if earliest does; qed");

// Either cached or ancient.
if session_index <= latest { return Ok(Ok(())) }
if session_index <= latest { return Ok(()) }

let old_window_end = latest;

Expand All @@ -311,17 +310,17 @@ async fn cache_session_info_for_head(
latest + 1
};

match load_all_sessions(ctx, block_hash, fresh_start, session_index).await? {
None => {
match load_all_sessions(ctx, block_hash, fresh_start, session_index).await {
Err(SessionsUnavailable) => {
tracing::warn!(
target: LOG_TARGET,
"Could not load sessions {}..={} from block {:?} in session {}",
latest + 1, session_index, block_hash, session_index,
);

return Ok(Err(SessionsUnavailable));
return Err(SessionsUnavailable);
}
Some(s) => {
Ok(s) => {
let outdated = std::cmp::min(overlap_start as usize, session_window.session_info.len());
session_window.session_info.drain(..outdated);
session_window.session_info.extend(s);
Expand All @@ -335,7 +334,7 @@ async fn cache_session_info_for_head(
}
}

Ok(Ok(()))
Ok(())
}

struct ImportedBlockInfo {
Expand Down Expand Up @@ -539,7 +538,13 @@ pub(crate) async fn handle_new_head(

match h_rx.await? {
Err(e) => {
return Err(SubsystemError::with_origin("approval-voting", e));
tracing::debug!(
target: LOG_TARGET,
"Chain API subsystem temporarily unreachable {}",
e,
);

return Ok(Vec::new());
}
Ok(None) => {
tracing::warn!(target: LOG_TARGET, "Missing header for new head {}", head);
Expand All @@ -555,7 +560,7 @@ pub(crate) async fn handle_new_head(
&mut state.session_window,
head,
&header,
).await?
).await
{
tracing::warn!(
target: LOG_TARGET,
Expand All @@ -582,24 +587,43 @@ pub(crate) async fn handle_new_head(
let mut imported_candidates = Vec::with_capacity(new_blocks.len());

// `determine_new_blocks` gives us a vec in backwards order. we want to move forwards.
for (block_hash, block_header) in new_blocks.into_iter().rev() {
let env = ImportedBlockInfoEnv {
session_window: &state.session_window,
assignment_criteria: &*state.assignment_criteria,
keystore: &state.keystore,
};
let imported_blocks_and_info = {
let mut imported_blocks_and_info = Vec::with_capacity(new_blocks.len());
for (block_hash, block_header) in new_blocks.into_iter().rev() {
let env = ImportedBlockInfoEnv {
session_window: &state.session_window,
assignment_criteria: &*state.assignment_criteria,
keystore: &state.keystore,
};

match imported_block_info(ctx, env, block_hash, &block_header).await? {
Some(i) => imported_blocks_and_info.push((block_hash, block_header, i)),
None => {
// Such errors are likely spurious, but this prevents us from getting gaps
// in the approval-db.
tracing::warn!(
target: LOG_TARGET,
"Unable to gather info about imported block {:?}. Skipping chain.",
(block_hash, block_header.number),
);

return Ok(Vec::new());
},
};
}

imported_blocks_and_info
};

for (block_hash, block_header, imported_block_info) in imported_blocks_and_info {
let ImportedBlockInfo {
included_candidates,
session_index,
assignments,
n_validators,
relay_vrf_story,
slot,
} = match imported_block_info(ctx, env, block_hash, &block_header).await? {
Some(i) => i,
None => continue,
};
} = imported_block_info;

let session_info = state.session_window.session_info(session_index)
.expect("imported_block_info requires session to be available; qed");
Expand Down Expand Up @@ -1772,7 +1796,7 @@ mod tests {
&mut window,
hash,
&header,
).await.unwrap().unwrap();
).await.unwrap();

assert_eq!(window.earliest_session, Some(expected_start_session));
assert_eq!(
Expand Down Expand Up @@ -1953,7 +1977,7 @@ mod tests {
&mut window,
hash,
&header,
).await.unwrap();
).await;

assert_matches!(res, Err(SessionsUnavailable));
})
Expand Down Expand Up @@ -2020,7 +2044,7 @@ mod tests {
&mut window,
hash,
&header,
).await.unwrap().unwrap();
).await.unwrap();

assert_eq!(window.earliest_session, Some(session));
assert_eq!(
Expand Down
31 changes: 19 additions & 12 deletions node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,10 +665,10 @@ async fn handle_approved_ancestor(

ctx.send_message(ChainApiMessage::BlockNumber(target, tx).into()).await;

match rx.await? {
Ok(Some(n)) => n,
Ok(None) => return Ok(None),
Err(_) => return Ok(None),
match rx.await {
Ok(Ok(Some(n))) => n,
Ok(Ok(None)) => return Ok(None),
Ok(Err(_)) | Err(_) => return Ok(None),
}
};

Expand All @@ -689,9 +689,9 @@ async fn handle_approved_ancestor(
response_channel: tx,
}.into()).await;

match rx.await? {
Ok(a) => a,
Err(_) => return Ok(None),
match rx.await {
Ok(Ok(a)) => a,
Err(_) | Ok(Err(_)) => return Ok(None),
}
} else {
Vec::new()
Expand Down Expand Up @@ -1406,11 +1406,18 @@ async fn launch_approval(
ChainApiMessage::BlockNumber(candidate.descriptor.relay_parent, context_num_tx).into()
).await;

let in_context_number = match context_num_rx.await?
.map_err(|e| SubsystemError::with_origin("chain-api", e))?
{
Some(n) => n,
None => return Ok(()),
let in_context_number = match context_num_rx.await {
Ok(Ok(Some(n))) => n,
Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
tracing::warn!(
target: LOG_TARGET,
"Could not launch approval work for candidate {:?}: Number of block {} unknown",
(candidate_hash, candidate.descriptor.para_id),
candidate.descriptor.relay_parent,
);

return Ok(());
}
};

ctx.send_message(
Expand Down

0 comments on commit c257eaf

Please sign in to comment.