Skip to content

Commit

Permalink
Small cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Oct 16, 2024
1 parent 7779510 commit 1e36445
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 62 deletions.
117 changes: 64 additions & 53 deletions code/crates/actors/src/block_sync.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::time::Duration;

use async_trait::async_trait;
use bytes::Bytes;
use derive_where::derive_where;
use libp2p::request_response::InboundRequestId;
use malachite_blocksync::{Request, SyncedBlock};
use ractor::{Actor, ActorProcessingErr, ActorRef};
use tokio::task::JoinHandle;
use tracing::{debug, info};
use tracing::{debug, error_span, info};

use malachite_blocksync::{Request, SyncedBlock};
use malachite_common::{Certificate, Context, Proposal};
use malachite_gossip_consensus::PeerId;

Expand All @@ -30,34 +29,38 @@ pub struct RawDecidedBlock<Ctx: Context> {

#[derive_where(Clone, Debug)]
pub enum Msg<Ctx: Context> {
/// Internal tick
Tick,

/// Receive an even from gossip layer
GossipEvent(GossipEvent<Ctx>),

// Consensus has decided on a value
/// Consensus has decided on a value
Decided { height: Ctx::Height },

// Consensus has started a new height
/// Consensus has started a new height
StartHeight { height: Ctx::Height },

// Host has a response for the block request
/// Host has a response for the block request
DecidedBlock(InboundRequestId, Option<SyncedBlock<Ctx>>),
}

// TODO: Move to blocksync crate
#[derive_where(Clone, Debug, Default)]
struct BlockSyncState<Ctx>
where
Ctx: Context,
{
// Height of last decided block
/// Height of last decided block
tip_height: Ctx::Height,

// Height currently syncing.
/// Height currently syncing.
sync_height: Ctx::Height,

// Requests for these heights have been sent out to peers.
/// Requests for these heights have been sent out to peers.
pending_requests: BTreeMap<Ctx::Height, PeerId>,

// The set of peers we are connected to in order to get blocks and certificates.
/// The set of peers we are connected to in order to get blocks and certificates.
peers: BTreeMap<PeerId, Ctx::Height>,
}

Expand All @@ -68,14 +71,18 @@ where
pub fn store_peer_height(&mut self, peer: PeerId, height: Ctx::Height) {
self.peers.insert(peer, height);
}

pub fn store_pending_request(&mut self, height: Ctx::Height, peer: PeerId) {
self.pending_requests.insert(height, peer);
}

pub fn remove_pending_request(&mut self, height: Ctx::Height) {
self.pending_requests.remove(&height);
}
}

pub const DEFAULT_STATUS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);

