Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(code/blocksync): Randomize choice of peer to sync from #492

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions code/crates/actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ derive-where = { workspace = true }
eyre = { workspace = true }
libp2p = { workspace = true }
ractor = { workspace = true, features = ["async-trait"] }
rand = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
21 changes: 15 additions & 6 deletions code/crates/actors/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use derive_where::derive_where;
use libp2p::request_response::InboundRequestId;
use libp2p::PeerId;
use ractor::{Actor, ActorProcessingErr, ActorRef};
use rand::SeedableRng;
use tokio::task::JoinHandle;

use malachite_blocksync::{self as blocksync, OutboundRequestId};
Expand Down Expand Up @@ -87,7 +88,10 @@ impl Default for Params {
}
}

#[derive_where(Debug)]
pub struct Args<Ctx: Context> {
pub initial_height: Ctx::Height,
}

pub struct State<Ctx: Context> {
/// The state of the blocksync state machine
blocksync: blocksync::State<Ctx>,
Expand Down Expand Up @@ -130,8 +134,11 @@ where
}
}

pub async fn spawn(self) -> Result<(BlockSyncRef<Ctx>, JoinHandle<()>), ractor::SpawnErr> {
Actor::spawn(None, self, ()).await
pub async fn spawn(
self,
initial_height: Ctx::Height,
) -> Result<(BlockSyncRef<Ctx>, JoinHandle<()>), ractor::SpawnErr> {
Actor::spawn(None, self, Args { initial_height }).await
}

async fn process_input(
Expand Down Expand Up @@ -217,12 +224,12 @@ where
{
type Msg = Msg<Ctx>;
type State = State<Ctx>;
type Arguments = ();
type Arguments = Args<Ctx>;

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
_args: (),
args: Args<Ctx>,
) -> Result<Self::State, ActorProcessingErr> {
let forward = forward(myself.clone(), Some(myself.get_cell()), Msg::GossipEvent).await?;
self.gossip.cast(GossipConsensusMsg::Subscribe(forward))?;
Expand All @@ -233,8 +240,10 @@ where
|| Msg::Tick,
));

let rng = Box::new(rand::rngs::StdRng::from_entropy());

Ok(State {
blocksync: blocksync::State::default(),
blocksync: blocksync::State::new(rng, args.initial_height),
timers: Timers::new(myself.clone()),
inflight: HashMap::new(),
ticker,
Expand Down
3 changes: 2 additions & 1 deletion code/crates/blocksync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ derive-where = { workspace = true }
displaydoc = { workspace = true }
genawaiter = { workspace = true }
libp2p = { workspace = true, features = ["request-response", "cbor"] }
tracing = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
tracing = { workspace = true }

[lints]
workspace = true
76 changes: 47 additions & 29 deletions code/crates/blocksync/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,32 +123,29 @@ where
pub async fn on_status<Ctx>(
co: Co<Ctx>,
state: &mut State<Ctx>,
_metrics: &Metrics,
metrics: &Metrics,
status: Status<Ctx>,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
{
let peer = status.peer_id;
let peer_height = status.height;
let sync_height = state.sync_height;
let tip_height = state.tip_height;

debug!(%status.peer_id, %status.height, "Received peer status");

state.update_status(status);

if peer_height > tip_height {
info!(%peer_height, %peer, "SYNC REQUIRED: Falling behind");

// If there are no pending requests for the base height yet then ask for a batch of blocks from peer
if !state.pending_requests.contains_key(&sync_height) {
debug!(%sync_height, %peer, "Requesting block from peer");
let peer_height = status.height;

perform!(co, Effect::SendRequest(peer, Request::new(sync_height)));
state.update_status(status);

state.store_pending_request(sync_height, peer);
}
if peer_height > state.tip_height {
info!(
tip.height = %state.tip_height,
sync.height = %state.sync_height,
peer.height = %peer_height,
"SYNC REQUIRED: Falling behind"
);

// We are lagging behind one of our peer at least,
// request sync from any peer already at or above that peer's height.
request_sync(co, state, metrics).await?;
}

Ok(())
Expand Down Expand Up @@ -176,7 +173,7 @@ where
pub async fn on_start_height<Ctx>(
co: Co<Ctx>,
state: &mut State<Ctx>,
_metrics: &Metrics,
metrics: &Metrics,
height: Ctx::Height,
) -> Result<(), Error<Ctx>>
where
Expand All @@ -186,17 +183,9 @@ where

state.sync_height = height;

for (peer, status) in &state.peers {
if status.height > height && !state.has_pending_request(&status.height) {
debug!(%height, peer.height = %status.height, %peer, "Starting new height, requesting block from peer");

perform!(co, Effect::SendRequest(*peer, Request::new(height)));

state.store_pending_request(height, *peer);

break;
}
}
// Check if there is any peer already at or above the height we just started,
// and request sync from that peer in order to catch up.
request_sync(co, state, metrics).await?;

Ok(())
}
Expand Down Expand Up @@ -271,3 +260,32 @@ where

Ok(())
}

/// If there are no pending requests for the sync height,
/// and there is peer at a higher height than our sync height,
/// then sync from that peer.
async fn request_sync<Ctx>(
co: Co<Ctx>,
state: &mut State<Ctx>,
_metrics: &Metrics,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
{
let sync_height = state.sync_height;

if state.has_pending_request(&sync_height) {
debug!(sync.height = %sync_height, "Already have a pending request for this height");
return Ok(());
}

if let Some(peer) = state.random_peer_at_or_above(sync_height) {
debug!(sync.height = %sync_height, %peer, "Requesting block from peer");

perform!(co, Effect::SendRequest(peer, Request::new(sync_height)));

state.store_pending_request(sync_height, peer);
}

Ok(())
}
23 changes: 21 additions & 2 deletions code/crates/blocksync/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::collections::BTreeMap;

use derive_where::derive_where;
use libp2p::PeerId;

use malachite_common::Context;
use rand::seq::IteratorRandom;

use crate::Status;

#[derive_where(Clone, Debug, Default)]
pub struct State<Ctx>
where
Ctx: Context,
{
rng: Box<dyn rand::RngCore + Send>,

/// Height of last decided block
pub tip_height: Ctx::Height,

Expand All @@ -29,10 +30,28 @@ impl<Ctx> State<Ctx>
where
Ctx: Context,
{
pub fn new(rng: Box<dyn rand::RngCore + Send>, tip_height: Ctx::Height) -> Self {
Self {
rng,
tip_height,
sync_height: tip_height,
pending_requests: BTreeMap::new(),
peers: BTreeMap::new(),
}
}

pub fn update_status(&mut self, status: Status<Ctx>) {
self.peers.insert(status.peer_id, status);
}

/// Select at random a peer that that we know is at or above the given height.
pub fn random_peer_at_or_above(&mut self, height: Ctx::Height) -> Option<PeerId> {
self.peers
.iter()
.filter_map(move |(&peer, status)| (status.height >= height).then_some(peer))
.choose_stable(&mut self.rng)
}

pub fn store_pending_request(&mut self, height: Ctx::Height, peer: PeerId) {
self.pending_requests.insert(height, peer);
}
Expand Down
9 changes: 6 additions & 3 deletions code/crates/starknet/app/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub async fn spawn_node_actor(
let metrics = Metrics::register(registry);
let address = Address::from_public_key(private_key.public_key());

// Start at height 1-1
let start_height = Height::new(1, 1);

// Spawn mempool and its gossip layer
let gossip_mempool = spawn_gossip_mempool_actor(&cfg, &private_key, registry).await;
let mempool = spawn_mempool_actor(gossip_mempool.clone(), &cfg.mempool, &cfg.test).await;
Expand All @@ -64,11 +67,10 @@ pub async fn spawn_node_actor(
gossip_consensus.clone(),
host.clone(),
&cfg.blocksync,
start_height,
)
.await;

let start_height = Height::new(1, 1);

// Spawn consensus
let consensus = spawn_consensus_actor(
start_height,
Expand Down Expand Up @@ -106,14 +108,15 @@ async fn spawn_block_sync_actor(
gossip_consensus: GossipConsensusRef<MockContext>,
host: HostRef<MockContext>,
config: &BlockSyncConfig,
initial_height: Height,
) -> BlockSyncRef<MockContext> {
let params = BlockSyncParams {
status_update_interval: config.status_update_interval,
request_timeout: config.request_timeout,
};

let block_sync = BlockSync::new(ctx, gossip_consensus, host, params);
block_sync.spawn().await.unwrap().0
block_sync.spawn(initial_height).await.unwrap().0
}

#[allow(clippy::too_many_arguments)]
Expand Down
Loading