Skip to content

Commit

Permalink
SyncBucket cleanup (#407)
Browse files Browse the repository at this point in the history
* Minor bucket improvements

* Remove unused Ord implementations of several structures

* Derive PartialEq and Hash for Cid in order to avoid allocating

* Remove unnecessary mut
  • Loading branch information
timvermeulen authored May 12, 2020
1 parent ef7aafd commit f47b757
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 83 deletions.
13 changes: 0 additions & 13 deletions blockchain/blocks/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use num_bigint::{
BigUint,
};
use sha2::Digest;
use std::cmp::Ordering;
use std::fmt;
use std::time::{SystemTime, UNIX_EPOCH};
use vm::PoStProof;
Expand Down Expand Up @@ -202,18 +201,6 @@ impl<'de> Deserialize<'de> for BlockHeader {
}
}

impl PartialOrd for BlockHeader {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.cached_bytes.partial_cmp(&other.cached_bytes)
}
}

impl Ord for BlockHeader {
fn cmp(&self, other: &Self) -> Ordering {
self.cached_bytes.cmp(&other.cached_bytes)
}
}

impl BlockHeader {
/// Generates a BlockHeader builder as a constructor
pub fn builder() -> BlockHeaderBuilder {
Expand Down
4 changes: 2 additions & 2 deletions blockchain/blocks/src/tipset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use serde::Deserialize;
/// A set of CIDs forming a unique key for a Tipset.
/// Equal keys will have equivalent iteration order, but note that the CIDs are *not* maintained in
/// the same order as the canonical iteration order of blocks in a tipset (which is by ticket)
#[derive(Clone, Debug, PartialEq, Eq, Hash, Default, Ord, PartialOrd)]
#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)]
pub struct TipsetKeys {
pub cids: Vec<Cid>,
}
Expand Down Expand Up @@ -56,7 +56,7 @@ impl Cbor for TipsetKeys {}

/// An immutable set of blocks at the same height with the same parent set.
/// Blocks in a tipset are canonically ordered by ticket size.
#[derive(Clone, PartialEq, Debug, PartialOrd, Ord, Eq)]
#[derive(Clone, PartialEq, Debug, Eq)]
pub struct Tipset {
blocks: Vec<BlockHeader>,
key: TipsetKeys,
Expand Down
66 changes: 29 additions & 37 deletions blockchain/chain_sync/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use blocks::Tipset;
use num_bigint::BigUint;
use std::sync::Arc;

/// SyncBucket defines a bucket of tipsets to sync
#[derive(Clone, Default, PartialEq, PartialOrd, Ord, Eq)]
#[derive(Clone, Default, PartialEq, Eq)]
pub struct SyncBucket {
tips: Vec<Arc<Tipset>>,
}
Expand All @@ -15,25 +16,20 @@ impl SyncBucket {
fn new(tips: Vec<Arc<Tipset>>) -> SyncBucket {
Self { tips }
}
/// Returns the weight of the heaviest tipset
fn max_weight(&self) -> Option<&BigUint> {
self.tips.iter().map(|ts| ts.weight()).max()
}
/// Returns the tipset with the max weight
pub fn heaviest_tipset(&self) -> Option<Arc<Tipset>> {
if self.tips.is_empty() {
return None;
}

// return max value pointer
self.tips.iter().max_by_key(|a| a.weight()).cloned()
}
/// Returns true if tipset is from same chain
pub fn same_chain_as(&mut self, ts: &Tipset) -> bool {
for t in self.tips.iter_mut() {
// TODO Confirm that comparing keys will be sufficient on full tipset impl
if ts.key() == t.key() || ts.key() == t.parents() || ts.parents() == t.key() {
return true;
}
}

false
pub fn is_same_chain_as(&self, ts: &Tipset) -> bool {
// TODO Confirm that comparing keys will be sufficient on full tipset impl
self.tips
.iter()
.any(|t| ts.key() == t.key() || ts.key() == t.parents() || ts.parents() == t.key())
}
/// Adds tipset to vector to be included in the bucket
pub fn add(&mut self, ts: Arc<Tipset>) {
Expand All @@ -56,39 +52,35 @@ pub(crate) struct SyncBucketSet {
impl SyncBucketSet {
/// Inserts a tipset into a bucket
pub(crate) fn insert(&mut self, tipset: Arc<Tipset>) {
for b in self.buckets.iter_mut() {
if b.same_chain_as(&tipset) {
b.add(tipset);
return;
}
if let Some(b) = self
.buckets
.iter_mut()
.find(|b| b.is_same_chain_as(&tipset))
{
b.add(tipset);
} else {
self.buckets.push(SyncBucket::new(vec![tipset]))
}
self.buckets.push(SyncBucket::new(vec![tipset]))
}
/// Removes the SyncBucket with heaviest weighted Tipset from SyncBucketSet
pub(crate) fn pop(&mut self) -> Option<SyncBucket> {
if let Some((i, _)) = self
let (i, _) = self
.buckets()
.iter()
.enumerate()
.max_by_key(|(_, b)| b.heaviest_tipset())
{
let ts = self.buckets.remove(i);
Some(ts)
} else {
None
}
.map(|(i, b)| (i, b.max_weight()))
.max_by(|(_, w1), (_, w2)| w1.cmp(w2))?;
// we can't use `max_by_key` here because the weight is a reference,
// see https://github.com/rust-lang/rust/issues/34162

Some(self.buckets.swap_remove(i))
}
/// Returns heaviest tipset from bucket set
pub(crate) fn heaviest(&self) -> Option<Arc<Tipset>> {
// Transform max values from each bucket into a Vec
let vals: Vec<Arc<Tipset>> = self
.buckets
self.buckets
.iter()
.filter_map(|b| b.heaviest_tipset())
.collect();

// Return the heaviest tipset bucket
vals.iter().max_by_key(|b| b.weight()).cloned()
.filter_map(SyncBucket::heaviest_tipset)
.max_by(|ts1, ts2| ts1.weight().cmp(ts2.weight()))
}
/// Returns a vector of SyncBuckets
pub(crate) fn buckets(&self) -> &[SyncBucket] {
Expand Down
4 changes: 2 additions & 2 deletions blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl SyncNetworkContext {
}

/// Send a hello request to the network (does not await response)
pub async fn hello_request(&mut self, peer_id: PeerId, request: HelloMessage) {
pub async fn hello_request(&self, peer_id: PeerId, request: HelloMessage) {
trace!("Sending Hello Message {:?}", request);
// TODO update to await response when we want to handle the latency
self.send_rpc_event(peer_id, RPCEvent::Request(0, RPCRequest::Hello(request)))
Expand Down Expand Up @@ -146,7 +146,7 @@ impl SyncNetworkContext {
}

/// Handles sending the base event to the network service
async fn send_rpc_event(&mut self, peer_id: PeerId, event: RPCEvent) {
async fn send_rpc_event(&self, peer_id: PeerId, event: RPCEvent) {
self.network_send
.send(NetworkMessage::RPC { peer_id, event })
.await
Expand Down
2 changes: 1 addition & 1 deletion blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ where
// TODO check for related tipsets

// if next_sync_target is from same chain as incoming tipset add it to be synced next
if !self.next_sync_target.is_empty() && self.next_sync_target.same_chain_as(&tipset) {
if self.next_sync_target.is_same_chain_as(&tipset) {
self.next_sync_target.add(tipset);
} else {
// add incoming tipset to queue to by synced later
Expand Down
2 changes: 1 addition & 1 deletion ipld/cid/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::Error;

macro_rules! build_codec_enum {
{$( $val:expr => $var:ident, )*} => {
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum Codec {
$( $var, )*
}
Expand Down
27 changes: 1 addition & 26 deletions ipld/cid/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub use self::version::Version;
use integer_encoding::{VarIntReader, VarIntWriter};
pub use multihash;
use multihash::{Code, Identity, Multihash, MultihashDigest};
use std::cmp::Ordering;
use std::convert::TryInto;
use std::fmt;
use std::io::Cursor;
Expand Down Expand Up @@ -43,7 +42,7 @@ pub struct Prefix {
}

/// Representation of a IPLD CID.
#[derive(Eq, Clone)]
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct Cid {
pub version: Version,
pub codec: Codec,
Expand Down Expand Up @@ -203,30 +202,6 @@ impl Cid {
}
}

impl std::hash::Hash for Cid {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.to_bytes().hash(state);
}
}

impl PartialEq for Cid {
fn eq(&self, other: &Self) -> bool {
self.to_bytes() == other.to_bytes()
}
}

impl PartialOrd for Cid {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.to_bytes().partial_cmp(&other.to_bytes())
}
}

impl Ord for Cid {
fn cmp(&self, other: &Self) -> Ordering {
self.to_bytes().cmp(&other.to_bytes())
}
}

impl fmt::Display for Cid {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let encoded = match self.version {
Expand Down
2 changes: 1 addition & 1 deletion ipld/cid/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::Error;

/// Cid protocol version
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum Version {
V0,
V1,
Expand Down

0 comments on commit f47b757

Please sign in to comment.