Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change endorsements #3004

Merged
merged 8 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions massa-consensus-exports/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ pub enum ConsensusCommand {
/// response channel
response_tx: oneshot::Sender<Option<BlockId>>,
},
/// Get a block at a given slot in a blockclique
GetLatestBlockcliqueBlockAtSlot {
/// wanted slot
slot: Slot,
/// response channel
response_tx: oneshot::Sender<BlockId>,
},
/// Get the best parents and their period
GetBestParents {
/// response channel
Expand Down
20 changes: 20 additions & 0 deletions massa-consensus-exports/src/consensus_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,26 @@ impl ConsensusCommandSender {
})
}

/// get latest block id of a slot in a blockclique
pub fn get_latest_blockclique_block_at_slot(
&self,
slot: Slot,
) -> Result<BlockId, ConsensusError> {
let (response_tx, response_rx) = oneshot::channel();
self.0
.blocking_send(ConsensusCommand::GetLatestBlockcliqueBlockAtSlot { slot, response_tx })
.map_err(|_| {
ConsensusError::SendChannelError(
"send error consensus command get_blockclique_block_at_slot".into(),
)
})?;
response_rx.blocking_recv().map_err(|_| {
ConsensusError::ReceiveChannelError(
"consensus command get_blockclique_block_at_slot response read error".to_string(),
)
})
}

