Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

make it easier to dbg stalls #3351

Merged
merged 4 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions node/core/parachains-inherent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ impl ParachainsInherentDataProvider {
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)?.map_err(Error::Subsystem)?;

let (sender, receiver) = futures::channel::oneshot::channel();
overseer.send_msg(AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent, sender),
)).await;
overseer.send_msg(
AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent, sender),
),
std::any::type_name::<Self>(),
).await;

receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData)
};
Expand Down
94 changes: 63 additions & 31 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,10 @@ impl From<FinalityNotification<Block>> for BlockInfo {
enum Event {
BlockImported(BlockInfo),
BlockFinalized(BlockInfo),
MsgToSubsystem(AllMessages),
MsgToSubsystem {
msg: AllMessages,
origin: &'static str,
},
ExternalRequest(ExternalRequest),
Stop,
}
Expand Down Expand Up @@ -452,8 +455,16 @@ impl OverseerHandler {
}

/// Send some message to one of the `Subsystem`s.
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) {
self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
self.send_and_log_error(Event::MsgToSubsystem {
msg: msg.into(),
origin,
}).await
}

/// Same as `send_msg`, but with no origin. Used for tests.
pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
self.send_msg(msg, "").await
}

/// Inform the `Overseer` that some block was finalized.
Expand Down Expand Up @@ -801,7 +812,8 @@ pub struct OverseerSubsystemSender {
#[async_trait::async_trait]
impl SubsystemSender for OverseerSubsystemSender {
async fn send_message(&mut self, msg: AllMessages) {
self.channels.send_and_log_error(self.signals_received.load(), msg).await;
let needed_signals = self.signals_received.load();
self.channels.send_and_log_error(needed_signals, msg).await;
}

async fn send_messages<T>(&mut self, msgs: T)
Expand Down Expand Up @@ -891,12 +903,18 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
loop {
// If we have a message pending an overseer signal, we only poll for signals
// in the meantime.
let signals_received = self.signals_received.load();
if let Some((needs_signals_received, msg)) = self.pending_incoming.take() {
if needs_signals_received <= self.signals_received.load() {
if needs_signals_received <= signals_received {
return Ok(FromOverseer::Communication { msg });
} else {
self.pending_incoming = Some((needs_signals_received, msg));

tracing::debug!(
target: LOG_TARGET,
subsystem = std::any::type_name::<M>(),
diff = needs_signals_received - signals_received,
"waiting for a signal",
);
// wait for next signal.
let signal = self.signals.next().await
.ok_or(SubsystemError::Context(
Expand All @@ -911,7 +929,6 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {

let mut await_message = self.messages.next();
let mut await_signal = self.signals.next();
let signals_received = self.signals_received.load();
let pending_incoming = &mut self.pending_incoming;

// Otherwise, wait for the next signal or incoming message.
Expand Down Expand Up @@ -989,7 +1006,7 @@ impl<M> OverseenSubsystem<M> {
/// Send a message to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_message(&mut self, msg: M) -> SubsystemResult<()> {
async fn send_message(&mut self, msg: M, origin: &'static str) -> SubsystemResult<()> {
ordian marked this conversation as resolved.
Show resolved Hide resolved
const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);

if let Some(ref mut instance) = self.instance {
Expand All @@ -999,7 +1016,12 @@ impl<M> OverseenSubsystem<M> {
}).timeout(MESSAGE_TIMEOUT).await
{
None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
tracing::error!(
target: LOG_TARGET,
%origin,
"Subsystem {} appears unresponsive.",
instance.name,
);
Err(SubsystemError::SubsystemStalled(instance.name))
}
Some(res) => res.map_err(Into::into),
Expand All @@ -1016,9 +1038,15 @@ impl<M> OverseenSubsystem<M> {
const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10);

if let Some(ref mut instance) = self.instance {
match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await {
match instance.tx_signal.send(signal.clone()).timeout(SIGNAL_TIMEOUT).await {
None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
tracing::error!(
target: LOG_TARGET,
?signal,
received = instance.signals_received,
"Subsystem {} appears unresponsive.",
instance.name,
);
Err(SubsystemError::SubsystemStalled(instance.name))
}
Some(res) => {
Expand Down Expand Up @@ -1903,8 +1931,8 @@ where
select! {
msg = self.events_rx.select_next_some() => {
match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg.into()).await?;
Event::MsgToSubsystem { msg, origin } => {
self.route_message(msg.into(), origin).await?;
}
Event::Stop => {
self.stop().await;
Expand Down Expand Up @@ -2028,59 +2056,63 @@ where
Ok(())
}

async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
async fn route_message(
&mut self,
msg: AllMessages,
origin: &'static str,
) -> SubsystemResult<()> {
self.metrics.on_message_relayed();
match msg {
AllMessages::CandidateValidation(msg) => {
self.subsystems.candidate_validation.send_message(msg).await?;
self.subsystems.candidate_validation.send_message(msg, origin).await?;
},
AllMessages::CandidateBacking(msg) => {
self.subsystems.candidate_backing.send_message(msg).await?;
self.subsystems.candidate_backing.send_message(msg, origin).await?;
},
AllMessages::StatementDistribution(msg) => {
self.subsystems.statement_distribution.send_message(msg).await?;
self.subsystems.statement_distribution.send_message(msg, origin).await?;
},
AllMessages::AvailabilityDistribution(msg) => {
self.subsystems.availability_distribution.send_message(msg).await?;
self.subsystems.availability_distribution.send_message(msg, origin).await?;
},
AllMessages::AvailabilityRecovery(msg) => {
self.subsystems.availability_recovery.send_message(msg).await?;
self.subsystems.availability_recovery.send_message(msg, origin).await?;
},
AllMessages::BitfieldDistribution(msg) => {
self.subsystems.bitfield_distribution.send_message(msg).await?;
self.subsystems.bitfield_distribution.send_message(msg, origin).await?;
},
AllMessages::BitfieldSigning(msg) => {
self.subsystems.bitfield_signing.send_message(msg).await?;
self.subsystems.bitfield_signing.send_message(msg, origin).await?;
},
AllMessages::Provisioner(msg) => {
self.subsystems.provisioner.send_message(msg).await?;
self.subsystems.provisioner.send_message(msg, origin).await?;
},
AllMessages::RuntimeApi(msg) => {
self.subsystems.runtime_api.send_message(msg).await?;
self.subsystems.runtime_api.send_message(msg, origin).await?;
},
AllMessages::AvailabilityStore(msg) => {
self.subsystems.availability_store.send_message(msg).await?;
self.subsystems.availability_store.send_message(msg, origin).await?;
},
AllMessages::NetworkBridge(msg) => {
self.subsystems.network_bridge.send_message(msg).await?;
self.subsystems.network_bridge.send_message(msg, origin).await?;
},
AllMessages::ChainApi(msg) => {
self.subsystems.chain_api.send_message(msg).await?;
self.subsystems.chain_api.send_message(msg, origin).await?;
},
AllMessages::CollationGeneration(msg) => {
self.subsystems.collation_generation.send_message(msg).await?;
self.subsystems.collation_generation.send_message(msg, origin).await?;
},
AllMessages::CollatorProtocol(msg) => {
self.subsystems.collator_protocol.send_message(msg).await?;
self.subsystems.collator_protocol.send_message(msg, origin).await?;
},
AllMessages::ApprovalDistribution(msg) => {
self.subsystems.approval_distribution.send_message(msg).await?;
self.subsystems.approval_distribution.send_message(msg, origin).await?;
},
AllMessages::ApprovalVoting(msg) => {
self.subsystems.approval_voting.send_message(msg).await?;
self.subsystems.approval_voting.send_message(msg, origin).await?;
},
AllMessages::GossipSupport(msg) => {
self.subsystems.gossip_support.send_message(msg).await?;
self.subsystems.gossip_support.send_message(msg, origin).await?;
},
AllMessages::DisputeCoordinator(_) => {}
AllMessages::DisputeParticipation(_) => {}
Expand Down
34 changes: 17 additions & 17 deletions node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ fn overseer_metrics_work() {

handler.block_imported(second_block).await;
handler.block_imported(third_block).await;
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.stop().await;

select! {
Expand Down Expand Up @@ -984,22 +984,22 @@ fn overseer_all_subsystems_receive_signals_and_messages() {

// send a msg to each subsystem
// except for BitfieldSigning and GossipSupport as the messages are not instantiable
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
handler.send_msg(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
// handler.send_msg(AllMessages::GossipSupport(test_bitfield_signing_msg())).await;
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await;
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await;
handler.send_msg(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
handler.send_msg(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;
handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.send_msg_anon(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
handler.send_msg_anon(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
handler.send_msg_anon(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
handler.send_msg_anon(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
handler.send_msg_anon(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
// handler.send_msg_anon(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
// handler.send_msg_anon(AllMessages::GossipSupport(test_bitfield_signing_msg())).await;
handler.send_msg_anon(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
handler.send_msg_anon(AllMessages::Provisioner(test_provisioner_msg())).await;
handler.send_msg_anon(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
handler.send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
handler.send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
handler.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await;
handler.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
handler.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;

// Wait until all subsystems have received. Otherwise the messages might race against
// the conclude signal.
Expand Down
13 changes: 8 additions & 5 deletions node/service/src/grandpa_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,14 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingVotingRule
Box::pin(async move {
let (tx, rx) = oneshot::channel();
let approval_checking_subsystem_vote = {
overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor(
best_hash,
base_number,
tx,
)).await;
overseer.send_msg(
ApprovalVotingMessage::ApprovedAncestor(
best_hash,
base_number,
tx,
),
std::any::type_name::<Self>(),
).await;

rx.await.ok().and_then(|v| v)
};
Expand Down
23 changes: 16 additions & 7 deletions node/service/src/relay_chain_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>

self.overseer
.clone()
.send_msg(ChainSelectionMessage::Leaves(tx)).await;
.send_msg(
ChainSelectionMessage::Leaves(tx),
std::any::type_name::<Self>(),
).await;

rx.await
.map_err(Error::OverseerDisconnected)
Expand Down Expand Up @@ -264,7 +267,10 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>

let subchain_head = {
let (tx, rx) = oneshot::channel();
overseer.send_msg(ChainSelectionMessage::BestLeafContaining(target_hash, tx)).await;
overseer.send_msg(
ChainSelectionMessage::BestLeafContaining(target_hash, tx),
std::any::type_name::<Self>(),
).await;

let best = rx.await
.map_err(Error::OverseerDisconnected)
Expand Down Expand Up @@ -318,11 +324,14 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>
let (subchain_head, subchain_number) = {

let (tx, rx) = oneshot::channel();
overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor(
subchain_head,
target_number,
tx,
)).await;
overseer.send_msg(
ApprovalVotingMessage::ApprovedAncestor(
subchain_head,
target_number,
tx,
),
std::any::type_name::<Self>(),
).await;

match rx.await
.map_err(Error::OverseerDisconnected)
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem-test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ mod tests {

spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());

block_on(handler.send_msg(CollatorProtocolMessage::CollateOn(Default::default())));
block_on(handler.send_msg_anon(CollatorProtocolMessage::CollateOn(Default::default())));
assert!(matches!(block_on(rx.into_future()).0.unwrap(), CollatorProtocolMessage::CollateOn(_)));
}
}
4 changes: 2 additions & 2 deletions node/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,11 @@ impl PolkadotTestNode {
};

self.overseer_handler
.send_msg(CollationGenerationMessage::Initialize(config))
.send_msg(CollationGenerationMessage::Initialize(config), "Collator")
.await;

self.overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(para_id))
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator")
.await;
}
}
Expand Down
4 changes: 2 additions & 2 deletions parachain/test-parachains/adder/collator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ fn main() -> Result<()> {
para_id,
};
overseer_handler
.send_msg(CollationGenerationMessage::Initialize(config))
.send_msg(CollationGenerationMessage::Initialize(config), "Collator")
.await;

overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(para_id))
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator")
.await;

Ok(full_node.task_manager)
Expand Down