Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jaden/chainsync/asyncverification #419

Merged
merged 16 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 94 additions & 41 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ use super::peer_manager::PeerManager;
use super::{Error, SyncNetworkContext};
use address::Address;
use amt::Amt;
use async_std::prelude::*;
use async_std::sync::{channel, Receiver, Sender};
use async_std::task;
use blocks::{Block, FullTipset, Tipset, TipsetKeys, TxMeta};
use chain::ChainStore;
use cid::{multihash::Blake2b256, Cid};
use core::time::Duration;
use crypto::verify_bls_aggregate;
use db::MemoryDB;
use encoding::{Cbor, Error as EncodingError};
use forest_libp2p::{
hello::HelloMessage, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES,
};
use futures::stream::{FuturesUnordered, StreamExt};
use ipld_blockstore::BlockStore;
use libp2p::core::PeerId;
use log::{debug, info, warn};
Expand Down Expand Up @@ -96,7 +97,7 @@ struct MsgMetaData {

impl<DB> ChainSyncer<DB>
where
DB: BlockStore,
DB: BlockStore + Sync + Send + 'static,
{
pub fn new(
chain_store: ChainStore<DB>,
Expand Down Expand Up @@ -133,7 +134,7 @@ where

impl<DB> ChainSyncer<DB>
where
DB: BlockStore,
DB: BlockStore + Sync + Send + 'static,
{
flodesi marked this conversation as resolved.
Show resolved Hide resolved
pub async fn start(mut self) -> Result<(), Error> {
self.net_handler.spawn(Arc::clone(&self.peer_manager));
Expand Down Expand Up @@ -316,7 +317,7 @@ where
}

// validate message root from header matches message root
let sm_root = self.compute_msg_data(&bls_msgs, &secp_msgs)?;
let sm_root = Self::compute_msg_data(&bls_msgs, &secp_msgs)?;
if header.messages() != &sm_root {
return Err(Error::InvalidRoots);
}
Expand Down Expand Up @@ -436,7 +437,7 @@ where
/// Validates message root from header matches message root generated from the
/// bls and secp messages contained in the passed in block and stores them in a key-value store
fn validate_msg_data(&self, block: &Block) -> Result<(), Error> {
let sm_root = self.compute_msg_data(block.bls_msgs(), block.secp_msgs())?;
let sm_root = Self::compute_msg_data(block.bls_msgs(), block.secp_msgs())?;
if block.header().messages() != &sm_root {
return Err(Error::InvalidRoots);
}
Expand All @@ -448,26 +449,24 @@ where
}
/// Returns message root CID from bls and secp message contained in the param Block
fn compute_msg_data(
&self,
bls_msgs: &[UnsignedMessage],
secp_msgs: &[SignedMessage],
) -> Result<Cid, Error> {
let temp_store = MemoryDB::default();
// collect bls and secp cids
let bls_cids = cids_from_messages(bls_msgs)?;
let secp_cids = cids_from_messages(secp_msgs)?;
// generate Amt and batch set message values
let bls_root = Amt::new_from_slice(self.chain_store.blockstore(), &bls_cids)?;
let secp_root = Amt::new_from_slice(self.chain_store.blockstore(), &secp_cids)?;
let bls_root = Amt::new_from_slice(&temp_store, &bls_cids)?;
let secp_root = Amt::new_from_slice(&temp_store, &secp_cids)?;

let meta = TxMeta {
bls_message_root: bls_root,
secp_message_root: secp_root,
};
// TODO this should be memoryDB for temp storage
// store message roots and receive meta_root
let meta_root = self
.chain_store
.blockstore()
let meta_root = temp_store
.put(&meta, Blake2b256)
.map_err(|e| Error::Other(e.to_string()))?;
austinabell marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -510,16 +509,12 @@ where
Ok(fts)
}
// Block message validation checks
fn check_block_msgs(&self, block: Block, tip: &Tipset) -> Result<(), Error> {
let mut pub_keys = Vec::new();
let mut cids = Vec::new();
for m in block.bls_msgs() {
let pk = self
.state_manager
.get_bls_public_key(m.from(), tip.parent_state())?;
pub_keys.push(pk);
cids.push(m.cid()?.to_bytes());
}
fn check_block_msgs(
db: Arc<DB>,
pub_keys: Vec<Vec<u8>>,
cids: Vec<Vec<u8>>,
block: Block,
) -> Result<(), Error> {
if let Some(sig) = block.header().bls_aggregate() {
if !verify_bls_aggregate(
cids.iter()
Expand Down Expand Up @@ -593,7 +588,7 @@ where
let mut msg_meta_data: HashMap<Address, MsgMetaData> = HashMap::default();
// TODO retrieve tipset state and load state tree
// temporary
let tree = StateTree::new(self.chain_store.db.as_ref());
let tree = StateTree::new(db.as_ref());
// loop through bls messages and check msg validity
for m in block.bls_msgs() {
check_msg(m, &mut msg_meta_data, &tree)?;
Expand All @@ -607,7 +602,7 @@ where
.map_err(|e| Error::Validation(format!("Message signature invalid: {}", e)))?;
}
// validate message root from header matches message root
let sm_root = self.compute_msg_data(block.bls_msgs(), block.secp_msgs())?;
let sm_root = Self::compute_msg_data(block.bls_msgs(), block.secp_msgs())?;
if block.header().messages() != &sm_root {
return Err(Error::InvalidRoots);
}
Expand All @@ -617,48 +612,106 @@ where

/// Validates block semantically according to https://github.com/filecoin-project/specs/blob/6ab401c0b92efb6420c6e198ec387cf56dc86057/validation.md
async fn validate(&self, block: &Block) -> Result<(), Error> {
let mut error_vec: Vec<String> = Vec::new();
let mut validations = FuturesUnordered::new();

let header = block.header();

// check if block has been signed
if header.signature().is_none() {
return Err(Error::Validation("Signature is nil in header".to_owned()));
error_vec.push("Signature is nil in header".to_owned());
}

let parent_tipset = self.chain_store.tipset_from_keys(header.parents())?;

// time stamp checks
header.validate_timestamps(&parent_tipset)?;
let time_stamp_check = header.validate_timestamps(&parent_tipset);
if time_stamp_check.is_err() {
error_vec.push(time_stamp_check.err().unwrap().to_string());
}
flodesi marked this conversation as resolved.
Show resolved Hide resolved

// check messages to ensure valid state transitions
flodesi marked this conversation as resolved.
Show resolved Hide resolved
self.check_block_msgs(block.clone(), &parent_tipset)?;
let b = block.clone();

// Check Block Message and Signatures in them
let mut pub_keys = Vec::new();
let mut cids = Vec::new();
for m in block.bls_msgs() {
let pk = self
.state_manager
.get_bls_public_key(m.from(), parent_tipset.parent_state())?;
pub_keys.push(pk);
cids.push(m.cid()?.to_bytes());
}
flodesi marked this conversation as resolved.
Show resolved Hide resolved
let db = Arc::clone(&self.chain_store.db);
let x = task::spawn_blocking(move || Self::check_block_msgs(db, pub_keys, cids, b));
validations.push(x);

// TODO use computed state_root instead of parent_tipset.parent_state()
let work_addr = self
.state_manager
.get_miner_work_addr(&parent_tipset.parent_state(), header.miner_address())?;

// block signature check
header.check_block_signature(&work_addr)?;
let work_addr_result = self
.state_manager
.get_miner_work_addr(&parent_tipset.parent_state(), header.miner_address());
match work_addr_result {
Ok(work_addr) => {
let temp_header = header.clone();
let block_sig_task = task::spawn_blocking(move || {
temp_header
.check_block_signature(&work_addr)
.map_err(|err| Error::Blockchain(err))
});
validations.push(block_sig_task)
}
Err(err) => error_vec.push(err.to_string()),
}

let slash = self
.state_manager
.is_miner_slashed(header.miner_address(), &parent_tipset.parent_state())?;
.is_miner_slashed(header.miner_address(), &parent_tipset.parent_state())
.unwrap_or_else(|err| {
error_vec.push(err.to_string());
false
});
if slash {
return Err(Error::Validation(
"Received block was from slashed or invalid miner".to_owned(),
));
error_vec.push("Received block was from slashed or invalid miner".to_owned())
}

let (c_pow, net_pow) = self
let power_result = self
.state_manager
.get_power(&parent_tipset.parent_state(), header.miner_address())?;
.get_power(&parent_tipset.parent_state(), header.miner_address());
// ticket winner check
if !header.is_ticket_winner(c_pow, net_pow) {
return Err(Error::Validation(
"Miner created a block but was not a winner".to_owned(),
));
match power_result {
Ok(pow_tuple) => {
let (c_pow, net_pow) = pow_tuple;
if !header.is_ticket_winner(c_pow, net_pow) {
error_vec.push("Miner created a block but was not a winner".to_owned())
}
}
Err(err) => error_vec.push(err.to_string()),
}

// TODO verify_ticket_vrf

// collect the errors from the async validations
loop {
match validations.next().await {
Some(result) => {
if result.is_err() {
error_vec.push(result.err().unwrap().to_string());
}
}
None => {
break;
}
}
}
flodesi marked this conversation as resolved.
Show resolved Hide resolved
// combine vec of error strings and return Validation error with this resultant string
if !error_vec.is_empty() {
let error_string = error_vec.join(", ");
return Err(Error::Validation(error_string.to_owned()));
}

Ok(())
}
/// validates tipsets and adds header data to tipset tracker
Expand Down Expand Up @@ -947,7 +1000,7 @@ mod tests {
Cid::from_raw_cid("bafy2bzacecujyfvb74s7xxnlajidxpgcpk6abyatk62dlhgq6gcob3iixhgom")
.unwrap();

let root = cs.compute_msg_data(&[bls], &[secp]).unwrap();
let root = ChainSyncer::<MemoryDB>::compute_msg_data(&[bls], &[secp]).unwrap();
austinabell marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(root, expected_root);
}
}
10 changes: 9 additions & 1 deletion blockchain/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ where
Ok(addr)
}
/// Returns specified actor's claimed power and total network power as a tuple
pub fn get_power(&self, state_cid: &Cid, addr: &Address) -> Result<(BigUint, BigUint), Error> {
pub fn get_power<'a>(
&'a self,
state_cid: &Cid,
addr: &Address,
) -> Result<(BigUint, BigUint), Error> {
flodesi marked this conversation as resolved.
Show resolved Hide resolved
let ps: power::State = self.load_actor_state(&*STORAGE_POWER_ACTOR_ADDR, state_cid)?;

if let Some(claim) = ps.get_claim(self.bs.as_ref(), addr)? {
Expand Down Expand Up @@ -129,4 +133,8 @@ where
}
Ok(kaddr.payload_bytes())
}

pub fn get_db(&self) -> Arc<DB> {
Arc::clone(&self.bs)
}
austinabell marked this conversation as resolved.
Show resolved Hide resolved
}