Skip to content

Commit

Permalink
Merge pull request #3004 from massalabs/change_endorsements
Browse files Browse the repository at this point in the history
Change endorsements
  • Loading branch information
damip authored Sep 13, 2022
2 parents 9129f8f + fdcf6b2 commit 12c7e42
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 72 deletions.
7 changes: 7 additions & 0 deletions massa-consensus-exports/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ pub enum ConsensusCommand {
/// response channel
response_tx: oneshot::Sender<Option<BlockId>>,
},
/// Get a block at a given slot in a blockclique
GetLatestBlockcliqueBlockAtSlot {
/// wanted slot
slot: Slot,
/// response channel
response_tx: oneshot::Sender<BlockId>,
},
/// Get the best parents and their period
GetBestParents {
/// response channel
Expand Down
20 changes: 20 additions & 0 deletions massa-consensus-exports/src/consensus_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,26 @@ impl ConsensusCommandSender {
})
}

/// get latest block id of a slot in a blockclique
pub fn get_latest_blockclique_block_at_slot(
&self,
slot: Slot,
) -> Result<BlockId, ConsensusError> {
let (response_tx, response_rx) = oneshot::channel();
self.0
.blocking_send(ConsensusCommand::GetLatestBlockcliqueBlockAtSlot { slot, response_tx })
.map_err(|_| {
ConsensusError::SendChannelError(
"send error consensus command get_blockclique_block_at_slot".into(),
)
})?;
response_rx.blocking_recv().map_err(|_| {
ConsensusError::ReceiveChannelError(
"consensus command get_blockclique_block_at_slot response read error".to_string(),
)
})
}

