From 0c1febbfb3ac2b176757071940313ceca8310441 Mon Sep 17 00:00:00 2001 From: Austin Abell Date: Mon, 16 Nov 2020 15:19:12 -0500 Subject: [PATCH] Rename BlockSync -> ChainExchange and tweak parameters (#852) * Tweak blocksync window size and yamux max buffer * Switch BlockSync -> ChainExchange and tweak parameters * Update timeout * fix test --- blockchain/chain_sync/src/network_context.rs | 57 ++++++----- blockchain/chain_sync/src/peer_manager.rs | 2 +- blockchain/chain_sync/src/sync.rs | 6 +- blockchain/chain_sync/src/sync_state.rs | 2 +- blockchain/chain_sync/src/sync_worker.rs | 41 ++++---- .../src/sync_worker/full_sync_test.rs | 6 +- ipld/graphsync/src/libp2p/handler.rs | 2 +- node/forest_libp2p/src/behaviour.rs | 65 +++++++------ node/forest_libp2p/src/blocksync/mod.rs | 89 ----------------- .../{blocksync => chain_exchange}/message.rs | 32 +++---- node/forest_libp2p/src/chain_exchange/mod.rs | 26 +++++ .../{blocksync => chain_exchange}/provider.rs | 29 +++--- node/forest_libp2p/src/hello/message.rs | 6 +- node/forest_libp2p/src/hello/mod.rs | 70 +------------- node/forest_libp2p/src/lib.rs | 4 +- node/forest_libp2p/src/rpc/mod.rs | 95 ++++++++++++++++++- node/forest_libp2p/src/service.rs | 48 +++++----- node/forest_libp2p/tests/decode_test.rs | 8 +- utils/test_utils/src/chain_structures.rs | 10 +- 19 files changed, 291 insertions(+), 307 deletions(-) delete mode 100644 node/forest_libp2p/src/blocksync/mod.rs rename node/forest_libp2p/src/{blocksync => chain_exchange}/message.rs (90%) create mode 100644 node/forest_libp2p/src/chain_exchange/mod.rs rename node/forest_libp2p/src/{blocksync => chain_exchange}/provider.rs (90%) diff --git a/blockchain/chain_sync/src/network_context.rs b/blockchain/chain_sync/src/network_context.rs index 626b048ea282..b6a388375082 100644 --- a/blockchain/chain_sync/src/network_context.rs +++ b/blockchain/chain_sync/src/network_context.rs @@ -8,8 +8,9 @@ use blocks::{FullTipset, Tipset, TipsetKeys}; use cid::Cid; use encoding::de::DeserializeOwned; use forest_libp2p::{ - blocksync::{ - BlockSyncRequest, BlockSyncResponse, CompactedMessages, TipsetBundle, BLOCKS, MESSAGES, + chain_exchange::{ + ChainExchangeRequest, ChainExchangeResponse, CompactedMessages, TipsetBundle, BLOCKS, + MESSAGES, }, hello::HelloRequest, NetworkMessage, @@ -23,7 +24,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; /// Timeout for response from an RPC request -const RPC_TIMEOUT: u64 = 20; +const RPC_TIMEOUT: u64 = 5; /// Context used in chain sync to handle network requests pub struct SyncNetworkContext { @@ -71,38 +72,38 @@ where self.peer_manager.clone() } - /// Send a blocksync request for only block headers (ignore messages). + /// Send a chain_exchange request for only block headers (ignore messages). /// If `peer_id` is `None`, requests will be sent to a set of shuffled peers. - pub async fn blocksync_headers( + pub async fn chain_exchange_headers( &self, peer_id: Option, tsk: &TipsetKeys, count: u64, ) -> Result>, String> { - self.handle_blocksync_request(peer_id, tsk, count, BLOCKS) + self.handle_chain_exchange_request(peer_id, tsk, count, BLOCKS) .await } - /// Send a blocksync request for only messages (ignore block headers). + /// Send a chain_exchange request for only messages (ignore block headers). /// If `peer_id` is `None`, requests will be sent to a set of shuffled peers. - pub async fn blocksync_messages( + pub async fn chain_exchange_messages( &self, peer_id: Option, tsk: &TipsetKeys, count: u64, ) -> Result, String> { - self.handle_blocksync_request(peer_id, tsk, count, MESSAGES) + self.handle_chain_exchange_request(peer_id, tsk, count, MESSAGES) .await } - /// Send a blocksync request for a single full tipset (includes messages) + /// Send a chain_exchange request for a single full tipset (includes messages) /// If `peer_id` is `None`, requests will be sent to a set of shuffled peers. - pub async fn blocksync_fts( + pub async fn chain_exchange_fts( &self, peer_id: Option, tsk: &TipsetKeys, ) -> Result { let mut fts = self - .handle_blocksync_request(peer_id, tsk, 1, BLOCKS | MESSAGES) + .handle_chain_exchange_request(peer_id, tsk, 1, BLOCKS | MESSAGES) .await?; if fts.len() != 1 { @@ -152,7 +153,7 @@ where /// Helper function to handle the peer retrieval if no peer supplied as well as the logging /// and updating of the peer info in the `PeerManager`. - async fn handle_blocksync_request( + async fn handle_chain_exchange_request( &self, peer_id: Option, tsk: &TipsetKeys, @@ -162,7 +163,7 @@ where where T: TryFrom, { - let request = BlockSyncRequest { + let request = ChainExchangeRequest { start: tsk.cids().to_vec(), request_len, options, @@ -170,30 +171,36 @@ where let global_pre_time = SystemTime::now(); let bs_res = match peer_id { - Some(id) => self.blocksync_request(id, request).await?.into_result()?, + Some(id) => self + .chain_exchange_request(id, request) + .await? + .into_result()?, None => { let peers = self.peer_manager.top_peers_shuffled().await; let mut res = None; for p in peers.into_iter() { - match self.blocksync_request(p.clone(), request.clone()).await { + match self + .chain_exchange_request(p.clone(), request.clone()) + .await + { Ok(bs_res) => match bs_res.into_result() { Ok(r) => { res = Some(r); break; } Err(e) => { - warn!("Failed blocksync response: {}", e); + warn!("Failed chain_exchange response: {}", e); continue; } }, Err(e) => { - warn!("Failed blocksync request to peer {:?}: {}", p, e); + warn!("Failed chain_exchange request to peer {:?}: {}", p, e); continue; } } } - res.ok_or_else(|| "BlockSync request failed for all top peers".to_string())? + res.ok_or_else(|| "ChainExchange request failed for all top peers".to_string())? } }; @@ -205,19 +212,19 @@ where Ok(bs_res) } - /// Send a blocksync request to the network and await response. - async fn blocksync_request( + /// Send a chain_exchange request to the network and await response. + async fn chain_exchange_request( &self, peer_id: PeerId, - request: BlockSyncRequest, - ) -> Result { - trace!("Sending BlockSync Request {:?}", request); + request: ChainExchangeRequest, + ) -> Result { + trace!("Sending ChainExchange Request {:?}", request); let req_pre_time = SystemTime::now(); let (tx, rx) = oneshot_channel(); self.network_send - .send(NetworkMessage::BlockSyncRequest { + .send(NetworkMessage::ChainExchangeRequest { peer_id: peer_id.clone(), request, response_channel: tx, diff --git a/blockchain/chain_sync/src/peer_manager.rs b/blockchain/chain_sync/src/peer_manager.rs index 93994b75f9d0..27592c01dc1d 100644 --- a/blockchain/chain_sync/src/peer_manager.rs +++ b/blockchain/chain_sync/src/peer_manager.rs @@ -45,7 +45,7 @@ impl PeerInfo { } } -/// Thread safe peer manager which handles peer management for the `BlockSync` protocol. +/// Thread safe peer manager which handles peer management for the `ChainExchange` protocol. #[derive(Default)] pub struct PeerManager { /// Hash set of full peers available diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 721b2d5d5f3f..f5aaf8eb4292 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -40,14 +40,14 @@ type WorkerState = Arc>>>>; enum ChainSyncState { /// Bootstrapping peers before starting sync. Bootstrap, - /// Syncing chain with BlockSync protocol. + /// Syncing chain with ChainExchange protocol. Initial, /// Following chain with blocks received over gossipsub. Follow, } /// Struct that handles the ChainSync logic. This handles incoming network events such as -/// gossipsub messages, Hello protocol requests, as well as sending and receiving BlockSync +/// gossipsub messages, Hello protocol requests, as well as sending and receiving ChainExchange /// messages to be able to do the initial sync. pub struct ChainSyncer { /// State of general `ChainSync` protocol. @@ -440,7 +440,7 @@ where ) -> Result { let fts = match Self::load_fts(cs, tsk).await { Ok(fts) => fts, - Err(_) => network.blocksync_fts(Some(peer_id), tsk).await?, + Err(_) => network.chain_exchange_fts(Some(peer_id), tsk).await?, }; Ok(fts) diff --git a/blockchain/chain_sync/src/sync_state.rs b/blockchain/chain_sync/src/sync_state.rs index 3e46d8c7f9c8..7a19fcc25b4f 100644 --- a/blockchain/chain_sync/src/sync_state.rs +++ b/blockchain/chain_sync/src/sync_state.rs @@ -8,7 +8,7 @@ use std::fmt; use std::sync::Arc; use std::time::SystemTime; -/// Current state of the ChainSyncer using the BlockSync protocol. +/// Current state of the ChainSyncer using the ChainExchange protocol. #[derive(PartialEq, Debug, Clone, Copy)] pub enum SyncStage { /// Idle state diff --git a/blockchain/chain_sync/src/sync_worker.rs b/blockchain/chain_sync/src/sync_worker.rs index 069bb19d508e..47ecc21aeade 100644 --- a/blockchain/chain_sync/src/sync_worker.rs +++ b/blockchain/chain_sync/src/sync_worker.rs @@ -24,7 +24,7 @@ use fil_types::{ verifier::ProofVerifier, Randomness, ALLOWABLE_CLOCK_DRIFT, BLOCK_DELAY_SECS, BLOCK_GAS_LIMIT, TICKET_RANDOMNESS_LOOKBACK, UPGRADE_SMOKE_HEIGHT, }; -use forest_libp2p::blocksync::TipsetBundle; +use forest_libp2p::chain_exchange::TipsetBundle; use futures::stream::{FuturesUnordered, StreamExt}; use interpreter::price_list_by_epoch; use ipld_blockstore::BlockStore; @@ -40,7 +40,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -/// Worker to handle syncing chain with the blocksync protocol. +/// Worker to handle syncing chain with the chain_exchange protocol. pub(crate) struct SyncWorker { /// State of the sync worker. pub state: Arc>, @@ -189,16 +189,16 @@ where } // TODO tweak request window when socket frame is tested - const REQUEST_WINDOW: i64 = 10; + const REQUEST_WINDOW: i64 = 200; let epoch_diff = cur_ts.epoch() - to_epoch; - debug!("BlockSync from: {} to {}", cur_ts.epoch(), to_epoch); + debug!("ChainExchange from: {} to {}", cur_ts.epoch(), to_epoch); let window = min(epoch_diff, REQUEST_WINDOW); - // Load blocks from network using blocksync + // Load blocks from network using chain_exchange // TODO consider altering window size before returning error for failed sync. let tipsets = self .network - .blocksync_headers(None, cur_ts.parents(), window as u64) + .chain_exchange_headers(None, cur_ts.parents(), window as u64) .await?; info!( @@ -278,7 +278,7 @@ where // to have to request all fork length headers at once. let tips = self .network - .blocksync_headers(None, head.parents(), FORK_LENGTH_THRESHOLD) + .chain_exchange_headers(None, head.parents(), FORK_LENGTH_THRESHOLD) .await?; let mut ts = self.chain_store().tipset_from_keys(to.parents()).await?; @@ -302,7 +302,8 @@ where )) } - /// Syncs messages by first checking state for message existence otherwise fetches messages from blocksync + /// Syncs messages by first checking state for message existence otherwise fetches messages from + /// chain exchange. async fn sync_messages_check_state(&self, tipsets: Vec>) -> Result<(), Error> { let mut ts_iter = tipsets.into_iter().rev(); // Currently syncing 1 height at a time, no reason for us to sync more @@ -313,11 +314,11 @@ where let fts = match self.chain_store().fill_tipset(ts) { Ok(fts) => fts, Err(ts) => { - // no full tipset in storage; request messages via blocksync + // no full tipset in storage; request messages via chain_exchange let batch_size = REQUEST_WINDOW; debug!( - "BlockSync message sync tipsets: epoch: {}, len: {}", + "ChainExchange message sync tipsets: epoch: {}, len: {}", ts.epoch(), batch_size ); @@ -325,7 +326,7 @@ where // receive tipset bundle from block sync let compacted_messages = self .network - .blocksync_messages(None, ts.key(), batch_size as u64) + .chain_exchange_messages(None, ts.key(), batch_size as u64) .await?; // Chain current tipset with iterator @@ -354,7 +355,7 @@ where chain::persist_objects(self.state_manager.blockstore(), &m.bls_msgs)?; chain::persist_objects(self.state_manager.blockstore(), &m.secp_msgs)?; } else { - warn!("Blocksync request for messages returned null messages"); + warn!("Chain Exchange request for messages returned null messages"); } } @@ -1013,7 +1014,7 @@ mod tests { use libp2p::PeerId; use std::sync::Arc; use std::time::Duration; - use test_utils::{construct_blocksync_response, construct_dummy_header, construct_tipset}; + use test_utils::{construct_chain_exchange_response, construct_dummy_header, construct_tipset}; fn sync_worker_setup( db: Arc, @@ -1045,12 +1046,12 @@ mod tests { ) } - fn send_blocksync_response(blocksync_message: Receiver) { - let rpc_response = construct_blocksync_response(); + fn send_chain_exchange_response(chain_exchange_message: Receiver) { + let rpc_response = construct_chain_exchange_response(); task::block_on(async { - match blocksync_message.recv().await.unwrap() { - NetworkMessage::BlockSyncRequest { + match chain_exchange_message.recv().await.unwrap() { + NetworkMessage::ChainExchangeRequest { response_channel, .. } => { response_channel.send(rpc_response).unwrap(); @@ -1076,10 +1077,10 @@ mod tests { .update_peer_head(source.clone(), Some(head.clone())) .await; assert_eq!(sw.network.peer_manager().len().await, 1); - // make blocksync request + // make chain_exchange request let return_set = task::spawn(async move { sw.sync_headers_reverse(head, &to).await }); - // send blocksync response to channel - send_blocksync_response(network_receiver); + // send chain_exchange response to channel + send_chain_exchange_response(network_receiver); assert_eq!(return_set.await.unwrap().len(), 4); }); } diff --git a/blockchain/chain_sync/src/sync_worker/full_sync_test.rs b/blockchain/chain_sync/src/sync_worker/full_sync_test.rs index 8aee2389bb12..8274bfc58cb8 100644 --- a/blockchain/chain_sync/src/sync_worker/full_sync_test.rs +++ b/blockchain/chain_sync/src/sync_worker/full_sync_test.rs @@ -10,7 +10,7 @@ use beacon::{DrandBeacon, DrandPublic}; use db::MemoryDB; use fil_types::verifier::FullVerifier; use forest_car::load_car; -use forest_libp2p::{blocksync::make_blocksync_response, NetworkMessage}; +use forest_libp2p::{chain_exchange::make_chain_exchange_response, NetworkMessage}; use genesis::{initialize_genesis, EXPORT_SR_40}; use libp2p::core::PeerId; use state_manager::StateManager; @@ -21,12 +21,12 @@ where { loop { match chan.next().await { - Some(NetworkMessage::BlockSyncRequest { + Some(NetworkMessage::ChainExchangeRequest { request, response_channel, .. }) => response_channel - .send(make_blocksync_response(&db, &request).await) + .send(make_chain_exchange_response(&db, &request).await) .unwrap(), Some(event) => log::warn!("Other request sent to network: {:?}", event), None => break, diff --git a/ipld/graphsync/src/libp2p/handler.rs b/ipld/graphsync/src/libp2p/handler.rs index ffb13c914e5d..1ee0e4e990e9 100644 --- a/ipld/graphsync/src/libp2p/handler.rs +++ b/ipld/graphsync/src/libp2p/handler.rs @@ -19,7 +19,7 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant}; // TODO move this to config option -const TIMEOUT: u64 = 10; +const TIMEOUT: u64 = 5; /// Handler implementation for GraphSync protocol. pub struct GraphSyncHandler { diff --git a/node/forest_libp2p/src/behaviour.rs b/node/forest_libp2p/src/behaviour.rs index 907cef710db0..6f497a745d53 100644 --- a/node/forest_libp2p/src/behaviour.rs +++ b/node/forest_libp2p/src/behaviour.rs @@ -1,8 +1,8 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::blocksync::{ - BlockSyncCodec, BlockSyncProtocolName, BlockSyncRequest, BlockSyncResponse, +use crate::chain_exchange::{ + ChainExchangeCodec, ChainExchangeProtocolName, ChainExchangeRequest, ChainExchangeResponse, }; use crate::config::Libp2pConfig; use crate::hello::{HelloCodec, HelloProtocolName, HelloRequest, HelloResponse}; @@ -48,7 +48,7 @@ pub struct ForestBehaviour { ping: Ping, identify: Identify, hello: RequestResponse, - blocksync: RequestResponse, + chain_exchange: RequestResponse, kademlia: Toggle>, bitswap: Bitswap, #[behaviour(ignore)] @@ -78,15 +78,15 @@ pub enum ForestBehaviourEvent { request_id: RequestId, response: HelloResponse, }, - BlockSyncRequest { + ChainExchangeRequest { peer: PeerId, - request: BlockSyncRequest, - channel: ResponseChannel, + request: ChainExchangeRequest, + channel: ResponseChannel, }, - BlockSyncResponse { + ChainExchangeResponse { peer: PeerId, request_id: RequestId, - response: BlockSyncResponse, + response: ChainExchangeResponse, }, } @@ -247,39 +247,46 @@ impl NetworkBehaviourEventProcess> +impl NetworkBehaviourEventProcess> for ForestBehaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { + fn inject_event( + &mut self, + event: RequestResponseEvent, + ) { match event { RequestResponseEvent::Message { peer, message } => match message { RequestResponseMessage::Request { request, channel } => { - self.events.push(ForestBehaviourEvent::BlockSyncRequest { - peer, - request, - channel, - }) + self.events + .push(ForestBehaviourEvent::ChainExchangeRequest { + peer, + request, + channel, + }) } RequestResponseMessage::Response { request_id, response, - } => self.events.push(ForestBehaviourEvent::BlockSyncResponse { - peer, - request_id, - response, - }), + } => self + .events + .push(ForestBehaviourEvent::ChainExchangeResponse { + peer, + request_id, + response, + }), }, RequestResponseEvent::OutboundFailure { peer, request_id, error, } => warn!( - "BlockSync outbound error (peer: {:?}) (id: {:?}): {:?}", + "ChainExchange outbound error (peer: {:?}) (id: {:?}): {:?}", peer, request_id, error ), - RequestResponseEvent::InboundFailure { peer, error } => { - warn!("BlockSync inbound error (peer: {:?}): {:?}", peer, error) - } + RequestResponseEvent::InboundFailure { peer, error } => warn!( + "ChainExchange inbound error (peer: {:?}): {:?}", + peer, error + ), } } } @@ -340,7 +347,7 @@ impl ForestBehaviour { }; let hp = std::iter::once((HelloProtocolName, ProtocolSupport::Full)); - let bp = std::iter::once((BlockSyncProtocolName, ProtocolSupport::Full)); + let cp = std::iter::once((ChainExchangeProtocolName, ProtocolSupport::Full)); let mut req_res_config = RequestResponseConfig::default(); req_res_config.set_request_timeout(Duration::from_secs(20)); @@ -361,8 +368,8 @@ impl ForestBehaviour { ), kademlia: kademlia_opt.into(), bitswap, - hello: RequestResponse::new(HelloCodec, hp, req_res_config.clone()), - blocksync: RequestResponse::new(BlockSyncCodec, bp, req_res_config), + hello: RequestResponse::new(HelloCodec::default(), hp, req_res_config.clone()), + chain_exchange: RequestResponse::new(ChainExchangeCodec::default(), cp, req_res_config), events: vec![], peers: Default::default(), } @@ -391,7 +398,9 @@ impl ForestBehaviour { pub fn send_rpc_request(&mut self, peer_id: &PeerId, req: RPCRequest) -> RequestId { match req { RPCRequest::Hello(request) => self.hello.send_request(peer_id, request), - RPCRequest::BlockSync(request) => self.blocksync.send_request(peer_id, request), + RPCRequest::ChainExchange(request) => { + self.chain_exchange.send_request(peer_id, request) + } } } diff --git a/node/forest_libp2p/src/blocksync/mod.rs b/node/forest_libp2p/src/blocksync/mod.rs deleted file mode 100644 index 5fbac2907e12..000000000000 --- a/node/forest_libp2p/src/blocksync/mod.rs +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -mod message; -mod provider; - -pub use self::message::*; -pub use self::provider::*; -use async_trait::async_trait; -use forest_encoding::{from_slice, to_vec}; -use futures::prelude::*; -use libp2p::core::ProtocolName; -use libp2p_request_response::RequestResponseCodec; -use std::io; - -pub const BLOCKSYNC_PROTOCOL_ID: &[u8] = b"/fil/sync/blk/0.0.1"; - -/// Type to satisfy `ProtocolName` interface for BlockSync RPC. -#[derive(Clone, Debug, PartialEq, Default)] -pub struct BlockSyncProtocolName; - -impl ProtocolName for BlockSyncProtocolName { - fn protocol_name(&self) -> &[u8] { - BLOCKSYNC_PROTOCOL_ID - } -} - -/// BlockSync protocol codec to be used within the RPC service. -#[derive(Debug, Clone, Default)] -pub struct BlockSyncCodec; - -#[async_trait] -impl RequestResponseCodec for BlockSyncCodec { - type Protocol = BlockSyncProtocolName; - type Request = BlockSyncRequest; - type Response = BlockSyncResponse; - - async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut buf = Vec::new(); - io.read_to_end(&mut buf).await?; - Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) - } - - async fn read_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut buf = Vec::new(); - io.read_to_end(&mut buf).await?; - Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) - } - - async fn write_request( - &mut self, - _: &Self::Protocol, - io: &mut T, - req: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - io.write_all( - &to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, - ) - .await - } - - async fn write_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - res: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - io.write_all( - &to_vec(&res).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, - ) - .await - } -} diff --git a/node/forest_libp2p/src/blocksync/message.rs b/node/forest_libp2p/src/chain_exchange/message.rs similarity index 90% rename from node/forest_libp2p/src/blocksync/message.rs rename to node/forest_libp2p/src/chain_exchange/message.rs index 278e763dd7b3..751aea06a339 100644 --- a/node/forest_libp2p/src/blocksync/message.rs +++ b/node/forest_libp2p/src/chain_exchange/message.rs @@ -9,14 +9,14 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::convert::TryFrom; use std::sync::Arc; -/// Blocksync request options +/// ChainExchange request options pub const BLOCKS: u64 = 1; pub const MESSAGES: u64 = 2; pub const BLOCKS_MESSAGES: u64 = 3; /// The payload that gets sent to another node to request for blocks and messages. It get DagCBOR serialized before sending over the wire. #[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)] -pub struct BlockSyncRequest { +pub struct ChainExchangeRequest { /// The tipset to start sync from pub start: Vec, /// The amount of epochs to sync by @@ -25,7 +25,7 @@ pub struct BlockSyncRequest { pub options: u64, } -impl BlockSyncRequest { +impl ChainExchangeRequest { /// If a request expects blocks to be included in response. pub fn include_blocks(&self) -> bool { self.options == BLOCKS || self.options == BLOCKS_MESSAGES @@ -37,9 +37,9 @@ impl BlockSyncRequest { } } -/// Status codes of a blocksync response. +/// Status codes of a chain_exchange response. #[derive(Clone, Debug, PartialEq)] -pub enum BlockSyncResponseStatus { +pub enum ChainExchangeResponseStatus { /// All is well. Success, /// We could not fetch all blocks requested (but at least we returned @@ -57,12 +57,12 @@ pub enum BlockSyncResponseStatus { Other(i32), } -impl Serialize for BlockSyncResponseStatus { +impl Serialize for ChainExchangeResponseStatus { fn serialize(&self, serializer: S) -> Result where S: Serializer, { - use BlockSyncResponseStatus::*; + use ChainExchangeResponseStatus::*; let code: i32 = match self { Success => 0, PartialResponse => 101, @@ -76,14 +76,14 @@ impl Serialize for BlockSyncResponseStatus { } } -impl<'de> Deserialize<'de> for BlockSyncResponseStatus { +impl<'de> Deserialize<'de> for ChainExchangeResponseStatus { fn deserialize(deserializer: D) -> Result>::Error> where D: Deserializer<'de>, { let code: i32 = Deserialize::deserialize(deserializer)?; - use BlockSyncResponseStatus::*; + use ChainExchangeResponseStatus::*; let status = match code { 0 => Success, 101 => PartialResponse, @@ -97,27 +97,27 @@ impl<'de> Deserialize<'de> for BlockSyncResponseStatus { } } -/// The response to a BlockSync request. +/// The response to a ChainExchange request. #[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)] -pub struct BlockSyncResponse { +pub struct ChainExchangeResponse { /// Error code - pub status: BlockSyncResponseStatus, + pub status: ChainExchangeResponseStatus, /// Status message indicating failure reason pub message: String, /// The tipsets requested pub chain: Vec, } -impl BlockSyncResponse { - /// Converts blocksync response into result. +impl ChainExchangeResponse { + /// Converts chain_exchange response into result. /// Returns an error if the response status is not `Ok`. /// Tipset bundle is converted into generic return type with `TryFrom` trait impl. pub fn into_result(self) -> Result, String> where T: TryFrom, { - if self.status != BlockSyncResponseStatus::Success - && self.status != BlockSyncResponseStatus::PartialResponse + if self.status != ChainExchangeResponseStatus::Success + && self.status != ChainExchangeResponseStatus::PartialResponse { return Err(format!("Status {:?}: {}", self.status, self.message)); } diff --git a/node/forest_libp2p/src/chain_exchange/mod.rs b/node/forest_libp2p/src/chain_exchange/mod.rs new file mode 100644 index 000000000000..a40f27fc81d7 --- /dev/null +++ b/node/forest_libp2p/src/chain_exchange/mod.rs @@ -0,0 +1,26 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +mod message; +mod provider; + +pub use self::message::*; +pub use self::provider::*; +use super::rpc::CborRequestResponse; +use libp2p::core::ProtocolName; + +pub const CHAIN_XCHG_PROTOCOL_ID: &[u8] = b"/fil/chain/xchg/0.0.1"; + +/// Type to satisfy `ProtocolName` interface for ChainExchange RPC. +#[derive(Clone, Debug, PartialEq, Default)] +pub struct ChainExchangeProtocolName; + +impl ProtocolName for ChainExchangeProtocolName { + fn protocol_name(&self) -> &[u8] { + CHAIN_XCHG_PROTOCOL_ID + } +} + +/// ChainExchange protocol codec to be used within the RPC service. +pub type ChainExchangeCodec = + CborRequestResponse; diff --git a/node/forest_libp2p/src/blocksync/provider.rs b/node/forest_libp2p/src/chain_exchange/provider.rs similarity index 90% rename from node/forest_libp2p/src/blocksync/provider.rs rename to node/forest_libp2p/src/chain_exchange/provider.rs index 86e02a19dd01..ecfd426ad205 100644 --- a/node/forest_libp2p/src/blocksync/provider.rs +++ b/node/forest_libp2p/src/chain_exchange/provider.rs @@ -9,14 +9,15 @@ use log::debug; use std::collections::HashMap; use super::{ - BlockSyncRequest, BlockSyncResponse, BlockSyncResponseStatus, CompactedMessages, TipsetBundle, + ChainExchangeRequest, ChainExchangeResponse, ChainExchangeResponseStatus, CompactedMessages, + TipsetBundle, }; -/// Builds blocksync response out of chain data. -pub async fn make_blocksync_response( +/// Builds chain exchange response out of chain data. +pub async fn make_chain_exchange_response( cs: &ChainStore, - request: &BlockSyncRequest, -) -> BlockSyncResponse + request: &ChainExchangeRequest, +) -> ChainExchangeResponse where DB: BlockStore + Send + Sync + 'static, { @@ -34,9 +35,9 @@ where Err(err) => { debug!("Cannot get tipset from keys: {}", err); - return BlockSyncResponse { + return ChainExchangeResponse { chain: vec![], - status: BlockSyncResponseStatus::InternalError, + status: ChainExchangeResponseStatus::InternalError, message: "Tipset was not found in the database".to_owned(), }; } @@ -48,9 +49,9 @@ where Err(err) => { debug!("Cannot compact messages for tipset: {}", err); - return BlockSyncResponse { + return ChainExchangeResponse { chain: vec![], - status: BlockSyncResponseStatus::InternalError, + status: ChainExchangeResponseStatus::InternalError, message: "Can not fullfil the request".to_owned(), }; } @@ -75,12 +76,12 @@ where let result_chain_length = response_chain.len() as u64; - BlockSyncResponse { + ChainExchangeResponse { chain: response_chain, status: if result_chain_length < request.request_len { - BlockSyncResponseStatus::PartialResponse + ChainExchangeResponseStatus::PartialResponse } else { - BlockSyncResponseStatus::Success + ChainExchangeResponseStatus::Success }, message: "Success".to_owned(), } @@ -169,9 +170,9 @@ mod tests { fn compact_messages_test() { let (cids, db) = populate_db(); - let response = task::block_on(make_blocksync_response( + let response = task::block_on(make_chain_exchange_response( &ChainStore::new(Arc::new(db)), - &BlockSyncRequest { + &ChainExchangeRequest { start: cids, request_len: 2, options: BLOCKS_MESSAGES, diff --git a/node/forest_libp2p/src/hello/message.rs b/node/forest_libp2p/src/hello/message.rs index d03a014d4221..9190a1691bc7 100644 --- a/node/forest_libp2p/src/hello/message.rs +++ b/node/forest_libp2p/src/hello/message.rs @@ -7,7 +7,7 @@ use forest_encoding::tuple::*; use num_bigint::BigInt; /// Hello message https://filecoin-project.github.io/specs/#hello-spec -#[derive(Clone, Debug, PartialEq, Default, Serialize_tuple, Deserialize_tuple)] +#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)] pub struct HelloRequest { pub heaviest_tip_set: Vec, pub heaviest_tipset_height: ChainEpoch, @@ -35,7 +35,9 @@ mod tests { fn hello_default_ser() { let orig_msg = HelloRequest { genesis_hash: Cid::new_from_cbor(&[], Identity), - ..Default::default() + heaviest_tipset_weight: Default::default(), + heaviest_tipset_height: Default::default(), + heaviest_tip_set: Default::default(), }; let bz = to_vec(&orig_msg).unwrap(); let msg: HelloRequest = from_slice(&bz).unwrap(); diff --git a/node/forest_libp2p/src/hello/mod.rs b/node/forest_libp2p/src/hello/mod.rs index 8d86196c2bb4..b0fb2fd3f78c 100644 --- a/node/forest_libp2p/src/hello/mod.rs +++ b/node/forest_libp2p/src/hello/mod.rs @@ -4,16 +4,12 @@ mod message; pub use self::message::*; -use async_trait::async_trait; -use forest_encoding::{from_slice, to_vec}; -use futures::prelude::*; +use super::rpc::CborRequestResponse; use libp2p::core::ProtocolName; -use libp2p_request_response::RequestResponseCodec; -use std::io; pub const HELLO_PROTOCOL_ID: &[u8] = b"/fil/hello/1.0.0"; -/// Type to satisfy `ProtocolName` interface for BlockSync RPC. +/// Type to satisfy `ProtocolName` interface for Hello RPC. #[derive(Clone, Debug, PartialEq, Default)] pub struct HelloProtocolName; @@ -24,64 +20,4 @@ impl ProtocolName for HelloProtocolName { } /// Hello protocol codec to be used within the RPC service. -#[derive(Debug, Clone, Default)] -pub struct HelloCodec; - -#[async_trait] -impl RequestResponseCodec for HelloCodec { - type Protocol = HelloProtocolName; - type Request = HelloRequest; - type Response = HelloResponse; - - async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut buf = Vec::new(); - io.read_to_end(&mut buf).await?; - Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) - } - - async fn read_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut buf = Vec::new(); - io.read_to_end(&mut buf).await?; - Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) - } - - async fn write_request( - &mut self, - _: &Self::Protocol, - io: &mut T, - req: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - io.write_all( - &to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, - ) - .await - } - - async fn write_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - res: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - io.write_all( - &to_vec(&res).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, - ) - .await - } -} +pub type HelloCodec = CborRequestResponse; diff --git a/node/forest_libp2p/src/lib.rs b/node/forest_libp2p/src/lib.rs index 89d079c87916..93f17cf913c3 100644 --- a/node/forest_libp2p/src/lib.rs +++ b/node/forest_libp2p/src/lib.rs @@ -7,13 +7,13 @@ extern crate lazy_static; mod behaviour; -pub mod blocksync; +pub mod chain_exchange; mod config; pub mod hello; pub mod rpc; mod service; pub use self::behaviour::*; -pub use self::blocksync::{BlockSyncRequest, MESSAGES}; +pub use self::chain_exchange::{ChainExchangeRequest, MESSAGES}; pub use self::config::*; pub use self::service::*; diff --git a/node/forest_libp2p/src/rpc/mod.rs b/node/forest_libp2p/src/rpc/mod.rs index d3dd2859247b..0d00fa6fc251 100644 --- a/node/forest_libp2p/src/rpc/mod.rs +++ b/node/forest_libp2p/src/rpc/mod.rs @@ -1,20 +1,109 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::blocksync::{BlockSyncRequest, BlockSyncResponse}; +use crate::chain_exchange::{ChainExchangeRequest, ChainExchangeResponse}; use crate::hello::{HelloRequest, HelloResponse}; +use async_trait::async_trait; +use forest_encoding::{from_slice, to_vec}; +use futures::prelude::*; +use libp2p::core::ProtocolName; +use libp2p_request_response::RequestResponseCodec; pub use libp2p_request_response::{RequestId, ResponseChannel}; +use serde::{de::DeserializeOwned, Serialize}; +use std::io; +use std::marker::PhantomData; /// RPCResponse payloads for request/response calls #[derive(Debug, Clone, PartialEq)] pub enum RPCResponse { - BlockSync(BlockSyncResponse), + ChainExchange(ChainExchangeResponse), Hello(HelloResponse), } /// RPCRequest payloads for request/response calls #[derive(Debug, Clone, PartialEq)] pub enum RPCRequest { - BlockSync(BlockSyncRequest), + ChainExchange(ChainExchangeRequest), Hello(HelloRequest), } + +#[derive(Clone)] +pub struct CborRequestResponse { + protocol: PhantomData

, + request: PhantomData, + response: PhantomData, +} + +impl Default for CborRequestResponse { + fn default() -> Self { + Self { + protocol: PhantomData::

::default(), + request: PhantomData::::default(), + response: PhantomData::::default(), + } + } +} + +#[async_trait] +impl RequestResponseCodec for CborRequestResponse +where + P: ProtocolName + Clone + Send + Sync, + RQ: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, + RS: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, +{ + type Protocol = P; + type Request = RQ; + type Response = RS; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + io.write_all( + &to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + ) + .await + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + io.write_all( + &to_vec(&res).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + ) + .await + } +} diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 1564bde8d9d1..3f376df79948 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -1,7 +1,9 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use super::blocksync::{make_blocksync_response, BlockSyncRequest, BlockSyncResponse}; +use super::chain_exchange::{ + make_chain_exchange_response, ChainExchangeRequest, ChainExchangeResponse, +}; use super::rpc::RPCRequest; use super::{ForestBehaviour, ForestBehaviourEvent, Libp2pConfig}; use crate::hello::{HelloRequest, HelloResponse}; @@ -57,13 +59,13 @@ pub enum NetworkEvent { request_id: RequestId, response: HelloResponse, }, - BlockSyncRequest { - request: BlockSyncRequest, - channel: ResponseChannel, + ChainExchangeRequest { + request: ChainExchangeRequest, + channel: ResponseChannel, }, - BlockSyncResponse { + ChainExchangeResponse { request_id: RequestId, - response: BlockSyncResponse, + response: ChainExchangeResponse, }, PeerDialed { peer_id: PeerId, @@ -89,10 +91,10 @@ pub enum NetworkMessage { topic: Topic, message: Vec, }, - BlockSyncRequest { + ChainExchangeRequest { peer_id: PeerId, - request: BlockSyncRequest, - response_channel: OneShotSender, + request: ChainExchangeRequest, + response_channel: OneShotSender, }, HelloRequest { peer_id: PeerId, @@ -107,8 +109,8 @@ pub enum NetworkMessage { pub struct Libp2pService { pub swarm: Swarm, cs: Arc>, - /// Keeps track of Blocksync requests to responses - bs_request_table: HashMap>, + /// Keeps track of Chain exchange requests to responses + cx_request_table: HashMap>, network_receiver_in: Receiver, network_sender_in: Sender, network_receiver_out: Receiver, @@ -156,7 +158,7 @@ where Libp2pService { swarm, cs, - bs_request_table: HashMap::new(), + cx_request_table: HashMap::new(), network_receiver_in, network_sender_in, network_receiver_out, @@ -239,19 +241,19 @@ where response, }).await; } - ForestBehaviourEvent::BlockSyncRequest { channel, peer, request } => { - debug!("Received blocksync request (peerId: {:?})", peer); + ForestBehaviourEvent::ChainExchangeRequest { channel, peer, request } => { + debug!("Received chain_exchange request (peerId: {:?})", peer); let db = self.cs.clone(); async { let response = task::spawn(async move { - make_blocksync_response(db.as_ref(), &request).await + make_chain_exchange_response(db.as_ref(), &request).await }).await; let _ = channel.send(response).await; }.await; } - ForestBehaviourEvent::BlockSyncResponse { request_id, response, .. } => { - debug!("Received blocksync response (id: {:?})", request_id); - let tx = self.bs_request_table.remove(&request_id); + ForestBehaviourEvent::ChainExchangeResponse { request_id, response, .. } => { + debug!("Received chain_exchange response (id: {:?})", request_id); + let tx = self.cx_request_table.remove(&request_id); if let Some(tx) = tx { if tx.send(response).is_err() { @@ -312,10 +314,10 @@ where NetworkMessage::HelloRequest { peer_id, request } => { let _ = swarm_stream.get_mut().send_rpc_request(&peer_id, RPCRequest::Hello(request)); } - NetworkMessage::BlockSyncRequest { peer_id, request, response_channel } => { - let id = swarm_stream.get_mut().send_rpc_request(&peer_id, RPCRequest::BlockSync(request)); + NetworkMessage::ChainExchangeRequest { peer_id, request, response_channel } => { + let id = swarm_stream.get_mut().send_rpc_request(&peer_id, RPCRequest::ChainExchange(request)); debug!("Sent BS Request with id: {:?}", id); - self.bs_request_table.insert(id, response_channel); + self.cx_request_table.insert(id, response_channel); } NetworkMessage::BitswapRequest { cid, response_channel } => { if let Err(e) = swarm_stream.get_mut().want_block(cid, 1000) { @@ -364,8 +366,8 @@ pub fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Er .into_authentic(&local_key) .expect("Noise key generation failed"); let mut yamux_config = yamux::Config::default(); - yamux_config.set_max_buffer_size(1 << 20); - yamux_config.set_receive_window(1 << 20); + yamux_config.set_max_buffer_size(16 * 1024 * 1024); + yamux_config.set_receive_window(16 * 1024 * 1024); transport .upgrade(core::upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(dh_keys).into_authenticated()) diff --git a/node/forest_libp2p/tests/decode_test.rs b/node/forest_libp2p/tests/decode_test.rs index 8a5563f037a4..35cd3d112373 100644 --- a/node/forest_libp2p/tests/decode_test.rs +++ b/node/forest_libp2p/tests/decode_test.rs @@ -4,8 +4,8 @@ use crypto::{Signature, Signer}; use forest_address::Address; use forest_blocks::{Block, BlockHeader, FullTipset}; -use forest_libp2p::blocksync::{ - BlockSyncResponse, BlockSyncResponseStatus, CompactedMessages, TipsetBundle, +use forest_libp2p::chain_exchange::{ + ChainExchangeResponse, ChainExchangeResponseStatus, CompactedMessages, TipsetBundle, }; use forest_message::{SignedMessage, UnsignedMessage}; use num_bigint::BigInt; @@ -42,9 +42,9 @@ fn convert_single_tipset_bundle() { }), }; - let res = BlockSyncResponse { + let res = ChainExchangeResponse { chain: vec![bundle], - status: BlockSyncResponseStatus::Success, + status: ChainExchangeResponseStatus::Success, message: "".into(), } .into_result::() diff --git a/utils/test_utils/src/chain_structures.rs b/utils/test_utils/src/chain_structures.rs index 065449b1a5e4..8e2c857f30f6 100644 --- a/utils/test_utils/src/chain_structures.rs +++ b/utils/test_utils/src/chain_structures.rs @@ -11,8 +11,8 @@ use chain::TipsetMetadata; use cid::{Cid, Code::Blake2b256}; use crypto::{Signature, Signer, VRFProof}; use encoding::{from_slice, to_vec}; -use forest_libp2p::blocksync::{ - BlockSyncResponse, BlockSyncResponseStatus, CompactedMessages, TipsetBundle, +use forest_libp2p::chain_exchange::{ + ChainExchangeResponse, ChainExchangeResponseStatus, CompactedMessages, TipsetBundle, }; use message::{SignedMessage, UnsignedMessage}; use num_bigint::BigInt; @@ -207,15 +207,15 @@ pub fn construct_dummy_header() -> BlockHeader { } /// Returns a RPCResponse used for testing -pub fn construct_blocksync_response() -> BlockSyncResponse { +pub fn construct_chain_exchange_response() -> ChainExchangeResponse { // construct block sync response - BlockSyncResponse { + ChainExchangeResponse { chain: vec![ construct_tipset_bundle(3, 10), construct_tipset_bundle(2, 10), construct_tipset_bundle(1, 10), ], - status: BlockSyncResponseStatus::Success, + status: ChainExchangeResponseStatus::Success, message: "message".to_owned(), } }