Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename BlockSync -> ChainExchange and tweak parameters #852

Merged
merged 5 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use blocks::{FullTipset, Tipset, TipsetKeys};
use cid::Cid;
use encoding::de::DeserializeOwned;
use forest_libp2p::{
blocksync::{
BlockSyncRequest, BlockSyncResponse, CompactedMessages, TipsetBundle, BLOCKS, MESSAGES,
chain_exchange::{
ChainExchangeRequest, ChainExchangeResponse, CompactedMessages, TipsetBundle, BLOCKS,
MESSAGES,
},
hello::HelloRequest,
NetworkMessage,
Expand All @@ -23,7 +24,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

/// Timeout for response from an RPC request
const RPC_TIMEOUT: u64 = 20;
const RPC_TIMEOUT: u64 = 5;

/// Context used in chain sync to handle network requests
pub struct SyncNetworkContext<DB> {
Expand Down Expand Up @@ -71,38 +72,38 @@ where
self.peer_manager.clone()
}

/// Send a blocksync request for only block headers (ignore messages).
/// Send a chain_exchange request for only block headers (ignore messages).
/// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
pub async fn blocksync_headers(
pub async fn chain_exchange_headers(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKeys,
count: u64,
) -> Result<Vec<Tipset>, String> {
self.handle_blocksync_request(peer_id, tsk, count, BLOCKS)
self.handle_chain_exchange_request(peer_id, tsk, count, BLOCKS)
.await
}
/// Send a blocksync request for only messages (ignore block headers).
/// Send a chain_exchange request for only messages (ignore block headers).
/// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
pub async fn blocksync_messages(
pub async fn chain_exchange_messages(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKeys,
count: u64,
) -> Result<Vec<CompactedMessages>, String> {
self.handle_blocksync_request(peer_id, tsk, count, MESSAGES)
self.handle_chain_exchange_request(peer_id, tsk, count, MESSAGES)
.await
}

/// Send a blocksync request for a single full tipset (includes messages)
/// Send a chain_exchange request for a single full tipset (includes messages)
/// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
pub async fn blocksync_fts(
pub async fn chain_exchange_fts(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKeys,
) -> Result<FullTipset, String> {
let mut fts = self
.handle_blocksync_request(peer_id, tsk, 1, BLOCKS | MESSAGES)
.handle_chain_exchange_request(peer_id, tsk, 1, BLOCKS | MESSAGES)
.await?;

if fts.len() != 1 {
Expand Down Expand Up @@ -152,7 +153,7 @@ where

/// Helper function to handle the peer retrieval if no peer supplied as well as the logging
/// and updating of the peer info in the `PeerManager`.
async fn handle_blocksync_request<T>(
async fn handle_chain_exchange_request<T>(
&self,
peer_id: Option<PeerId>,
tsk: &TipsetKeys,
Expand All @@ -162,38 +163,44 @@ where
where
T: TryFrom<TipsetBundle, Error = String>,
{
let request = BlockSyncRequest {
let request = ChainExchangeRequest {
start: tsk.cids().to_vec(),
request_len,
options,
};

let global_pre_time = SystemTime::now();
let bs_res = match peer_id {
Some(id) => self.blocksync_request(id, request).await?.into_result()?,
Some(id) => self
.chain_exchange_request(id, request)
.await?
.into_result()?,
None => {
let peers = self.peer_manager.top_peers_shuffled().await;
let mut res = None;
for p in peers.into_iter() {
match self.blocksync_request(p.clone(), request.clone()).await {
match self
.chain_exchange_request(p.clone(), request.clone())
.await
{
Ok(bs_res) => match bs_res.into_result() {
Ok(r) => {
res = Some(r);
break;
}
Err(e) => {
warn!("Failed blocksync response: {}", e);
warn!("Failed chain_exchange response: {}", e);
continue;
}
},
Err(e) => {
warn!("Failed blocksync request to peer {:?}: {}", p, e);
warn!("Failed chain_exchange request to peer {:?}: {}", p, e);
continue;
}
}
}

res.ok_or_else(|| "BlockSync request failed for all top peers".to_string())?
res.ok_or_else(|| "ChainExchange request failed for all top peers".to_string())?
}
};

Expand All @@ -205,19 +212,19 @@ where
Ok(bs_res)
}

/// Send a blocksync request to the network and await response.
async fn blocksync_request(
/// Send a chain_exchange request to the network and await response.
async fn chain_exchange_request(
&self,
peer_id: PeerId,
request: BlockSyncRequest,
) -> Result<BlockSyncResponse, String> {
trace!("Sending BlockSync Request {:?}", request);
request: ChainExchangeRequest,
) -> Result<ChainExchangeResponse, String> {
trace!("Sending ChainExchange Request {:?}", request);

let req_pre_time = SystemTime::now();

let (tx, rx) = oneshot_channel();
self.network_send
.send(NetworkMessage::BlockSyncRequest {
.send(NetworkMessage::ChainExchangeRequest {
peer_id: peer_id.clone(),
request,
response_channel: tx,
Expand Down
2 changes: 1 addition & 1 deletion blockchain/chain_sync/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl PeerInfo {
}
}

/// Thread safe peer manager which handles peer management for the `BlockSync` protocol.
/// Thread safe peer manager which handles peer management for the `ChainExchange` protocol.
#[derive(Default)]
pub struct PeerManager {
/// Hash set of full peers available
Expand Down
6 changes: 3 additions & 3 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ type WorkerState = Arc<RwLock<Vec<Arc<RwLock<SyncState>>>>>;
enum ChainSyncState {
/// Bootstrapping peers before starting sync.
Bootstrap,
/// Syncing chain with BlockSync protocol.
/// Syncing chain with ChainExchange protocol.
Initial,
/// Following chain with blocks received over gossipsub.
Follow,
}

/// Struct that handles the ChainSync logic. This handles incoming network events such as
/// gossipsub messages, Hello protocol requests, as well as sending and receiving BlockSync
/// gossipsub messages, Hello protocol requests, as well as sending and receiving ChainExchange
/// messages to be able to do the initial sync.
pub struct ChainSyncer<DB, TBeacon, V, M> {
/// State of general `ChainSync` protocol.
Expand Down Expand Up @@ -440,7 +440,7 @@ where
) -> Result<FullTipset, String> {
let fts = match Self::load_fts(cs, tsk) {
Ok(fts) => fts,
Err(_) => network.blocksync_fts(Some(peer_id), tsk).await?,
Err(_) => network.chain_exchange_fts(Some(peer_id), tsk).await?,
};

Ok(fts)
Expand Down
2 changes: 1 addition & 1 deletion blockchain/chain_sync/src/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::fmt;
use std::sync::Arc;
use std::time::SystemTime;

/// Current state of the ChainSyncer using the BlockSync protocol.
/// Current state of the ChainSyncer using the ChainExchange protocol.
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum SyncStage {
/// Idle state
Expand Down
40 changes: 20 additions & 20 deletions blockchain/chain_sync/src/sync_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use fil_types::{
verifier::ProofVerifier, Randomness, ALLOWABLE_CLOCK_DRIFT, BLOCK_DELAY_SECS, BLOCK_GAS_LIMIT,
TICKET_RANDOMNESS_LOOKBACK, UPGRADE_SMOKE_HEIGHT,
};
use forest_libp2p::blocksync::TipsetBundle;
use forest_libp2p::chain_exchange::TipsetBundle;
use futures::stream::{FuturesUnordered, StreamExt};
use interpreter::price_list_by_epoch;
use ipld_blockstore::BlockStore;
Expand All @@ -40,7 +40,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

/// Worker to handle syncing chain with the blocksync protocol.
/// Worker to handle syncing chain with the chain_exchange protocol.
pub(crate) struct SyncWorker<DB, TBeacon, V> {
/// State of the sync worker.
pub state: Arc<RwLock<SyncState>>,
Expand Down Expand Up @@ -188,16 +188,16 @@ where
}

// TODO tweak request window when socket frame is tested
const REQUEST_WINDOW: i64 = 10;
const REQUEST_WINDOW: i64 = 200;
let epoch_diff = cur_ts.epoch() - to_epoch;
debug!("BlockSync from: {} to {}", cur_ts.epoch(), to_epoch);
debug!("ChainExchange from: {} to {}", cur_ts.epoch(), to_epoch);
let window = min(epoch_diff, REQUEST_WINDOW);

// Load blocks from network using blocksync
// Load blocks from network using chain_exchange
// TODO consider altering window size before returning error for failed sync.
let tipsets = self
.network
.blocksync_headers(None, cur_ts.parents(), window as u64)
.chain_exchange_headers(None, cur_ts.parents(), window as u64)
.await?;

info!(
Expand Down Expand Up @@ -277,7 +277,7 @@ where
// to have to request all fork length headers at once.
let tips = self
.network
.blocksync_headers(None, head.parents(), FORK_LENGTH_THRESHOLD)
.chain_exchange_headers(None, head.parents(), FORK_LENGTH_THRESHOLD)
.await?;

let mut ts = self.chain_store().tipset_from_keys(to.parents())?;
Expand All @@ -301,7 +301,7 @@ where
))
}

/// Syncs messages by first checking state for message existence otherwise fetches messages from blocksync
/// Syncs messages by first checking state for message existence otherwise fetches messages from chain_exchange
async fn sync_messages_check_state(&self, tipsets: Vec<Tipset>) -> Result<(), Error> {
let mut ts_iter = tipsets.into_iter().rev();
// Currently syncing 1 height at a time, no reason for us to sync more
Expand All @@ -312,19 +312,19 @@ where
let fts = match self.chain_store().fill_tipset(ts) {
Ok(fts) => fts,
Err(ts) => {
// no full tipset in storage; request messages via blocksync
// no full tipset in storage; request messages via chain_exchange

let batch_size = REQUEST_WINDOW;
debug!(
"BlockSync message sync tipsets: epoch: {}, len: {}",
"ChainExchange message sync tipsets: epoch: {}, len: {}",
ts.epoch(),
batch_size
);

// receive tipset bundle from block sync
let compacted_messages = self
.network
.blocksync_messages(None, ts.key(), batch_size as u64)
.chain_exchange_messages(None, ts.key(), batch_size as u64)
.await?;

// Chain current tipset with iterator
Expand Down Expand Up @@ -353,7 +353,7 @@ where
chain::persist_objects(self.state_manager.blockstore(), &m.bls_msgs)?;
chain::persist_objects(self.state_manager.blockstore(), &m.secp_msgs)?;
} else {
warn!("Blocksync request for messages returned null messages");
warn!("Chain Exchange request for messages returned null messages");
}
}

Expand Down Expand Up @@ -1012,7 +1012,7 @@ mod tests {
use libp2p::PeerId;
use std::sync::Arc;
use std::time::Duration;
use test_utils::{construct_blocksync_response, construct_dummy_header, construct_tipset};
use test_utils::{construct_chain_exchange_response, construct_dummy_header, construct_tipset};

fn sync_worker_setup(
db: Arc<MemoryDB>,
Expand Down Expand Up @@ -1044,12 +1044,12 @@ mod tests {
)
}

fn send_blocksync_response(blocksync_message: Receiver<NetworkMessage>) {
let rpc_response = construct_blocksync_response();
fn send_chain_exchange_response(chain_exchange_message: Receiver<NetworkMessage>) {
let rpc_response = construct_chain_exchange_response();

task::block_on(async {
match blocksync_message.recv().await.unwrap() {
NetworkMessage::BlockSyncRequest {
match chain_exchange_message.recv().await.unwrap() {
NetworkMessage::ChainExchangeRequest {
response_channel, ..
} => {
response_channel.send(rpc_response).unwrap();
Expand All @@ -1075,11 +1075,11 @@ mod tests {
.update_peer_head(source.clone(), Some(head.clone()))
.await;
assert_eq!(sw.network.peer_manager().len().await, 1);
// make blocksync request
// make chain_exchange request
let return_set =
task::spawn(async move { sw.sync_headers_reverse((*head).clone(), &to).await });
// send blocksync response to channel
send_blocksync_response(network_receiver);
// send chain_exchange response to channel
send_chain_exchange_response(network_receiver);
assert_eq!(return_set.await.unwrap().len(), 4);
});
}
Expand Down
6 changes: 3 additions & 3 deletions blockchain/chain_sync/src/sync_worker/full_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ use beacon::{DrandBeacon, DrandPublic};
use db::MemoryDB;
use fil_types::verifier::FullVerifier;
use forest_car::load_car;
use forest_libp2p::{blocksync::make_blocksync_response, NetworkMessage};
use forest_libp2p::{chain_exchange::make_chain_exchange_response, NetworkMessage};
use genesis::{initialize_genesis, EXPORT_SR_40};
use libp2p::core::PeerId;
use state_manager::StateManager;

async fn handle_requests<DB: BlockStore>(mut chan: Receiver<NetworkMessage>, db: ChainStore<DB>) {
loop {
match chan.next().await {
Some(NetworkMessage::BlockSyncRequest {
Some(NetworkMessage::ChainExchangeRequest {
request,
response_channel,
..
}) => response_channel
.send(make_blocksync_response(&db, &request))
.send(make_chain_exchange_response(&db, &request))
.unwrap(),
Some(event) => log::warn!("Other request sent to network: {:?}", event),
None => break,
Expand Down
2 changes: 1 addition & 1 deletion ipld/graphsync/src/libp2p/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::task::{Context, Poll};
use std::time::{Duration, Instant};

// TODO move this to config option
const TIMEOUT: u64 = 10;
const TIMEOUT: u64 = 5;

/// Handler implementation for GraphSync protocol.
pub struct GraphSyncHandler {
Expand Down
Loading