/// get current consensus stats
pub async fn get_stats(&self) -> Result<ConsensusStats, ConsensusError> {
let (response_tx, response_rx) = oneshot::channel();
Expand Down
17 changes: 11 additions & 6 deletions massa-consensus-worker/src/consensus_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ impl ConsensusWorker {

/// this function is called around every slot tick
/// it checks for cycle increment
/// creates block and endorsement if a staking address has been drawn
/// it signals the new slot to other components
/// detects desynchronization
/// produce quite more logs than actual stuff
async fn slot_tick(&mut self, next_slot_timer: &mut std::pin::Pin<&mut Sleep>) -> Result<()> {
Expand Down Expand Up @@ -432,6 +430,15 @@ impl ConsensusWorker {
}
Ok(())
}
ConsensusCommand::GetLatestBlockcliqueBlockAtSlot { slot, response_tx } => {
let res = self.block_db.get_latest_blockclique_block_at_slot(&slot);
if response_tx.send(res).is_err() {
warn!(
"consensus: could not send get latest block clique block at slot response"
);
}
Ok(())
}
ConsensusCommand::SendBlock {
block_id,
slot,
Expand Down Expand Up @@ -545,8 +552,7 @@ impl ConsensusWorker {
}

/// call me if the block database changed
/// Processing of final blocks, pruning and producing endorsement.
/// Please refactor me
/// Processing of final blocks, pruning.
///
/// 1. propagate blocks
/// 2. Notify of attack attempts
Expand All @@ -558,8 +564,7 @@ impl ConsensusWorker {
/// 8. Notify PoS of final blocks
/// 9. notify protocol of block wish list
/// 10. note new latest final periods (prune graph if changed)
/// 11. Produce endorsements
/// 12. add stale blocks to stats
/// 11. add stale blocks to stats
async fn block_db_changed(&mut self) -> Result<()> {
massa_trace!("consensus.consensus_worker.block_db_changed", {});

Expand Down
11 changes: 6 additions & 5 deletions massa-factory-worker/src/block_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,14 @@ impl BlockFactoryWorker {

// get the parent in the same thread, with its period
// will not panic because the thread is validated before the call
let (same_thread_parent_id, same_thread_parent_period) = parents[slot.thread as usize];
let (same_thread_parent_id, _) = parents[slot.thread as usize];

// gather endorsements
let (endorsements_ids, endo_storage) = self.channels.pool.get_block_endorsements(
&same_thread_parent_id,
&Slot::new(same_thread_parent_period, slot.thread),
);
let (endorsements_ids, endo_storage) = self
.channels
.pool
.get_block_endorsements(&same_thread_parent_id, &slot);

//TODO: Do we want ot populate only with endorsement id in the future ?
let endorsements: Vec<WrappedEndorsement> = {
let endo_read = endo_storage.read_endorsements();
Expand Down
52 changes: 28 additions & 24 deletions massa-factory-worker/src/endorsement_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,14 @@ impl EndorsementFactoryWorker {
/// Extra safety against double-production caused by clock adjustments (this is the role of the previous_slot parameter).
fn get_next_slot(&self, previous_slot: Option<Slot>) -> (Slot, Instant) {
// get delayed time
let shifted_now = MassaTime::now(self.cfg.clock_compensation_millis)
.expect("could not get current time")
.saturating_sub(self.half_t0);
let now =
MassaTime::now(self.cfg.clock_compensation_millis).expect("could not get current time");

// if it's the first computed slot, add a time shift to prevent double-production on node restart with clock skew
let base_time = if previous_slot.is_none() {
shifted_now.saturating_add(self.cfg.initial_delay)
now.saturating_add(self.cfg.initial_delay)
} else {
shifted_now
now
};

// get closest slot according to the current absolute time
Expand All @@ -88,6 +87,11 @@ impl EndorsementFactoryWorker {
}
}

// prevent triggering on period-zero slots
if next_slot.period == 0 {
next_slot = Slot::new(1, 0);
}

// get the timestamp of the target slot
let next_instant = get_block_slot_timestamp(
self.cfg.thread_count,
Expand All @@ -96,7 +100,7 @@ impl EndorsementFactoryWorker {
next_slot,
)
.expect("could not get block slot timestamp")
.saturating_add(self.half_t0)
.saturating_sub(self.half_t0)
.estimate_instant(self.cfg.clock_compensation_millis)
.expect("could not estimate block slot instant");

Expand All @@ -118,7 +122,7 @@ impl EndorsementFactoryWorker {
}
}

/// Process a slot: produce a block at that slot if one of the managed keys is drawn.
/// Process a slot: produce an endorsement at that slot if one of the managed keys is drawn.
fn process_slot(&mut self, slot: Slot) {
// get endorsement producer addresses for that slot
let producer_addrs = match self.channels.selector.get_selection(slot) {
Expand Down Expand Up @@ -156,23 +160,23 @@ impl EndorsementFactoryWorker {
}

// get consensus block ID for that slot
let endorsed_block: BlockId =
match self.channels.consensus.get_blockclique_block_at_slot(slot) {
// error getting block ID at target slot
Err(_) => {
warn!(
"could not get blockclique block to create endorsement targeting slot {}",
slot
);
return;
}

// the target slot is a miss: ignore
Ok(None) => return,

// there is a block a the target slot
Ok(Some(b_id)) => b_id,
};
let endorsed_block: BlockId = match self
.channels
.consensus
.get_latest_blockclique_block_at_slot(slot)
{
// error getting block ID at target slot
Err(_) => {
warn!(
"could not get latest blockclique block to create endorsement to be included at slot {}",
slot
);
return;
}

// latest block found
Ok(b_id) => b_id,
};

// produce endorsements
let mut endorsements: Vec<WrappedEndorsement> = Vec::with_capacity(producers_indices.len());
Expand Down
67 changes: 44 additions & 23 deletions massa-graph/src/block_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,7 @@ impl BlockGraph {
.0;

// check endorsements
match self.check_endorsements(header, parent_in_own_thread)? {
match self.check_endorsements(header)? {
EndorsementsCheckOutcome::Proceed => {}
EndorsementsCheckOutcome::Discard(reason) => {
return Ok(HeaderCheckOutcome::Discard(reason))
Expand Down Expand Up @@ -1567,16 +1567,9 @@ impl BlockGraph {
/// check endorsements:
/// * endorser was selected for that (slot, index)
/// * endorsed slot is `parent_in_own_thread` slot
fn check_endorsements(
&self,
header: &WrappedHeader,
parent_in_own_thread: &ActiveBlock,
) -> Result<EndorsementsCheckOutcome> {
fn check_endorsements(&self, header: &WrappedHeader) -> Result<EndorsementsCheckOutcome> {
// check endorsements
let endorsement_draws = match self
.selector_controller
.get_selection(parent_in_own_thread.slot)
{
let endorsement_draws = match self.selector_controller.get_selection(header.content.slot) {
Ok(sel) => sel.endorsements,
Err(_) => return Ok(EndorsementsCheckOutcome::WaitForSlot),
};
Expand All @@ -1591,22 +1584,12 @@ impl BlockGraph {
),
)));
}
// check that the endorsement slot matches the endorsed block
if endorsement.content.slot != parent_in_own_thread.slot {
return Ok(EndorsementsCheckOutcome::Discard(DiscardReason::Invalid(
format!("endorsement targets a block with wrong slot. Block's parent: {}, endorsement: {}",
parent_in_own_thread.slot, endorsement.content.slot),
)));
}

// note that the following aspects are checked in protocol
// * PoS draws
// * signature
// * intra block endorsement reuse
// * intra block index reuse
// * slot in the same thread as block's slot
// * slot is before the block's slot
// * the endorsed block is the parent in the same thread
// * index reuse
// * slot matching the block's
// * the endorsed block is the containing block's parent
}

Ok(EndorsementsCheckOutcome::Proceed)
Expand Down Expand Up @@ -2678,6 +2661,44 @@ impl BlockGraph {
})
}

/// get the latest blockclique (or final) block ID at a given slot, if any
pub fn get_latest_blockclique_block_at_slot(&self, slot: &Slot) -> BlockId {
let (mut best_block_id, mut best_block_period) = self
.latest_final_blocks_periods
.get(slot.thread as usize)
.unwrap_or_else(|| panic!("unexpected not found latest final block period"));

self.max_cliques
.iter()
.find(|c| c.is_blockclique)
.expect("expected one clique to be the blockclique")
.block_ids
.iter()
.for_each(|id| match self.block_statuses.get(id) {
Some(BlockStatus::Active {
a_block,
storage: _,
}) => {
if a_block.is_final {
panic!(
"unexpected final block on getting latest blockclique block at slot"
);
}
if a_block.slot.thread == slot.thread
&& a_block.slot.period < slot.period
&& a_block.slot.period > best_block_period
{
best_block_period = a_block.slot.period;
best_block_id = *id;
}
}
_ => {
panic!("expected to find only active block but found another status")
}
});
best_block_id
}
adrien-zinger marked this conversation as resolved.
Show resolved Hide resolved

/// Clones all stored final blocks, not only the still-useful ones
/// This is used when initializing Execution from Consensus.
/// Since the Execution bootstrap snapshot is older than the Consensus snapshot,
Expand Down
2 changes: 1 addition & 1 deletion massa-pool-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait PoolController: Send + Sync {
fn get_block_endorsements(
&self,
target_block: &BlockId,
target_slot: &Slot,
slot: &Slot,
) -> (Vec<Option<EndorsementId>>, Storage);

/// Get the number of endorsements in the pool
Expand Down
18 changes: 9 additions & 9 deletions massa-pool-worker/src/endorsement_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub struct EndorsementPool {
/// endorsements indexed by slot, index and block ID
endorsements_indexed: HashMap<(Slot, u32, BlockId), EndorsementId>,

/// endorsements sorted by increasing target slot for pruning
/// indexed by thread, then BTreeMap<(target_slot, index, target_block), endorsement_id>
/// endorsements sorted by increasing inclusion slot for pruning
/// indexed by thread, then BTreeMap<(inclusion_slot, index, target_block), endorsement_id>
endorsements_sorted: Vec<BTreeMap<(Slot, u32, BlockId), EndorsementId>>,

/// storage
Expand Down Expand Up @@ -54,16 +54,16 @@ impl EndorsementPool {
// update internal final CS period counter
self.last_cs_final_periods = final_cs_periods.to_vec();

// remove old endorsements
// remove all endorsements whose periods <= last_cs_final_periods[endorsement.thread]
let mut removed: PreHashSet<EndorsementId> = Default::default();
for thread in 0..self.config.thread_count {
while let Some((&(target_slot, index, block_id), &endo_id)) =
while let Some((&(inclusion_slot, index, block_id), &endo_id)) =
self.endorsements_sorted[thread as usize].first_key_value()
{
if target_slot.period < self.last_cs_final_periods[thread as usize] {
if inclusion_slot.period <= self.last_cs_final_periods[thread as usize] {
self.endorsements_sorted[thread as usize].pop_first();
self.endorsements_indexed
.remove(&(target_slot, index, block_id))
.remove(&(inclusion_slot, index, block_id))
.expect("endorsement should be in endorsements_indexed at this point");
removed.insert(endo_id);
} else {
Expand Down Expand Up @@ -148,17 +148,17 @@ impl EndorsementPool {
/// get endorsements for block creation
pub fn get_block_endorsements(
&self,
target_slot: &Slot,
slot: &Slot, // slot of the block that will contain the endorsement
target_block: &BlockId,
) -> (Vec<Option<EndorsementId>>, Storage) {
// init list of selected operation IDs
// init list of selected endorsement IDs
let mut endo_ids = Vec::with_capacity(self.config.max_block_endorsement_count as usize);

// gather endorsements
for index in 0..self.config.max_block_endorsement_count {
endo_ids.push(
self.endorsements_indexed
.get(&(*target_slot, index, *target_block))
.get(&(*slot, index, *target_block))
.copied(),
);
}
Expand Down
4 changes: 3 additions & 1 deletion massa-pos-exports/src/controller_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub trait SelectorController: Send + Sync {
///
/// Only used in tests for post-bootstrap selection matching.
#[cfg(feature = "testing")]
fn get_entire_selection(&self) -> VecDeque<(u64, HashMap<Slot, Selection>)>;
fn get_entire_selection(&self) -> VecDeque<(u64, HashMap<Slot, Selection>)> {
unimplemented!("mock implementation only")
}
}

/// Allow cloning `Box<dyn SelectorController>`
Expand Down
4 changes: 1 addition & 3 deletions massa-protocol-worker/src/protocol_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,9 +812,7 @@ impl ProtocolWorker {
return Ok(None);
}
// check slot
if (endorsement.content.slot.thread != header.content.slot.thread)
|| (endorsement.content.slot >= header.content.slot)
{
if endorsement.content.slot != header.content.slot {
massa_trace!("protocol.protocol_worker.check_header.err_endorsement_invalid_slot", { "header": header, "endorsement": endorsement});
return Ok(None);
}
Expand Down