/// get current consensus stats
pub async fn get_stats(&self) -> Result<ConsensusStats, ConsensusError> {
let (response_tx, response_rx) = oneshot::channel();
Expand Down
17 changes: 11 additions & 6 deletions massa-consensus-worker/src/consensus_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ impl ConsensusWorker {

/// this function is called around every slot tick
/// it checks for cycle increment
/// creates block and endorsement if a staking address has been drawn
/// it signals the new slot to other components
/// detects desynchronization
/// produce quite more logs than actual stuff
async fn slot_tick(&mut self, next_slot_timer: &mut std::pin::Pin<&mut Sleep>) -> Result<()> {
Expand Down Expand Up @@ -432,6 +430,15 @@ impl ConsensusWorker {
}
Ok(())
}
ConsensusCommand::GetLatestBlockcliqueBlockAtSlot { slot, response_tx } => {
let res = self.block_db.get_latest_blockclique_block_at_slot(&slot);
if response_tx.send(res).is_err() {
warn!(
"consensus: could not send get latest block clique block at slot response"
);
}
Ok(())
}
ConsensusCommand::SendBlock {
block_id,
slot,
Expand Down Expand Up @@ -545,8 +552,7 @@ impl ConsensusWorker {
}

/// call me if the block database changed
/// Processing of final blocks, pruning and producing endorsement.
/// Please refactor me
/// Processing of final blocks, pruning.
///
/// 1. propagate blocks
/// 2. Notify of attack attempts
Expand All @@ -558,8 +564,7 @@ impl ConsensusWorker {
/// 8. Notify PoS of final blocks
/// 9. notify protocol of block wish list
/// 10. note new latest final periods (prune graph if changed)
/// 11. Produce endorsements
/// 12. add stale blocks to stats
/// 11. add stale blocks to stats
async fn block_db_changed(&mut self) -> Result<()> {
massa_trace!("consensus.consensus_worker.block_db_changed", {});

Expand Down
11 changes: 6 additions & 5 deletions massa-factory-worker/src/block_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,14 @@ impl BlockFactoryWorker {

// get the parent in the same thread, with its period
// will not panic because the thread is validated before the call
let (same_thread_parent_id, same_thread_parent_period) = parents[slot.thread as usize];
let (same_thread_parent_id, _) = parents[slot.thread as usize];

// gather endorsements
let (endorsements_ids, endo_storage) = self.channels.pool.get_block_endorsements(
&same_thread_parent_id,
&Slot::new(same_thread_parent_period, slot.thread),
);
let (endorsements_ids, endo_storage) = self
.channels
.pool
.get_block_endorsements(&same_thread_parent_id, &slot);

//TODO: Do we want ot populate only with endorsement id in the future ?
let endorsements: Vec<WrappedEndorsement> = {
let endo_read = endo_storage.read_endorsements();
Expand Down
52 changes: 28 additions & 24 deletions massa-factory-worker/src/endorsement_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,14 @@ impl EndorsementFactoryWorker {
/// Extra safety against double-production caused by clock adjustments (this is the role of the previous_slot parameter).
fn get_next_slot(&self, previous_slot: Option<Slot>) -> (Slot, Instant) {
// get delayed time
let shifted_now = MassaTime::now(self.cfg.clock_compensation_millis)
.expect("could not get current time")
.saturating_sub(self.half_t0);
let now =
MassaTime::now(self.cfg.clock_compensation_millis).expect("could not get current time");

// if it's the first computed slot, add a time shift to prevent double-production on node restart with clock skew
let base_time = if previous_slot.is_none() {
shifted_now.saturating_add(self.cfg.initial_delay)
now.saturating_add(self.cfg.initial_delay)
} else {
shifted_now
now
};

// get closest slot according to the current absolute time
Expand All @@ -88,6 +87,11 @@ impl EndorsementFactoryWorker {
}
}

// prevent triggering on period-zero slots
if next_slot.period == 0 {
next_slot = Slot::new(1, 0);
}

// get the timestamp of the target slot
let next_instant = get_block_slot_timestamp(
self.cfg.thread_count,
Expand All @@ -96,7 +100,7 @@ impl EndorsementFactoryWorker {
next_slot,
)
.expect("could not get block slot timestamp")
.saturating_add(self.half_t0)
.saturating_sub(self.half_t0)
.estimate_instant(self.cfg.clock_compensation_millis)
.expect("could not estimate block slot instant");

Expand All @@ -118,7 +122,7 @@ impl EndorsementFactoryWorker {
}
}

/// Process a slot: produce a block at that slot if one of the managed keys is drawn.
/// Process a slot: produce an endorsement at that slot if one of the managed keys is drawn.
fn process_slot(&mut self, slot: Slot) {
// get endorsement producer addresses for that slot
let producer_addrs = match self.channels.selector.get_selection(slot) {
Expand Down Expand Up @@ -156,23 +160,23 @@ impl EndorsementFactoryWorker {
}

// get consensus block ID for that slot
let endorsed_block: BlockId =
match self.channels.consensus.get_blockclique_block_at_slot(slot) {
// error getting block ID at target slot
Err(_) => {
warn!(
"could not get blockclique block to create endorsement targeting slot {}",
slot
);
return;
}

// the target slot is a miss: ignore
Ok(None) => return,

// there is a block a the target slot
Ok(Some(b_id)) => b_id,
};
let endorsed_block: BlockId = match self
.channels
.consensus
.get_latest_blockclique_block_at_slot(slot)
{
// error getting block ID at target slot
Err(_) => {
warn!(
"could not get latest blockclique block to create endorsement to be included at slot {}",
slot
);
return;
}

// latest block found
Ok(b_id) => b_id,
};

// produce endorsements
let mut endorsements: Vec<WrappedEndorsement> = Vec::with_capacity(producers_indices.len());
Expand Down
67 changes: 44 additions & 23 deletions massa-graph/src/block_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,7 @@ impl BlockGraph {
.0;

// check endorsements
match self.check_endorsements(header, parent_in_own_thread)? {
match self.check_endorsements(header)? {
EndorsementsCheckOutcome::Proceed => {}
EndorsementsCheckOutcome::Discard(reason) => {
return Ok(HeaderCheckOutcome::Discard(reason))
Expand Down Expand Up @@ -1567,16 +1567,9 @@ impl BlockGraph {
/// check endorsements:
/// * endorser was selected for that (slot, index)
/// * endorsed slot is `parent_in_own_thread` slot
fn check_endorsements(
&self,
header: &WrappedHeader,
parent_in_own_thread: &ActiveBlock,
) -> Result<EndorsementsCheckOutcome> {
fn check_endorsements(&self, header: &WrappedHeader) -> Result<EndorsementsCheckOutcome> {
// check endorsements
let endorsement_draws = match self
.selector_controller
.get_selection(parent_in_own_thread.slot)
{
let endorsement_draws = match self.selector_controller.get_selection(header.content.slot) {
Ok(sel) => sel.endorsements,
Err(_) => return Ok(EndorsementsCheckOutcome::WaitForSlot),
};
Expand All @@ -1591,22 +1584,12 @@ impl BlockGraph {
),
)));
}
// check that the endorsement slot matches the endorsed block
if endorsement.content.slot != parent_in_own_thread.slot {
return Ok(EndorsementsCheckOutcome::Discard(DiscardReason::Invalid(
format!("endorsement targets a block with wrong slot. Block's parent: {}, endorsement: {}",
parent_in_own_thread.slot, endorsement.content.slot),
)));
}

// note that the following aspects are checked in protocol
// * PoS draws
// * signature
// * intra block endorsement reuse
// * intra block index reuse
// * slot in the same thread as block's slot
// * slot is before the block's slot
// * the endorsed block is the parent in the same thread
// * index reuse
// * slot matching the block's
// * the endorsed block is the containing block's parent
}

Ok(EndorsementsCheckOutcome::Proceed)
Expand Down Expand Up @@ -2678,6 +2661,44 @@ impl BlockGraph {
})
}

/// get the latest blockclique (or final) block ID at a given slot, if any
pub fn get_latest_blockclique_block_at_slot(&self, slot: &Slot) -> BlockId {
let (mut best_block_id, mut best_block_period) = self
.latest_final_blocks_periods
.get(slot.thread as usize)
.unwrap_or_else(|| panic!("unexpected not found latest final block period"));

self.max_cliques
.iter()
.find(|c| c.is_blockclique)
.expect("expected one clique to be the blockclique")
.block_ids
.iter()
.for_each(|id| match self.block_statuses.get(id) {
Some(BlockStatus::Active {
a_block,
storage: _,
}) => {
if a_block.is_final {
panic!(
"unexpected final block on getting latest blockclique block at slot"
);
}
if a_block.slot.thread == slot.thread
&& a_block.slot.period < slot.period
&& a_block.slot.period > best_block_period
{
best_block_period = a_block.slot.period;
best_block_id = *id;
}
}
_ => {
panic!("expected to find only active block but found another status")
}
});
best_block_id
}

/// Clones all stored final blocks, not only the still-useful ones
/// This is used when initializing Execution from Consensus.
/// Since the Execution bootstrap snapshot is older than the Consensus snapshot,
Expand Down
2 changes: 1 addition & 1 deletion massa-pool-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait PoolController: Send + Sync {
fn get_block_endorsements(
&self,
target_block: &BlockId,
target_slot: &Slot,
slot: &Slot,
) -> (Vec<Option<EndorsementId>>, Storage);

/// Get the number of endorsements in the pool
Expand Down
18 changes: 9 additions & 9 deletions massa-pool-worker/src/endorsement_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub struct EndorsementPool {
/// endorsements indexed by slot, index and block ID
endorsements_indexed: HashMap<(Slot, u32, BlockId), EndorsementId>,

/// endorsements sorted by increasing target slot for pruning
/// indexed by thread, then BTreeMap<(target_slot, index, target_block), endorsement_id>
/// endorsements sorted by increasing inclusion slot for pruning
/// indexed by thread, then BTreeMap<(inclusion_slot, index, target_block), endorsement_id>
endorsements_sorted: Vec<BTreeMap<(Slot, u32, BlockId), EndorsementId>>,

/// storage
Expand Down Expand Up @@ -54,16 +54,16 @@ impl EndorsementPool {
// update internal final CS period counter
self.last_cs_final_periods = final_cs_periods.to_vec();

// remove old endorsements
// remove all endorsements whose periods <= last_cs_final_periods[endorsement.thread]
let mut removed: PreHashSet<EndorsementId> = Default::default();
for thread in 0..self.config.thread_count {
while let Some((&(target_slot, index, block_id), &endo_id)) =
while let Some((&(inclusion_slot, index, block_id), &endo_id)) =
self.endorsements_sorted[thread as usize].first_key_value()
{
if target_slot.period < self.last_cs_final_periods[thread as usize] {
if inclusion_slot.period <= self.last_cs_final_periods[thread as usize] {
self.endorsements_sorted[thread as usize].pop_first();
self.endorsements_indexed
.remove(&(target_slot, index, block_id))
.remove(&(inclusion_slot, index, block_id))
.expect("endorsement should be in endorsements_indexed at this point");
removed.insert(endo_id);
} else {
Expand Down Expand Up @@ -148,17 +148,17 @@ impl EndorsementPool {
/// get endorsements for block creation
pub fn get_block_endorsements(
&self,
target_slot: &Slot,
slot: &Slot, // slot of the block that will contain the endorsement
target_block: &BlockId,
) -> (Vec<Option<EndorsementId>>, Storage) {
// init list of selected operation IDs
// init list of selected endorsement IDs
let mut endo_ids = Vec::with_capacity(self.config.max_block_endorsement_count as usize);

// gather endorsements
for index in 0..self.config.max_block_endorsement_count {
endo_ids.push(
self.endorsements_indexed
.get(&(*target_slot, index, *target_block))
.get(&(*slot, index, *target_block))
.copied(),
);
}
Expand Down
4 changes: 3 additions & 1 deletion massa-pos-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub trait SelectorController: Send + Sync {
///
/// Only used in tests for post-bootstrap selection matching.
#[cfg(feature = "testing")]
fn get_entire_selection(&self) -> VecDeque<(u64, HashMap<Slot, Selection>)>;
fn get_entire_selection(&self) -> VecDeque<(u64, HashMap<Slot, Selection>)> {
unimplemented!("mock implementation only")
}
}

/// Allow cloning `Box<dyn SelectorController>`
Expand Down
4 changes: 1 addition & 3 deletions massa-protocol-worker/src/protocol_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,9 +812,7 @@ impl ProtocolWorker {
return Ok(None);
}
// check slot
if (endorsement.content.slot.thread != header.content.slot.thread)
|| (endorsement.content.slot >= header.content.slot)
{
if endorsement.content.slot != header.content.slot {
massa_trace!("protocol.protocol_worker.check_header.err_endorsement_invalid_slot", { "header": header, "endorsement": endorsement});
return Ok(None);
}
Expand Down

0 comments on commit 12c7e42

Please sign in to comment.