#[derive(Debug)]
pub struct Args {
pub status_update_interval: Duration,
Expand All @@ -84,7 +91,7 @@ pub struct Args {
impl Default for Args {
fn default() -> Self {
Self {
status_update_interval: Duration::from_secs(10),
status_update_interval: DEFAULT_STATUS_UPDATE_INTERVAL,
}
}
}
Expand All @@ -94,26 +101,21 @@ pub struct State<Ctx: Context> {
/// The state of the blocksync state machine
blocksync: BlockSyncState<Ctx>,
ticker: JoinHandle<()>,
marker: PhantomData<Ctx>,
}

#[allow(dead_code)]
pub struct BlockSync<Ctx: Context> {
ctx: Ctx,
gossip_consensus: GossipConsensusRef<Ctx>,
gossip: GossipConsensusRef<Ctx>,
host: HostRef<Ctx>,
}

impl<Ctx> BlockSync<Ctx>
where
Ctx: Context,
{
pub fn new(ctx: Ctx, gossip_consensus: GossipConsensusRef<Ctx>, host: HostRef<Ctx>) -> Self {
Self {
ctx,
gossip_consensus,
host,
}
pub fn new(ctx: Ctx, gossip: GossipConsensusRef<Ctx>, host: HostRef<Ctx>) -> Self {
Self { ctx, gossip, host }
}

pub async fn spawn(self) -> Result<(BlockSyncRef<Ctx>, JoinHandle<()>), ractor::SpawnErr> {
Expand All @@ -137,8 +139,7 @@ where
) -> Result<Self::State, ActorProcessingErr> {
let forward = forward(myself.clone(), Some(myself.get_cell()), Msg::GossipEvent).await?;

self.gossip_consensus
.cast(GossipConsensusMsg::Subscribe(forward))?;
self.gossip.cast(GossipConsensusMsg::Subscribe(forward))?;

let ticker = tokio::spawn(async move {
loop {
Expand All @@ -153,7 +154,6 @@ where
Ok(State {
blocksync: BlockSyncState::default(),
ticker,
marker: PhantomData,
})
}

Expand All @@ -170,32 +170,46 @@ where
msg: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
#[allow(clippy::single_match)]
match msg {
Msg::GossipEvent(GossipEvent::Status(peer, ref status)) => {
Msg::Tick => {
let status = Status {
height: state.blocksync.tip_height,
};

self.gossip
.cast(GossipConsensusMsg::PublishStatus(status))?;
}

Msg::GossipEvent(GossipEvent::Status(peer, status)) => {
let peer_height = status.height;
debug!("Received Status {:?} from peer {peer}", status);
state.blocksync.store_peer_height(peer, status.height);
if status.height > state.blocksync.tip_height {
info!(
"SYNC REQUIRED: falling behind {peer} at {}, my height {}",
status.height, state.blocksync.tip_height
);
let height = state.blocksync.sync_height;
let sync_height = state.blocksync.sync_height;
let tip_height = state.blocksync.tip_height;

let _span =
error_span!("status", %peer, %peer_height, %sync_height, %tip_height).entered();

debug!("Received peer status");

state.blocksync.store_peer_height(peer, peer_height);

if peer_height > tip_height {
info!("SYNC REQUIRED: Falling behind {peer} at {peer_height}");

// If there are no pending requests then ask for block from peer
if !state.blocksync.pending_requests.contains_key(&height) {
debug!(
"Requesting block {height} from {peer:?} that is at {peer_height:?}"
);
self.gossip_consensus
.cast(OutgoingBlockSyncRequest(peer, Request { height }))?;
state.blocksync.store_pending_request(height, peer);
if !state.blocksync.pending_requests.contains_key(&sync_height) {
debug!("Requesting block {sync_height} from {peer} at {peer_height}");

self.gossip
.cast(OutgoingBlockSyncRequest(peer, Request::new(sync_height)))?;

state.blocksync.store_pending_request(sync_height, peer);
}
}
}

Msg::GossipEvent(GossipEvent::BlockSyncRequest(request_id, request)) => {
debug!("Received request for block height {}", request.height);

// Retrieve the block for request.height
self.host.call_and_forward(
|reply| HostMsg::DecidedBlock {
Expand All @@ -212,42 +226,39 @@ where

Msg::Decided { height, .. } => {
debug!("Decided height {height}");

state.blocksync.tip_height = height;
state.blocksync.remove_pending_request(height);
}

Msg::StartHeight { height } => {
state.blocksync.sync_height = height;
debug!("Starting new height {height}");
for (peer, peer_height) in state.blocksync.peers.iter() {
if *peer_height > height {

state.blocksync.sync_height = height;

for (peer, &peer_height) in &state.blocksync.peers {
if peer_height > height {
debug!(
"Starting new height {height}, requesting the block from {peer:?} that is at {peer_height:?}"
);
self.gossip_consensus

self.gossip
.cast(OutgoingBlockSyncRequest(*peer, Request { height }))?;

state.blocksync.store_pending_request(height, *peer);

break;
}
}
}

Msg::Tick => {
let status = Status {
height: state.blocksync.tip_height,
};

self.gossip_consensus
.cast(GossipConsensusMsg::PublishStatus(status))?;
}

Msg::DecidedBlock(request_id, Some(decided_block)) => {
debug!(
"Received decided block for {}",
decided_block.proposal.height()
);
self.gossip_consensus

self.gossip
.cast(GossipConsensusMsg::OutgoingBlockSyncResponse(
request_id,
decided_block,
Expand Down
18 changes: 9 additions & 9 deletions code/crates/actors/src/gossip_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use libp2p::identity::Keypair;
use libp2p::request_response::{InboundRequestId, OutboundRequestId};
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
use tokio::task::JoinHandle;
use tracing::{debug, error, error_span, Instrument};
use tracing::{error, error_span, trace, Instrument};

use crate::gossip_consensus::GossipEvent::{BlockSyncRequest, BlockSyncResponse};
use malachite_blocksync as blocksync;
Expand Down Expand Up @@ -209,7 +209,7 @@ where
},

Msg::PublishProposalPart(msg) => {
debug!(
trace!(
stream_id = %msg.stream_id,
sequence = %msg.sequence,
"Broadcasting proposal part"
Expand Down Expand Up @@ -294,12 +294,12 @@ where
}
};

// debug!(
// %from,
// stream_id = %msg.stream_id,
// sequence = %msg.sequence,
// "Received proposal part"
// );
trace!(
%from,
stream_id = %msg.stream_id,
sequence = %msg.sequence,
"Received proposal part"
);

self.publish(GossipEvent::ProposalPart(from, msg), subscribers);
}
Expand All @@ -318,7 +318,7 @@ where
return Ok(());
}

debug!(%from, height = %status.height, "Received status");
trace!(%from, height = %status.height, "Received status");

self.publish(
GossipEvent::Status(status.peer_id, Status::new(status.height)),
Expand Down
6 changes: 6 additions & 0 deletions code/crates/blocksync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ pub struct Request<Ctx: Context> {
pub height: Ctx::Height,
}

impl<Ctx: Context> Request<Ctx> {
pub fn new(height: Ctx::Height) -> Self {
Self { height }
}
}

#[derive_where(Clone, Debug, PartialEq, Eq)]
pub struct SyncedBlock<Ctx: Context> {
pub proposal: SignedProposal<Ctx>,
Expand Down

0 comments on commit 1e36445

Please sign in to comment.