Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChainSync refactor #693

Merged
merged 14 commits into from
Sep 14, 2020
Merged
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions blockchain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ beacon = { path = "../beacon" }
flo_stream = "0.4.0"
address = { package = "forest_address", path = "../../vm/address" }
lazy_static = "1.4"
async-std = "1.6.3"

[dev-dependencies]
multihash = "0.10.0"
Expand Down
57 changes: 34 additions & 23 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::{Error, TipIndex, TipsetMetadata};
use actor::{power::State as PowerState, STORAGE_POWER_ACTOR_ADDR};
use address::Address;
use async_std::sync::RwLock;
use beacon::BeaconEntry;
use blake2b_simd::Params;
use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta};
Expand Down Expand Up @@ -46,15 +47,17 @@ pub enum HeadChange {
Revert(Arc<Tipset>),
}

/// Generic implementation of the datastore trait and structures
/// Stores chain data such as heaviest tipset and cached tipset info at each epoch.
/// This structure is threadsafe, and all caches are wrapped in a mutex to allow a consistent
/// `ChainStore` to be shared across tasks.
pub struct ChainStore<DB> {
publisher: Publisher<HeadChange>,
publisher: RwLock<Publisher<HeadChange>>,

// key-value datastore
pub db: Arc<DB>,

// Tipset at the head of the best-known chain.
heaviest: Option<Arc<Tipset>>,
heaviest: RwLock<Option<Arc<Tipset>>>,

// tip_index tracks tipsets by epoch/parentset for use by expected consensus.
tip_index: TipIndex,
Expand All @@ -71,44 +74,50 @@ where
.map(Arc::new);
Self {
db,
publisher: Publisher::new(SINK_CAP),
publisher: Publisher::new(SINK_CAP).into(),
tip_index: TipIndex::new(),
heaviest,
heaviest: heaviest.into(),
ec2 marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Sets heaviest tipset within ChainStore and store its tipset cids under HEAD_KEY
pub async fn set_heaviest_tipset(&mut self, ts: Arc<Tipset>) -> Result<(), Error> {
pub async fn set_heaviest_tipset(&self, ts: Arc<Tipset>) -> Result<(), Error> {
self.db.write(HEAD_KEY, ts.key().marshal_cbor()?)?;
self.heaviest = Some(ts.clone());
self.publisher.publish(HeadChange::Current(ts)).await;
*self.heaviest.write().await = Some(ts.clone());
self.publisher
.write()
.await
.publish(HeadChange::Current(ts))
.await;
Ok(())
}

// subscribing returns a future sink that we can essentially iterate over using future streams
pub fn subscribe(&mut self) -> Subscriber<HeadChange> {
self.publisher.subscribe()
pub async fn subscribe(&self) -> Subscriber<HeadChange> {
self.publisher.write().await.subscribe()
}

/// Sets tip_index tracker
pub fn set_tipset_tracker(&mut self, header: &BlockHeader) -> Result<(), Error> {
let ts: Tipset = Tipset::new(vec![header.clone()])?;
// TODO this is really broken, should not be setting the tipset metadata to a tipset with just
// the one header.
pub async fn set_tipset_tracker(&self, header: &BlockHeader) -> Result<(), Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see the need for the tip_index.rs file anymore. I recall implementing it really early thinking it would be good for tipset data retrievals but it doesn't look like we are using it anywhere for that purpose? We set the tracker but never make use of it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was planning on leaving it for now because out of scope of this and the changes were already a lot

let ts = Arc::new(Tipset::new(vec![header.clone()])?);
let meta = TipsetMetadata {
tipset_state_root: header.state_root().clone(),
tipset_receipts_root: header.message_receipts().clone(),
tipset: ts,
};
self.tip_index.put(&meta);
self.tip_index.put(&meta).await;
Ok(())
}

/// Writes genesis to blockstore
pub fn set_genesis(&self, header: BlockHeader) -> Result<Cid, Error> {
pub fn set_genesis(&self, header: &BlockHeader) -> Result<Cid, Error> {
set_genesis(self.blockstore(), header)
}

/// Writes tipset block headers to data store and updates heaviest tipset
pub async fn put_tipset(&mut self, ts: &Tipset) -> Result<(), Error> {
pub async fn put_tipset(&self, ts: &Tipset) -> Result<(), Error> {
persist_objects(self.blockstore(), ts.blocks())?;
// TODO determine if expanded tipset is required; see https://github.com/filecoin-project/lotus/blob/testnet/3/chain/store/store.go#L236
self.update_heaviest(ts).await?;
Expand All @@ -121,16 +130,18 @@ where
}

/// Loads heaviest tipset from datastore and sets as heaviest in chainstore
pub async fn load_heaviest_tipset(&mut self) -> Result<(), Error> {
pub async fn load_heaviest_tipset(&self) -> Result<(), Error> {
let heaviest_ts = get_heaviest_tipset(self.blockstore())?.ok_or_else(|| {
warn!("No previous chain state found");
Error::Other("No chain state found".to_owned())
})?;

// set as heaviest tipset
let heaviest_ts = Arc::new(heaviest_ts);
self.heaviest = Some(heaviest_ts.clone());
*self.heaviest.write().await = Some(heaviest_ts.clone());
self.publisher
.write()
.await
.publish(HeadChange::Current(heaviest_ts))
.await;
Ok(())
Expand All @@ -142,8 +153,8 @@ where
}

/// Returns heaviest tipset from blockstore
pub fn heaviest_tipset(&self) -> Option<Arc<Tipset>> {
self.heaviest.clone()
pub async fn heaviest_tipset(&self) -> Option<Arc<Tipset>> {
self.heaviest.read().await.clone()
}

/// Returns key-value store instance
Expand Down Expand Up @@ -183,8 +194,8 @@ where
}

/// Determines if provided tipset is heavier than existing known heaviest tipset
async fn update_heaviest(&mut self, ts: &Tipset) -> Result<(), Error> {
match &self.heaviest {
async fn update_heaviest(&self, ts: &Tipset) -> Result<(), Error> {
match self.heaviest.read().await.as_ref() {
Some(heaviest) => {
let new_weight = weight(self.blockstore(), ts)?;
let curr_weight = weight(self.blockstore(), &heaviest)?;
Expand Down Expand Up @@ -266,7 +277,7 @@ where
}
}

fn set_genesis<DB>(db: &DB, header: BlockHeader) -> Result<Cid, Error>
fn set_genesis<DB>(db: &DB, header: &BlockHeader) -> Result<Cid, Error>
where
DB: BlockStore,
{
Expand Down Expand Up @@ -649,7 +660,7 @@ mod tests {
.unwrap();

assert_eq!(cs.genesis().unwrap(), None);
cs.set_genesis(gen_block.clone()).unwrap();
cs.set_genesis(&gen_block).unwrap();
assert_eq!(cs.genesis().unwrap(), Some(gen_block));
}
}
45 changes: 32 additions & 13 deletions blockchain/chain/src/store/tip_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use super::errors::Error;
use async_std::sync::RwLock;
use blocks::{Tipset, TipsetKeys};
use cid::Cid;
use clock::ChainEpoch;
use std::collections::hash_map::{DefaultHasher, HashMap};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// TipsetMetadata is the type stored as the value in the TipIndex hashmap. It contains
/// a tipset pointing to blocks, the root cid of the chain's state after
/// applying the messages in this tipset to it's parent state, and the cid of the receipts
Expand All @@ -20,7 +23,8 @@ pub struct TipsetMetadata {
pub tipset_receipts_root: Cid,

/// The actual Tipset
pub tipset: Tipset,
// TODO This should not be keeping a tipset with the metadata
pub tipset: Arc<Tipset>,
}

/// Trait to allow metadata to be indexed by multiple types of structs
Expand All @@ -39,47 +43,62 @@ impl Index for TipsetKeys {}
pub struct TipIndex {
// metadata allows lookup of recorded Tipsets and their state roots
// by TipsetKey and Epoch
metadata: HashMap<u64, TipsetMetadata>,
// TODO this should be mapping epoch to a vector of Cids of block headers
metadata: RwLock<HashMap<u64, TipsetMetadata>>,
}

impl TipIndex {
/// Creates new TipIndex with empty metadata
pub fn new() -> Self {
Self {
metadata: HashMap::new(),
metadata: Default::default(),
}
}
/// Adds an entry to TipIndex's metadata
/// After this call the input TipsetMetadata can be looked up by the TipsetKey of
/// the tipset, or the tipset's epoch
pub fn put(&mut self, meta: &TipsetMetadata) {
pub async fn put(&self, meta: &TipsetMetadata) {
// retrieve parent cids to be used as hash map key
let parent_key = meta.tipset.parents();
// retrieve epoch to be used as hash map key
let epoch_key = meta.tipset.epoch();
// insert value by parent_key into hash map
self.metadata.insert(parent_key.hash_key(), meta.clone());
self.metadata
.write()
.await
.insert(parent_key.hash_key(), meta.clone());
// insert value by epoch_key into hash map
self.metadata.insert(epoch_key.hash_key(), meta.clone());
self.metadata
.write()
.await
.insert(epoch_key.hash_key(), meta.clone());
}
/// Returns the tipset given by hashed key
fn get(&self, key: u64) -> Result<TipsetMetadata, Error> {
async fn get(&self, key: u64) -> Result<TipsetMetadata, Error> {
self.metadata
.read()
.await
.get(&key)
.cloned()
.ok_or_else(|| Error::UndefinedKey("invalid metadata key".to_string()))
}

/// Returns the tipset corresponding to the hashed index
pub fn get_tipset<I: Index>(&self, idx: &I) -> Result<Tipset, Error> {
Ok(self.get(idx.hash_key()).map(|r| r.tipset)?)
pub async fn get_tipset<I: Index>(&self, idx: &I) -> Result<Arc<Tipset>, Error> {
Ok(self.get(idx.hash_key()).await.map(|r| r.tipset)?)
}
/// Returns the state root for the tipset corresponding to the index
pub fn get_tipset_state_root<I: Index>(&self, idx: &I) -> Result<Cid, Error> {
Ok(self.get(idx.hash_key()).map(|r| r.tipset_state_root)?)
pub async fn get_tipset_state_root<I: Index>(&self, idx: &I) -> Result<Cid, Error> {
Ok(self
.get(idx.hash_key())
.await
.map(|r| r.tipset_state_root)?)
}
/// Returns the receipt root for the tipset corresponding to the index
pub fn get_tipset_receipts_root<I: Index>(&self, idx: &I) -> Result<Cid, Error> {
Ok(self.get(idx.hash_key()).map(|r| r.tipset_receipts_root)?)
pub async fn get_tipset_receipts_root<I: Index>(&self, idx: &I) -> Result<Cid, Error> {
Ok(self
.get(idx.hash_key())
.await
.map(|r| r.tipset_receipts_root)?)
}
}
2 changes: 2 additions & 0 deletions blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ log = "0.4.8"
async-std = { version = "1.6.0", features = ["unstable"] }
forest_libp2p = { path = "../../node/forest_libp2p" }
futures = "0.3.5"
futures-util = "0.3.5"
lru = "0.6"
thiserror = "1.0"
num-traits = "0.2"
Expand All @@ -34,6 +35,7 @@ commcid = { path = "../../utils/commcid" }
clock = { path = "../../node/clock" }
serde = { version = "1.0", features = ["derive", "rc"] }
flo_stream = "0.4.0"
rand = "0.7.3"

[dev-dependencies]
test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] }
Expand Down
21 changes: 10 additions & 11 deletions blockchain/chain_sync/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ impl SyncBucket {
}
/// Returns true if tipset is from same chain
pub fn is_same_chain_as(&self, ts: &Tipset) -> bool {
// TODO Confirm that comparing keys will be sufficient on full tipset impl
self.tips
.iter()
.any(|t| ts.key() == t.key() || ts.key() == t.parents() || ts.parents() == t.key())
Expand All @@ -37,10 +36,6 @@ impl SyncBucket {
self.tips.push(ts);
}
}
/// Returns true if SyncBucket is empty
pub fn is_empty(&self) -> bool {
self.tips.is_empty()
}
}

/// Set of tipset buckets
Expand Down Expand Up @@ -75,13 +70,17 @@ impl SyncBucketSet {

Some(self.buckets.swap_remove(i))
}
/// Returns heaviest tipset from bucket set
pub(crate) fn heaviest(&self) -> Option<Arc<Tipset>> {
self.buckets
.iter()
.filter_map(SyncBucket::heaviest_tipset)
.max_by(|ts1, ts2| ts1.weight().cmp(ts2.weight()))

/// Returns true if tipset is related to any tipset in the bucket set.
pub(crate) fn related_to_any(&self, ts: &Tipset) -> bool {
for b in self.buckets.iter() {
if b.is_same_chain_as(ts) {
return true;
}
}
false
}

/// Returns a vector of SyncBuckets
pub(crate) fn buckets(&self) -> &[SyncBucket] {
&self.buckets
Expand Down
4 changes: 3 additions & 1 deletion blockchain/chain_sync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

#![recursion_limit = "1024"]

mod bad_block_cache;
mod bucket;
mod errors;
mod network_context;
mod network_handler;
mod peer_manager;
mod sync;
mod sync_state;
mod sync_worker;

// workaround for a compiler bug, see https://github.com/rust-lang/rust/issues/55779
extern crate serde;
Expand Down
Loading