Skip to content

Commit

Permalink
fix(code/wal): Fix WAL tests (#653)
Browse files Browse the repository at this point in the history
* Append to the log (via on_.. handlers) before flushing (via Effect:Broadcast)

* Emit more `WalReplay` events

* Fix typo

* Wait one second for the actor to shutdown when killed

* Better logs on error in WAL

* Ensure node starts again even if peer dont resubscribe to GossipSub topic right away

* Link Mempool actor to Host actor

---------

Co-authored-by: Anca Zamfir <zamfiranca@gmail.com>
  • Loading branch information
romac and ancazamfir authored Dec 9, 2024
1 parent 3c1f3dd commit 04ac0a8
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 76 deletions.
8 changes: 8 additions & 0 deletions code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ where
for entry in entries {
match entry {
WalEntry::ConsensusMsg(Vote(vote)) => {
self.tx_event
.send(|| Event::WalReplayConsensus(Vote(vote.clone())));

if let Err(e) = self
.process_input(myself, state, ConsensusInput::Vote(vote))
.await
Expand All @@ -549,6 +552,9 @@ where
}

WalEntry::ConsensusMsg(Proposal(proposal)) => {
self.tx_event
.send(|| Event::WalReplayConsensus(Proposal(proposal.clone())));

if let Err(e) = self
.process_input(myself, state, ConsensusInput::Proposal(proposal))
.await
Expand All @@ -558,6 +564,8 @@ where
}

WalEntry::Timeout(timeout) => {
self.tx_event.send(|| Event::WalReplayTimeout(timeout));

if let Err(e) = self.timeout_elapsed(myself, state, timeout).await {
error!("Error when replaying TimeoutElapsed: {e}");
}
Expand Down
6 changes: 5 additions & 1 deletion code/crates/actors/src/util/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::fmt;
use derive_where::derive_where;
use tokio::sync::broadcast;

use malachite_common::{CommitCertificate, Context, Round, ValueOrigin};
use malachite_common::{CommitCertificate, Context, Round, Timeout, ValueOrigin};
use malachite_consensus::{ProposedValue, SignedConsensusMsg, ValueToPropose};

pub type RxEvent<Ctx> = broadcast::Receiver<Event<Ctx>>;
Expand Down Expand Up @@ -44,6 +44,8 @@ pub enum Event<Ctx: Context> {
ReceivedProposedValue(ProposedValue<Ctx>, ValueOrigin),
Decided(CommitCertificate<Ctx>),
WalReplayBegin(Ctx::Height, usize),
WalReplayConsensus(SignedConsensusMsg<Ctx>),
WalReplayTimeout(Timeout),
WalReplayDone(Ctx::Height),
}

Expand All @@ -61,6 +63,8 @@ impl<Ctx: Context> fmt::Display for Event<Ctx> {
Event::WalReplayBegin(height, count) => {
write!(f, "WalReplayBegin({height}, {count})")
}
Event::WalReplayConsensus(msg) => write!(f, "WalReplayConsensus({msg:?})"),
Event::WalReplayTimeout(timeout) => write!(f, "WalReplayTimeout({timeout:?})"),
Event::WalReplayDone(height) => write!(f, "WalReplayDone({height})"),
}
}
Expand Down
20 changes: 14 additions & 6 deletions code/crates/actors/src/wal/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ where
thread::spawn(move || loop {
if let Err(e) = task(moniker.clone(), &mut wal, &codec, &mut rx) {
// Task failed, log the error and continue
error!("WAL error: {e}");
error!("WAL task failed: {e}");
error!("Restarting WAL task");

continue;
}
Expand All @@ -57,7 +58,7 @@ where
while let Some(msg) = rx.blocking_recv() {
match msg {
WalMsg::StartedHeight(height, reply) => {
// FIXME: Ensure this works event with fork_id
// FIXME: Ensure this works even with fork_id
let sequence = height.as_u64();

if sequence == log.sequence() {
Expand Down Expand Up @@ -89,20 +90,27 @@ where

if let Err(e) = &result {
error!("ATTENTION: Failed to append entry to WAL: {e}");
} else {
debug!("Wrote log entry: type = {tpe}, log size = {}", log.len());
}

if reply.send(result).is_err() {
error!("ATTENTION: Failed to send WAL append reply");
}

debug!("Wrote log entry: type = {tpe}, log size = {}", log.len());
}

WalMsg::Flush(reply) => {
let result = log.flush().map_err(Into::into);
reply.send(result).unwrap(); // FIXME

debug!("Flushed WAL to disk");
if let Err(e) = &result {
error!("ATTENTION: Failed to flush WAL to disk: {e}");
} else {
debug!("Flushed WAL to disk");
}

if reply.send(result).is_err() {
error!("ATTENTION: Failed to send WAL flush reply");
}
}

WalMsg::Shutdown => {
Expand Down
31 changes: 15 additions & 16 deletions code/crates/consensus/src/handle/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,6 @@ where

let signed_proposal = state.ctx.sign_proposal(proposal.clone());

// Proposal messages should not be broadcasted if they are implicit,
// instead they should be inferred from the block parts.
if state.params.value_payload.include_proposal() {
perform!(
co,
Effect::Broadcast(SignedConsensusMsg::Proposal(signed_proposal.clone()))
);
}

if signed_proposal.pol_round().is_defined() {
perform!(
co,
Expand All @@ -174,7 +165,18 @@ where
);
}

on_proposal(co, state, metrics, signed_proposal).await
on_proposal(co, state, metrics, signed_proposal.clone()).await?;

// Proposal messages should not be broadcasted if they are implicit,
// instead they should be inferred from the block parts.
if state.params.value_payload.include_proposal() {
perform!(
co,
Effect::Broadcast(SignedConsensusMsg::Proposal(signed_proposal))
);
};

Ok(())
}

DriverOutput::Vote(vote) => {
Expand All @@ -187,13 +189,10 @@ where

let extended_vote = extend_vote(vote, state);
let signed_vote = state.ctx.sign_vote(extended_vote);
on_vote(co, state, metrics, signed_vote.clone()).await?;

perform!(
co,
Effect::Broadcast(SignedConsensusMsg::Vote(signed_vote.clone()))
);

on_vote(co, state, metrics, signed_vote).await
perform!(co, Effect::Broadcast(SignedConsensusMsg::Vote(signed_vote)));
Ok(())
}

DriverOutput::Decide(consensus_round, proposal) => {
Expand Down
4 changes: 2 additions & 2 deletions code/crates/discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Discovery {
}

pub fn can_dial(&self) -> bool {
self.handler.can_dial()
self.is_enabled() && self.handler.can_dial()
}

fn should_dial(
Expand Down Expand Up @@ -298,7 +298,7 @@ impl Discovery {
}

pub fn can_request(&self) -> bool {
self.handler.can_request()
self.is_enabled() && self.handler.can_request()
}

fn should_request(
Expand Down
93 changes: 47 additions & 46 deletions code/crates/gossip-consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::time::Duration;
use futures::StreamExt;
use libp2p::metrics::{Metrics, Recorder};
use libp2p::request_response::InboundRequestId;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{self, SwarmEvent};
use libp2p::{gossipsub, identify, quic, SwarmBuilder};
use libp2p_broadcast as broadcast;
Expand Down Expand Up @@ -221,12 +222,23 @@ async fn run(
if let Err(e) = swarm.listen_on(config.listen_addr.clone()) {
error!("Error listening on {}: {e}", config.listen_addr);
return;
};
}

for persistent_peer in &config.persistent_peers {
state
.discovery
.add_to_dial_queue(&swarm, ConnectionData::new(None, persistent_peer.clone()));
for peer in &config.persistent_peers {
if config.discovery.enabled {
state
.discovery
.add_to_dial_queue(&swarm, ConnectionData::new(None, peer.clone()));
} else {
let opts = DialOpts::unknown_peer_id()
.address(peer.clone())
.allocate_new_port()
.build();

if let Err(e) = swarm.dial(opts) {
error!("Error dialing persistent peer {peer}: {e}");
}
}
}

if let Err(e) = pubsub::subscribe(&mut swarm, config.pubsub_protocol, Channel::consensus()) {
Expand All @@ -243,7 +255,7 @@ async fn run(
loop {
let result = tokio::select! {
event = swarm.select_next_some() => {
handle_swarm_event(event, &metrics, &mut swarm, &mut state, &tx_event).await
handle_swarm_event(event, &config, &metrics, &mut swarm, &mut state, &tx_event).await
}

Some(connection_data) = state.discovery.rx_dial.recv(), if state.discovery.can_dial() => {
Expand Down Expand Up @@ -334,6 +346,7 @@ async fn handle_ctrl_msg(

async fn handle_swarm_event(
event: SwarmEvent<NetworkEvent>,
config: &Config,
metrics: &Metrics,
swarm: &mut swarm::Swarm<Behaviour>,
state: &mut State,
Expand Down Expand Up @@ -361,9 +374,11 @@ async fn handle_swarm_event(
endpoint,
..
} => {
state
.discovery
.handle_connection(peer_id, connection_id, endpoint);
if config.discovery.enabled {
state
.discovery
.handle_connection(peer_id, connection_id, endpoint);
}
}

SwarmEvent::OutgoingConnectionError {
Expand All @@ -372,7 +387,10 @@ async fn handle_swarm_event(
..
} => {
error!("Error dialing peer: {error}");
state.discovery.handle_failed_connection(connection_id);

if config.discovery.enabled {
state.discovery.handle_failed_connection(connection_id);
}
}

SwarmEvent::ConnectionClosed {
Expand All @@ -386,8 +404,17 @@ async fn handle_swarm_event(
} else {
error!("Connection closed with {peer_id}, reason: unknown");
}
if config.discovery.enabled {
state.discovery.remove_peer(peer_id, connection_id);
}

state.discovery.remove_peer(peer_id, connection_id);
if let Err(e) = tx_event
.send(Event::PeerDisconnected(PeerId::from_libp2p(&peer_id)))
.await
{
error!("Error sending peer disconnected event to handle: {e}");
return ControlFlow::Break(());
}
}

SwarmEvent::Behaviour(NetworkEvent::Identify(identify::Event::Sent {
Expand All @@ -412,9 +439,15 @@ async fn handle_swarm_event(
info.protocol_version
);

state
.discovery
.handle_new_peer(connection_id, peer_id, info)
if config.discovery.enabled {
state
.discovery
.handle_new_peer(connection_id, peer_id, info)
}

let _ = tx_event
.send(Event::PeerConnected(PeerId::from_libp2p(&peer_id)))
.await;
} else {
trace!(
"Peer {peer_id} is using incompatible protocol version: {:?}",
Expand Down Expand Up @@ -476,14 +509,6 @@ async fn handle_gossipsub_event(
}

trace!("Peer {peer_id} subscribed to {topic}");

if let Err(e) = tx_event
.send(Event::PeerConnected(PeerId::from_libp2p(&peer_id)))
.await
{
error!("Error sending peer connected event to handle: {e}");
return ControlFlow::Break(());
}
}

gossipsub::Event::Unsubscribed { peer_id, topic } => {
Expand All @@ -493,14 +518,6 @@ async fn handle_gossipsub_event(
}

trace!("Peer {peer_id} unsubscribed from {topic}");

if let Err(e) = tx_event
.send(Event::PeerDisconnected(PeerId::from_libp2p(&peer_id)))
.await
{
error!("Error sending peer disconnected event to handle: {e}");
return ControlFlow::Break(());
}
}

gossipsub::Event::Message {
Expand Down Expand Up @@ -560,14 +577,6 @@ async fn handle_broadcast_event(
}

trace!("Peer {peer_id} subscribed to {topic:?}");

if let Err(e) = tx_event
.send(Event::PeerConnected(PeerId::from_libp2p(&peer_id)))
.await
{
error!("Error sending peer connected event to handle: {e}");
return ControlFlow::Break(());
}
}

broadcast::Event::Unsubscribed(peer_id, topic) => {
Expand All @@ -577,14 +586,6 @@ async fn handle_broadcast_event(
}

trace!("Peer {peer_id} unsubscribed from {topic:?}");

if let Err(e) = tx_event
.send(Event::PeerDisconnected(PeerId::from_libp2p(&peer_id)))
.await
{
error!("Error sending peer disconnected event to handle: {e}");
return ControlFlow::Break(());
}
}

broadcast::Event::Received(peer_id, topic, message) => {
Expand Down
Loading

0 comments on commit 04ac0a8

Please sign in to comment.