Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send only required blocks to bootstrap #1866

Merged
merged 2 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 92 additions & 148 deletions consensus/src/block_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,39 +102,24 @@ pub struct ExportActiveBlock {
pub production_events: Vec<(u64, Address, bool)>,
}

impl From<ActiveBlock> for ExportActiveBlock {
fn from(block: ActiveBlock) -> Self {
impl From<&ActiveBlock> for ExportActiveBlock {
fn from(a_block: &ActiveBlock) -> Self {
ExportActiveBlock {
block: block.block,
parents: block.parents,
children: block.children,
dependencies: block.dependencies,
is_final: block.is_final,
block_ledger_changes: block.block_ledger_changes,
roll_updates: block.roll_updates,
production_events: block.production_events,
block: a_block.block.clone(),
parents: a_block.parents.clone(),
children: a_block.children.clone(),
dependencies: a_block.dependencies.clone(),
is_final: a_block.is_final,
block_ledger_changes: a_block.block_ledger_changes.clone(),
roll_updates: a_block.roll_updates.clone(),
production_events: a_block.production_events.clone(),
}
}
}

impl From<Box<ActiveBlock>> for ExportActiveBlock {
fn from(block: Box<ActiveBlock>) -> Self {
ExportActiveBlock {
block: block.block,
parents: block.parents,
children: block.children,
dependencies: block.dependencies,
is_final: block.is_final,
block_ledger_changes: block.block_ledger_changes,
roll_updates: block.roll_updates,
production_events: block.production_events,
}
}
}

impl<'a> TryFrom<ExportActiveBlock> for ActiveBlock {
fn try_from(block: ExportActiveBlock) -> Result<ActiveBlock, ConsensusError> {
let operation_set = block
impl TryFrom<ExportActiveBlock> for ActiveBlock {
fn try_from(a_block: ExportActiveBlock) -> Result<ActiveBlock, ConsensusError> {
let operation_set = a_block
.block
.operations
.iter()
Expand All @@ -145,7 +130,7 @@ impl<'a> TryFrom<ExportActiveBlock> for ActiveBlock {
})
.collect::<Result<_, _>>()?;

let endorsement_ids = block
let endorsement_ids = a_block
.block
.header
.content
Expand All @@ -154,76 +139,30 @@ impl<'a> TryFrom<ExportActiveBlock> for ActiveBlock {
.map(|endo| Ok((endo.compute_endorsement_id()?, endo.content.index)))
.collect::<Result<_, ConsensusError>>()?;

let addresses_to_operations = block.block.involved_addresses(&operation_set)?;
let addresses_to_endorsements = block.block.addresses_to_endorsements(&endorsement_ids)?;
let addresses_to_operations = a_block.block.involved_addresses(&operation_set)?;
let addresses_to_endorsements =
a_block.block.addresses_to_endorsements(&endorsement_ids)?;
Ok(ActiveBlock {
creator_address: Address::from_public_key(&block.block.header.content.creator)?,
block: block.block,
parents: block.parents,
children: block.children,
dependencies: block.dependencies,
descendants: Default::default(),
is_final: block.is_final,
block_ledger_changes: block.block_ledger_changes,
creator_address: Address::from_public_key(&a_block.block.header.content.creator)?,
block: a_block.block,
parents: a_block.parents,
children: a_block.children,
dependencies: a_block.dependencies,
descendants: Default::default(), // will be computed once the full graph is available
is_final: a_block.is_final,
block_ledger_changes: a_block.block_ledger_changes,
operation_set,
endorsement_ids,
addresses_to_operations,
roll_updates: block.roll_updates,
production_events: block.production_events,
roll_updates: a_block.roll_updates,
production_events: a_block.production_events,
addresses_to_endorsements,
})
}

type Error = ConsensusError;
}

impl<'a> TryFrom<ExportActiveBlock> for Box<ActiveBlock> {
fn try_from(block: ExportActiveBlock) -> Result<Box<ActiveBlock>, ConsensusError> {
let operation_set = block
.block
.operations
.iter()
.enumerate()
.map(|(idx, op)| match op.get_operation_id() {
Ok(id) => Ok((id, (idx, op.content.expire_period))),
Err(e) => Err(e),
})
.collect::<Result<_, _>>()?;

let endorsement_ids = block
.block
.header
.content
.endorsements
.iter()
.map(|endo| Ok((endo.compute_endorsement_id()?, endo.content.index)))
.collect::<Result<EndorsementHashMap<u32>, ConsensusError>>()?;

let addresses_to_operations = block.block.involved_addresses(&operation_set)?;
let addresses_to_endorsements = block
.block
.addresses_to_endorsements(&endorsement_ids.clone())?;
Ok(Box::new(ActiveBlock {
creator_address: Address::from_public_key(&block.block.header.content.creator)?,
block: block.block,
parents: block.parents,
children: block.children,
dependencies: block.dependencies,
descendants: Default::default(),
is_final: block.is_final,
block_ledger_changes: block.block_ledger_changes,
operation_set,
endorsement_ids,
addresses_to_operations,
roll_updates: block.roll_updates,
production_events: block.production_events,
addresses_to_endorsements,
}))
}

type Error = ConsensusError;
}

