From 3411459d4896e75222c59ada61ed1a37e8ef670b Mon Sep 17 00:00:00 2001 From: Austin Abell Date: Mon, 14 Sep 2020 15:14:48 -0400 Subject: [PATCH] ChainSync refactor (#693) * Switch chain store to threadsafe * Update genesis to arc reference * wip refactoring chain sync to workers in async tasks * Update network event handling and remove NetworkHandler * Update tipset scheduling logic * Update peer retrieval to take a random sample of available peers * Cleanup and enabling all existing tests * fix worker task spawn * Add TODO for emit event ignoring and change to error log * oops * Update comment * Fix typo --- Cargo.lock | 4 + blockchain/chain/Cargo.toml | 1 + blockchain/chain/src/store/chain_store.rs | 57 +- blockchain/chain/src/store/tip_index.rs | 45 +- blockchain/chain_sync/Cargo.toml | 2 + blockchain/chain_sync/src/bucket.rs | 23 +- blockchain/chain_sync/src/lib.rs | 4 +- blockchain/chain_sync/src/network_context.rs | 34 +- blockchain/chain_sync/src/network_handler.rs | 53 - blockchain/chain_sync/src/peer_manager.rs | 14 +- blockchain/chain_sync/src/sync.rs | 1099 ++++-------------- blockchain/chain_sync/src/sync/peer_test.rs | 27 +- blockchain/chain_sync/src/sync_state.rs | 7 + blockchain/chain_sync/src/sync_worker.rs | 866 ++++++++++++++ blockchain/message_pool/Cargo.toml | 1 + blockchain/message_pool/src/msgpool.rs | 37 +- forest/src/cli/genesis.rs | 2 +- forest/src/daemon.rs | 12 +- ipld/graphsync/src/message/mod.rs | 2 +- node/forest_libp2p/src/behaviour.rs | 1 + node/forest_libp2p/src/service.rs | 23 +- node/rpc/src/lib.rs | 2 +- node/rpc/src/sync_api.rs | 47 +- utils/test_utils/src/chain_structures.rs | 23 +- vm/message/src/unsigned_message.rs | 2 +- 25 files changed, 1319 insertions(+), 1069 deletions(-) delete mode 100644 blockchain/chain_sync/src/network_handler.rs create mode 100644 blockchain/chain_sync/src/sync_worker.rs diff --git a/Cargo.lock b/Cargo.lock index 75671a20af8f..35ba63d356ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1230,6 +1230,7 @@ name = "chain" version = "0.1.0" dependencies = [ "actor", + "async-std", "beacon", "blake2b_simd", "byteorder 1.3.4", @@ -1279,12 +1280,14 @@ dependencies = [ "forest_message", "forest_vm", "futures 0.3.5", + "futures-util", "ipld_amt", "ipld_blockstore", "libp2p", "log", "lru", "num-traits 0.2.12", + "rand 0.7.3", "serde", "state_manager", "state_tree", @@ -4318,6 +4321,7 @@ name = "message_pool" version = "0.1.0" dependencies = [ "async-std", + "async-trait", "blake2b_simd", "chain", "db", diff --git a/blockchain/chain/Cargo.toml b/blockchain/chain/Cargo.toml index 63a43e47f622..e7314ff82d64 100644 --- a/blockchain/chain/Cargo.toml +++ b/blockchain/chain/Cargo.toml @@ -27,6 +27,7 @@ beacon = { path = "../beacon" } flo_stream = "0.4.0" address = { package = "forest_address", path = "../../vm/address" } lazy_static = "1.4" +async-std = "1.6.3" [dev-dependencies] multihash = "0.10.0" diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index f663d3ecf1bb..23467d83bc96 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -4,6 +4,7 @@ use super::{Error, TipIndex, TipsetMetadata}; use actor::{power::State as PowerState, STORAGE_POWER_ACTOR_ADDR}; use address::Address; +use async_std::sync::RwLock; use beacon::{BeaconEntry, IGNORE_DRAND_VAR}; use blake2b_simd::Params; use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta}; @@ -46,15 +47,17 @@ pub enum HeadChange { Revert(Arc), } -/// Generic implementation of the datastore trait and structures +/// Stores chain data such as heaviest tipset and cached tipset info at each epoch. +/// This structure is threadsafe, and all caches are wrapped in a mutex to allow a consistent +/// `ChainStore` to be shared across tasks. pub struct ChainStore { - publisher: Publisher, + publisher: RwLock>, // key-value datastore pub db: Arc, // Tipset at the head of the best-known chain. - heaviest: Option>, + heaviest: RwLock>>, // tip_index tracks tipsets by epoch/parentset for use by expected consensus. tip_index: TipIndex, @@ -71,44 +74,50 @@ where .map(Arc::new); Self { db, - publisher: Publisher::new(SINK_CAP), + publisher: Publisher::new(SINK_CAP).into(), tip_index: TipIndex::new(), - heaviest, + heaviest: heaviest.into(), } } /// Sets heaviest tipset within ChainStore and store its tipset cids under HEAD_KEY - pub async fn set_heaviest_tipset(&mut self, ts: Arc) -> Result<(), Error> { + pub async fn set_heaviest_tipset(&self, ts: Arc) -> Result<(), Error> { self.db.write(HEAD_KEY, ts.key().marshal_cbor()?)?; - self.heaviest = Some(ts.clone()); - self.publisher.publish(HeadChange::Current(ts)).await; + *self.heaviest.write().await = Some(ts.clone()); + self.publisher + .write() + .await + .publish(HeadChange::Current(ts)) + .await; Ok(()) } // subscribing returns a future sink that we can essentially iterate over using future streams - pub fn subscribe(&mut self) -> Subscriber { - self.publisher.subscribe() + pub async fn subscribe(&self) -> Subscriber { + self.publisher.write().await.subscribe() } /// Sets tip_index tracker - pub fn set_tipset_tracker(&mut self, header: &BlockHeader) -> Result<(), Error> { - let ts: Tipset = Tipset::new(vec![header.clone()])?; + // TODO this is really broken, should not be setting the tipset metadata to a tipset with just + // the one header. + pub async fn set_tipset_tracker(&self, header: &BlockHeader) -> Result<(), Error> { + let ts = Arc::new(Tipset::new(vec![header.clone()])?); let meta = TipsetMetadata { tipset_state_root: header.state_root().clone(), tipset_receipts_root: header.message_receipts().clone(), tipset: ts, }; - self.tip_index.put(&meta); + self.tip_index.put(&meta).await; Ok(()) } /// Writes genesis to blockstore - pub fn set_genesis(&self, header: BlockHeader) -> Result { + pub fn set_genesis(&self, header: &BlockHeader) -> Result { set_genesis(self.blockstore(), header) } /// Writes tipset block headers to data store and updates heaviest tipset - pub async fn put_tipset(&mut self, ts: &Tipset) -> Result<(), Error> { + pub async fn put_tipset(&self, ts: &Tipset) -> Result<(), Error> { persist_objects(self.blockstore(), ts.blocks())?; // TODO determine if expanded tipset is required; see https://github.com/filecoin-project/lotus/blob/testnet/3/chain/store/store.go#L236 self.update_heaviest(ts).await?; @@ -121,7 +130,7 @@ where } /// Loads heaviest tipset from datastore and sets as heaviest in chainstore - pub async fn load_heaviest_tipset(&mut self) -> Result<(), Error> { + pub async fn load_heaviest_tipset(&self) -> Result<(), Error> { let heaviest_ts = get_heaviest_tipset(self.blockstore())?.ok_or_else(|| { warn!("No previous chain state found"); Error::Other("No chain state found".to_owned()) @@ -129,8 +138,10 @@ where // set as heaviest tipset let heaviest_ts = Arc::new(heaviest_ts); - self.heaviest = Some(heaviest_ts.clone()); + *self.heaviest.write().await = Some(heaviest_ts.clone()); self.publisher + .write() + .await .publish(HeadChange::Current(heaviest_ts)) .await; Ok(()) @@ -142,8 +153,8 @@ where } /// Returns heaviest tipset from blockstore - pub fn heaviest_tipset(&self) -> Option> { - self.heaviest.clone() + pub async fn heaviest_tipset(&self) -> Option> { + self.heaviest.read().await.clone() } /// Returns key-value store instance @@ -183,8 +194,8 @@ where } /// Determines if provided tipset is heavier than existing known heaviest tipset - async fn update_heaviest(&mut self, ts: &Tipset) -> Result<(), Error> { - match &self.heaviest { + async fn update_heaviest(&self, ts: &Tipset) -> Result<(), Error> { + match self.heaviest.read().await.as_ref() { Some(heaviest) => { let new_weight = weight(self.blockstore(), ts)?; let curr_weight = weight(self.blockstore(), &heaviest)?; @@ -266,7 +277,7 @@ where } } -fn set_genesis(db: &DB, header: BlockHeader) -> Result +fn set_genesis(db: &DB, header: &BlockHeader) -> Result where DB: BlockStore, { @@ -657,7 +668,7 @@ mod tests { .unwrap(); assert_eq!(cs.genesis().unwrap(), None); - cs.set_genesis(gen_block.clone()).unwrap(); + cs.set_genesis(&gen_block).unwrap(); assert_eq!(cs.genesis().unwrap(), Some(gen_block)); } } diff --git a/blockchain/chain/src/store/tip_index.rs b/blockchain/chain/src/store/tip_index.rs index 3631c011c8eb..1b46697d8dc0 100644 --- a/blockchain/chain/src/store/tip_index.rs +++ b/blockchain/chain/src/store/tip_index.rs @@ -2,11 +2,14 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::errors::Error; +use async_std::sync::RwLock; use blocks::{Tipset, TipsetKeys}; use cid::Cid; use clock::ChainEpoch; use std::collections::hash_map::{DefaultHasher, HashMap}; use std::hash::{Hash, Hasher}; +use std::sync::Arc; + /// TipsetMetadata is the type stored as the value in the TipIndex hashmap. It contains /// a tipset pointing to blocks, the root cid of the chain's state after /// applying the messages in this tipset to it's parent state, and the cid of the receipts @@ -20,7 +23,8 @@ pub struct TipsetMetadata { pub tipset_receipts_root: Cid, /// The actual Tipset - pub tipset: Tipset, + // TODO This should not be keeping a tipset with the metadata + pub tipset: Arc, } /// Trait to allow metadata to be indexed by multiple types of structs @@ -39,47 +43,62 @@ impl Index for TipsetKeys {} pub struct TipIndex { // metadata allows lookup of recorded Tipsets and their state roots // by TipsetKey and Epoch - metadata: HashMap, + // TODO this should be mapping epoch to a vector of Cids of block headers + metadata: RwLock>, } impl TipIndex { /// Creates new TipIndex with empty metadata pub fn new() -> Self { Self { - metadata: HashMap::new(), + metadata: Default::default(), } } /// Adds an entry to TipIndex's metadata /// After this call the input TipsetMetadata can be looked up by the TipsetKey of /// the tipset, or the tipset's epoch - pub fn put(&mut self, meta: &TipsetMetadata) { + pub async fn put(&self, meta: &TipsetMetadata) { // retrieve parent cids to be used as hash map key let parent_key = meta.tipset.parents(); // retrieve epoch to be used as hash map key let epoch_key = meta.tipset.epoch(); // insert value by parent_key into hash map - self.metadata.insert(parent_key.hash_key(), meta.clone()); + self.metadata + .write() + .await + .insert(parent_key.hash_key(), meta.clone()); // insert value by epoch_key into hash map - self.metadata.insert(epoch_key.hash_key(), meta.clone()); + self.metadata + .write() + .await + .insert(epoch_key.hash_key(), meta.clone()); } /// Returns the tipset given by hashed key - fn get(&self, key: u64) -> Result { + async fn get(&self, key: u64) -> Result { self.metadata + .read() + .await .get(&key) .cloned() .ok_or_else(|| Error::UndefinedKey("invalid metadata key".to_string())) } /// Returns the tipset corresponding to the hashed index - pub fn get_tipset(&self, idx: &I) -> Result { - Ok(self.get(idx.hash_key()).map(|r| r.tipset)?) + pub async fn get_tipset(&self, idx: &I) -> Result, Error> { + Ok(self.get(idx.hash_key()).await.map(|r| r.tipset)?) } /// Returns the state root for the tipset corresponding to the index - pub fn get_tipset_state_root(&self, idx: &I) -> Result { - Ok(self.get(idx.hash_key()).map(|r| r.tipset_state_root)?) + pub async fn get_tipset_state_root(&self, idx: &I) -> Result { + Ok(self + .get(idx.hash_key()) + .await + .map(|r| r.tipset_state_root)?) } /// Returns the receipt root for the tipset corresponding to the index - pub fn get_tipset_receipts_root(&self, idx: &I) -> Result { - Ok(self.get(idx.hash_key()).map(|r| r.tipset_receipts_root)?) + pub async fn get_tipset_receipts_root(&self, idx: &I) -> Result { + Ok(self + .get(idx.hash_key()) + .await + .map(|r| r.tipset_receipts_root)?) } } diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml index 40831ac21d5f..2d1287558ed9 100644 --- a/blockchain/chain_sync/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -25,6 +25,7 @@ log = "0.4.8" async-std = { version = "1.6.0", features = ["unstable"] } forest_libp2p = { path = "../../node/forest_libp2p" } futures = "0.3.5" +futures-util = "0.3.5" lru = "0.6" thiserror = "1.0" num-traits = "0.2" @@ -34,6 +35,7 @@ commcid = { path = "../../utils/commcid" } clock = { path = "../../node/clock" } serde = { version = "1.0", features = ["derive", "rc"] } flo_stream = "0.4.0" +rand = "0.7.3" [dev-dependencies] test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] } diff --git a/blockchain/chain_sync/src/bucket.rs b/blockchain/chain_sync/src/bucket.rs index d8d014893e87..3f2487a8b5b3 100644 --- a/blockchain/chain_sync/src/bucket.rs +++ b/blockchain/chain_sync/src/bucket.rs @@ -26,7 +26,6 @@ impl SyncBucket { } /// Returns true if tipset is from same chain pub fn is_same_chain_as(&self, ts: &Tipset) -> bool { - // TODO Confirm that comparing keys will be sufficient on full tipset impl self.tips .iter() .any(|t| ts.key() == t.key() || ts.key() == t.parents() || ts.parents() == t.key()) @@ -37,10 +36,6 @@ impl SyncBucket { self.tips.push(ts); } } - /// Returns true if SyncBucket is empty - pub fn is_empty(&self) -> bool { - self.tips.is_empty() - } } /// Set of tipset buckets @@ -75,13 +70,17 @@ impl SyncBucketSet { Some(self.buckets.swap_remove(i)) } - /// Returns heaviest tipset from bucket set - pub(crate) fn heaviest(&self) -> Option> { - self.buckets - .iter() - .filter_map(SyncBucket::heaviest_tipset) - .max_by(|ts1, ts2| ts1.weight().cmp(ts2.weight())) + + /// Returns true if tipset is related to any tipset in the bucket set. + pub(crate) fn related_to_any(&self, ts: &Tipset) -> bool { + for b in self.buckets.iter() { + if b.is_same_chain_as(ts) { + return true; + } + } + false } + /// Returns a vector of SyncBuckets pub(crate) fn buckets(&self) -> &[SyncBucket] { &self.buckets @@ -147,7 +146,7 @@ mod tests { assert_eq!( set.buckets.len(), 2, - "Inserting seperate tipset should create new bucket" + "Inserting separate tipset should create new bucket" ); assert_eq!(set.buckets[1].tips.len(), 1); diff --git a/blockchain/chain_sync/src/lib.rs b/blockchain/chain_sync/src/lib.rs index c74b2b919d64..33d676fd0655 100644 --- a/blockchain/chain_sync/src/lib.rs +++ b/blockchain/chain_sync/src/lib.rs @@ -1,14 +1,16 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +#![recursion_limit = "1024"] + mod bad_block_cache; mod bucket; mod errors; mod network_context; -mod network_handler; mod peer_manager; mod sync; mod sync_state; +mod sync_worker; // workaround for a compiler bug, see https://github.com/rust-lang/rust/issues/55779 extern crate serde; diff --git a/blockchain/chain_sync/src/network_context.rs b/blockchain/chain_sync/src/network_context.rs index cc536f5f401d..1a676fee30b6 100644 --- a/blockchain/chain_sync/src/network_context.rs +++ b/blockchain/chain_sync/src/network_context.rs @@ -4,11 +4,10 @@ use async_std::future; use async_std::sync::Sender; use blocks::{FullTipset, Tipset, TipsetKeys}; -use flo_stream::Subscriber; use forest_libp2p::{ blocksync::{BlockSyncRequest, BlockSyncResponse, BLOCKS, MESSAGES}, hello::HelloRequest, - NetworkEvent, NetworkMessage, + NetworkMessage, }; use futures::channel::oneshot::channel as oneshot_channel; use libp2p::core::PeerId; @@ -19,25 +18,20 @@ use std::time::Duration; const RPC_TIMEOUT: u64 = 20; /// Context used in chain sync to handle network requests +#[derive(Clone)] pub struct SyncNetworkContext { /// Channel to send network messages through p2p service network_send: Sender, - - /// Receiver channel for network events - pub receiver: Subscriber, } impl SyncNetworkContext { - pub fn new(network_send: Sender, receiver: Subscriber) -> Self { - Self { - network_send, - receiver, - } + pub fn new(network_send: Sender) -> Self { + Self { network_send } } /// Send a blocksync request for only block headers (ignore messages) pub async fn blocksync_headers( - &mut self, + &self, peer_id: PeerId, tsk: &TipsetKeys, count: u64, @@ -58,7 +52,7 @@ impl SyncNetworkContext { } /// Send a blocksync request for a single full tipset (includes messages) pub async fn blocksync_fts( - &mut self, + &self, peer_id: PeerId, tsk: &TipsetKeys, ) -> Result { @@ -73,15 +67,19 @@ impl SyncNetworkContext { ) .await?; - let fts = bs_res.into_result()?; - fts.get(0) - .cloned() - .ok_or(format!("No full tipset found for cid: {:?}", tsk)) + let mut fts = bs_res.into_result()?; + if fts.len() != 1 { + return Err(format!( + "Full tipset request returned {} tipsets", + fts.len() + )); + } + Ok(fts.remove(0)) } /// Send a blocksync request to the network and await response pub async fn blocksync_request( - &mut self, + &self, peer_id: PeerId, request: BlockSyncRequest, ) -> Result { @@ -104,7 +102,7 @@ impl SyncNetworkContext { } /// Send a hello request to the network (does not await response) - pub async fn hello_request(&mut self, peer_id: PeerId, request: HelloRequest) { + pub async fn hello_request(&self, peer_id: PeerId, request: HelloRequest) { trace!("Sending Hello Message {:?}", request); // TODO update to await response when we want to handle the latency self.network_send diff --git a/blockchain/chain_sync/src/network_handler.rs b/blockchain/chain_sync/src/network_handler.rs deleted file mode 100644 index 52e4b2d6d001..000000000000 --- a/blockchain/chain_sync/src/network_handler.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use super::peer_manager::PeerManager; -use async_std::prelude::*; -use async_std::sync::Receiver; -use async_std::task; -use flo_stream::{MessagePublisher, Publisher}; -use forest_libp2p::hello::HelloResponse; -use forest_libp2p::NetworkEvent; -use log::trace; -use std::sync::Arc; - -/// Handles network events from channel and splits based on request -pub(crate) struct NetworkHandler { - event_send: Publisher, - receiver: Receiver, -} - -impl NetworkHandler { - pub(crate) fn new( - receiver: Receiver, - event_send: Publisher, - ) -> Self { - Self { - receiver, - event_send, - } - } - - pub(crate) fn spawn(&self, peer_manager: Arc) { - let mut receiver = self.receiver.clone(); - let mut event_send = self.event_send.republish(); - task::spawn(async move { - while let Some(event) = receiver.next().await { - // Update peer on this thread before sending hello - if let NetworkEvent::HelloRequest { channel, .. } = &event { - // TODO should probably add peer with their tipset/ not handled seperately - channel - .clone() - .send(HelloResponse { - arrival: 100, - sent: 101, - }) - .await; - peer_manager.add_peer(channel.peer.clone(), None).await; - } - event_send.publish(event).await - } - }); - trace!("Spawned network handler"); - } -} diff --git a/blockchain/chain_sync/src/peer_manager.rs b/blockchain/chain_sync/src/peer_manager.rs index 72a0354c165b..59136eefa5b7 100644 --- a/blockchain/chain_sync/src/peer_manager.rs +++ b/blockchain/chain_sync/src/peer_manager.rs @@ -5,10 +5,11 @@ use async_std::sync::RwLock; use blocks::Tipset; use libp2p::core::PeerId; use log::debug; +use rand::seq::SliceRandom; use std::collections::HashMap; use std::sync::Arc; -/// Thread safe peer manager +/// Thread safe peer manager which handles peer management for the `BlockSync` protocol. #[derive(Default)] pub struct PeerManager { // TODO potentially separate or expand to handle blocksync peers/ peers that haven't sent hello @@ -30,13 +31,10 @@ impl PeerManager { /// Retrieves a cloned PeerId to be used to send network request pub async fn get_peer(&self) -> Option { - // TODO replace this with a shuffled or more random sample - self.full_peers - .read() - .await - .iter() - .next() - .map(|(k, _)| k.clone()) + // TODO this should prioritize peers with greater success rate and take a random sample + // of the top `x` peers + let peer_vec: Vec = self.full_peers.read().await.keys().cloned().collect(); + peer_vec.choose(&mut rand::thread_rng()).cloned() } /// Retrieves all tipsets from current peer set diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index c655f801f6b8..3499c68c1839 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -6,54 +6,52 @@ mod peer_test; use super::bad_block_cache::BadBlockCache; use super::bucket::{SyncBucket, SyncBucketSet}; -use super::network_handler::NetworkHandler; use super::peer_manager::PeerManager; -use super::sync_state::{SyncStage, SyncState}; +use super::sync_state::SyncState; +use super::sync_worker::SyncWorker; use super::{Error, SyncNetworkContext}; -use address::{Address, Protocol}; use amt::Amt; -use async_std::sync::{Receiver, RwLock, Sender}; -use async_std::task; -use beacon::{Beacon, BeaconEntry, IGNORE_DRAND_VAR}; -use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta}; -use chain::{persist_objects, ChainStore}; +use async_std::sync::{channel, Receiver, RwLock, Sender}; +use async_std::task::{self, JoinHandle}; +use beacon::Beacon; +use blocks::{Block, FullTipset, Tipset, TipsetKeys, TxMeta}; +use chain::ChainStore; use cid::{multihash::Blake2b256, Cid}; -use commcid::cid_to_replica_commitment_v1; -use core::time::Duration; -use crypto::verify_bls_aggregate; -use crypto::DomainSeparationTag; use encoding::{Cbor, Error as EncodingError}; -use fil_types::SectorInfo; -use filecoin_proofs_api::{post::verify_winning_post, ProverId, PublicReplicaInfo, SectorId}; -use flo_stream::{MessagePublisher, Publisher}; -use forest_libp2p::{ - hello::HelloRequest, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES, -}; -use futures::{ - executor::block_on, - stream::{FuturesUnordered, StreamExt}, -}; +use forest_libp2p::{hello::HelloRequest, NetworkEvent, NetworkMessage}; +use futures::select; +use futures::stream::StreamExt; use ipld_blockstore::BlockStore; use libp2p::core::PeerId; -use log::error; -use log::{debug, info, warn}; -use message::{Message, SignedMessage, UnsignedMessage}; -use num_traits::Zero; -use state_manager::{utils, StateManager}; -use state_tree::StateTree; -use std::cmp::min; -use std::collections::{BTreeMap, HashMap}; -use std::convert::{TryFrom, TryInto}; +use log::{debug, warn}; +use message::{SignedMessage, UnsignedMessage}; +use state_manager::StateManager; use std::sync::Arc; -use vm::TokenAmount; + +// TODO revisit this type, necessary for two sets of Arc> because each state is +// on separate thread and needs to be mutated independently, but the vec needs to be read +// on the RPC API thread and mutated on this thread. +type WorkerState = Arc>>>>; + +#[derive(Debug, PartialEq)] +enum ChainSyncState { + /// Bootstrapping peers before starting sync. + Bootstrap, + /// Syncing chain with BlockSync protocol. + Initial, + /// Following chain with blocks received over gossipsub. + _Follow, +} /// Struct that handles the ChainSync logic. This handles incoming network events such as /// gossipsub messages, Hello protocol requests, as well as sending and receiving BlockSync /// messages to be able to do the initial sync. pub struct ChainSyncer { - /// Syncing state of chain sync - // TODO should be a vector once syncing done async and ideally not wrap each state in mutex. - state: Arc>, + /// State of general `ChainSync` protocol. + state: ChainSyncState, + + /// Syncing state of chain sync workers. + worker_state: WorkerState, /// Drand randomness beacon beacon: Arc, @@ -63,69 +61,63 @@ pub struct ChainSyncer { /// Bucket queue for incoming tipsets sync_queue: SyncBucketSet, + /// Represents tipsets related to ones already being synced to avoid duplicate work. + active_sync_tipsets: SyncBucketSet, - /// Represents next tipset to be synced - next_sync_target: SyncBucket, + /// Represents next tipset to be synced. + next_sync_target: Option, /// access and store tipsets / blocks / messages - chain_store: ChainStore, + chain_store: Arc>, /// Context to be able to send requests to p2p network network: SyncNetworkContext, /// the known genesis tipset - genesis: Tipset, + genesis: Arc, /// Bad blocks cache, updates based on invalid state transitions. /// Will mark any invalid blocks and all childen as bad in this bounded cache bad_blocks: Arc, /// incoming network events to be handled by syncer - net_handler: NetworkHandler, + net_handler: Receiver, /// Peer manager to handle full peers to send ChainSync requests to peer_manager: Arc, } -/// Message data used to ensure valid state transition -struct MsgMetaData { - balance: TokenAmount, - sequence: u64, -} - -impl ChainSyncer +impl ChainSyncer where - TBeacon: Beacon + Send, + TBeacon: Beacon + Sync + Send + 'static, DB: BlockStore + Sync + Send + 'static, { pub fn new( - chain_store: ChainStore, + chain_store: Arc>, state_manager: Arc>, beacon: Arc, network_send: Sender, network_rx: Receiver, - genesis: Tipset, + genesis: Arc, ) -> Result { - // Split incoming channel to handle blocksync requests - let mut event_send = Publisher::new(30); - let network = SyncNetworkContext::new(network_send, event_send.subscribe()); + let network = SyncNetworkContext::new(network_send); let peer_manager = Arc::new(PeerManager::default()); - let net_handler = NetworkHandler::new(network_rx, event_send); - Ok(Self { - state: Arc::new(RwLock::new(SyncState::default())), + state: ChainSyncState::Bootstrap, + worker_state: Default::default(), beacon, state_manager, chain_store, network, genesis, bad_blocks: Arc::new(BadBlockCache::default()), - net_handler, + net_handler: network_rx, peer_manager, sync_queue: SyncBucketSet::default(), - next_sync_target: SyncBucket::default(), + active_sync_tipsets: SyncBucketSet::default(), + next_sync_target: None, }) } @@ -134,307 +126,227 @@ where self.bad_blocks.clone() } - /// Returns the atomic reference to the syncing state. - pub fn sync_state_cloned(&self) -> Arc> { - self.state.clone() + /// Returns a cloned `Arc` of the sync worker state. + pub fn sync_state_cloned(&self) -> WorkerState { + self.worker_state.clone() } /// Spawns a network handler and begins the syncing process. - pub async fn start(mut self) -> Result<(), Error> { - self.net_handler.spawn(Arc::clone(&self.peer_manager)); - - while let Some(event) = self.network.receiver.next().await { - match event { - NetworkEvent::HelloRequest { request, channel } => { - let source = channel.peer.clone(); - debug!( - "Message inbound, heaviest tipset cid: {:?}", - request.heaviest_tip_set - ); - match self - .fetch_tipset(source.clone(), &TipsetKeys::new(request.heaviest_tip_set)) - .await - { - Ok(fts) => { - if let Err(e) = self.inform_new_head(source.clone(), &fts).await { - warn!("Failed to sync with provided tipset: {}", e); - }; - } - Err(e) => { - warn!("Failed to fetch full tipset from peer ({}): {}", source, e); - } - } - } - NetworkEvent::PeerDialed { peer_id } => { - let heaviest = self.chain_store.heaviest_tipset().unwrap(); - self.network - .hello_request( - peer_id, - HelloRequest { - heaviest_tip_set: heaviest.cids().to_vec(), - heaviest_tipset_height: heaviest.epoch(), - heaviest_tipset_weight: heaviest.weight().clone(), - genesis_hash: self.genesis.blocks()[0].cid().clone(), - }, - ) - .await - } - _ => (), - } - } - Ok(()) - } - - /// Performs syncing process - async fn sync(&mut self, head: Arc) -> Result<(), Error> { - // Bootstrap peers before syncing - // TODO increase bootstrap peer count before syncing - const MIN_PEERS: usize = 1; - loop { - let peer_count = self.peer_manager.len().await; - if peer_count < MIN_PEERS { - debug!("bootstrapping peers, have {}", peer_count); - task::sleep(Duration::from_secs(2)).await; - } else { - break; - } + pub async fn start(mut self, num_workers: usize) { + let (worker_tx, worker_rx) = channel(5); + for _ in 0..num_workers { + self.spawn_worker(worker_rx.clone()).await; } - // Get heaviest tipset from storage to sync toward - let heaviest = self.chain_store.heaviest_tipset().unwrap(); + // Channels to handle fetching hello tipsets in separate task and return tipset. + let (new_ts_tx, new_ts_rx) = channel(10); - info!("Starting block sync..."); + let mut fused_handler = self.net_handler.clone().fuse(); + let mut fused_inform_channel = new_ts_rx.fuse(); - // Sync headers from network from head to heaviest from storage - self.state - .write() - .await - .init(heaviest.clone(), head.clone()); - let tipsets = match self - .sync_headers_reverse(head.as_ref().clone(), &heaviest) - .await - { - Ok(ts) => ts, - Err(e) => { - self.state.write().await.error(e.to_string()); - return Err(e); + loop { + // TODO would be ideal if this is a future attached to the select + if worker_tx.is_empty() { + if let Some(tar) = self.next_sync_target.take() { + if let Some(ts) = tar.heaviest_tipset() { + self.active_sync_tipsets.insert(ts.clone()); + worker_tx.send(ts).await; + } + } } - }; - - // Persist header chain pulled from network - self.set_stage(SyncStage::PersistHeaders).await; - let headers: Vec<&BlockHeader> = tipsets.iter().map(|t| t.blocks()).flatten().collect(); - if let Err(e) = persist_objects(self.chain_store.blockstore(), &headers) { - self.state.write().await.error(e.to_string()); - return Err(e.into()); - } - // Sync and validate messages from fetched tipsets - self.set_stage(SyncStage::Messages).await; - if let Err(e) = self.sync_messages_check_state(&tipsets).await { - self.state.write().await.error(e.to_string()); - return Err(e); - } - self.set_stage(SyncStage::Complete).await; - - // At this point the head is synced and the head can be set as the heaviest. - self.chain_store.put_tipset(head.as_ref()).await?; - - Ok(()) - } - - /// Syncs messages by first checking state for message existence otherwise fetches messages from blocksync - async fn sync_messages_check_state(&mut self, ts: &[Tipset]) -> Result<(), Error> { - // see https://github.com/filecoin-project/lotus/blob/master/build/params_shared.go#L109 for request window size - const REQUEST_WINDOW: i64 = 1; - // TODO refactor type handling - // set i to the length of provided tipsets - let mut i: i64 = i64::try_from(ts.len())? - 1; - - while i >= 0 { - // check storage first to see if we have full tipset - let fts = match self.chain_store.fill_tipsets(ts[i as usize].clone()) { - Ok(fts) => fts, - Err(_) => { - // no full tipset in storage; request messages via blocksync - - // retrieve peerId used for blocksync request - if let Some(peer_id) = self.peer_manager.get_peer().await { - let mut batch_size = REQUEST_WINDOW; - if i < batch_size { - batch_size = i; - } - - // set params for blocksync request - let idx = i - batch_size; - let next = &ts[idx as usize]; - let req_len = batch_size + 1; + select! { + network_event = fused_handler.next() => match network_event { + Some(NetworkEvent::HelloRequest { request, channel }) => { + let source = channel.peer.clone(); debug!( - "BlockSync message sync tipsets: epoch: {}, len: {}", - next.epoch(), - req_len + "Message inbound, heaviest tipset cid: {:?}", + request.heaviest_tip_set ); - // receive tipset bundle from block sync - let mut ts_bundle = match self - .network - .blocksync_request( + let new_ts_tx_cloned = new_ts_tx.clone(); + let cs_cloned = self.chain_store.clone(); + let net_cloned = self.network.clone(); + // TODO determine if tasks started to fetch and load tipsets should be + // limited. Currently no cap on this. + task::spawn(async { + Self::fetch_and_inform_tipset( + cs_cloned, + net_cloned, + source, + TipsetKeys::new(request.heaviest_tip_set), + new_ts_tx_cloned, + ) + .await; + }); + } + Some(NetworkEvent::PeerDialed { peer_id }) => { + let heaviest = self.chain_store.heaviest_tipset().await.unwrap(); + self.network + .hello_request( peer_id, - BlockSyncRequest { - start: next.cids().to_vec(), - request_len: req_len as u64, - options: MESSAGES, + HelloRequest { + heaviest_tip_set: heaviest.cids().to_vec(), + heaviest_tipset_height: heaviest.epoch(), + heaviest_tipset_weight: heaviest.weight().clone(), + genesis_hash: self.genesis.blocks()[0].cid().clone(), }, ) .await - { - Ok(k) => k, - Err(e) => { - warn!("BlockSyncRequest for message failed: {}", e); - continue; - } - } - .chain; - let mut ts_r = ts[(idx) as usize..(idx + 1 + req_len) as usize].to_vec(); - // since the bundle only has messages, we have to put the headers in them - for b in ts_bundle.iter_mut() { - let t = ts_r.pop().unwrap(); - b.blocks = t.blocks().to_vec(); - } - for b in ts_bundle { - // construct full tipsets from fetched messages - let fts: FullTipset = (&b).try_into().map_err(Error::Other)?; - - // validate tipset and messages - let curr_epoch = fts.epoch(); - self.validate_tipset(fts).await?; - self.state.write().await.set_epoch(curr_epoch); - - // store messages - if let Some(m) = b.messages { - self.chain_store.put_messages(&m.bls_msgs)?; - self.chain_store.put_messages(&m.secp_msgs)?; - } else { - warn!("Blocksync request for messages returned null messages"); - } + } + // All other network events are being ignored currently + _ => (), + None => break, + }, + inform_head_event = fused_inform_channel.next() => match inform_head_event { + Some((peer, new_head)) => { + if let Err(e) = self.inform_new_head(peer.clone(), &new_head).await { + warn!("failed to inform new head from peer {}", peer); } } - i -= REQUEST_WINDOW; - continue; + None => break, } - }; - // full tipset found in storage; validate and continue - let curr_epoch = fts.epoch(); - self.validate_tipset(fts).await?; - self.state.write().await.set_epoch(curr_epoch); - i -= 1; - continue; + } } + } - Ok(()) + /// Fetches a tipset from store or network, then passes the tipset back through the channel + /// to inform of the new head. + async fn fetch_and_inform_tipset( + cs: Arc>, + network: SyncNetworkContext, + peer_id: PeerId, + tsk: TipsetKeys, + channel: Sender<(PeerId, FullTipset)>, + ) { + match Self::fetch_full_tipset(cs.as_ref(), &network, peer_id.clone(), &tsk).await { + Ok(fts) => { + channel.send((peer_id, fts)).await; + } + Err(e) => { + debug!("Failed to fetch full tipset from peer ({}): {}", peer_id, e); + } + } + } + + /// Spawns a new sync worker and pushes the state to the `ChainSyncer` + async fn spawn_worker(&mut self, channel: Receiver>) -> JoinHandle<()> { + let state = Arc::new(RwLock::new(SyncState::default())); + + // push state to managed states in Syncer. + self.worker_state.write().await.push(state.clone()); + SyncWorker { + state, + beacon: self.beacon.clone(), + state_manager: self.state_manager.clone(), + chain_store: self.chain_store.clone(), + network: self.network.clone(), + genesis: self.genesis.clone(), + bad_blocks: self.bad_blocks.clone(), + peer_manager: self.peer_manager.clone(), + } + .spawn(channel) + .await } /// informs the syncer about a new potential tipset /// This should be called when connecting to new peers, and additionally /// when receiving new blocks from the network - pub async fn inform_new_head(&mut self, peer: PeerId, fts: &FullTipset) -> Result<(), Error> { + pub async fn inform_new_head(&mut self, peer: PeerId, ts: &FullTipset) -> Result<(), Error> { // check if full block is nil and if so return error - if fts.blocks().is_empty() { + if ts.blocks().is_empty() { return Err(Error::NoBlocks); } // TODO: Check if tipset has height that is too far ahead to be possible - for block in fts.blocks() { + for block in ts.blocks() { if let Some(bad) = self.bad_blocks.peek(block.cid()).await { warn!("Bad block detected, cid: {:?}", bad); return Err(Error::Other("Block marked as bad".to_string())); } - // validate message data - self.validate_msg_meta(block)?; } - // TODO: Publish LocalIncoming blocks // compare target_weight to heaviest weight stored; ignore otherwise - let best_weight = match self.chain_store.heaviest_tipset() { - Some(ts) => ts.weight().clone(), - None => Zero::zero(), - }; - let target_weight = fts.weight(); - - if target_weight.gt(&best_weight) { - self.set_peer_head(peer, Arc::new(fts.to_tipset())).await?; + let candidate_ts = self + .chain_store + .heaviest_tipset() + .await + .map(|heaviest| ts.weight() >= heaviest.weight()) + .unwrap_or(true); + if candidate_ts { + // Check message meta after all other checks (expensive) + for block in ts.blocks() { + self.validate_msg_meta(block)?; + } + self.set_peer_head(peer, Arc::new(ts.to_tipset())).await; } - // incoming tipset from miners does not appear to be better than our best chain, ignoring for now + Ok(()) } - async fn set_peer_head(&mut self, peer: PeerId, ts: Arc) -> Result<(), Error> { + async fn set_peer_head(&mut self, peer: PeerId, ts: Arc) { self.peer_manager .add_peer(peer, Some(Arc::clone(&ts))) .await; // Only update target on initial sync - if self.state.read().await.stage() == SyncStage::Headers { + if self.state == ChainSyncState::Bootstrap { if let Some(best_target) = self.select_sync_target().await { - // TODO revisit this if using for full node, shouldn't start syncing on first update - self.sync(best_target).await?; - return Ok(()); + self.schedule_tipset(best_target).await; + self.state = ChainSyncState::Initial; + return; } } - self.schedule_tipset(ts).await?; - - Ok(()) + self.schedule_tipset(ts).await; } - /// Retrieves the heaviest tipset in the sync queue; considered best target head - async fn select_sync_target(&mut self) -> Option> { + /// Selects max sync target from current peer set + async fn select_sync_target(&self) -> Option> { // Retrieve all peer heads from peer manager - let mut heads = self.peer_manager.get_peer_heads().await; - heads.sort_by_key(|h| h.epoch()); + let heads = self.peer_manager.get_peer_heads().await; + heads.iter().max_by_key(|h| h.epoch()).cloned() + } - // insert tipsets into sync queue - for tip in heads { - self.sync_queue.insert(tip); - } + /// Schedules a new tipset to be handled by the sync manager + async fn schedule_tipset(&mut self, tipset: Arc) { + debug!("Scheduling incoming tipset to sync: {:?}", tipset.cids()); + + let mut related_to_active = false; + for act_state in self.worker_state.read().await.iter() { + if let Some(target) = act_state.read().await.target() { + if target == &tipset { + return; + } - if self.sync_queue.buckets().len() > 1 { - warn!("Caution, multiple distinct chains seen during head selections"); + if tipset.parents() == target.key() { + related_to_active = true; + } + } } - // return heaviest tipset in queue - self.sync_queue.heaviest() - } - /// Schedules a new tipset to be handled by the sync manager - async fn schedule_tipset(&mut self, tipset: Arc) -> Result<(), Error> { - info!("Scheduling incoming tipset to sync: {:?}", tipset.cids()); - - // check sync status if indicates tipsets are ready to be synced - // TODO revisit this, seems wrong - if self.state.read().await.stage() == SyncStage::Complete { - // send tipsets to be synced - self.sync(tipset).await?; - return Ok(()); + // Check if related to active tipset buckets. + if !related_to_active && self.active_sync_tipsets.related_to_any(tipset.as_ref()) { + related_to_active = true; } - // TODO check for related tipsets + if related_to_active { + self.active_sync_tipsets.insert(tipset); + return; + } // if next_sync_target is from same chain as incoming tipset add it to be synced next - if self.next_sync_target.is_same_chain_as(&tipset) { - self.next_sync_target.add(tipset); + if let Some(tar) = &mut self.next_sync_target { + if tar.is_same_chain_as(&tipset) { + tar.add(tipset); + } } else { // add incoming tipset to queue to by synced later self.sync_queue.insert(tipset); - // update next sync target if empty - if self.next_sync_target.is_empty() { + // update next sync target if none + if self.next_sync_target.is_none() { if let Some(target_bucket) = self.sync_queue.pop() { - self.next_sync_target = target_bucket; - if let Some(best_target) = self.next_sync_target.heaviest_tipset() { - // send heaviest tipset from sync target to be synced - self.sync(best_target).await?; - return Ok(()); - } + self.next_sync_target = Some(target_bucket); } } } - Ok(()) } /// Validates message root from header matches message root generated from the /// bls and secp messages contained in the passed in block and stores them in a key-value store @@ -454,29 +366,30 @@ where Ok(()) } - /// Returns FullTipset from store if TipsetKeys exist in key-value store otherwise requests FullTipset - /// from block sync - async fn fetch_tipset( - &mut self, + /// Returns `FullTipset` from store if `TipsetKeys` exist in key-value store otherwise requests + /// `FullTipset` from block sync + async fn fetch_full_tipset( + cs: &ChainStore, + network: &SyncNetworkContext, peer_id: PeerId, tsk: &TipsetKeys, ) -> Result { - let fts = match self.load_fts(tsk) { + let fts = match Self::load_fts(cs, tsk) { Ok(fts) => fts, - _ => return self.network.blocksync_fts(peer_id, tsk).await, + Err(_) => network.blocksync_fts(peer_id, tsk).await?, }; Ok(fts) } + /// Returns a reconstructed FullTipset from store if keys exist - fn load_fts(&self, keys: &TipsetKeys) -> Result { + fn load_fts(cs: &ChainStore, keys: &TipsetKeys) -> Result { let mut blocks = Vec::new(); // retrieve tipset from store based on passed in TipsetKeys - let ts = self.chain_store.tipset_from_keys(keys)?; + let ts = cs.tipset_from_keys(keys)?; for header in ts.blocks() { // retrieve bls and secp messages from specified BlockHeader - let (bls_msgs, secp_msgs) = - chain::block_messages(self.chain_store.blockstore(), &header)?; + let (bls_msgs, secp_msgs) = chain::block_messages(cs.blockstore(), &header)?; // construct a full block let full_block = Block { @@ -491,525 +404,6 @@ where let fts = FullTipset::new(blocks)?; Ok(fts) } - // Block message validation checks - fn check_block_msgs( - state_manager: Arc>, - block: Block, - tip: Tipset, - ) -> Result<(), Error> { - // do the initial loop here - // Check Block Message and Signatures in them - let mut pub_keys = Vec::new(); - let mut cids = Vec::new(); - for m in block.bls_msgs() { - let pk = StateManager::get_bls_public_key( - &state_manager.get_block_store(), - m.from(), - tip.parent_state(), - )?; - pub_keys.push(pk); - cids.push(m.cid()?.to_bytes()); - } - - if let Some(sig) = block.header().bls_aggregate() { - if !verify_bls_aggregate( - cids.iter() - .map(|x| x.as_slice()) - .collect::>() - .as_slice(), - pub_keys - .iter() - .map(|x| &x[..]) - .collect::>() - .as_slice(), - &sig, - ) { - return Err(Error::Validation(format!( - "Bls aggregate signature {:?} was invalid: {:?}", - sig, cids - ))); - } - } else { - return Err(Error::Validation( - "No bls signature included in the block header".to_owned(), - )); - } - - // check msgs for validity - fn check_msg( - msg: &M, - msg_meta_data: &mut HashMap, - tree: &StateTree, - ) -> Result<(), Error> - where - M: Message, - { - let updated_state: MsgMetaData = match msg_meta_data.get(msg.from()) { - // address is present begin validity checks - Some(MsgMetaData { sequence, balance }) => { - // sequence equality check - if *sequence != msg.sequence() { - return Err(Error::Validation("Sequences are not equal".to_owned())); - } - - // sufficient funds check - if *balance < msg.required_funds() { - return Err(Error::Validation( - "Insufficient funds for message execution".to_owned(), - )); - } - // update balance and increment sequence by 1 - MsgMetaData { - balance: balance - &msg.required_funds(), - sequence: sequence + 1, - } - } - // MsgMetaData not found with provided address key, insert sequence and balance with address as key - None => { - let actor = tree - .get_actor(msg.from()) - .map_err(|e| Error::Other(e.to_string()))? - .ok_or_else(|| { - Error::Other("Could not retrieve actor from state tree".to_owned()) - })?; - - MsgMetaData { - sequence: actor.sequence, - balance: actor.balance, - } - } - }; - // update hash map with updated state - msg_meta_data.insert(*msg.from(), updated_state); - Ok(()) - } - let mut msg_meta_data: HashMap = HashMap::default(); - let db = state_manager.get_block_store(); - let (state_root, _) = block_on(state_manager.tipset_state(&tip)) - .map_err(|e| Error::Validation(format!("Could not update state: {}", e)))?; - let tree = StateTree::new_from_root(db.as_ref(), &state_root).map_err(|_| { - Error::Validation("Could not load from new state root in state manager".to_owned()) - })?; - // loop through bls messages and check msg validity - for m in block.bls_msgs() { - check_msg(m, &mut msg_meta_data, &tree)?; - } - // loop through secp messages and check msg validity and signature - for m in block.secp_msgs() { - check_msg(m, &mut msg_meta_data, &tree)?; - // signature validation - m.signature() - .verify(&m.cid()?.to_bytes(), m.from()) - .map_err(|e| Error::Validation(format!("Message signature invalid: {}", e)))?; - } - // validate message root from header matches message root - let sm_root = compute_msg_meta(db.as_ref(), block.bls_msgs(), block.secp_msgs())?; - if block.header().messages() != &sm_root { - return Err(Error::InvalidRoots); - } - - Ok(()) - } - - /// Validates block semantically according to https://github.com/filecoin-project/specs/blob/6ab401c0b92efb6420c6e198ec387cf56dc86057/validation.md - async fn validate(&self, block: &Block) -> Result<(), Error> { - debug!( - "Validating block at epoch: {} with weight: {}", - block.header().epoch(), - block.header().weight() - ); - let mut error_vec: Vec = Vec::new(); - let mut validations = FuturesUnordered::new(); - let header = block.header(); - - // check if block has been signed - if header.signature().is_none() { - error_vec.push("Signature is nil in header".to_owned()); - } - - let parent_tipset = self.chain_store.tipset_from_keys(header.parents())?; - - // time stamp checks - if let Err(err) = header.validate_timestamps(&parent_tipset) { - error_vec.push(err.to_string()); - } - - let b = block.clone(); - - let parent_clone = parent_tipset.clone(); - // check messages to ensure valid state transitions - let sm = self.state_manager.clone(); - let x = task::spawn_blocking(move || Self::check_block_msgs(sm, b, parent_clone)); - validations.push(x); - - // block signature check - let (state_root, _) = self - .state_manager - .tipset_state(&parent_tipset) - .await - .map_err(|e| Error::Validation(format!("Could not update state: {}", e.to_string())))?; - let work_addr_result = self - .state_manager - .get_miner_work_addr(&state_root, header.miner_address()); - - // temp header needs to live long enough in static context returned by task::spawn - let signature = block.header().signature().clone(); - let cid_bytes = block.header().to_signing_bytes()?; - match work_addr_result { - Ok(_) => validations.push(task::spawn_blocking(move || { - signature - .ok_or_else(|| { - Error::Blockchain(blocks::Error::InvalidSignature( - "Signature is nil in header".to_owned(), - )) - })? - .verify(&cid_bytes, &work_addr_result.unwrap()) - .map_err(|e| Error::Blockchain(blocks::Error::InvalidSignature(e))) - })), - Err(err) => error_vec.push(err.to_string()), - } - - // base fee check - let base_fee = chain::compute_base_fee(self.chain_store.db.as_ref(), &parent_tipset) - .map_err(|e| { - Error::Validation(format!("Could not compute base fee: {}", e.to_string())) - })?; - if &base_fee != block.header().parent_base_fee() { - error_vec.push(format!( - "base fee doesnt match: {} (header), {} (computed)", - block.header().parent_base_fee(), - base_fee - )); - } - - let slash = self - .state_manager - .is_miner_slashed(header.miner_address(), &parent_tipset.parent_state()) - .unwrap_or_else(|err| { - error_vec.push(err.to_string()); - false - }); - if slash { - error_vec.push("Received block was from slashed or invalid miner".to_owned()) - } - - let prev_beacon = chain::latest_beacon_entry( - self.chain_store.blockstore(), - &self.chain_store.tipset_from_keys(header.parents())?, - )?; - - if std::env::var(IGNORE_DRAND_VAR) == Ok("1".to_owned()) { - header - .validate_block_drand(Arc::clone(&self.beacon), prev_beacon) - .await?; - } - - let power_result = self - .state_manager - .get_power(&parent_tipset.parent_state(), header.miner_address()); - // ticket winner check - match power_result { - Ok((_c_pow, _net_pow)) => { - // TODO this doesn't seem to be checked currently - // if !header.is_ticket_winner(c_pow, net_pow) { - // error_vec.push("Miner created a block but was not a winner".to_owned()) - // } - } - Err(err) => error_vec.push(err.to_string()), - } - - // TODO verify_ticket_vrf - - // collect the errors from the async validations - while let Some(result) = validations.next().await { - if result.is_err() { - error_vec.push(result.err().unwrap().to_string()); - } - } - // combine vec of error strings and return Validation error with this resultant string - if !error_vec.is_empty() { - let error_string = error_vec.join(", "); - return Err(Error::Validation(error_string)); - } - - Ok(()) - } - /// validates tipsets and adds header data to tipset tracker - async fn validate_tipset(&mut self, fts: FullTipset) -> Result<(), Error> { - if fts.to_tipset() == self.genesis { - debug!("Skipping tipset validation for genesis"); - return Ok(()); - } - - for b in fts.blocks() { - if let Err(e) = self.validate(&b).await { - self.bad_blocks.put(b.cid().clone(), e.to_string()).await; - return Err(Error::Other(format!( - "Invalid blocks detected: {}", - e.to_string() - ))); - } - self.chain_store.set_tipset_tracker(b.header())?; - } - info!("Successfully validated tipset at epoch: {}", fts.epoch()); - Ok(()) - } - - pub async fn verify_winning_post_proof( - &self, - block: BlockHeader, - prev_entry: BeaconEntry, - lbst: Cid, - ) -> Result<(), Error> { - let marshal_miner_work_addr = block.miner_address().marshal_cbor()?; - let rbase = block.beacon_entries().iter().last().unwrap_or(&prev_entry); - let rand = chain::draw_randomness( - rbase.data(), - DomainSeparationTag::WinningPoStChallengeSeed, - block.epoch(), - &marshal_miner_work_addr, - ) - .map_err(|err| { - Error::Validation(format!( - "failed to get randomness for verifying winningPost proof: {:}", - err - )) - })?; - if block.miner_address().protocol() != Protocol::ID { - return Err(Error::Validation(format!( - "failed to get ID from miner address {:}", - block.miner_address() - ))); - }; - let sectors = utils::get_sectors_for_winning_post( - &self.state_manager, - &lbst, - &block.miner_address(), - &rand, - )?; - - let proofs = block - .win_post_proof() - .iter() - .fold(Vec::new(), |mut proof, p| { - proof.extend_from_slice(&p.proof_bytes); - proof - }); - - let replicas = sectors - .iter() - .map::, _>(|sector_info: &SectorInfo| { - let commr = - cid_to_replica_commitment_v1(§or_info.sealed_cid).map_err(|err| { - Error::Validation(format!("failed to get replica commitment: {:}", err)) - })?; - let replica = PublicReplicaInfo::new( - sector_info - .proof - .registered_winning_post_proof() - .map_err(|err| Error::Validation(format!("Invalid proof code: {:}", err)))? - .try_into() - .map_err(|err| { - Error::Validation(format!("failed to get registered proof: {:}", err)) - })?, - commr, - ); - Ok((SectorId::from(sector_info.sector_number), replica)) - }) - .collect::, _>>()?; - - let mut prover_id = ProverId::default(); - let prover_bytes = block.miner_address().to_bytes(); - prover_id[..prover_bytes.len()].copy_from_slice(&prover_bytes); - if !verify_winning_post(&rand, &proofs, &replicas, prover_id) - .map_err(|err| Error::Validation(format!("failed to verify election post: {:}", err)))? - { - error!("invalid winning post ({:?}; {:?})", rand, sectors); - Err(Error::Validation("Winning post was invalid".to_string())) - } else { - Ok(()) - } - } - - /// Syncs chain data and persists it to blockstore - async fn sync_headers_reverse( - &mut self, - head: Tipset, - to: &Tipset, - ) -> Result, Error> { - info!("Syncing headers from: {:?}", head.key()); - self.state.write().await.set_epoch(to.epoch()); - - let mut accepted_blocks: Vec = Vec::new(); - - let sync_len = head.epoch() - to.epoch(); - if !sync_len.is_positive() { - return Err(Error::Other( - "Target tipset must be after heaviest".to_string(), - )); - } - let mut return_set = Vec::with_capacity(sync_len as usize); - return_set.push(head); - - let to_epoch = to.blocks().get(0).expect("Tipset cannot be empty").epoch(); - - // Loop until most recent tipset height is less than to tipset height - 'sync: while let Some(cur_ts) = return_set.last() { - // Check if parent cids exist in bad block caches - self.validate_tipset_against_cache(cur_ts.parents(), &accepted_blocks) - .await?; - - if cur_ts.epoch() <= to_epoch { - // Current tipset is less than epoch of tipset syncing toward - break; - } - - // Try to load parent tipset from local storage - if let Ok(ts) = self.chain_store.tipset_from_keys(cur_ts.parents()) { - // Add blocks in tipset to accepted chain and push the tipset to return set - accepted_blocks.extend_from_slice(ts.cids()); - return_set.push(ts); - continue; - } - - // TODO tweak request window when socket frame is tested - const REQUEST_WINDOW: i64 = 30; - let epoch_diff = cur_ts.epoch() - to_epoch; - debug!("BlockSync from: {} to {}", cur_ts.epoch(), to_epoch); - let window = min(epoch_diff, REQUEST_WINDOW); - - let peer_id = self.get_peer().await; - - // Load blocks from network using blocksync - let tipsets: Vec = match self - .network - .blocksync_headers(peer_id.clone(), cur_ts.parents(), window as u64) - .await - { - Ok(ts) => ts, - Err(e) => { - warn!("Failed blocksync request to peer {:?}: {}", peer_id, e); - self.peer_manager.remove_peer(&peer_id).await; - continue; - } - }; - info!( - "Got tipsets: Height: {}, Len: {}", - tipsets[0].epoch(), - tipsets.len() - ); - - // Loop through each tipset received from network - for ts in tipsets { - if ts.epoch() < to_epoch { - // Break out of sync loop if epoch lower than to tipset - // This should not be hit if response from server is correct - break 'sync; - } - // Check Cids of blocks against bad block cache - self.validate_tipset_against_cache(&ts.key(), &accepted_blocks) - .await?; - - accepted_blocks.extend_from_slice(ts.cids()); - self.state.write().await.set_epoch(ts.epoch()); - // Add tipset to vector of tipsets to return - return_set.push(ts); - } - } - - let last_ts = return_set - .last() - .ok_or_else(|| Error::Other("Return set should contain a tipset".to_owned()))?; - - // Check if local chain was fork - if last_ts.key() != to.key() { - info!("Local chain was fork. Syncing fork..."); - if last_ts.parents() == to.parents() { - // block received part of same tipset as best block - // This removes need to sync fork - return Ok(return_set); - } - // add fork into return set - let fork = self.sync_fork(&last_ts, &to).await?; - info!("Fork Synced"); - return_set.extend(fork); - } - info!("Sync Header reverse complete"); - Ok(return_set) - } - /// checks to see if tipset is included in bad clocks cache - async fn validate_tipset_against_cache( - &mut self, - ts: &TipsetKeys, - accepted_blocks: &[Cid], - ) -> Result<(), Error> { - for cid in ts.cids() { - if let Some(reason) = self.bad_blocks.get(cid).await { - for bh in accepted_blocks { - self.bad_blocks - .put(bh.clone(), format!("chain contained {}", cid)) - .await; - } - - return Err(Error::Other(format!( - "Chain contained block marked as bad: {}, {}", - cid, reason - ))); - } - } - Ok(()) - } - /// fork detected, collect tipsets to be included in return_set sync_headers_reverse - async fn sync_fork(&mut self, head: &Tipset, to: &Tipset) -> Result, Error> { - let peer_id = self.get_peer().await; - // TODO move to shared parameter (from actors crate most likely) - const FORK_LENGTH_THRESHOLD: u64 = 500; - - // Load blocks from network using blocksync - let tips: Vec = self - .network - .blocksync_headers(peer_id, head.parents(), FORK_LENGTH_THRESHOLD) - .await - .map_err(|_| Error::Other("Could not retrieve tipset".to_string()))?; - - let mut ts = self.chain_store.tipset_from_keys(to.parents())?; - - for i in 0..tips.len() { - while ts.epoch() > tips[i].epoch() { - if ts.epoch() == 0 { - return Err(Error::Other( - "Synced chain forked at genesis, refusing to sync".to_string(), - )); - } - ts = self.chain_store.tipset_from_keys(ts.parents())?; - } - if ts == tips[i] { - return Ok(tips[0..=i].to_vec()); - } - } - - Err(Error::Other( - "Fork longer than threshold finality of 500".to_string(), - )) - } - - /// Sets the managed sync status - pub async fn set_stage(&mut self, new_stage: SyncStage) { - debug!("Sync stage set to: {}", new_stage); - self.state.write().await.set_stage(new_stage); - } - - async fn get_peer(&self) -> PeerId { - while self.peer_manager.is_empty().await { - warn!("No valid peers to sync, waiting for other nodes"); - task::sleep(Duration::from_secs(5)).await; - } - - self.peer_manager - .get_peer() - .await - .expect("Peer set is not empty here") - } } /// Returns message root CID from bls and secp message contained in the param Block @@ -1049,12 +443,12 @@ mod tests { use async_std::sync::channel; use async_std::sync::Sender; use beacon::MockBeacon; - use blocks::BlockHeader; use db::MemoryDB; use forest_libp2p::NetworkEvent; use state_manager::StateManager; use std::sync::Arc; - use test_utils::{construct_blocksync_response, construct_messages, construct_tipset}; + use std::time::Duration; + use test_utils::{construct_dummy_header, construct_messages}; fn chain_syncer_setup( db: Arc, @@ -1063,17 +457,17 @@ mod tests { Sender, Receiver, ) { - let chain_store = ChainStore::new(db.clone()); + let chain_store = Arc::new(ChainStore::new(db.clone())); let (local_sender, test_receiver) = channel(20); let (event_sender, event_receiver) = channel(20); - let gen = dummy_header(); - chain_store.set_genesis(gen.clone()).unwrap(); + let gen = construct_dummy_header(); + chain_store.set_genesis(&gen).unwrap(); let beacon = Arc::new(MockBeacon::new(Duration::from_secs(1))); - let genesis_ts = Tipset::new(vec![gen]).unwrap(); + let genesis_ts = Arc::new(Tipset::new(vec![gen]).unwrap()); ( ChainSyncer::new( chain_store, @@ -1089,32 +483,6 @@ mod tests { ) } - fn send_blocksync_response(blocksync_message: Receiver) { - let rpc_response = construct_blocksync_response(); - - task::block_on(async { - match blocksync_message.recv().await.unwrap() { - NetworkMessage::BlockSyncRequest { - peer_id: _, - request: _, - response_channel, - } => { - response_channel.send(rpc_response).unwrap(); - } - _ => unreachable!(), - } - }); - } - - fn dummy_header() -> BlockHeader { - BlockHeader::builder() - .miner_address(Address::new_id(1000)) - .messages(Cid::new_from_cbor(&[1, 2, 3], Blake2b256)) - .message_receipts(Cid::new_from_cbor(&[1, 2, 3], Blake2b256)) - .state_root(Cid::new_from_cbor(&[1, 2, 3], Blake2b256)) - .build() - .unwrap() - } #[test] fn chainsync_constructor() { let db = Arc::new(MemoryDB::default()); @@ -1124,29 +492,6 @@ mod tests { let _chain_syncer = chain_syncer_setup(db); } - #[test] - fn sync_headers_reverse_given_tipsets_test() { - let db = Arc::new(MemoryDB::default()); - let (mut cs, _event_sender, network_receiver) = chain_syncer_setup(db); - - cs.net_handler.spawn(Arc::clone(&cs.peer_manager)); - - // params for sync_headers_reverse - let source = PeerId::random(); - let head = construct_tipset(4, 10); - let to = construct_tipset(1, 10); - - task::block_on(async move { - cs.peer_manager.add_peer(source.clone(), None).await; - assert_eq!(cs.peer_manager.len().await, 1); - // make blocksync request - let return_set = task::spawn(async move { cs.sync_headers_reverse(head, &to).await }); - // send blocksync response to channel - send_blocksync_response(network_receiver); - assert_eq!(return_set.await.unwrap().len(), 4); - }); - } - #[test] fn compute_msg_meta_given_msgs_test() { let db = Arc::new(MemoryDB::default()); diff --git a/blockchain/chain_sync/src/sync/peer_test.rs b/blockchain/chain_sync/src/sync/peer_test.rs index ee3f77745fc5..eed0fc533c42 100644 --- a/blockchain/chain_sync/src/sync/peer_test.rs +++ b/blockchain/chain_sync/src/sync/peer_test.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::*; +use address::Address; use async_std::sync::channel; use async_std::task; use beacon::MockBeacon; @@ -16,21 +17,23 @@ use std::time::Duration; fn peer_manager_update() { let db = Arc::new(MemoryDB::default()); - let chain_store = ChainStore::new(db.clone()); + let chain_store = Arc::new(ChainStore::new(db.clone())); let (local_sender, _test_receiver) = channel(20); let (event_sender, event_receiver) = channel(20); + let msg_root = compute_msg_meta(chain_store.blockstore(), &[], &[]).unwrap(); + let dummy_header = BlockHeader::builder() .miner_address(Address::new_id(1000)) - .messages(Cid::new_from_cbor(&[1, 2, 3], Blake2b256)) + .messages(msg_root) .message_receipts(Cid::new_from_cbor(&[1, 2, 3], Blake2b256)) .state_root(Cid::new_from_cbor(&[1, 2, 3], Blake2b256)) - .build() + .build_and_validate() .unwrap(); - chain_store.set_genesis(dummy_header.clone()).unwrap(); + let gen_hash = chain_store.set_genesis(&dummy_header).unwrap(); - let genesis_ts = Tipset::new(vec![dummy_header]).unwrap(); + let genesis_ts = Arc::new(Tipset::new(vec![dummy_header]).unwrap()); let beacon = Arc::new(MockBeacon::new(Duration::from_secs(1))); let state_manager = Arc::new(StateManager::new(db)); let cs = ChainSyncer::new( @@ -39,24 +42,30 @@ fn peer_manager_update() { beacon, local_sender, event_receiver, - genesis_ts, + genesis_ts.clone(), ) .unwrap(); let peer_manager = Arc::clone(&cs.peer_manager); task::spawn(async { - cs.start().await.unwrap(); + cs.start(0).await; }); let source = PeerId::random(); let source_clone = source.clone(); let (sender, _) = channel(1); + let gen_cloned = genesis_ts.clone(); task::block_on(async { event_sender .send(NetworkEvent::HelloRequest { - request: HelloRequest::default(), + request: HelloRequest { + heaviest_tip_set: gen_cloned.key().cids().to_vec(), + heaviest_tipset_height: gen_cloned.epoch(), + heaviest_tipset_weight: gen_cloned.weight().clone(), + genesis_hash: gen_hash, + }, channel: ResponseChannel { peer: source, sender, @@ -65,7 +74,7 @@ fn peer_manager_update() { .await; // Would be ideal to not have to sleep here and have it deterministic - task::sleep(Duration::from_millis(50)).await; + task::sleep(Duration::from_millis(1000)).await; assert_eq!(peer_manager.len().await, 1); assert_eq!(peer_manager.get_peer().await, Some(source_clone)); diff --git a/blockchain/chain_sync/src/sync_state.rs b/blockchain/chain_sync/src/sync_state.rs index c90689aaf18d..3e46d8c7f9c8 100644 --- a/blockchain/chain_sync/src/sync_state.rs +++ b/blockchain/chain_sync/src/sync_state.rs @@ -11,6 +11,8 @@ use std::time::SystemTime; /// Current state of the ChainSyncer using the BlockSync protocol. #[derive(PartialEq, Debug, Clone, Copy)] pub enum SyncStage { + /// Idle state + Idle, /// Syncing headers from the heaviest tipset to genesis. Headers, /// Persisting headers on chain from heaviest to genesis. @@ -32,6 +34,7 @@ impl Default for SyncStage { impl fmt::Display for SyncStage { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + SyncStage::Idle => write!(f, "idle worker"), SyncStage::Headers => write!(f, "header sync"), SyncStage::PersistHeaders => write!(f, "persisting headers"), SyncStage::Messages => write!(f, "message sync"), @@ -79,6 +82,10 @@ impl SyncState { self.stage } + pub fn target(&self) -> &Option> { + &self.target + } + /// Sets the sync stage for the syncing state. If setting to complete, sets end timer to now. pub fn set_stage(&mut self, stage: SyncStage) { if let SyncStage::Complete = stage { diff --git a/blockchain/chain_sync/src/sync_worker.rs b/blockchain/chain_sync/src/sync_worker.rs new file mode 100644 index 000000000000..047b62c1b3d2 --- /dev/null +++ b/blockchain/chain_sync/src/sync_worker.rs @@ -0,0 +1,866 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::bad_block_cache::BadBlockCache; +use super::peer_manager::PeerManager; +use super::sync_state::{SyncStage, SyncState}; +use super::{Error, SyncNetworkContext}; +use address::{Address, Protocol}; +use amt::Amt; +use async_std::sync::{Receiver, RwLock}; +use async_std::task::{self, JoinHandle}; +use beacon::{Beacon, BeaconEntry, IGNORE_DRAND_VAR}; +use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta}; +use chain::{persist_objects, ChainStore}; +use cid::{multihash::Blake2b256, Cid}; +use commcid::cid_to_replica_commitment_v1; +use core::time::Duration; +use crypto::verify_bls_aggregate; +use crypto::DomainSeparationTag; +use encoding::{Cbor, Error as EncodingError}; +use fil_types::SectorInfo; +use filecoin_proofs_api::{post::verify_winning_post, ProverId, PublicReplicaInfo, SectorId}; +use forest_libp2p::{BlockSyncRequest, MESSAGES}; +use futures::{ + executor::block_on, + stream::{FuturesUnordered, StreamExt}, +}; +use ipld_blockstore::BlockStore; +use libp2p::core::PeerId; +use log::{debug, error, info, warn}; +use message::{Message, SignedMessage, UnsignedMessage}; +use state_manager::{utils, StateManager}; +use state_tree::StateTree; +use std::cmp::min; +use std::collections::{BTreeMap, HashMap}; +use std::convert::{TryFrom, TryInto}; +use std::sync::Arc; +use vm::TokenAmount; + +/// Message data used to ensure valid state transition +struct MsgMetaData { + balance: TokenAmount, + sequence: u64, +} + +/// Worker to handle syncing chain with the blocksync protocol. +pub(crate) struct SyncWorker { + /// State of the sync worker. + pub state: Arc>, + + /// Drand randomness beacon. + pub beacon: Arc, + + /// manages retrieving and updates state objects. + pub state_manager: Arc>, + + /// access and store tipsets / blocks / messages. + pub chain_store: Arc>, + + /// Context to be able to send requests to p2p network. + pub network: SyncNetworkContext, + + /// The known genesis tipset. + pub genesis: Arc, + + /// Bad blocks cache, updates based on invalid state transitions. + /// Will mark any invalid blocks and all childen as bad in this bounded cache. + pub bad_blocks: Arc, + + /// Peer manager to handle full peers to send ChainSync requests to. + pub peer_manager: Arc, +} + +impl SyncWorker +where + TBeacon: Beacon + Sync + Send + 'static, + DB: BlockStore + Sync + Send + 'static, +{ + pub async fn spawn(self, mut inbound_channel: Receiver>) -> JoinHandle<()> { + task::spawn(async move { + while let Some(ts) = inbound_channel.next().await { + if let Err(e) = self.sync(ts).await { + let err = e.to_string(); + warn!("failed to sync tipset: {}", &err); + self.state.write().await.error(err); + } + } + }) + } + + /// Performs syncing process + pub async fn sync(&self, head: Arc) -> Result<(), Error> { + // Bootstrap peers before syncing + // TODO increase bootstrap peer count before syncing + const MIN_PEERS: usize = 1; + loop { + let peer_count = self.peer_manager.len().await; + if peer_count < MIN_PEERS { + debug!("bootstrapping peers, have {}", peer_count); + task::sleep(Duration::from_secs(2)).await; + } else { + break; + } + } + + // Get heaviest tipset from storage to sync toward + let heaviest = self.chain_store.heaviest_tipset().await.unwrap(); + + info!("Starting block sync..."); + + // Sync headers from network from head to heaviest from storage + self.state + .write() + .await + .init(heaviest.clone(), head.clone()); + let tipsets = match self + .sync_headers_reverse(head.as_ref().clone(), &heaviest) + .await + { + Ok(ts) => ts, + Err(e) => { + self.state.write().await.error(e.to_string()); + return Err(e); + } + }; + + // Persist header chain pulled from network + self.set_stage(SyncStage::PersistHeaders).await; + let headers: Vec<&BlockHeader> = tipsets.iter().map(|t| t.blocks()).flatten().collect(); + if let Err(e) = persist_objects(self.chain_store.blockstore(), &headers) { + self.state.write().await.error(e.to_string()); + return Err(e.into()); + } + // Sync and validate messages from fetched tipsets + self.set_stage(SyncStage::Messages).await; + if let Err(e) = self.sync_messages_check_state(&tipsets).await { + self.state.write().await.error(e.to_string()); + return Err(e); + } + self.set_stage(SyncStage::Complete).await; + + // At this point the head is synced and the head can be set as the heaviest. + self.chain_store.put_tipset(head.as_ref()).await?; + + Ok(()) + } + + /// Sets the managed sync status + pub async fn set_stage(&self, new_stage: SyncStage) { + debug!("Sync stage set to: {}", new_stage); + self.state.write().await.set_stage(new_stage); + } + + async fn get_peer(&self) -> PeerId { + while self.peer_manager.is_empty().await { + warn!("No valid peers to sync, waiting for other nodes"); + task::sleep(Duration::from_secs(5)).await; + } + + self.peer_manager + .get_peer() + .await + .expect("Peer set is not empty here") + } + + /// Syncs chain data and persists it to blockstore + async fn sync_headers_reverse(&self, head: Tipset, to: &Tipset) -> Result, Error> { + info!("Syncing headers from: {:?}", head.key()); + self.state.write().await.set_epoch(to.epoch()); + + let mut accepted_blocks: Vec = Vec::new(); + + let sync_len = head.epoch() - to.epoch(); + if !sync_len.is_positive() { + return Err(Error::Other( + "Target tipset must be after heaviest".to_string(), + )); + } + let mut return_set = Vec::with_capacity(sync_len as usize); + return_set.push(head); + + let to_epoch = to.blocks().get(0).expect("Tipset cannot be empty").epoch(); + + // Loop until most recent tipset height is less than to tipset height + 'sync: while let Some(cur_ts) = return_set.last() { + // Check if parent cids exist in bad block caches + self.validate_tipset_against_cache(cur_ts.parents(), &accepted_blocks) + .await?; + + if cur_ts.epoch() <= to_epoch { + // Current tipset is less than epoch of tipset syncing toward + break; + } + + // Try to load parent tipset from local storage + if let Ok(ts) = self.chain_store.tipset_from_keys(cur_ts.parents()) { + // Add blocks in tipset to accepted chain and push the tipset to return set + accepted_blocks.extend_from_slice(ts.cids()); + return_set.push(ts); + continue; + } + + // TODO tweak request window when socket frame is tested + const REQUEST_WINDOW: i64 = 10; + let epoch_diff = cur_ts.epoch() - to_epoch; + debug!("BlockSync from: {} to {}", cur_ts.epoch(), to_epoch); + let window = min(epoch_diff, REQUEST_WINDOW); + + let peer_id = self.get_peer().await; + + // Load blocks from network using blocksync + let tipsets: Vec = match self + .network + .blocksync_headers(peer_id.clone(), cur_ts.parents(), window as u64) + .await + { + Ok(ts) => ts, + Err(e) => { + warn!("Failed blocksync request to peer {:?}: {}", peer_id, e); + self.peer_manager.remove_peer(&peer_id).await; + continue; + } + }; + info!( + "Got tipsets: Height: {}, Len: {}", + tipsets[0].epoch(), + tipsets.len() + ); + + // Loop through each tipset received from network + for ts in tipsets { + if ts.epoch() < to_epoch { + // Break out of sync loop if epoch lower than to tipset + // This should not be hit if response from server is correct + break 'sync; + } + // Check Cids of blocks against bad block cache + self.validate_tipset_against_cache(&ts.key(), &accepted_blocks) + .await?; + + accepted_blocks.extend_from_slice(ts.cids()); + self.state.write().await.set_epoch(ts.epoch()); + // Add tipset to vector of tipsets to return + return_set.push(ts); + } + } + + let last_ts = return_set + .last() + .ok_or_else(|| Error::Other("Return set should contain a tipset".to_owned()))?; + + // Check if local chain was fork + if last_ts.key() != to.key() { + info!("Local chain was fork. Syncing fork..."); + if last_ts.parents() == to.parents() { + // block received part of same tipset as best block + // This removes need to sync fork + return Ok(return_set); + } + // add fork into return set + let fork = self.sync_fork(&last_ts, &to).await?; + info!("Fork Synced"); + return_set.extend(fork); + } + info!("Sync Header reverse complete"); + Ok(return_set) + } + + /// checks to see if tipset is included in bad clocks cache + async fn validate_tipset_against_cache( + &self, + ts: &TipsetKeys, + accepted_blocks: &[Cid], + ) -> Result<(), Error> { + for cid in ts.cids() { + if let Some(reason) = self.bad_blocks.get(cid).await { + for bh in accepted_blocks { + self.bad_blocks + .put(bh.clone(), format!("chain contained {}", cid)) + .await; + } + + return Err(Error::Other(format!( + "Chain contained block marked as bad: {}, {}", + cid, reason + ))); + } + } + Ok(()) + } + + /// fork detected, collect tipsets to be included in return_set sync_headers_reverse + async fn sync_fork(&self, head: &Tipset, to: &Tipset) -> Result, Error> { + let peer_id = self.get_peer().await; + // TODO move to shared parameter (from actors crate most likely) + const FORK_LENGTH_THRESHOLD: u64 = 500; + + // Load blocks from network using blocksync + let tips: Vec = self + .network + .blocksync_headers(peer_id, head.parents(), FORK_LENGTH_THRESHOLD) + .await + .map_err(|_| Error::Other("Could not retrieve tipset".to_string()))?; + + let mut ts = self.chain_store.tipset_from_keys(to.parents())?; + + for i in 0..tips.len() { + while ts.epoch() > tips[i].epoch() { + if ts.epoch() == 0 { + return Err(Error::Other( + "Synced chain forked at genesis, refusing to sync".to_string(), + )); + } + ts = self.chain_store.tipset_from_keys(ts.parents())?; + } + if ts == tips[i] { + return Ok(tips[0..=i].to_vec()); + } + } + + Err(Error::Other( + "Fork longer than threshold finality of 500".to_string(), + )) + } + + /// Syncs messages by first checking state for message existence otherwise fetches messages from blocksync + async fn sync_messages_check_state(&self, ts: &[Tipset]) -> Result<(), Error> { + // see https://github.com/filecoin-project/lotus/blob/master/build/params_shared.go#L109 for request window size + const REQUEST_WINDOW: i64 = 1; + // TODO refactor type handling + // set i to the length of provided tipsets + let mut i: i64 = i64::try_from(ts.len())? - 1; + + while i >= 0 { + // check storage first to see if we have full tipset + let fts = match self.chain_store.fill_tipsets(ts[i as usize].clone()) { + Ok(fts) => fts, + Err(_) => { + // no full tipset in storage; request messages via blocksync + + // retrieve peerId used for blocksync request + if let Some(peer_id) = self.peer_manager.get_peer().await { + let mut batch_size = REQUEST_WINDOW; + if i < batch_size { + batch_size = i; + } + + // set params for blocksync request + let idx = i - batch_size; + let next = &ts[idx as usize]; + let req_len = batch_size + 1; + debug!( + "BlockSync message sync tipsets: epoch: {}, len: {}", + next.epoch(), + req_len + ); + // receive tipset bundle from block sync + let mut ts_bundle = match self + .network + .blocksync_request( + peer_id, + BlockSyncRequest { + start: next.cids().to_vec(), + request_len: req_len as u64, + options: MESSAGES, + }, + ) + .await + { + Ok(k) => k, + Err(e) => { + warn!("BlockSyncRequest for message failed: {}", e); + continue; + } + } + .chain; + let mut ts_r = ts[(idx) as usize..(idx + 1 + req_len) as usize].to_vec(); + // since the bundle only has messages, we have to put the headers in them + for b in ts_bundle.iter_mut() { + let t = ts_r.pop().unwrap(); + b.blocks = t.blocks().to_vec(); + } + for b in ts_bundle { + // construct full tipsets from fetched messages + let fts: FullTipset = (&b).try_into().map_err(Error::Other)?; + + // validate tipset and messages + let curr_epoch = fts.epoch(); + self.validate_tipset(fts).await?; + self.state.write().await.set_epoch(curr_epoch); + + // store messages + if let Some(m) = b.messages { + self.chain_store.put_messages(&m.bls_msgs)?; + self.chain_store.put_messages(&m.secp_msgs)?; + } else { + warn!("Blocksync request for messages returned null messages"); + } + } + } + i -= REQUEST_WINDOW; + continue; + } + }; + // full tipset found in storage; validate and continue + let curr_epoch = fts.epoch(); + self.validate_tipset(fts).await?; + self.state.write().await.set_epoch(curr_epoch); + i -= 1; + continue; + } + + Ok(()) + } + + /// validates tipsets and adds header data to tipset tracker + async fn validate_tipset(&self, fts: FullTipset) -> Result<(), Error> { + if &fts.to_tipset() == self.genesis.as_ref() { + debug!("Skipping tipset validation for genesis"); + return Ok(()); + } + + for b in fts.blocks() { + if let Err(e) = self.validate(&b).await { + self.bad_blocks.put(b.cid().clone(), e.to_string()).await; + return Err(Error::Other(format!( + "Invalid blocks detected: {}", + e.to_string() + ))); + } + self.chain_store.set_tipset_tracker(b.header()).await?; + } + info!("Successfully validated tipset at epoch: {}", fts.epoch()); + Ok(()) + } + + /// Validates block semantically according to https://github.com/filecoin-project/specs/blob/6ab401c0b92efb6420c6e198ec387cf56dc86057/validation.md + async fn validate(&self, block: &Block) -> Result<(), Error> { + debug!( + "Validating block at epoch: {} with weight: {}", + block.header().epoch(), + block.header().weight() + ); + let mut error_vec: Vec = Vec::new(); + let mut validations = FuturesUnordered::new(); + let header = block.header(); + + // check if block has been signed + if header.signature().is_none() { + error_vec.push("Signature is nil in header".to_owned()); + } + + let parent_tipset = self.chain_store.tipset_from_keys(header.parents())?; + + // time stamp checks + if let Err(err) = header.validate_timestamps(&parent_tipset) { + error_vec.push(err.to_string()); + } + + let b = block.clone(); + + let parent_clone = parent_tipset.clone(); + // check messages to ensure valid state transitions + let sm = self.state_manager.clone(); + let x = task::spawn_blocking(move || Self::check_block_msgs(sm, b, parent_clone)); + validations.push(x); + + // block signature check + let (state_root, _) = self + .state_manager + .tipset_state(&parent_tipset) + .await + .map_err(|e| Error::Validation(format!("Could not update state: {}", e.to_string())))?; + let work_addr_result = self + .state_manager + .get_miner_work_addr(&state_root, header.miner_address()); + + // temp header needs to live long enough in static context returned by task::spawn + let signature = block.header().signature().clone(); + let cid_bytes = block.header().to_signing_bytes()?; + match work_addr_result { + Ok(_) => validations.push(task::spawn_blocking(move || { + signature + .ok_or_else(|| { + Error::Blockchain(blocks::Error::InvalidSignature( + "Signature is nil in header".to_owned(), + )) + })? + .verify(&cid_bytes, &work_addr_result.unwrap()) + .map_err(|e| Error::Blockchain(blocks::Error::InvalidSignature(e))) + })), + Err(err) => error_vec.push(err.to_string()), + } + + // base fee check + let base_fee = chain::compute_base_fee(self.chain_store.db.as_ref(), &parent_tipset) + .map_err(|e| { + Error::Validation(format!("Could not compute base fee: {}", e.to_string())) + })?; + if &base_fee != block.header().parent_base_fee() { + error_vec.push(format!( + "base fee doesnt match: {} (header), {} (computed)", + block.header().parent_base_fee(), + base_fee + )); + } + + let slash = self + .state_manager + .is_miner_slashed(header.miner_address(), &parent_tipset.parent_state()) + .unwrap_or_else(|err| { + error_vec.push(err.to_string()); + false + }); + if slash { + error_vec.push("Received block was from slashed or invalid miner".to_owned()) + } + + let prev_beacon = chain::latest_beacon_entry( + self.chain_store.blockstore(), + &self.chain_store.tipset_from_keys(header.parents())?, + )?; + + if std::env::var(IGNORE_DRAND_VAR) == Ok("1".to_owned()) { + header + .validate_block_drand(Arc::clone(&self.beacon), prev_beacon) + .await?; + } + + let power_result = self + .state_manager + .get_power(&parent_tipset.parent_state(), header.miner_address()); + // ticket winner check + match power_result { + Ok((_c_pow, _net_pow)) => { + // TODO this doesn't seem to be checked currently + // if !header.is_ticket_winner(c_pow, net_pow) { + // error_vec.push("Miner created a block but was not a winner".to_owned()) + // } + } + Err(err) => error_vec.push(err.to_string()), + } + + // TODO verify_ticket_vrf + + // collect the errors from the async validations + while let Some(result) = validations.next().await { + if result.is_err() { + error_vec.push(result.err().unwrap().to_string()); + } + } + // combine vec of error strings and return Validation error with this resultant string + if !error_vec.is_empty() { + let error_string = error_vec.join(", "); + return Err(Error::Validation(error_string)); + } + + Ok(()) + } + + // Block message validation checks + fn check_block_msgs( + state_manager: Arc>, + block: Block, + tip: Tipset, + ) -> Result<(), Error> { + // do the initial loop here + // Check Block Message and Signatures in them + let mut pub_keys = Vec::new(); + let mut cids = Vec::new(); + for m in block.bls_msgs() { + let pk = StateManager::get_bls_public_key( + &state_manager.get_block_store(), + m.from(), + tip.parent_state(), + )?; + pub_keys.push(pk); + cids.push(m.cid()?.to_bytes()); + } + + if let Some(sig) = block.header().bls_aggregate() { + if !verify_bls_aggregate( + cids.iter() + .map(|x| x.as_slice()) + .collect::>() + .as_slice(), + pub_keys + .iter() + .map(|x| &x[..]) + .collect::>() + .as_slice(), + &sig, + ) { + return Err(Error::Validation(format!( + "Bls aggregate signature {:?} was invalid: {:?}", + sig, cids + ))); + } + } else { + return Err(Error::Validation( + "No bls signature included in the block header".to_owned(), + )); + } + + // check msgs for validity + fn check_msg( + msg: &M, + msg_meta_data: &mut HashMap, + tree: &StateTree, + ) -> Result<(), Error> + where + M: Message, + { + let updated_state: MsgMetaData = match msg_meta_data.get(msg.from()) { + // address is present begin validity checks + Some(MsgMetaData { sequence, balance }) => { + // sequence equality check + if *sequence != msg.sequence() { + return Err(Error::Validation("Sequences are not equal".to_owned())); + } + + // sufficient funds check + if *balance < msg.required_funds() { + return Err(Error::Validation( + "Insufficient funds for message execution".to_owned(), + )); + } + // update balance and increment sequence by 1 + MsgMetaData { + balance: balance - &msg.required_funds(), + sequence: sequence + 1, + } + } + // MsgMetaData not found with provided address key, insert sequence and balance with address as key + None => { + let actor = tree + .get_actor(msg.from()) + .map_err(|e| Error::Other(e.to_string()))? + .ok_or_else(|| { + Error::Other("Could not retrieve actor from state tree".to_owned()) + })?; + + MsgMetaData { + sequence: actor.sequence, + balance: actor.balance, + } + } + }; + // update hash map with updated state + msg_meta_data.insert(*msg.from(), updated_state); + Ok(()) + } + let mut msg_meta_data: HashMap = HashMap::default(); + let db = state_manager.get_block_store(); + let (state_root, _) = block_on(state_manager.tipset_state(&tip)) + .map_err(|e| Error::Validation(format!("Could not update state: {}", e)))?; + let tree = StateTree::new_from_root(db.as_ref(), &state_root).map_err(|_| { + Error::Validation("Could not load from new state root in state manager".to_owned()) + })?; + // loop through bls messages and check msg validity + for m in block.bls_msgs() { + check_msg(m, &mut msg_meta_data, &tree)?; + } + // loop through secp messages and check msg validity and signature + for m in block.secp_msgs() { + check_msg(m, &mut msg_meta_data, &tree)?; + // signature validation + m.signature() + .verify(&m.cid()?.to_bytes(), m.from()) + .map_err(|e| Error::Validation(format!("Message signature invalid: {}", e)))?; + } + // validate message root from header matches message root + let sm_root = compute_msg_meta(db.as_ref(), block.bls_msgs(), block.secp_msgs())?; + if block.header().messages() != &sm_root { + return Err(Error::InvalidRoots); + } + + Ok(()) + } + + // TODO PoSt verifications are unused currently + async fn _verify_winning_post_proof( + &self, + block: BlockHeader, + prev_entry: BeaconEntry, + lbst: Cid, + ) -> Result<(), Error> { + let marshal_miner_work_addr = block.miner_address().marshal_cbor()?; + let rbase = block.beacon_entries().iter().last().unwrap_or(&prev_entry); + let rand = chain::draw_randomness( + rbase.data(), + DomainSeparationTag::WinningPoStChallengeSeed, + block.epoch(), + &marshal_miner_work_addr, + ) + .map_err(|err| { + Error::Validation(format!( + "failed to get randomness for verifying winningPost proof: {:}", + err + )) + })?; + if block.miner_address().protocol() != Protocol::ID { + return Err(Error::Validation(format!( + "failed to get ID from miner address {:}", + block.miner_address() + ))); + }; + let sectors = utils::get_sectors_for_winning_post( + &self.state_manager, + &lbst, + &block.miner_address(), + &rand, + )?; + + let proofs = block + .win_post_proof() + .iter() + .fold(Vec::new(), |mut proof, p| { + proof.extend_from_slice(&p.proof_bytes); + proof + }); + + let replicas = sectors + .iter() + .map::, _>(|sector_info: &SectorInfo| { + let commr = + cid_to_replica_commitment_v1(§or_info.sealed_cid).map_err(|err| { + Error::Validation(format!("failed to get replica commitment: {:}", err)) + })?; + let replica = PublicReplicaInfo::new( + sector_info + .proof + .registered_winning_post_proof() + .map_err(|err| Error::Validation(format!("Invalid proof code: {:}", err)))? + .try_into() + .map_err(|err| { + Error::Validation(format!("failed to get registered proof: {:}", err)) + })?, + commr, + ); + Ok((SectorId::from(sector_info.sector_number), replica)) + }) + .collect::, _>>()?; + + let mut prover_id = ProverId::default(); + let prover_bytes = block.miner_address().to_bytes(); + prover_id[..prover_bytes.len()].copy_from_slice(&prover_bytes); + if !verify_winning_post(&rand, &proofs, &replicas, prover_id) + .map_err(|err| Error::Validation(format!("failed to verify election post: {:}", err)))? + { + error!("invalid winning post ({:?}; {:?})", rand, sectors); + Err(Error::Validation("Winning post was invalid".to_string())) + } else { + Ok(()) + } + } +} + +/// Returns message root CID from bls and secp message contained in the param Block +fn compute_msg_meta( + blockstore: &DB, + bls_msgs: &[UnsignedMessage], + secp_msgs: &[SignedMessage], +) -> Result { + // collect bls and secp cids + let bls_cids = cids_from_messages(bls_msgs)?; + let secp_cids = cids_from_messages(secp_msgs)?; + + // generate Amt and batch set message values + let bls_root = Amt::new_from_slice(blockstore, &bls_cids)?; + let secp_root = Amt::new_from_slice(blockstore, &secp_cids)?; + + let meta = TxMeta { + bls_message_root: bls_root, + secp_message_root: secp_root, + }; + + // store message roots and receive meta_root cid + let meta_root = blockstore + .put(&meta, Blake2b256) + .map_err(|e| Error::Other(e.to_string()))?; + + Ok(meta_root) +} + +fn cids_from_messages(messages: &[T]) -> Result, EncodingError> { + messages.iter().map(Cbor::cid).collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use async_std::sync::channel; + use beacon::MockBeacon; + use db::MemoryDB; + use forest_libp2p::NetworkMessage; + use std::sync::Arc; + use std::time::Duration; + use test_utils::{construct_blocksync_response, construct_dummy_header, construct_tipset}; + + fn sync_worker_setup( + db: Arc, + ) -> (SyncWorker, Receiver) { + let chain_store = Arc::new(ChainStore::new(db.clone())); + + let (local_sender, test_receiver) = channel(20); + + let gen = construct_dummy_header(); + chain_store.set_genesis(&gen).unwrap(); + + let beacon = Arc::new(MockBeacon::new(Duration::from_secs(1))); + + let genesis_ts = Arc::new(Tipset::new(vec![gen]).unwrap()); + ( + SyncWorker { + state: Default::default(), + beacon, + state_manager: Arc::new(StateManager::new(db)), + chain_store, + network: SyncNetworkContext::new(local_sender), + genesis: genesis_ts, + bad_blocks: Default::default(), + peer_manager: Default::default(), + }, + test_receiver, + ) + } + + fn send_blocksync_response(blocksync_message: Receiver) { + let rpc_response = construct_blocksync_response(); + + task::block_on(async { + match blocksync_message.recv().await.unwrap() { + NetworkMessage::BlockSyncRequest { + peer_id: _, + request: _, + response_channel, + } => { + response_channel.send(rpc_response).unwrap(); + } + _ => unreachable!(), + } + }); + } + + #[test] + fn sync_headers_reverse_given_tipsets_test() { + let db = Arc::new(MemoryDB::default()); + let (cs, network_receiver) = sync_worker_setup(db); + + // params for sync_headers_reverse + let source = PeerId::random(); + let head = construct_tipset(4, 10); + let to = construct_tipset(1, 10); + + task::block_on(async move { + cs.peer_manager.add_peer(source.clone(), None).await; + assert_eq!(cs.peer_manager.len().await, 1); + // make blocksync request + let return_set = task::spawn(async move { cs.sync_headers_reverse(head, &to).await }); + // send blocksync response to channel + send_blocksync_response(network_receiver); + assert_eq!(return_set.await.unwrap().len(), 4); + }); + } +} diff --git a/blockchain/message_pool/Cargo.toml b/blockchain/message_pool/Cargo.toml index 87117021cf5a..4980dbdd9964 100644 --- a/blockchain/message_pool/Cargo.toml +++ b/blockchain/message_pool/Cargo.toml @@ -26,6 +26,7 @@ libsecp256k1 = "0.3.4" blake2b_simd = "0.5.10" log = "0.4.8" async-std = "1.6.0" +async-trait = "0.1" state_manager = { path = "../state_manager" } [dev-dependencies] diff --git a/blockchain/message_pool/src/msgpool.rs b/blockchain/message_pool/src/msgpool.rs index 76eff2a50eea..a47a6cf3bebd 100644 --- a/blockchain/message_pool/src/msgpool.rs +++ b/blockchain/message_pool/src/msgpool.rs @@ -5,6 +5,7 @@ use super::errors::Error; use address::Address; use async_std::sync::{Arc, RwLock}; use async_std::task; +use async_trait::async_trait; use blocks::{BlockHeader, Tipset, TipsetKeys}; use blockstore::BlockStore; use chain::{ChainStore, HeadChange}; @@ -73,11 +74,12 @@ impl MsgSet { /// Provider Trait. This trait will be used by the messagepool to interact with some medium in order to do /// the operations that are listed below that are required for the messagepool. +#[async_trait] pub trait Provider { /// Update Mpool's cur_tipset whenever there is a chnge to the provider - fn subscribe_head_changes(&mut self) -> Subscriber; + async fn subscribe_head_changes(&mut self) -> Subscriber; /// Get the heaviest Tipset in the provider - fn get_heaviest_tipset(&mut self) -> Option; + async fn get_heaviest_tipset(&mut self) -> Option; /// Add a message to the MpoolProvider, return either Cid or Error depending on successful put fn put_message(&self, msg: &SignedMessage) -> Result; /// Return state actor for given address given the tipset that the a temp StateTree will be rooted @@ -114,16 +116,17 @@ where } } +#[async_trait] impl Provider for MpoolProvider where - DB: BlockStore, + DB: BlockStore + Send + Sync, { - fn subscribe_head_changes(&mut self) -> Subscriber { - self.cs.subscribe() + async fn subscribe_head_changes(&mut self) -> Subscriber { + self.cs.subscribe().await } - fn get_heaviest_tipset(&mut self) -> Option { - let ts = self.cs.heaviest_tipset()?; + async fn get_heaviest_tipset(&mut self) -> Option { + let ts = self.cs.heaviest_tipset().await?; Some(ts.as_ref().clone()) } @@ -182,15 +185,16 @@ where } } +#[async_trait] impl Provider for MpoolRpcProvider where - DB: BlockStore, + DB: BlockStore + Send + Sync, { - fn subscribe_head_changes(&mut self) -> Subscriber { + async fn subscribe_head_changes(&mut self) -> Subscriber { self.subscriber.clone() } - fn get_heaviest_tipset(&mut self) -> Option { + async fn get_heaviest_tipset(&mut self) -> Option { chain::get_heaviest_tipset(self.sm.get_block_store_ref()) .ok() .unwrap_or(None) @@ -262,9 +266,9 @@ where let local_addrs = Arc::new(RwLock::new(Vec::new())); // LruCache sizes have been taken from the lotus implementation let pending = Arc::new(RwLock::new(HashMap::new())); - let tipset = Arc::new(RwLock::new(api.get_heaviest_tipset().ok_or_else(|| { - Error::Other("No ts in api to set as cur_tipset".to_owned()) - })?)); + let tipset = Arc::new(RwLock::new(api.get_heaviest_tipset().await.ok_or_else( + || Error::Other("No ts in api to set as cur_tipset".to_owned()), + )?)); let bls_sig_cache = Arc::new(RwLock::new(LruCache::new(40000))); let sig_val_cache = Arc::new(RwLock::new(LruCache::new(32000))); let api_mutex = Arc::new(RwLock::new(api)); @@ -285,7 +289,7 @@ where mp.load_local().await?; - let mut subscriber = mp.api.write().await.subscribe_head_changes(); + let mut subscriber = mp.api.write().await.subscribe_head_changes().await; let api = mp.api.clone(); let bls_sig_cache = mp.bls_sig_cache.clone(); @@ -777,12 +781,13 @@ pub mod test_provider { } } + #[async_trait] impl Provider for TestApi { - fn subscribe_head_changes(&mut self) -> Subscriber { + async fn subscribe_head_changes(&mut self) -> Subscriber { self.publisher.subscribe() } - fn get_heaviest_tipset(&mut self) -> Option { + async fn get_heaviest_tipset(&mut self) -> Option { Tipset::new(vec![create_header(1, b"", b"")]).ok() } diff --git a/forest/src/cli/genesis.rs b/forest/src/cli/genesis.rs index dc5c92f0f1a1..09f7a13e08c4 100644 --- a/forest/src/cli/genesis.rs +++ b/forest/src/cli/genesis.rs @@ -79,7 +79,7 @@ where Ok(genesis_block) } else { debug!("Initialize ChainSyncer with new genesis from config"); - chain_store.set_genesis(genesis_block.clone())?; + chain_store.set_genesis(&genesis_block)?; async_std::task::block_on( chain_store.set_heaviest_tipset(Arc::new(Tipset::new(vec![genesis_block.clone()])?)), )?; diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index f540c5385144..37f4a3d98178 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -19,6 +19,10 @@ use std::sync::Arc; use utils::write_to_file; use wallet::PersistentKeyStore; +/// Number of tasks spawned for sync workers. +// TODO benchmark and/or add this as a config option. +const WORKER_TASKS: usize = 3; + /// Starts daemon process pub(super) async fn start(config: Config) { info!("Starting Forest daemon"); @@ -64,7 +68,7 @@ pub(super) async fn start(config: Config) { let state_manager = Arc::new(StateManager::new(Arc::clone(&db))); // Initialize mpool - let subscriber = chain_store.subscribe(); + let subscriber = chain_store.subscribe().await; let provider = MpoolRpcProvider::new(subscriber, Arc::clone(&state_manager)); let mpool = Arc::new( MessagePool::new(provider, network_name.clone()) @@ -86,18 +90,18 @@ pub(super) async fn start(config: Config) { // Initialize ChainSyncer let chain_syncer = ChainSyncer::new( - chain_store, + Arc::new(chain_store), Arc::clone(&state_manager), Arc::new(beacon), network_send.clone(), network_rx, - genesis, + Arc::new(genesis), ) .unwrap(); let bad_blocks = chain_syncer.bad_blocks_cloned(); let sync_state = chain_syncer.sync_state_cloned(); let sync_task = task::spawn(async { - chain_syncer.start().await.unwrap(); + chain_syncer.start(WORKER_TASKS).await; }); // Start services diff --git a/ipld/graphsync/src/message/mod.rs b/ipld/graphsync/src/message/mod.rs index 3b5f9ed1f0d4..b12173b21286 100644 --- a/ipld/graphsync/src/message/mod.rs +++ b/ipld/graphsync/src/message/mod.rs @@ -119,7 +119,7 @@ impl GraphSyncMessage { self.responses.insert(response.id, response); } /// Add block to message. - // TODO revisit block format, should be fine to be kept seperate, but may need to merge. + // TODO revisit block format, should be fine to be kept separate, but may need to merge. pub fn insert_block(&mut self, cid: Cid, block: Vec) { self.blocks.insert(cid, block); } diff --git a/node/forest_libp2p/src/behaviour.rs b/node/forest_libp2p/src/behaviour.rs index 590b87f2e1e5..4b9d31cd4d51 100644 --- a/node/forest_libp2p/src/behaviour.rs +++ b/node/forest_libp2p/src/behaviour.rs @@ -345,6 +345,7 @@ impl ForestBehaviour { let mut req_res_config = RequestResponseConfig::default(); req_res_config.set_request_timeout(Duration::from_secs(20)); + req_res_config.set_connection_keep_alive(Duration::from_secs(20)); ForestBehaviour { gossipsub: Gossipsub::new(MessageAuthenticity::Author(local_peer_id), gossipsub_config), diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 0c17d4520b49..5c86fab75c2e 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -21,7 +21,7 @@ use libp2p::{ mplex, noise, yamux, PeerId, Swarm, Transport, }; use libp2p_request_response::{RequestId, ResponseChannel}; -use log::{debug, info, trace, warn}; +use log::{debug, error, info, trace, warn}; use std::collections::HashMap; use std::io::{Error, ErrorKind}; use std::sync::Arc; @@ -129,7 +129,7 @@ where } let (network_sender_in, network_receiver_in) = channel(30); - let (network_sender_out, network_receiver_out) = channel(30); + let (network_sender_out, network_receiver_out) = channel(50); Libp2pService { swarm, @@ -154,7 +154,7 @@ where Some(event) => match event { ForestBehaviourEvent::PeerDialed(peer_id) => { debug!("Peer dialed, {:?}", peer_id); - self.network_sender_out.send(NetworkEvent::PeerDialed{ + emit_event(&self.network_sender_out, NetworkEvent::PeerDialed { peer_id }).await; } @@ -167,7 +167,7 @@ where message, } => { trace!("Got a Gossip Message from {:?}", source); - self.network_sender_out.send(NetworkEvent::PubsubMessage { + emit_event(&self.network_sender_out, NetworkEvent::PubsubMessage { source, topics, message @@ -175,14 +175,14 @@ where } ForestBehaviourEvent::HelloRequest { request, channel, .. } => { debug!("Received hello request: {:?}", request); - self.network_sender_out.send(NetworkEvent::HelloRequest { + emit_event(&self.network_sender_out, NetworkEvent::HelloRequest { request, channel, }).await; } ForestBehaviourEvent::HelloResponse { request_id, response, .. } => { debug!("Received hello response (id: {:?})", request_id); - self.network_sender_out.send(NetworkEvent::HelloResponse { + emit_event(&self.network_sender_out, NetworkEvent::HelloResponse { request_id, response, }).await; @@ -217,7 +217,7 @@ where } else { trace!("saved bitswap block with cid {:?}", cid); } - self.network_sender_out.send(NetworkEvent::BitswapBlock{cid}).await; + emit_event(&self.network_sender_out, NetworkEvent::BitswapBlock{cid}).await; } Err(e) => { warn!("failed to save bitswap block: {:?}", e.to_string()); @@ -276,6 +276,15 @@ where self.network_receiver_out.clone() } } +async fn emit_event(sender: &Sender, event: NetworkEvent) { + if !sender.is_full() { + sender.send(event).await + } else { + // TODO this would be better to keep the events that would be ignored in some sort of buffer + // so that they can be pulled off and sent through the channel when there is available space + error!("network sender channel was full, ignoring event"); + } +} /// Builds the transport stack that LibP2P will communicate over pub fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index dab2e5d3dce8..1ce85b2d44fa 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -30,7 +30,7 @@ where pub keystore: Arc>, pub mpool: Arc>>, pub bad_blocks: Arc, - pub sync_state: Arc>, + pub sync_state: Arc>>>>, pub network_send: Sender, pub network_name: String, } diff --git a/node/rpc/src/sync_api.rs b/node/rpc/src/sync_api.rs index be0d45c773c1..2738698b9e5b 100644 --- a/node/rpc/src/sync_api.rs +++ b/node/rpc/src/sync_api.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::RpcState; +use async_std::sync::RwLock; use blocks::gossip_block::json::GossipBlockJson; use blockstore::BlockStore; use chain_sync::SyncState; @@ -10,6 +11,7 @@ use encoding::Cbor; use forest_libp2p::{NetworkMessage, Topic, PUBSUB_BLOCK_STR}; use jsonrpc_v2::{Data, Error as JsonRpcError, Params}; use serde::Serialize; +use std::sync::Arc; use wallet::KeyStore; #[derive(Serialize)] @@ -49,6 +51,14 @@ where // TODO SyncIncomingBlocks (requires websockets) +async fn clone_state(states: &RwLock>>>) -> Vec { + let mut ret = Vec::new(); + for s in states.read().await.iter() { + ret.push(s.read().await.clone()); + } + ret +} + /// Returns the current status of the ChainSync process. pub(crate) async fn sync_state( data: Data>, @@ -57,10 +67,8 @@ where DB: BlockStore + Send + Sync + 'static, KS: KeyStore + Send + Sync + 'static, { - let state = data.sync_state.read().await.clone(); - Ok(RPCSyncState { - active_syncs: vec![state], - }) + let active_syncs = clone_state(data.sync_state.as_ref()).await; + Ok(RPCSyncState { active_syncs }) } /// Submits block to be sent through gossipsub. @@ -102,7 +110,7 @@ mod tests { const TEST_NET_NAME: &str = "test"; - fn state_setup() -> ( + async fn state_setup() -> ( Arc>, Receiver, ) { @@ -111,11 +119,11 @@ mod tests { let state_manager = Arc::new(StateManager::new(db.clone())); let pool = task::block_on(async { - let mut cs = ChainStore::new(db.clone()); + let cs = ChainStore::new(db.clone()); let bz = hex::decode("904300e80781586082cb7477a801f55c1f2ea5e5d1167661feea60a39f697e1099af132682b81cc5047beacf5b6e80d5f52b9fd90323fb8510a5396416dd076c13c85619e176558582744053a3faef6764829aa02132a1571a76aabdc498a638ea0054d3bb57f41d82015860812d2396cc4592cdf7f829374b01ffd03c5469a4b0a9acc5ccc642797aa0a5498b97b28d90820fedc6f79ff0a6005f5c15dbaca3b8a45720af7ed53000555667207a0ccb50073cd24510995abd4c4e45c1e9e114905018b2da9454190499941e818201582012dd0a6a7d0e222a97926da03adb5a7768d31cc7c5c2bd6828e14a7d25fa3a608182004b76616c69642070726f6f6681d82a5827000171a0e4022030f89a8b0373ad69079dbcbc5addfe9b34dce932189786e50d3eb432ede3ba9c43000f0001d82a5827000171a0e4022052238c7d15c100c1b9ebf849541810c9e3c2d86e826512c6c416d2318fcd496dd82a5827000171a0e40220e5658b3d18cd06e1db9015b4b0ec55c123a24d5be1ea24d83938c5b8397b4f2fd82a5827000171a0e4022018d351341c302a21786b585708c9873565a0d07c42521d4aaf52da3ff6f2e461586102c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001a5f2c5439586102b5cd48724dce0fec8799d77fd6c5113276e7f470c8391faa0b5a6033a3eaf357d635705c36abe10309d73592727289680515afd9d424793ba4796b052682d21b03c5c8a37d94827fecc59cdc5750e198fdf20dee012f4d627c6665132298ab95004500053724e0").unwrap(); let header = BlockHeader::unmarshal_cbor(&bz).unwrap(); let ts = Tipset::new(vec![header]).unwrap(); - let subscriber = cs.subscribe(); + let subscriber = cs.subscribe().await; let db = cs.db.clone(); let tsk = ts.key().cids.clone(); cs.set_heaviest_tipset(Arc::new(ts)).await.unwrap(); @@ -131,11 +139,11 @@ mod tests { }); let state = Arc::new(RpcState { - state_manager: state_manager, + state_manager, keystore: Arc::new(RwLock::new(wallet::MemKeyStore::new())), mpool: Arc::new(pool), bad_blocks: Default::default(), - sync_state: Default::default(), + sync_state: Arc::new(RwLock::new(vec![Default::default()])), network_send, network_name: TEST_NET_NAME.to_owned(), }); @@ -144,7 +152,7 @@ mod tests { #[async_std::test] async fn set_check_bad() { - let (state, _) = state_setup(); + let (state, _) = state_setup().await; let cid: CidJson = from_str(r#"{"/":"bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"}"#) @@ -166,24 +174,27 @@ mod tests { #[async_std::test] async fn sync_state_test() { - let (state, _) = state_setup(); + let (state, _) = state_setup().await; - let cloned_state = state.sync_state.clone(); + let st_copy = state.sync_state.clone(); match sync_state(Data(state.clone())).await { // TODO this will probably have to be updated when sync state is updated - Ok(ret) => assert_eq!(ret.active_syncs, vec![cloned_state.read().await.clone()]), + Ok(ret) => assert_eq!(ret.active_syncs, clone_state(st_copy.as_ref()).await), Err(e) => panic!(e), } // update cloned state - cloned_state.write().await.set_stage(SyncStage::Messages); - cloned_state.write().await.set_epoch(4); + st_copy.read().await[0] + .write() + .await + .set_stage(SyncStage::Messages); + st_copy.read().await[0].write().await.set_epoch(4); match sync_state(Data(state.clone())).await { Ok(ret) => { - assert_ne!(ret.active_syncs, vec![Default::default()]); - assert_eq!(ret.active_syncs, vec![cloned_state.read().await.clone()]); + assert_ne!(ret.active_syncs, vec![]); + assert_eq!(ret.active_syncs, clone_state(st_copy.as_ref()).await); } Err(e) => panic!(e), } @@ -191,7 +202,7 @@ mod tests { #[async_std::test] async fn sync_submit_test() { - let (state, mut rx) = state_setup(); + let (state, mut rx) = state_setup().await; let block_json: GossipBlockJson = from_str(r#"{"Header":{"Miner":"t01234","Ticket":{"VRFProof":"Ynl0ZSBhcnJheQ=="},"ElectionProof":{"WinCount":0,"VRFProof":"Ynl0ZSBhcnJheQ=="},"BeaconEntries":null,"WinPoStProof":null,"Parents":null,"ParentWeight":"0","Height":10101,"ParentStateRoot":{"/":"bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"},"ParentMessageReceipts":{"/":"bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"},"Messages":{"/":"bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"},"BLSAggregate":{"Type":2,"Data":"Ynl0ZSBhcnJheQ=="},"Timestamp":42,"BlockSig":{"Type":2,"Data":"Ynl0ZSBhcnJheQ=="},"ForkSignaling":42,"ParentBaseFee":"1"},"BlsMessages":null,"SecpkMessages":null}"#).unwrap(); diff --git a/utils/test_utils/src/chain_structures.rs b/utils/test_utils/src/chain_structures.rs index 09d93e24386a..274752bfc074 100644 --- a/utils/test_utils/src/chain_structures.rs +++ b/utils/test_utils/src/chain_structures.rs @@ -15,6 +15,7 @@ use forest_libp2p::blocksync::{BlockSyncResponse, CompactedMessages, TipsetBundl use message::{SignedMessage, UnsignedMessage}; use num_bigint::BigInt; use std::error::Error; +use std::sync::Arc; /// Defines a TipsetKey used in testing pub fn template_key(data: &[u8]) -> Cid { @@ -60,7 +61,7 @@ pub fn construct_keys() -> Vec { } /// Returns a vec of block headers to be used for testing purposes -pub fn construct_header(epoch: i64, weight: u64) -> Vec { +pub fn construct_headers(epoch: i64, weight: u64) -> Vec { let data0: Vec = vec![1, 4, 3, 6, 7, 1, 2]; let data1: Vec = vec![1, 4, 3, 6, 1, 1, 2, 2, 4, 5, 3, 12, 2, 1]; let data2: Vec = vec![1, 4, 3, 6, 1, 1, 2, 2, 4, 5, 3, 12, 2]; @@ -111,7 +112,7 @@ pub fn construct_epost_proof() -> EPostProof { pub fn construct_block() -> Block { const EPOCH: i64 = 1; const WEIGHT: u64 = 10; - let headers = construct_header(EPOCH, WEIGHT); + let headers = construct_headers(EPOCH, WEIGHT); let (bls_messages, secp_messages) = construct_messages(); Block { @@ -123,14 +124,14 @@ pub fn construct_block() -> Block { /// Returns a tipset used for testing pub fn construct_tipset(epoch: i64, weight: u64) -> Tipset { - Tipset::new(construct_header(epoch, weight)).unwrap() + Tipset::new(construct_headers(epoch, weight)).unwrap() } /// Returns a full tipset used for testing pub fn construct_full_tipset() -> FullTipset { const EPOCH: i64 = 1; const WEIGHT: u64 = 10; - let headers = construct_header(EPOCH, WEIGHT); + let headers = construct_headers(EPOCH, WEIGHT); let mut blocks: Vec = Vec::with_capacity(headers.len()); let (bls_messages, secp_messages) = construct_messages(); @@ -151,7 +152,7 @@ pub fn construct_tipset_metadata() -> TipsetMetadata { TipsetMetadata { tipset_state_root: tip_set.blocks()[0].state_root().clone(), tipset_receipts_root: tip_set.blocks()[0].message_receipts().clone(), - tipset: tip_set, + tipset: Arc::new(tip_set), } } @@ -178,7 +179,7 @@ pub fn construct_messages() -> (UnsignedMessage, SignedMessage) { /// Returns a TipsetBundle used for testing pub fn construct_tipset_bundle(epoch: i64, weight: u64) -> TipsetBundle { - let headers = construct_header(epoch, weight); + let headers = construct_headers(epoch, weight); let (bls, secp) = construct_messages(); let includes: Vec> = (0..headers.len()).map(|_| vec![]).collect(); @@ -193,6 +194,16 @@ pub fn construct_tipset_bundle(epoch: i64, weight: u64) -> TipsetBundle { } } +pub fn construct_dummy_header() -> BlockHeader { + BlockHeader::builder() + .miner_address(Address::new_id(1000)) + .messages(Cid::new_from_cbor(&[1, 2, 3], Blake2b256)) + .message_receipts(Cid::new_from_cbor(&[1, 2, 3], Blake2b256)) + .state_root(Cid::new_from_cbor(&[1, 2, 3], Blake2b256)) + .build_and_validate() + .unwrap() +} + /// Returns a RPCResponse used for testing pub fn construct_blocksync_response() -> BlockSyncResponse { // construct block sync response diff --git a/vm/message/src/unsigned_message.rs b/vm/message/src/unsigned_message.rs index a8efe5414756..d6a75283d596 100644 --- a/vm/message/src/unsigned_message.rs +++ b/vm/message/src/unsigned_message.rs @@ -30,7 +30,7 @@ use vm::{MethodNum, Serialized, TokenAmount}; /// .build() /// .unwrap(); /// -/// // Commands can be chained, or built seperately +// /// Commands can be chained, or built separately /// let mut message_builder = UnsignedMessage::builder(); /// message_builder.sequence(1); /// message_builder.from(Address::new_id(0));