Skip to content

Commit

Permalink
Rename BlockSync -> ChainExchange and tweak parameters (#852)
Browse files Browse the repository at this point in the history
* Tweak blocksync window size and yamux max buffer

* Switch BlockSync -> ChainExchange and tweak parameters

* Update timeout

* fix test
  • Loading branch information
austinabell authored Nov 16, 2020
1 parent 876b988 commit 0c1febb
Show file tree
Hide file tree
Showing 19 changed files with 291 additions and 307 deletions.
57 changes: 32 additions & 25 deletions blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use blocks::{FullTipset, Tipset, TipsetKeys};
use cid::Cid;
use encoding::de::DeserializeOwned;
use forest_libp2p::{
blocksync::{
BlockSyncRequest, BlockSyncResponse, CompactedMessages, TipsetBundle, BLOCKS, MESSAGES,
chain_exchange::{
ChainExchangeRequest, ChainExchangeResponse, CompactedMessages, TipsetBundle, BLOCKS,
MESSAGES,
},
hello::HelloRequest,
NetworkMessage,
Expand All @@ -23,7 +24,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

/// Timeout for response from an RPC request
const RPC_TIMEOUT: u64 = 20;
const RPC_TIMEOUT: u64 = 5;

/// Context used in chain sync to handle network requests
pub struct SyncNetworkContext<DB> {
Expand Down Expand Up @@ -71,38 +72,38 @@ where
self.peer_manager.clone()
}

/// Send a blocksync request for only block headers (ignore messages).
/// Send a chain_exchange request for only block headers (ignore messages).
/// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
pub async fn blocksync_headers(
pub async fn chain_exchange_headers(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKeys,
count: u64,
) -> Result<Vec<Arc<Tipset>>, String> {
self.handle_blocksync_request(peer_id, tsk, count, BLOCKS)
self.handle_chain_exchange_request(peer_id, tsk, count, BLOCKS)
.await
}
/// Send a blocksync request for only messages (ignore block headers).
/// Send a chain_exchange request for only messages (ignore block headers).
/// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
pub async fn blocksync_messages(
pub async fn chain_exchange_messages(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKeys,
count: u64,
) -> Result<Vec<CompactedMessages>, String> {
self.handle_blocksync_request(peer_id, tsk, count, MESSAGES)
self.handle_chain_exchange_request(peer_id, tsk, count, MESSAGES)
.await
}

/// Send a blocksync request for a single full tipset (includes messages)
/// Send a chain_exchange request for a single full tipset (includes messages)
/// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
pub async fn blocksync_fts(
pub async fn chain_exchange_fts(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKeys,
) -> Result<FullTipset, String> {
let mut fts = self
.handle_blocksync_request(peer_id, tsk, 1, BLOCKS | MESSAGES)
.handle_chain_exchange_request(peer_id, tsk, 1, BLOCKS | MESSAGES)
.await?;

if fts.len() != 1 {
Expand Down Expand Up @@ -152,7 +153,7 @@ where

/// Helper function to handle the peer retrieval if no peer supplied as well as the logging
/// and updating of the peer info in the `PeerManager`.
async fn handle_blocksync_request<T>(
async fn handle_chain_exchange_request<T>(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKeys,
Expand All @@ -162,38 +163,44 @@ where
where
T: TryFrom<TipsetBundle, Error = String>,
{
let request = BlockSyncRequest {
let request = ChainExchangeRequest {
start: tsk.cids().to_vec(),
request_len,
options,
};

let global_pre_time = SystemTime::now();
let bs_res = match peer_id {
Some(id) => self.blocksync_request(id, request).await?.into_result()?,
Some(id) => self
.chain_exchange_request(id, request)
.await?
.into_result()?,
None => {
let peers = self.peer_manager.top_peers_shuffled().await;
let mut res = None;
for p in peers.into_iter() {
match self.blocksync_request(p.clone(), request.clone()).await {
match self
.chain_exchange_request(p.clone(), request.clone())
.await
{
Ok(bs_res) => match bs_res.into_result() {
Ok(r) => {
res = Some(r);
break;
}
Err(e) => {
warn!("Failed blocksync response: {}", e);
warn!("Failed chain_exchange response: {}", e);
continue;
}
},
Err(e) => {
warn!("Failed blocksync request to peer {:?}: {}", p, e);
warn!("Failed chain_exchange request to peer {:?}: {}", p, e);
continue;
}
}
}

res.ok_or_else(|| "BlockSync request failed for all top peers".to_string())?
res.ok_or_else(|| "ChainExchange request failed for all top peers".to_string())?
}
};

Expand All @@ -205,19 +212,19 @@ where
Ok(bs_res)
}

/// Send a blocksync request to the network and await response.
async fn blocksync_request(
/// Send a chain_exchange request to the network and await response.
async fn chain_exchange_request(
&self,
peer_id: PeerId,
request: BlockSyncRequest,
) -> Result<BlockSyncResponse, String> {
trace!("Sending BlockSync Request {:?}", request);
request: ChainExchangeRequest,
) -> Result<ChainExchangeResponse, String> {
trace!("Sending ChainExchange Request {:?}", request);

let req_pre_time = SystemTime::now();

let (tx, rx) = oneshot_channel();
self.network_send
.send(NetworkMessage::BlockSyncRequest {
.send(NetworkMessage::ChainExchangeRequest {
peer_id: peer_id.clone(),
request,
response_channel: tx,
Expand Down
2 changes: 1 addition & 1 deletion blockchain/chain_sync/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl PeerInfo {
}
}

/// Thread safe peer manager which handles peer management for the `BlockSync` protocol.
/// Thread safe peer manager which handles peer management for the `ChainExchange` protocol.
#[derive(Default)]
pub struct PeerManager {
/// Hash set of full peers available
Expand Down
6 changes: 3 additions & 3 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ type WorkerState = Arc<RwLock<Vec<Arc<RwLock<SyncState>>>>>;
enum ChainSyncState {
/// Bootstrapping peers before starting sync.
Bootstrap,
/// Syncing chain with BlockSync protocol.
/// Syncing chain with ChainExchange protocol.
Initial,
/// Following chain with blocks received over gossipsub.
Follow,
}

/// Struct that handles the ChainSync logic. This handles incoming network events such as
/// gossipsub messages, Hello protocol requests, as well as sending and receiving BlockSync
/// gossipsub messages, Hello protocol requests, as well as sending and receiving ChainExchange
/// messages to be able to do the initial sync.
pub struct ChainSyncer<DB, TBeacon, V, M> {
/// State of general `ChainSync` protocol.
Expand Down Expand Up @@ -440,7 +440,7 @@ where
) -> Result<FullTipset, String> {
let fts = match Self::load_fts(cs, tsk).await {
Ok(fts) => fts,
Err(_) => network.blocksync_fts(Some(peer_id), tsk).await?,
Err(_) => network.chain_exchange_fts(Some(peer_id), tsk).await?,
};

Ok(fts)
Expand Down
2 changes: 1 addition & 1 deletion blockchain/chain_sync/src/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::fmt;
use std::sync::Arc;
use std::time::SystemTime;

/// Current state of the ChainSyncer using the BlockSync protocol.
/// Current state of the ChainSyncer using the ChainExchange protocol.
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum SyncStage {
/// Idle state
Expand Down
41 changes: 21 additions & 20 deletions blockchain/chain_sync/src/sync_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use fil_types::{
verifier::ProofVerifier, Randomness, ALLOWABLE_CLOCK_DRIFT, BLOCK_DELAY_SECS, BLOCK_GAS_LIMIT,
TICKET_RANDOMNESS_LOOKBACK, UPGRADE_SMOKE_HEIGHT,
};
use forest_libp2p::blocksync::TipsetBundle;
use forest_libp2p::chain_exchange::TipsetBundle;
use futures::stream::{FuturesUnordered, StreamExt};
use interpreter::price_list_by_epoch;
use ipld_blockstore::BlockStore;
Expand All @@ -40,7 +40,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

/// Worker to handle syncing chain with the blocksync protocol.
/// Worker to handle syncing chain with the chain_exchange protocol.
pub(crate) struct SyncWorker<DB, TBeacon, V> {
/// State of the sync worker.
pub state: Arc<RwLock<SyncState>>,
Expand Down Expand Up @@ -189,16 +189,16 @@ where
}

// TODO tweak request window when socket frame is tested
const REQUEST_WINDOW: i64 = 10;
const REQUEST_WINDOW: i64 = 200;
let epoch_diff = cur_ts.epoch() - to_epoch;
debug!("BlockSync from: {} to {}", cur_ts.epoch(), to_epoch);
debug!("ChainExchange from: {} to {}", cur_ts.epoch(), to_epoch);
let window = min(epoch_diff, REQUEST_WINDOW);

// Load blocks from network using blocksync
// Load blocks from network using chain_exchange
// TODO consider altering window size before returning error for failed sync.
let tipsets = self
.network
.blocksync_headers(None, cur_ts.parents(), window as u64)
.chain_exchange_headers(None, cur_ts.parents(), window as u64)
.await?;

info!(
Expand Down Expand Up @@ -278,7 +278,7 @@ where
// to have to request all fork length headers at once.
let tips = self
.network
.blocksync_headers(None, head.parents(), FORK_LENGTH_THRESHOLD)
.chain_exchange_headers(None, head.parents(), FORK_LENGTH_THRESHOLD)
.await?;

let mut ts = self.chain_store().tipset_from_keys(to.parents()).await?;
Expand All @@ -302,7 +302,8 @@ where
))
}

/// Syncs messages by first checking state for message existence otherwise fetches messages from blocksync
/// Syncs messages by first checking state for message existence otherwise fetches messages from
/// chain exchange.
async fn sync_messages_check_state(&self, tipsets: Vec<Arc<Tipset>>) -> Result<(), Error> {
let mut ts_iter = tipsets.into_iter().rev();
// Currently syncing 1 height at a time, no reason for us to sync more
Expand All @@ -313,19 +314,19 @@ where
let fts = match self.chain_store().fill_tipset(ts) {
Ok(fts) => fts,
Err(ts) => {
// no full tipset in storage; request messages via blocksync
// no full tipset in storage; request messages via chain_exchange

let batch_size = REQUEST_WINDOW;
debug!(
"BlockSync message sync tipsets: epoch: {}, len: {}",
"ChainExchange message sync tipsets: epoch: {}, len: {}",
ts.epoch(),
batch_size
);

// receive tipset bundle from block sync
let compacted_messages = self
.network
.blocksync_messages(None, ts.key(), batch_size as u64)
.chain_exchange_messages(None, ts.key(), batch_size as u64)
.await?;

// Chain current tipset with iterator
Expand Down Expand Up @@ -354,7 +355,7 @@ where
chain::persist_objects(self.state_manager.blockstore(), &m.bls_msgs)?;
chain::persist_objects(self.state_manager.blockstore(), &m.secp_msgs)?;
} else {
warn!("Blocksync request for messages returned null messages");
warn!("Chain Exchange request for messages returned null messages");
}
}

Expand Down Expand Up @@ -1013,7 +1014,7 @@ mod tests {
use libp2p::PeerId;
use std::sync::Arc;
use std::time::Duration;
use test_utils::{construct_blocksync_response, construct_dummy_header, construct_tipset};
use test_utils::{construct_chain_exchange_response, construct_dummy_header, construct_tipset};

fn sync_worker_setup(
db: Arc<MemoryDB>,
Expand Down Expand Up @@ -1045,12 +1046,12 @@ mod tests {
)
}

fn send_blocksync_response(blocksync_message: Receiver<NetworkMessage>) {
let rpc_response = construct_blocksync_response();
fn send_chain_exchange_response(chain_exchange_message: Receiver<NetworkMessage>) {
let rpc_response = construct_chain_exchange_response();

task::block_on(async {
match blocksync_message.recv().await.unwrap() {
NetworkMessage::BlockSyncRequest {
match chain_exchange_message.recv().await.unwrap() {
NetworkMessage::ChainExchangeRequest {
response_channel, ..
} => {
response_channel.send(rpc_response).unwrap();
Expand All @@ -1076,10 +1077,10 @@ mod tests {
.update_peer_head(source.clone(), Some(head.clone()))
.await;
assert_eq!(sw.network.peer_manager().len().await, 1);
// make blocksync request
// make chain_exchange request
let return_set = task::spawn(async move { sw.sync_headers_reverse(head, &to).await });
// send blocksync response to channel
send_blocksync_response(network_receiver);
// send chain_exchange response to channel
send_chain_exchange_response(network_receiver);
assert_eq!(return_set.await.unwrap().len(), 4);
});
}
Expand Down
6 changes: 3 additions & 3 deletions blockchain/chain_sync/src/sync_worker/full_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use beacon::{DrandBeacon, DrandPublic};
use db::MemoryDB;
use fil_types::verifier::FullVerifier;
use forest_car::load_car;
use forest_libp2p::{blocksync::make_blocksync_response, NetworkMessage};
use forest_libp2p::{chain_exchange::make_chain_exchange_response, NetworkMessage};
use genesis::{initialize_genesis, EXPORT_SR_40};
use libp2p::core::PeerId;
use state_manager::StateManager;
Expand All @@ -21,12 +21,12 @@ where
{
loop {
match chan.next().await {
Some(NetworkMessage::BlockSyncRequest {
Some(NetworkMessage::ChainExchangeRequest {
request,
response_channel,
..
}) => response_channel
.send(make_blocksync_response(&db, &request).await)
.send(make_chain_exchange_response(&db, &request).await)
.unwrap(),
Some(event) => log::warn!("Other request sent to network: {:?}", event),
None => break,
Expand Down
2 changes: 1 addition & 1 deletion ipld/graphsync/src/libp2p/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::task::{Context, Poll};
use std::time::{Duration, Instant};

// TODO move this to config option
const TIMEOUT: u64 = 10;
const TIMEOUT: u64 = 5;

/// Handler implementation for GraphSync protocol.
pub struct GraphSyncHandler {
Expand Down
Loading

0 comments on commit 0c1febb

Please sign in to comment.