From 6777cdace3c83f89ff654c51718f01846333ac41 Mon Sep 17 00:00:00 2001 From: Stepan Naumov Date: Tue, 28 Jul 2020 20:44:58 -0400 Subject: [PATCH 1/3] Add Drand Beacon Cache --- Cargo.lock | 1 + blockchain/beacon/src/drand.rs | 37 ++++++++++++++------- blockchain/beacon/src/mock_beacon.rs | 6 +++- blockchain/beacon/tests/drand.rs | 4 +-- blockchain/blocks/Cargo.toml | 1 + blockchain/blocks/src/header/mod.rs | 36 ++++++++++---------- blockchain/chain_sync/src/sync.rs | 6 ++-- blockchain/chain_sync/src/sync/peer_test.rs | 2 +- forest/src/daemon.rs | 2 +- 9 files changed, 58 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9fad00392d0..45ffb53fc06f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2365,6 +2365,7 @@ dependencies = [ name = "forest_blocks" version = "0.1.0" dependencies = [ + "async-std", "base64 0.12.3", "beacon", "byteorder 1.3.4", diff --git a/blockchain/beacon/src/drand.rs b/blockchain/beacon/src/drand.rs index e8183bd3c391..ed53ab0e16d8 100644 --- a/blockchain/beacon/src/drand.rs +++ b/blockchain/beacon/src/drand.rs @@ -15,6 +15,7 @@ use grpc::ClientStub; use grpc::RequestOptions; use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize}; use sha2::Digest; +use std::collections::HashMap; use std::convert::TryFrom; use std::error; use std::sync::Arc; @@ -40,7 +41,7 @@ where { /// Verify a new beacon entry against the most recent one before it. fn verify_entry( - &self, + &mut self, curr: &BeaconEntry, prev: &BeaconEntry, ) -> Result>; @@ -59,6 +60,9 @@ pub struct DrandBeacon { drand_gen_time: u64, fil_gen_time: u64, fil_round_time: u64, + + // Keeps track of computed beacon entries. + local_cache: HashMap, } impl DrandBeacon { @@ -95,6 +99,7 @@ impl DrandBeacon { drand_gen_time: group.genesis_time, fil_round_time: interval, fil_gen_time: genesis_ts, + local_cache: HashMap::new(), }) } } @@ -103,7 +108,7 @@ impl DrandBeacon { #[async_trait] impl Beacon for DrandBeacon { fn verify_entry( - &self, + &mut self, curr: &BeaconEntry, prev: &BeaconEntry, ) -> Result> { @@ -123,21 +128,29 @@ impl Beacon for DrandBeacon { // Signature let sig = Signature::from_bytes(curr.data())?; let sig_match = bls_signatures::verify(&sig, &[digest], &[self.pub_key.key()]); - // TODO: Cache this result + + // Cache the result + if sig_match && !self.local_cache.contains_key(&curr.round()) { + self.local_cache.insert(curr.round(), curr.clone()); + } Ok(sig_match) } async fn entry(&self, round: u64) -> Result> { - // TODO: Cache values into a database - let mut req = PublicRandRequest::new(); - req.round = round; - let resp = self - .client - .public_rand(grpc::RequestOptions::new(), req) - .drop_metadata() - .await?; + match self.local_cache.get(&round) { + Some(cached_entry) => Ok(cached_entry.clone()), + None => { + let mut req = PublicRandRequest::new(); + req.round = round; + let resp = self + .client + .public_rand(grpc::RequestOptions::new(), req) + .drop_metadata() + .await?; - Ok(BeaconEntry::new(resp.round, resp.signature)) + Ok(BeaconEntry::new(resp.round, resp.signature)) + } + } } fn max_beacon_round_for_epoch(&self, fil_epoch: ChainEpoch) -> u64 { diff --git a/blockchain/beacon/src/mock_beacon.rs b/blockchain/beacon/src/mock_beacon.rs index 8349eda9fbac..f3264adefef5 100644 --- a/blockchain/beacon/src/mock_beacon.rs +++ b/blockchain/beacon/src/mock_beacon.rs @@ -29,7 +29,11 @@ impl MockBeacon { #[async_trait] impl Beacon for MockBeacon { - fn verify_entry(&self, curr: &BeaconEntry, prev: &BeaconEntry) -> Result> { + fn verify_entry( + &mut self, + curr: &BeaconEntry, + prev: &BeaconEntry, + ) -> Result> { let oe = Self::entry_for_index(prev.round()); Ok(oe.data() == curr.data()) } diff --git a/blockchain/beacon/tests/drand.rs b/blockchain/beacon/tests/drand.rs index d440e65012bf..1fe2437b94a6 100644 --- a/blockchain/beacon/tests/drand.rs +++ b/blockchain/beacon/tests/drand.rs @@ -26,7 +26,7 @@ async fn construct_drand_beacon() { #[ignore] #[async_std::test] async fn ask_and_verify_beacon_entry() { - let beacon = new_beacon().await; + let mut beacon = new_beacon().await; let e2 = beacon.entry(2).await.unwrap(); let e3 = beacon.entry(3).await.unwrap(); @@ -36,7 +36,7 @@ async fn ask_and_verify_beacon_entry() { #[ignore] #[async_std::test] async fn ask_and_verify_beacon_entry_fail() { - let beacon = new_beacon().await; + let mut beacon = new_beacon().await; let e2 = beacon.entry(2).await.unwrap(); let e3 = beacon.entry(3).await.unwrap(); diff --git a/blockchain/blocks/Cargo.toml b/blockchain/blocks/Cargo.toml index 9e92de71dcd4..9087463879a0 100644 --- a/blockchain/blocks/Cargo.toml +++ b/blockchain/blocks/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" features = ["json"] [dependencies] +async-std = { version = "1.6.0", features = ["unstable"] } address = { package = "forest_address", path = "../../vm/address" } beacon = { path = "../beacon" } byteorder = "1.3.4" diff --git a/blockchain/blocks/src/header/mod.rs b/blockchain/blocks/src/header/mod.rs index 83eb63d6ce6c..9fd8ce3bdcf1 100644 --- a/blockchain/blocks/src/header/mod.rs +++ b/blockchain/blocks/src/header/mod.rs @@ -3,6 +3,7 @@ use super::{Error, Ticket, Tipset, TipsetKeys}; use address::Address; +use async_std::sync::RwLock; use beacon::{self, Beacon, BeaconEntry}; use cid::{multihash::Blake2b256, Cid}; use clock::ChainEpoch; @@ -368,10 +369,10 @@ impl BlockHeader { /// Validates if the current header's Beacon entries are valid to ensure randomness was generated correctly pub async fn validate_block_drand( &self, - beacon: Arc, + beacon: Arc>, prev_entry: BeaconEntry, ) -> Result<(), Error> { - let max_round = beacon.max_beacon_round_for_epoch(self.epoch); + let max_round = beacon.read().await.max_beacon_round_for_epoch(self.epoch); if max_round == prev_entry.round() { if !self.beacon_entries.is_empty() { return Err(Error::Validation(format!( @@ -390,21 +391,22 @@ impl BlockHeader { last.round() ))); } - self.beacon_entries.iter().try_fold( - &prev_entry, - |prev, curr| -> Result<&BeaconEntry, Error> { - if !beacon - .verify_entry(curr, &prev) - .map_err(|e| Error::Validation(e.to_string()))? - { - return Err(Error::Validation(format!( - "beacon entry was invalid: curr:{:?}, prev: {:?}", - curr, prev - ))); - } - Ok(curr) - }, - )?; + + let mut prev = &prev_entry; + for curr in &self.beacon_entries { + if !beacon + .write() + .await + .verify_entry(&curr, &prev) + .map_err(|e| Error::Validation(e.to_string()))? + { + return Err(Error::Validation(format!( + "beacon entry was invalid: curr:{:?}, prev: {:?}", + curr, prev + ))); + } + prev = &curr; + } Ok(()) } } diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 96e93cb3861c..e7d64898edf3 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -56,7 +56,7 @@ pub struct ChainSyncer { state: Arc>, /// Drand randomness beacon - beacon: Arc, + beacon: Arc>, /// manages retrieving and updates state objects state_manager: Arc>, @@ -100,7 +100,7 @@ where { pub fn new( chain_store: ChainStore, - beacon: Arc, + beacon: Arc>, network_send: Sender, network_rx: Receiver, genesis: Tipset, @@ -1017,7 +1017,7 @@ mod tests { let gen = dummy_header(); chain_store.set_genesis(gen.clone()).unwrap(); - let beacon = Arc::new(MockBeacon::new(Duration::from_secs(1))); + let beacon = Arc::new(RwLock::new(MockBeacon::new(Duration::from_secs(1)))); let genesis_ts = Tipset::new(vec![gen]).unwrap(); ( diff --git a/blockchain/chain_sync/src/sync/peer_test.rs b/blockchain/chain_sync/src/sync/peer_test.rs index 23ec5c004afa..f0a41fc576eb 100644 --- a/blockchain/chain_sync/src/sync/peer_test.rs +++ b/blockchain/chain_sync/src/sync/peer_test.rs @@ -30,7 +30,7 @@ fn peer_manager_update() { chain_store.set_genesis(dummy_header.clone()).unwrap(); let genesis_ts = Tipset::new(vec![dummy_header]).unwrap(); - let beacon = Arc::new(MockBeacon::new(Duration::from_secs(1))); + let beacon = Arc::new(RwLock::new(MockBeacon::new(Duration::from_secs(1)))); let cs = ChainSyncer::new( chain_store, beacon, diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index bce42893bf6f..773162c820cc 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -64,7 +64,7 @@ pub(super) async fn start(config: Config) { // Initialize ChainSyncer let chain_syncer = ChainSyncer::new( chain_store, - Arc::new(beacon), + Arc::new(RwLock::new(beacon)), network_send.clone(), network_rx, genesis, From 3df64ad2ef8003214ab985812feca598a5a1b377 Mon Sep 17 00:00:00 2001 From: Stepan Naumov Date: Wed, 29 Jul 2020 21:22:02 -0400 Subject: [PATCH 2/3] addressing comments --- Cargo.lock | 1 + blockchain/beacon/Cargo.toml | 1 + blockchain/beacon/src/drand.rs | 8 ++++---- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45ffb53fc06f..b28768d7fa5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -741,6 +741,7 @@ checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" name = "beacon" version = "0.1.0" dependencies = [ + "ahash 0.4.4", "async-std", "async-trait", "base64 0.12.3", diff --git a/blockchain/beacon/Cargo.toml b/blockchain/beacon/Cargo.toml index 1a9e242a1921..5c75f1deee8b 100644 --- a/blockchain/beacon/Cargo.toml +++ b/blockchain/beacon/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" features = ["json"] [dependencies] +ahash = "0.4" grpc = "0.8" grpc-protobuf = "0.8" protobuf = "2.14.0" diff --git a/blockchain/beacon/src/drand.rs b/blockchain/beacon/src/drand.rs index ed53ab0e16d8..dcb0c289550b 100644 --- a/blockchain/beacon/src/drand.rs +++ b/blockchain/beacon/src/drand.rs @@ -7,6 +7,7 @@ use super::drand_api::api_grpc::PublicClient; use super::drand_api::common::GroupRequest; use super::group::Group; +use ahash::AHashMap; use async_trait::async_trait; use bls_signatures::{PublicKey, Serialize, Signature}; use byteorder::{BigEndian, WriteBytesExt}; @@ -15,7 +16,6 @@ use grpc::ClientStub; use grpc::RequestOptions; use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize}; use sha2::Digest; -use std::collections::HashMap; use std::convert::TryFrom; use std::error; use std::sync::Arc; @@ -61,8 +61,8 @@ pub struct DrandBeacon { fil_gen_time: u64, fil_round_time: u64, - // Keeps track of computed beacon entries. - local_cache: HashMap, + /// Keeps track of computed beacon entries. + local_cache: AHashMap, } impl DrandBeacon { @@ -99,7 +99,7 @@ impl DrandBeacon { drand_gen_time: group.genesis_time, fil_round_time: interval, fil_gen_time: genesis_ts, - local_cache: HashMap::new(), + local_cache: AHashMap::new(), }) } } From 5100f9e1bae433c5144035d59f652ff781f435e0 Mon Sep 17 00:00:00 2001 From: Stepan Naumov Date: Thu, 30 Jul 2020 10:06:46 -0400 Subject: [PATCH 3/3] wrapping local_cache in rwlock only --- Cargo.lock | 1 - blockchain/beacon/Cargo.toml | 1 + blockchain/beacon/src/drand.rs | 22 ++++++++++++--------- blockchain/beacon/src/mock_beacon.rs | 4 ++-- blockchain/beacon/tests/drand.rs | 8 ++++---- blockchain/blocks/Cargo.toml | 1 - blockchain/blocks/src/header/mod.rs | 8 +++----- blockchain/chain_sync/src/sync.rs | 6 +++--- blockchain/chain_sync/src/sync/peer_test.rs | 2 +- forest/src/daemon.rs | 2 +- 10 files changed, 28 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b28768d7fa5f..5445418c5d95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2366,7 +2366,6 @@ dependencies = [ name = "forest_blocks" version = "0.1.0" dependencies = [ - "async-std", "base64 0.12.3", "beacon", "byteorder 1.3.4", diff --git a/blockchain/beacon/Cargo.toml b/blockchain/beacon/Cargo.toml index 5c75f1deee8b..dba4f833f5d1 100644 --- a/blockchain/beacon/Cargo.toml +++ b/blockchain/beacon/Cargo.toml @@ -9,6 +9,7 @@ features = ["json"] [dependencies] ahash = "0.4" +async-std = { version = "1.6.0", features = ["unstable"] } grpc = "0.8" grpc-protobuf = "0.8" protobuf = "2.14.0" diff --git a/blockchain/beacon/src/drand.rs b/blockchain/beacon/src/drand.rs index dcb0c289550b..a8b48931910f 100644 --- a/blockchain/beacon/src/drand.rs +++ b/blockchain/beacon/src/drand.rs @@ -8,6 +8,7 @@ use super::drand_api::common::GroupRequest; use super::group::Group; use ahash::AHashMap; +use async_std::sync::RwLock; use async_trait::async_trait; use bls_signatures::{PublicKey, Serialize, Signature}; use byteorder::{BigEndian, WriteBytesExt}; @@ -40,8 +41,8 @@ where Self: Sized, { /// Verify a new beacon entry against the most recent one before it. - fn verify_entry( - &mut self, + async fn verify_entry( + &self, curr: &BeaconEntry, prev: &BeaconEntry, ) -> Result>; @@ -62,7 +63,7 @@ pub struct DrandBeacon { fil_round_time: u64, /// Keeps track of computed beacon entries. - local_cache: AHashMap, + local_cache: RwLock>, } impl DrandBeacon { @@ -99,7 +100,7 @@ impl DrandBeacon { drand_gen_time: group.genesis_time, fil_round_time: interval, fil_gen_time: genesis_ts, - local_cache: AHashMap::new(), + local_cache: Default::default(), }) } } @@ -107,8 +108,8 @@ impl DrandBeacon { /// Use this to source randomness and to verify Drand beacon entries. #[async_trait] impl Beacon for DrandBeacon { - fn verify_entry( - &mut self, + async fn verify_entry( + &self, curr: &BeaconEntry, prev: &BeaconEntry, ) -> Result> { @@ -130,14 +131,17 @@ impl Beacon for DrandBeacon { let sig_match = bls_signatures::verify(&sig, &[digest], &[self.pub_key.key()]); // Cache the result - if sig_match && !self.local_cache.contains_key(&curr.round()) { - self.local_cache.insert(curr.round(), curr.clone()); + if sig_match && !self.local_cache.read().await.contains_key(&curr.round()) { + self.local_cache + .write() + .await + .insert(curr.round(), curr.clone()); } Ok(sig_match) } async fn entry(&self, round: u64) -> Result> { - match self.local_cache.get(&round) { + match self.local_cache.read().await.get(&round) { Some(cached_entry) => Ok(cached_entry.clone()), None => { let mut req = PublicRandRequest::new(); diff --git a/blockchain/beacon/src/mock_beacon.rs b/blockchain/beacon/src/mock_beacon.rs index f3264adefef5..9a42cd93e926 100644 --- a/blockchain/beacon/src/mock_beacon.rs +++ b/blockchain/beacon/src/mock_beacon.rs @@ -29,8 +29,8 @@ impl MockBeacon { #[async_trait] impl Beacon for MockBeacon { - fn verify_entry( - &mut self, + async fn verify_entry( + &self, curr: &BeaconEntry, prev: &BeaconEntry, ) -> Result> { diff --git a/blockchain/beacon/tests/drand.rs b/blockchain/beacon/tests/drand.rs index 1fe2437b94a6..9c7915c4e4ee 100644 --- a/blockchain/beacon/tests/drand.rs +++ b/blockchain/beacon/tests/drand.rs @@ -26,19 +26,19 @@ async fn construct_drand_beacon() { #[ignore] #[async_std::test] async fn ask_and_verify_beacon_entry() { - let mut beacon = new_beacon().await; + let beacon = new_beacon().await; let e2 = beacon.entry(2).await.unwrap(); let e3 = beacon.entry(3).await.unwrap(); - assert!(beacon.verify_entry(&e3, &e2).unwrap()); + assert!(beacon.verify_entry(&e3, &e2).await.unwrap()); } #[ignore] #[async_std::test] async fn ask_and_verify_beacon_entry_fail() { - let mut beacon = new_beacon().await; + let beacon = new_beacon().await; let e2 = beacon.entry(2).await.unwrap(); let e3 = beacon.entry(3).await.unwrap(); - assert!(!beacon.verify_entry(&e2, &e3).unwrap()); + assert!(!beacon.verify_entry(&e2, &e3).await.unwrap()); } diff --git a/blockchain/blocks/Cargo.toml b/blockchain/blocks/Cargo.toml index 9087463879a0..9e92de71dcd4 100644 --- a/blockchain/blocks/Cargo.toml +++ b/blockchain/blocks/Cargo.toml @@ -8,7 +8,6 @@ edition = "2018" features = ["json"] [dependencies] -async-std = { version = "1.6.0", features = ["unstable"] } address = { package = "forest_address", path = "../../vm/address" } beacon = { path = "../beacon" } byteorder = "1.3.4" diff --git a/blockchain/blocks/src/header/mod.rs b/blockchain/blocks/src/header/mod.rs index 9fd8ce3bdcf1..1ed6f4fb205a 100644 --- a/blockchain/blocks/src/header/mod.rs +++ b/blockchain/blocks/src/header/mod.rs @@ -3,7 +3,6 @@ use super::{Error, Ticket, Tipset, TipsetKeys}; use address::Address; -use async_std::sync::RwLock; use beacon::{self, Beacon, BeaconEntry}; use cid::{multihash::Blake2b256, Cid}; use clock::ChainEpoch; @@ -369,10 +368,10 @@ impl BlockHeader { /// Validates if the current header's Beacon entries are valid to ensure randomness was generated correctly pub async fn validate_block_drand( &self, - beacon: Arc>, + beacon: Arc, prev_entry: BeaconEntry, ) -> Result<(), Error> { - let max_round = beacon.read().await.max_beacon_round_for_epoch(self.epoch); + let max_round = beacon.max_beacon_round_for_epoch(self.epoch); if max_round == prev_entry.round() { if !self.beacon_entries.is_empty() { return Err(Error::Validation(format!( @@ -395,9 +394,8 @@ impl BlockHeader { let mut prev = &prev_entry; for curr in &self.beacon_entries { if !beacon - .write() - .await .verify_entry(&curr, &prev) + .await .map_err(|e| Error::Validation(e.to_string()))? { return Err(Error::Validation(format!( diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index e7d64898edf3..96e93cb3861c 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -56,7 +56,7 @@ pub struct ChainSyncer { state: Arc>, /// Drand randomness beacon - beacon: Arc>, + beacon: Arc, /// manages retrieving and updates state objects state_manager: Arc>, @@ -100,7 +100,7 @@ where { pub fn new( chain_store: ChainStore, - beacon: Arc>, + beacon: Arc, network_send: Sender, network_rx: Receiver, genesis: Tipset, @@ -1017,7 +1017,7 @@ mod tests { let gen = dummy_header(); chain_store.set_genesis(gen.clone()).unwrap(); - let beacon = Arc::new(RwLock::new(MockBeacon::new(Duration::from_secs(1)))); + let beacon = Arc::new(MockBeacon::new(Duration::from_secs(1))); let genesis_ts = Tipset::new(vec![gen]).unwrap(); ( diff --git a/blockchain/chain_sync/src/sync/peer_test.rs b/blockchain/chain_sync/src/sync/peer_test.rs index f0a41fc576eb..23ec5c004afa 100644 --- a/blockchain/chain_sync/src/sync/peer_test.rs +++ b/blockchain/chain_sync/src/sync/peer_test.rs @@ -30,7 +30,7 @@ fn peer_manager_update() { chain_store.set_genesis(dummy_header.clone()).unwrap(); let genesis_ts = Tipset::new(vec![dummy_header]).unwrap(); - let beacon = Arc::new(RwLock::new(MockBeacon::new(Duration::from_secs(1)))); + let beacon = Arc::new(MockBeacon::new(Duration::from_secs(1))); let cs = ChainSyncer::new( chain_store, beacon, diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index 773162c820cc..bce42893bf6f 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -64,7 +64,7 @@ pub(super) async fn start(config: Config) { // Initialize ChainSyncer let chain_syncer = ChainSyncer::new( chain_store, - Arc::new(RwLock::new(beacon)), + Arc::new(beacon), network_send.clone(), network_rx, genesis,