impl SerializeCompact for ExportActiveBlock {
fn to_bytes_compact(&self) -> Result<Vec<u8>, models::ModelsError> {
let mut res: Vec<u8> = Vec::new();
Expand Down Expand Up @@ -685,38 +624,6 @@ pub struct BootstrapableGraph {
pub ledger: LedgerSubset,
}

impl<'a> TryFrom<&'a BlockGraph> for BootstrapableGraph {
type Error = ConsensusError;
fn try_from(block_graph: &'a BlockGraph) -> Result<Self, Self::Error> {
let mut active_blocks = BlockHashMap::default();
for (hash, status) in block_graph.block_statuses.iter() {
match status {
BlockStatus::Active(block) => {
active_blocks.insert(*hash, block.clone());
}
_ => continue,
}
}

Ok(BootstrapableGraph {
active_blocks: active_blocks
.into_iter()
.map(|(hash, block)| (hash, block.into()))
.collect(),
best_parents: block_graph.best_parents.clone(),
latest_final_blocks_periods: block_graph.latest_final_blocks_periods.clone(),
gi_head: block_graph
.gi_head
.clone()
.into_iter()
.map(|(hash, incomp)| (hash, incomp.into_iter().collect()))
.collect(),
max_cliques: block_graph.max_cliques.clone(),
ledger: LedgerSubset::try_from(&block_graph.ledger)?,
})
}
}

impl SerializeCompact for BootstrapableGraph {
fn to_bytes_compact(&self) -> Result<Vec<u8>, models::ModelsError> {
let mut res: Vec<u8> = Vec::new();
Expand Down Expand Up @@ -1079,25 +986,21 @@ impl BlockGraph {
cfg,
sequence_counter: 0,
genesis_hashes: genesis_block_ids,
active_index: boot_graph.active_blocks.keys().copied().collect(),
block_statuses: boot_graph
.active_blocks
.iter()
.map(|(hash, block)| {
Ok((*hash, BlockStatus::Active(block.clone().try_into()?)))
.into_iter()
.map(|(b_id, block)| {
Ok((b_id, BlockStatus::Active(Box::new(block.try_into()?))))
})
.collect::<Result<_, ConsensusError>>()?,
incoming_index: Default::default(),
waiting_for_slot_index: Default::default(),
waiting_for_dependencies_index: Default::default(),
active_index: boot_graph.active_blocks.keys().copied().collect(),
discarded_index: Default::default(),
latest_final_blocks_periods: boot_graph.latest_final_blocks_periods,
best_parents: boot_graph.best_parents,
gi_head: boot_graph
.gi_head
.into_iter()
.map(|(h, v)| (h, v.into_iter().collect()))
.collect(),
gi_head: boot_graph.gi_head,
max_cliques: boot_graph.max_cliques,
to_propagate: Default::default(),
attack_attempts: Default::default(),
Expand Down Expand Up @@ -1164,6 +1067,34 @@ impl BlockGraph {
}
}

pub fn export_bootstrap_graph(&self) -> Result<BootstrapableGraph, ConsensusError> {
let required_active_blocks = self.list_required_active_blocks()?;
let mut active_blocks: BlockHashMap<ExportActiveBlock> =
BlockHashMap::with_capacity_and_hasher(
required_active_blocks.len(),
BuildHHasher::default(),
);
for b_id in required_active_blocks {
if let Some(BlockStatus::Active(a_block)) = self.block_statuses.get(&b_id) {
active_blocks.insert(b_id, (&**a_block).into());
} else {
return Err(ConsensusError::ContainerInconsistency(format!(
"block {} was expected to be active but wasn't on bootstrap graph export",
b_id
)));
}
}

Ok(BootstrapableGraph {
active_blocks,
best_parents: self.best_parents.clone(),
latest_final_blocks_periods: self.latest_final_blocks_periods.clone(),
gi_head: self.gi_head.clone(),
max_cliques: self.max_cliques.clone(),
ledger: LedgerSubset::try_from(&self.ledger)?,
})
}

pub fn block_state_try_apply_op(
&self,
state_accu: &mut BlockStateAccumulator,
Expand Down Expand Up @@ -3680,12 +3611,12 @@ impl BlockGraph {
Ok(())
}

// prune active blocks and return final blocks, return discarded final blocks
fn prune_active(&mut self) -> Result<BlockHashMap<ActiveBlock>, ConsensusError> {
fn list_required_active_blocks(&self) -> Result<BlockHashSet, ConsensusError> {
// list all active blocks
let active_blocks: BlockHashSet = self.active_index.clone();
let mut retain_active: BlockHashSet =
BlockHashSet::with_capacity_and_hasher(active_blocks.len(), BuildHHasher::default());
let mut retain_active: BlockHashSet = BlockHashSet::with_capacity_and_hasher(
self.active_index.len(),
BuildHHasher::default(),
);

let latest_final_blocks: Vec<BlockId> = self
.latest_final_blocks_periods
Expand All @@ -3696,7 +3627,7 @@ impl BlockGraph {
// retain all non-final active blocks,
// the current "best parents",
// and the dependencies for both.
for block_id in active_blocks.iter() {
for block_id in self.active_index.iter() {
if let Some(BlockStatus::Active(active_block)) = self.block_statuses.get(block_id) {
if !active_block.is_final
|| self.best_parents.iter().any(|(b, _p)| b == block_id)
Expand Down Expand Up @@ -3793,6 +3724,14 @@ impl BlockGraph {
}
}

Ok(retain_active)
}

// prune active blocks and return final blocks, return discarded final blocks
fn prune_active(&mut self) -> Result<BlockHashMap<ActiveBlock>, ConsensusError> {
// list required active blocks
let mut retain_active = self.list_required_active_blocks()?;

// retain extra history according to the config
// this is useful to avoid desync on temporary connection loss
for a_block in self.active_index.iter() {
Expand All @@ -3809,11 +3748,16 @@ impl BlockGraph {

// remove unused final active blocks
let mut discarded_finals: BlockHashMap<ActiveBlock> = BlockHashMap::default();
for discard_active_h in active_blocks.difference(&retain_active) {
let to_remove: Vec<BlockId> = self
.active_index
.difference(&retain_active)
.copied()
.collect();
for discard_active_h in to_remove {
let discarded_active = if let Some(BlockStatus::Active(discarded_active)) =
self.block_statuses.remove(discard_active_h)
self.block_statuses.remove(&discard_active_h)
{
self.active_index.remove(discard_active_h);
self.active_index.remove(&discard_active_h);
discarded_active
} else {
return Err(ConsensusError::ContainerInconsistency(format!("inconsistency inside block statuses pruning and removing unused final active blocks - {} is missing", discard_active_h)));
Expand All @@ -3826,23 +3770,23 @@ impl BlockGraph {
{
active_block.children
[discarded_active.block.header.content.slot.thread as usize]
.remove(discard_active_h);
.remove(&discard_active_h);
}
}

massa_trace!("consensus.block_graph.prune_active", {"hash": discard_active_h, "reason": DiscardReason::Final});
// mark as final
self.block_statuses.insert(
*discard_active_h,
discard_active_h,
BlockStatus::Discarded {
header: discarded_active.block.header.clone(),
reason: DiscardReason::Final,
sequence_number: BlockGraph::new_sequence_number(&mut self.sequence_counter),
},
);
self.discarded_index.insert(*discard_active_h);
self.discarded_index.insert(discard_active_h);

discarded_finals.insert(*discard_active_h, *discarded_active);
discarded_finals.insert(discard_active_h, *discarded_active);
}

Ok(discarded_finals)
Expand Down Expand Up @@ -4596,12 +4540,12 @@ mod tests {
active_blocks: vec![
(hash_genesist0, export_genesist0),
(hash_genesist1, export_genesist1),
(get_dummy_block_id("blockp1t0"), blockp1t0.into()),
(get_dummy_block_id("blockp1t1"), blockp1t1.into()),
(get_dummy_block_id("blockp2t0"), blockp2t0.into()),
(get_dummy_block_id("blockp2t1"), blockp2t1.into()),
(get_dummy_block_id("blockp3t0"), blockp3t0.into()),
(get_dummy_block_id("blockp3t1"), blockp3t1.into()),
(get_dummy_block_id("blockp1t0"), (&blockp1t0).into()),
(get_dummy_block_id("blockp1t1"), (&blockp1t1).into()),
(get_dummy_block_id("blockp2t0"), (&blockp2t0).into()),
(get_dummy_block_id("blockp2t1"), (&blockp2t1).into()),
(get_dummy_block_id("blockp3t0"), (&blockp3t0).into()),
(get_dummy_block_id("blockp3t1"), (&blockp3t1).into()),
]
.into_iter()
.collect(),
Expand Down
7 changes: 2 additions & 5 deletions consensus/src/consensus_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use models::{
use pool::PoolCommandSender;
use protocol_exports::{ProtocolCommandSender, ProtocolEvent, ProtocolEventReceiver};
use signature::{derive_public_key, PrivateKey, PublicKey};
use std::{cmp::max, collections::HashSet, collections::VecDeque, convert::TryFrom};
use std::{cmp::max, collections::HashSet, collections::VecDeque};
use time::UTime;
use tokio::{
sync::{mpsc, mpsc::error::SendTimeoutError, oneshot},
Expand Down Expand Up @@ -827,10 +827,7 @@ impl ConsensusWorker {
"consensus.consensus_worker.process_consensus_command.get_bootstrap_state",
{}
);
let resp = (
self.pos.export(),
BootstrapableGraph::try_from(&self.block_db)?,
);
let resp = (self.pos.export(), self.block_db.export_bootstrap_graph()?);
if response_tx.send(resp).is_err() {
warn!("consensus: could not send GetBootstrapState answer");
}
Expand Down