Skip to content

Commit

Permalink
fix(collector): re-add list of visited messages (#131)
Browse files Browse the repository at this point in the history
* fix(collector): re-add list of `visited` messages

* Duh

* Fmt
  • Loading branch information
grtlr authored May 9, 2022
1 parent 4d20617 commit 02bcdbb
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 49 deletions.
4 changes: 4 additions & 0 deletions bin/inx-chronicle/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ impl HandleEvent<Report<Solidifier>> for Collector {

#[cfg(feature = "stardust")]
pub mod stardust {
use std::collections::HashSet;

use chronicle::{
db::model::stardust::{
message::{MessageMetadata, MessageRecord},
Expand All @@ -112,13 +114,15 @@ pub mod stardust {
pub struct MilestoneState {
pub milestone_index: u32,
pub process_queue: VecDeque<dto::MessageId>,
pub visited: HashSet<dto::MessageId>,
}

impl MilestoneState {
pub fn new(milestone_index: u32) -> Self {
Self {
milestone_index,
process_queue: VecDeque::new(),
visited: HashSet::new(),
}
}
}
Expand Down
108 changes: 59 additions & 49 deletions bin/inx-chronicle/src/collector/solidifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,61 +67,71 @@ mod stardust {
) -> Result<(), Self::Error> {
// Process by iterating the queue until we either complete the milestone or fail to find a message
while let Some(message_id) = ms_state.process_queue.front() {
// Try the database
match self.db.get_message(message_id).await? {
Some(message_rec) => {
match message_rec
.metadata
.map(|metadata| metadata.referenced_by_milestone_index)
{
Some(ms_index) => {
log::trace!(
"Message {} is referenced by milestone {}",
message_id.to_hex(),
ms_index
);
// We may have reached a different milestone, in which case there is nothing to
// do for this message
if ms_state.milestone_index == ms_index {
let parents = Vec::from(message_rec.message.parents);
ms_state.process_queue.extend(parents);
// First check if we already processed this message in this run
if ms_state.visited.contains(message_id) {
ms_state.process_queue.pop_front();
} else {
// Try the database
match self.db.get_message(message_id).await? {
Some(message_rec) => {
match message_rec
.metadata
.map(|metadata| metadata.referenced_by_milestone_index)
{
Some(ms_index) => {
log::trace!(
"Message {} is referenced by milestone {}",
message_id.to_hex(),
ms_index
);

// We add the current message to the list of visited messages.
ms_state.visited.insert(message_id.clone());

// We may have reached a different milestone, in which case there is nothing to
// do for this message
if ms_state.milestone_index == ms_index {
let parents = Vec::from(message_rec.message.parents);
ms_state.process_queue.extend(parents);
}
ms_state.process_queue.pop_front();
}
// If the message has not been referenced, we can't proceed
None => {
log::trace!("Requesting metadata for message {}", message_id.to_hex());
// Send the state and everything. If the requester finds the message, it will circle
// back.
cx.addr::<InxWorker>()
.await
.send(InxRequest::get_metadata(
message_id.clone(),
cx.handle().clone(),
ms_state,
))
.map_err(|_| SolidifierError::MissingInxRequester)?;
return Ok(());
}
ms_state.process_queue.pop_front();
}
// If the message has not been referenced, we can't proceed
None => {
log::trace!("Requesting metadata for message {}", message_id.to_hex());
// Send the state and everything. If the requester finds the message, it will circle
// back.
cx.addr::<InxWorker>()
.await
.send(InxRequest::get_metadata(
message_id.clone(),
cx.handle().clone(),
ms_state,
))
.map_err(|_| SolidifierError::MissingInxRequester)?;
return Ok(());
}
}
}
// Otherwise, send a message to the requester
None => {
log::trace!("Requesting message {}", message_id.to_hex());
// Send the state and everything. If the requester finds the message, it will circle
// back.
cx.addr::<InxWorker>()
.await
.send(InxRequest::get_message(
message_id.clone(),
cx.handle().clone(),
ms_state,
))
.map_err(|_| SolidifierError::MissingInxRequester)?;
return Ok(());
// Otherwise, send a message to the requester
None => {
log::trace!("Requesting message {}", message_id.to_hex());
// Send the state and everything. If the requester finds the message, it will circle
// back.
cx.addr::<InxWorker>()
.await
.send(InxRequest::get_message(
message_id.clone(),
cx.handle().clone(),
ms_state,
))
.map_err(|_| SolidifierError::MissingInxRequester)?;
return Ok(());
}
}
}
}

// If we finished all the parents, that means we have a complete milestone
// so we should mark it synced
self.db
Expand Down

0 comments on commit 02bcdbb

Please sign in to comment.