Skip to content

Commit

Permalink
Merge #3310
Browse files Browse the repository at this point in the history
3310: List required active blocks refactoring r=AurelienFT a=Eitu33

* [x] document all added functions
* [x] try in labnet
* [x] make tests compile
* [x] make tests pass 
* [x] add logs allowing easy debugging in case the changes caused problems

Co-authored-by: Thomas Plisson <thomas.plisson@epitech.eu>
  • Loading branch information
bors[bot] and Eitu33 authored Dec 7, 2022
2 parents 1207bfe + d10ca11 commit 6bd8f01
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 111 deletions.
6 changes: 3 additions & 3 deletions massa-bootstrap/src/tests/scenarios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ async fn test_bootstrap_server() {
response_tx,
..
} => {
// send the consensus blocks only on the first call
// give an empty answer for the following ones
// send the consensus blocks at the 4th slot (1 for startup + 3 for safety)
// give an empty answer for any other call
if execution_cursor
== &StreamingStep::Ongoing(Slot {
period: 1,
thread: 0,
thread: 1,
})
{
response_tx
Expand Down
8 changes: 6 additions & 2 deletions massa-consensus-worker/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ impl ConsensusController for ConsensusControllerImpl {
let mut final_blocks: Vec<ExportActiveBlock> = Vec::new();
let mut retrieved_ids: PreHashSet<BlockId> = PreHashSet::default();
let read_shared_state = self.shared_state.read();
let required_blocks: PreHashSet<BlockId> =
read_shared_state.list_required_active_blocks()?;
let required_blocks: PreHashSet<BlockId> = match execution_cursor {
StreamingStep::Ongoing(slot) | StreamingStep::Finished(Some(slot)) => {
read_shared_state.list_required_active_blocks(Some(slot))?
}
_ => PreHashSet::default(),
};

let (current_ids, previous_ids, outdated_ids) = match cursor {
StreamingStep::Started => (
Expand Down
2 changes: 2 additions & 0 deletions massa-consensus-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
//! when protocol sends informations to this module.
//!
//! This module doesn't use asynchronous code.
#![feature(deadline_api)]
#![feature(let_chains)]

mod commands;
mod controller;
Expand Down
258 changes: 153 additions & 105 deletions massa-consensus-worker/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::{HashMap, VecDeque};
use std::{
collections::{HashMap, VecDeque},
vec,
};

use massa_consensus_exports::{
block_graph_export::BlockGraphExport,
Expand All @@ -17,6 +20,7 @@ use massa_models::{
};
use massa_storage::Storage;
use massa_time::MassaTime;
use tracing::debug;

mod graph;
mod process;
Expand Down Expand Up @@ -96,6 +100,18 @@ impl ConsensusState {
}
}

/// Get a full active block
///
/// Returns an error if it was not found
pub fn try_get_full_active_block(
&self,
block_id: &BlockId,
) -> Result<(&ActiveBlock, &Storage), ConsensusError> {
self.get_full_active_block(block_id).ok_or_else(|| {
ConsensusError::ContainerInconsistency(format!("block {} is missing", block_id))
})
}

pub fn get_clique_count(&self) -> usize {
self.max_cliques.len()
}
Expand Down Expand Up @@ -199,125 +215,157 @@ impl ConsensusState {
}
}

pub fn list_required_active_blocks(&self) -> Result<PreHashSet<BlockId>, ConsensusError> {
// list all active blocks
let mut retain_active: PreHashSet<BlockId> =
PreHashSet::<BlockId>::with_capacity(self.active_index.len());

let latest_final_blocks: Vec<BlockId> = self
.latest_final_blocks_periods
.iter()
.map(|(hash, _)| *hash)
.collect();

// retain all non-final active blocks,
// the current "best parents",
// and the dependencies for both.
for block_id in self.active_index.iter() {
if let Some(BlockStatus::Active {
a_block: active_block,
..
}) = self.block_statuses.get(block_id)
{
if !active_block.is_final
|| self.best_parents.iter().any(|(b, _p)| b == block_id)
|| latest_final_blocks.contains(block_id)
{
retain_active.extend(active_block.parents.iter().map(|(p, _)| *p));
retain_active.insert(*block_id);
}
/// list the latest final blocks at the given slot
///
/// exclusively used by `list_required_active_blocks`
fn list_latest_final_blocks_at(
&self,
slot: Slot,
) -> Result<Vec<(BlockId, u64)>, ConsensusError> {
let mut latest: Vec<Option<(BlockId, u64)>> = vec![None; self.config.thread_count as usize];
for id in self.active_index.iter() {
let (block, _storage) = self.try_get_full_active_block(id)?;
if let Some((_, p)) = latest[block.slot.thread as usize] && block.slot.period < p {
continue;
}
if block.is_final && block.slot <= slot {
latest[block.slot.thread as usize] = Some((*id, block.slot.period));
}
}
latest
.into_iter()
.enumerate()
.map(|(thread, opt)| {
opt.ok_or_else(|| {
ConsensusError::ContainerInconsistency(format!(
"could not find latest block for thread {}",
thread
))
})
})
.collect()
}

// retain best parents
retain_active.extend(self.best_parents.iter().map(|(b, _p)| *b));

// retain last final blocks
retain_active.extend(self.latest_final_blocks_periods.iter().map(|(h, _)| *h));

for (thread, id) in latest_final_blocks.iter().enumerate() {
let mut current_block_id = *id;
while let Some((current_block, _)) = self.get_full_active_block(&current_block_id) {
let parent_id = {
if !current_block.parents.is_empty() {
Some(current_block.parents[thread].0)
} else {
None
}
};

// retain block
retain_active.insert(current_block_id);
/// list the earliest blocks of the given block id list
///
/// exclusively used by `list_required_active_blocks`
fn list_earliest_blocks_of(
&self,
block_ids: &PreHashSet<BlockId>,
end_slot: Option<Slot>,
) -> Result<Vec<(BlockId, u64)>, ConsensusError> {
let mut earliest: Vec<Option<(BlockId, u64)>> =
vec![None; self.config.thread_count as usize];
for id in block_ids {
let (block, _storage) = self.try_get_full_active_block(id)?;
if let Some(slot) = end_slot && block.slot > slot {
continue;
}
if let Some((_, p)) = earliest[block.slot.thread as usize] && block.slot.period > p {
continue;
}
earliest[block.slot.thread as usize] = Some((*id, block.slot.period));
}
earliest
.into_iter()
.enumerate()
.map(|(thread, opt)| {
opt.ok_or_else(|| {
ConsensusError::ContainerInconsistency(format!(
"could not find earliest block for thread {}",
thread
))
})
})
.collect()
}

// stop traversing when reaching a block with period number low enough
// so that any of its operations will have their validity period expired at the latest final block in thread
// note: one more is kept because of the way we iterate
if current_block.slot.period
< self.latest_final_blocks_periods[thread]
.1
.saturating_sub(self.config.operation_validity_periods)
{
break;
/// adds to the given container every active block coming after the lower bound
///
/// exclusively used by `list_required_active_blocks`
fn add_active_blocks_after(
&self,
kept_blocks: &mut PreHashSet<BlockId>,
lower_bound: &[(BlockId, u64)],
end_slot: Option<Slot>,
) {
for id in self.active_index.iter() {
if let Some((block, _storage)) = self.get_full_active_block(id) {
if let Some(slot) = end_slot && block.slot > slot {
continue;
}

// if not genesis, traverse parent
match parent_id {
Some(p_id) => current_block_id = p_id,
None => break,
if block.slot.period >= lower_bound[block.slot.thread as usize].1 {
kept_blocks.insert(*id);
}
}
}
}

// grow with parents & fill thread holes twice
for _ in 0..2 {
// retain the parents of the selected blocks
let retain_clone = retain_active.clone();
/// list_required_active_blocks algo:
///
/// if end_slot is None:
/// set effective_latest_finals to be the IDs of the self.latest_final_blocks
/// else
/// set effective_latest_finals to be the IDs of the Active Final blocks that have the highest period in each thread but are before end_slot (included)
///
/// create a kept_blocks list of block IDs to keep
/// initialize it with effective_latest_finals as well as all the active blocks that are after the effective_latest_finals of their thread (included) (but before end_slot (included) if it is Some)
///
/// do the following 2 times:
/// extend kept_blocks with the parents of the current kept_blocks
/// fill holes by adding to kept_blocks all the active block IDs whose slot is after the earliest kept_blocks of their thread (included) (but before end_slot (included) if it is Some)
///
/// return kept_blocks
pub fn list_required_active_blocks(
&self,
end_slot: Option<Slot>,
) -> Result<PreHashSet<BlockId>, ConsensusError> {
// if an end_slot is provided compute the lastest final block for that given slot
// if not use the latest_final_blocks_periods
let effective_latest_finals: Vec<(BlockId, u64)> = if let Some(slot) = end_slot {
self.list_latest_final_blocks_at(slot)?
} else {
self.latest_final_blocks_periods.clone()
};

for retain_h in retain_clone.into_iter() {
retain_active.extend(
self.get_full_active_block(&retain_h)
.ok_or_else(|| ConsensusError::ContainerInconsistency(format!("inconsistency inside block statuses pruning and retaining the parents of the selected blocks - {} is missing", retain_h)))?
.0.parents
.iter()
.map(|(b_id, _p)| *b_id),
)
}
// init kept_blocks using effective_latest_finals
let mut kept_blocks: PreHashSet<BlockId> = effective_latest_finals
.iter()
.map(|(id, _period)| *id)
.collect();

// find earliest kept slots in each thread
let mut earliest_retained_periods: Vec<u64> = self
.latest_final_blocks_periods
.iter()
.map(|(_, p)| *p)
.collect();
for retain_h in retain_active.iter() {
let retain_slot = &self
.get_full_active_block(retain_h)
.ok_or_else(|| ConsensusError::ContainerInconsistency(format!("inconsistency inside block statuses pruning and finding earliest kept slots in each thread - {} is missing", retain_h)))?
.0.slot;
earliest_retained_periods[retain_slot.thread as usize] = std::cmp::min(
earliest_retained_periods[retain_slot.thread as usize],
retain_slot.period,
);
}
// add all the active blocks that are after the effective_latest_finals of their thread
self.add_active_blocks_after(&mut kept_blocks, &effective_latest_finals, end_slot);

// fill up from the latest final block back to the earliest for each thread
for thread in 0..self.config.thread_count {
let mut cursor = self.latest_final_blocks_periods[thread as usize].0; // hash of tha latest final in that thread
while let Some((c_block, _)) = self.get_full_active_block(&cursor) {
if c_block.slot.period < earliest_retained_periods[thread as usize] {
break;
}
retain_active.insert(cursor);
if c_block.parents.is_empty() {
// genesis
break;
}
cursor = c_block.parents[thread as usize].0;
}
// do the following 2 times
for _ in 0..2 {
// extend kept_blocks with the parents of the current kept_blocks
let mut cumulated_parents: PreHashSet<BlockId> = PreHashSet::default();
for id in kept_blocks.iter() {
let parents = self
.try_get_full_active_block(id)?
.0
.parents
.iter()
.map(|(id, _period)| *id);
cumulated_parents.extend(parents);
}
kept_blocks.extend(cumulated_parents);
// add all the active blocks whose slot is after the earliest kept_blocks of their thread
let earliest_blocks = self.list_earliest_blocks_of(&kept_blocks, end_slot)?;
self.add_active_blocks_after(&mut kept_blocks, &earliest_blocks, end_slot);
}

Ok(retain_active)
// check that we have the full blocks for every id we are about to return
for id in kept_blocks.iter() {
self.try_get_full_active_block(id)?;
}

// debug log for an easier diagnostic if needed
debug!("list_required_active_blocks return: {:?}", kept_blocks);

// return kept_blocks
Ok(kept_blocks)
}

pub fn extract_block_graph_part(
Expand Down
2 changes: 1 addition & 1 deletion massa-consensus-worker/src/state/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl ConsensusState {
/// prune active blocks and return final blocks, return discarded final blocks
fn prune_active(&mut self) -> Result<PreHashMap<BlockId, ActiveBlock>, ConsensusError> {
// list required active blocks
let mut retain_active: PreHashSet<BlockId> = self.list_required_active_blocks()?;
let mut retain_active: PreHashSet<BlockId> = self.list_required_active_blocks(None)?;

// retain extra history according to the config
// this is useful to avoid desync on temporary connection loss
Expand Down

0 comments on commit 6bd8f01

Please sign in to comment.