Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(code): Extract logic from BlockSync actor and into blocksync crate #487

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

177 changes: 92 additions & 85 deletions code/crates/actors/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ use derive_where::derive_where;
use libp2p::request_response::InboundRequestId;
use ractor::{Actor, ActorProcessingErr, ActorRef};
use tokio::task::JoinHandle;
use tracing::{debug, error_span, info};

use malachite_blocksync as blocksync;
use malachite_blocksync::{Request, SyncedBlock};
use malachite_common::{Certificate, Context, Proposal};
use malachite_common::{Certificate, Context};

use crate::gossip_consensus::Msg::OutgoingBlockSyncRequest;
use crate::gossip_consensus::{GossipConsensusMsg, GossipConsensusRef, GossipEvent, Status};
use crate::host::{HostMsg, HostRef};
use crate::util::forward::forward;
use crate::util::ticker::ticker;

pub type BlockSyncRef<Ctx> = ActorRef<Msg<Ctx>>;

Expand All @@ -41,7 +41,7 @@ pub enum Msg<Ctx: Context> {
StartHeight(Ctx::Height),

/// Host has a response for the blocks request
DecidedBlock(InboundRequestId, Option<SyncedBlock<Ctx>>),
GotDecidedBlock(Ctx::Height, InboundRequestId, Option<SyncedBlock<Ctx>>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -70,19 +70,78 @@ pub struct BlockSync<Ctx: Context> {
ctx: Ctx,
gossip: GossipConsensusRef<Ctx>,
host: HostRef<Ctx>,
metrics: blocksync::Metrics,
}

impl<Ctx> BlockSync<Ctx>
where
Ctx: Context,
{
pub fn new(ctx: Ctx, gossip: GossipConsensusRef<Ctx>, host: HostRef<Ctx>) -> Self {
Self { ctx, gossip, host }
Self {
ctx,
gossip,
host,
metrics: blocksync::Metrics::default(),
}
}

pub async fn spawn(self) -> Result<(BlockSyncRef<Ctx>, JoinHandle<()>), ractor::SpawnErr> {
Actor::spawn(None, self, Args::default()).await
}

async fn process_input(
&self,
myself: &ActorRef<Msg<Ctx>>,
state: &mut State<Ctx>,
input: blocksync::Input<Ctx>,
) -> Result<(), ActorProcessingErr> {
malachite_blocksync::process!(
input: input,
state: &mut state.blocksync,
metrics: &self.metrics,
with: effect => {
self.handle_effect(myself, effect).await
}
)
}

async fn handle_effect(
&self,
myself: &ActorRef<Msg<Ctx>>,
effect: blocksync::Effect<Ctx>,
) -> Result<blocksync::Resume<Ctx>, ActorProcessingErr> {
use blocksync::Effect;
match effect {
Effect::PublishStatus(height) => {
self.gossip
.cast(GossipConsensusMsg::PublishStatus(Status::new(height)))?;
}

Effect::SendRequest(peer_id, request) => {
self.gossip
.cast(OutgoingBlockSyncRequest(peer_id, request))?;
}

Effect::SendResponse(request_id, response) => {
self.gossip
.cast(GossipConsensusMsg::OutgoingBlockSyncResponse(
request_id, response,
))?;
}

Effect::GetBlock(request_id, height) => {
self.host.call_and_forward(
|reply_to| HostMsg::GetDecidedBlock { height, reply_to },
myself,
move |block| Msg::<Ctx>::GotDecidedBlock(height, request_id, block),
None,
)?;
}
}

Ok(blocksync::Resume::default())
}
}

#[async_trait]
Expand All @@ -100,18 +159,11 @@ where
args: Args,
) -> Result<Self::State, ActorProcessingErr> {
let forward = forward(myself.clone(), Some(myself.get_cell()), Msg::GossipEvent).await?;

self.gossip.cast(GossipConsensusMsg::Subscribe(forward))?;

let ticker = tokio::spawn(async move {
loop {
tokio::time::sleep(args.status_update_interval).await;

if let Err(e) = myself.cast(Msg::Tick) {
tracing::error!(?e, "Failed to send tick message");
}
}
});
let ticker = tokio::spawn(ticker(args.status_update_interval, myself.clone(), || {
Msg::Tick
}));

Ok(State {
blocksync: blocksync::State::default(),
Expand All @@ -120,7 +172,6 @@ where
}

// TODO:
// - move to blocksync crate
// - proper FSM
// - timeout requests
// - multiple requests for next few heights
Expand All @@ -134,94 +185,50 @@ where
) -> Result<(), ActorProcessingErr> {
match msg {
Msg::Tick => {
let status = Status {
height: state.blocksync.tip_height,
};

self.gossip
.cast(GossipConsensusMsg::PublishStatus(status))?;
self.process_input(&myself, state, blocksync::Input::Tick)
.await?;
}

Msg::GossipEvent(GossipEvent::Status(peer, status)) => {
let peer_height = status.height;
let sync_height = state.blocksync.sync_height;
let tip_height = state.blocksync.tip_height;

let _span = error_span!("status", %sync_height, %tip_height).entered();

debug!(%peer_height, %peer, "Received peer status");

state.blocksync.store_peer_height(peer, peer_height);

if peer_height > tip_height {
info!(%peer_height, %peer, "SYNC REQUIRED: Falling behind");

// If there are no pending requests for the base height yet then ask for a batch of blocks from peer
if !state.blocksync.pending_requests.contains_key(&sync_height) {
debug!(%sync_height, %peer, "Requesting block from peer");

self.gossip
.cast(OutgoingBlockSyncRequest(peer, Request::new(sync_height)))?;
Msg::GossipEvent(GossipEvent::Status(peer_id, status)) => {
let status = blocksync::Status {
peer_id,
height: status.height,
};

state.blocksync.store_pending_request(sync_height, peer);
}
}
self.process_input(&myself, state, blocksync::Input::Status(status))
.await?;
}

Msg::GossipEvent(GossipEvent::BlockSyncRequest(
request_id,
from,
blocksync::Request { height },
)) => {
debug!(%height, "Received request for block");

// Retrieve the blocks for the requested heights
self.host.call_and_forward(
|reply_to| HostMsg::GetDecidedBlock { height, reply_to },
self.process_input(
&myself,
move |block| Msg::<Ctx>::DecidedBlock(request_id, block),
None,
)?;
state,
blocksync::Input::Request(request_id, from, Request::new(height)),
)
.await?;
}

Msg::Decided(height) => {
debug!(%height, "Decided height");

state.blocksync.tip_height = height;
state.blocksync.remove_pending_request(height);
self.process_input(&myself, state, blocksync::Input::Decided(height))
.await?;
}

Msg::StartHeight(height) => {
debug!(%height, "Starting new height");

state.blocksync.sync_height = height;

for (peer, &peer_height) in &state.blocksync.peers {
if peer_height > height {
debug!(%height, %peer_height, %peer, "Starting new height, requesting block");

self.gossip
.cast(OutgoingBlockSyncRequest(*peer, Request::new(height)))?;

state.blocksync.store_pending_request(height, *peer);

break;
}
}
self.process_input(&myself, state, blocksync::Input::StartHeight(height))
.await?;
}

Msg::DecidedBlock(request_id, decided_block) => {
match &decided_block {
None => debug!("Received empty response"),
Some(block) => {
debug!(height = %block.proposal.height(), "Received decided block")
}
}

self.gossip
.cast(GossipConsensusMsg::OutgoingBlockSyncResponse(
request_id,
blocksync::Response::new(decided_block),
))?;
Msg::GotDecidedBlock(height, request_id, block) => {
self.process_input(
&myself,
state,
blocksync::Input::GotBlock(request_id, height, block),
)
.await?;
}

_ => {}
Expand Down
2 changes: 1 addition & 1 deletion code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ where
}
}

#[tracing::instrument(skip(self, myself))]
#[tracing::instrument(skip_all, fields(height = %height, round = %round))]
fn get_value(
&self,
myself: &ActorRef<Msg<Ctx>>,
Expand Down
8 changes: 5 additions & 3 deletions code/crates/actors/src/gossip_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ pub enum GossipEvent<Ctx: Context> {
Proposal(PeerId, SignedProposal<Ctx>),
ProposalPart(PeerId, StreamMessage<Ctx::ProposalPart>),
Status(PeerId, Status<Ctx>),
BlockSyncRequest(InboundRequestId, Request<Ctx>), // received a block request
BlockSyncResponse(OutboundRequestId, Response<Ctx>), // received a block response
BlockSyncRequest(InboundRequestId, PeerId, Request<Ctx>), // received a block request
BlockSyncResponse(OutboundRequestId, Response<Ctx>), // received a block response
}

pub enum State<Ctx: Context> {
Expand Down Expand Up @@ -339,8 +339,9 @@ where
return Ok(());
}
};

self.publish(
GossipEvent::BlockSyncRequest(request_id, request),
GossipEvent::BlockSyncRequest(request_id, peer, request),
subscribers,
);
}
Expand All @@ -353,6 +354,7 @@ where
return Ok(());
}
};

self.publish(
GossipEvent::BlockSyncResponse(request_id, response),
subscribers,
Expand Down
1 change: 1 addition & 0 deletions code/crates/actors/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod codec;
pub mod forward;
pub mod streaming;
pub mod ticker;
pub mod timers;
17 changes: 17 additions & 0 deletions code/crates/actors/src/util/ticker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::time::Duration;

use ractor::message::Message;
use ractor::ActorRef;

pub async fn ticker<Msg>(interval: Duration, target: ActorRef<Msg>, msg: impl Fn() -> Msg)
where
Msg: Message,
{
loop {
tokio::time::sleep(interval).await;

if let Err(e) = target.cast(msg()) {
tracing::error!(?e, ?target, "Failed to send tick message");
}
}
}
2 changes: 2 additions & 0 deletions code/crates/blocksync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ malachite-common = { workspace = true }
bytes = { workspace = true, features = ["serde"] }
derive-where = { workspace = true }
displaydoc = { workspace = true }
genawaiter = { workspace = true }
libp2p = { workspace = true, features = ["request-response", "cbor"] }
tracing = { workspace = true }
serde = { workspace = true }

[lints]
Expand Down
8 changes: 8 additions & 0 deletions code/crates/blocksync/src/co.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use genawaiter::sync as gen;
use genawaiter::GeneratorState;

use crate::{Effect, Error, Resume};

pub type Gen<Ctx, F> = gen::Gen<Effect<Ctx>, Resume<Ctx>, F>;
pub type Co<Ctx> = gen::Co<Effect<Ctx>, Resume<Ctx>>;
pub type CoState<Ctx> = GeneratorState<Effect<Ctx>, Result<(), Error<Ctx>>>;
Loading
Loading