Skip to content

Commit

Permalink
chore(db)!: combine milestone index and timestamp (#476)
Browse files Browse the repository at this point in the history
* Combine index and timestamp

* clippy
  • Loading branch information
Alexandcoats authored Jul 26, 2022
1 parent 0ad9ba0 commit 8470cae
Showing 1 changed file with 55 additions and 90 deletions.
145 changes: 55 additions & 90 deletions src/db/collections/milestone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ use crate::{
/// A milestone's metadata.
#[derive(Clone, Debug, Serialize, Deserialize)]
struct MilestoneDocument {
/// The milestone index.
milestone_index: MilestoneIndex,
/// The milestone index and timestamp.
at: MilestoneIndexTimestamp,
/// The [`MilestoneId`](MilestoneId) of the milestone.
milestone_id: MilestoneId,
/// The timestamp of the milestone.
milestone_timestamp: MilestoneTimestamp,
/// The milestone's payload.
payload: MilestonePayload,
/// The milestone's sync status.
Expand All @@ -44,16 +42,6 @@ impl MilestoneDocument {
const COLLECTION: &'static str = "stardust_milestones";
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[allow(missing_docs)]
pub(crate) struct MilestoneRecord {
pub(crate) milestone_index: MilestoneIndex,
pub(crate) milestone_id: MilestoneId,
pub(crate) milestone_timestamp: MilestoneTimestamp,
pub(crate) payload: MilestonePayload,
pub(crate) is_synced: bool,
}

/// An aggregation type that represents the ranges of completed milestones and gaps.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SyncData {
Expand All @@ -71,7 +59,7 @@ impl MongoDb {
collection
.create_index(
IndexModel::builder()
.keys(doc! { "milestone_index": 1 })
.keys(doc! { "at.milestone_index": 1 })
.options(
IndexOptions::builder()
.unique(true)
Expand All @@ -86,7 +74,7 @@ impl MongoDb {
collection
.create_index(
IndexModel::builder()
.keys(doc! { "milestone_timestamp": 1 })
.keys(doc! { "at.milestone_timestamp": 1 })
.options(
IndexOptions::builder()
.unique(true)
Expand Down Expand Up @@ -147,7 +135,7 @@ impl MongoDb {
.collection::<MilestonePayload>(MilestoneDocument::COLLECTION)
.aggregate(
vec![
doc! { "$match": { "milestone_index": index } },
doc! { "$match": { "at.milestone_index": index } },
doc! { "$replaceWith": "$payload" },
],
None,
Expand All @@ -163,19 +151,12 @@ impl MongoDb {

/// Gets the timestamp of a milestone by the [`MilestoneIndex`].
pub async fn get_milestone_timestamp(&self, index: MilestoneIndex) -> Result<Option<MilestoneTimestamp>, Error> {
#[derive(Deserialize)]
struct TimestampResult {
milestone_timestamp: MilestoneTimestamp,
}

let timestamp = self
.0
.collection::<TimestampResult>(MilestoneDocument::COLLECTION)
.collection::<MilestoneIndexTimestamp>(MilestoneDocument::COLLECTION)
.find_one(
doc! { "milestone_index": index },
FindOneOptions::builder()
.projection(doc! { "milestone_timestamp": 1 })
.build(),
doc! { "at.milestone_index": index },
FindOneOptions::builder().projection(doc! { "at": 1 }).build(),
)
.await?
.map(|ts| ts.milestone_timestamp);
Expand All @@ -192,9 +173,11 @@ impl MongoDb {
payload: MilestonePayload,
) -> Result<(), Error> {
let milestone_document = MilestoneDocument {
at: MilestoneIndexTimestamp {
milestone_index,
milestone_timestamp,
},
milestone_id,
milestone_index,
milestone_timestamp,
payload,
is_synced: Default::default(),
};
Expand All @@ -205,7 +188,7 @@ impl MongoDb {
self.0
.collection::<MilestoneDocument>(MilestoneDocument::COLLECTION)
.update_one(
doc! { "milestone_index": milestone_index },
doc! { "at.milestone_index": milestone_index },
doc! { "$set": doc },
UpdateOptions::builder().upsert(true).build(),
)
Expand All @@ -219,53 +202,45 @@ impl MongoDb {
&self,
start_timestamp: MilestoneTimestamp,
) -> Result<Option<MilestoneIndexTimestamp>, Error> {
Ok(self
.0
.collection::<MilestoneDocument>(MilestoneDocument::COLLECTION)
self.0
.collection::<MilestoneIndexTimestamp>(MilestoneDocument::COLLECTION)
.find(
doc! {
"milestone_timestamp": { "$gte": start_timestamp },
"at.milestone_timestamp": { "$gte": start_timestamp },
"is_synced": true
},
FindOptions::builder()
.sort(doc! {"milestone_index": 1})
.sort(doc! { "at.milestone_index": 1 })
.limit(1)
.projection(doc! { "at": 1 })
.build(),
)
.await?
.try_next()
.await?
.map(|d| MilestoneIndexTimestamp {
milestone_index: d.milestone_index,
milestone_timestamp: d.milestone_timestamp,
}))
.await
}

/// Find the end milestone.
pub async fn find_last_milestone(
&self,
end_timestamp: MilestoneTimestamp,
) -> Result<Option<MilestoneIndexTimestamp>, Error> {
Ok(self
.0
.collection::<MilestoneDocument>(MilestoneDocument::COLLECTION)
self.0
.collection::<MilestoneIndexTimestamp>(MilestoneDocument::COLLECTION)
.find(
doc! {
"milestone_timestamp": { "$lte": end_timestamp },
"at.milestone_timestamp": { "$lte": end_timestamp },
"is_synced": true
},
FindOptions::builder()
.sort(doc! {"milestone_index": -1})
.sort(doc! { "at.milestone_index": -1 })
.limit(1)
.projection(doc! { "at": 1 })
.build(),
)
.await?
.try_next()
.await?
.map(|d| MilestoneIndexTimestamp {
milestone_index: d.milestone_index,
milestone_timestamp: d.milestone_timestamp,
}))
.await
}

/// Find the latest milestone inserted.
Expand All @@ -275,8 +250,9 @@ impl MongoDb {
.find(
doc! { "is_synced": true },
FindOptions::builder()
.sort(doc! {"milestone_index": -1})
.sort(doc! { "at.milestone_index": -1 })
.limit(1)
.projection(doc! { "at": 1 })
.build(),
)
.await?
Expand All @@ -289,10 +265,8 @@ impl MongoDb {
self.0
.collection::<MilestoneDocument>(MilestoneDocument::COLLECTION)
.update_one(
doc! { "milestone_index": index },
doc! { "$set": {
"is_synced": true,
}},
doc! { "at.milestone_index": index },
doc! { "$set": { "is_synced": true } },
UpdateOptions::builder().upsert(true).build(),
)
.await?;
Expand All @@ -305,25 +279,21 @@ impl MongoDb {
&self,
range: RangeInclusive<MilestoneIndex>,
) -> Result<impl Stream<Item = Result<MilestoneIndex, Error>>, Error> {
#[derive(Deserialize)]
struct SyncEntry {
milestone_index: MilestoneIndex,
}

self.0
.collection::<SyncEntry>(MilestoneDocument::COLLECTION)
Ok(self
.0
.collection::<MilestoneIndexTimestamp>(MilestoneDocument::COLLECTION)
.find(
doc! {
"milestone_index": { "$gte": *range.start(), "$lte": *range.end() },
"is_synced": { "$eq": true }
"at.milestone_index": { "$gte": *range.start(), "$lte": *range.end() },
"is_synced": true
},
FindOptions::builder()
.sort(doc! {"milestone_index": 1u32})
.projection(doc! {"milestone_index": 1u32})
.sort(doc! { "at.milestone_index": 1 })
.projection(doc! { "at": 1 })
.build(),
)
.await
.map(|c| c.map_ok(|e| e.milestone_index))
.await?
.map_ok(|ts| ts.milestone_index))
}

/// Retrieves a [`SyncData`] structure that contains the completed and gaps ranges.
Expand Down Expand Up @@ -365,23 +335,18 @@ impl MongoDb {

/// Retrieves gaps in the milestones collection.
pub async fn get_gaps(&self) -> Result<Vec<RangeInclusive<MilestoneIndex>>, Error> {
#[derive(Deserialize)]
struct SyncEntry {
milestone_index: MilestoneIndex,
}

let mut synced_ms = self
.0
.collection::<SyncEntry>(MilestoneDocument::COLLECTION)
.collection::<MilestoneIndexTimestamp>(MilestoneDocument::COLLECTION)
.find(
doc! { "is_synced": true },
FindOptions::builder()
.sort(doc! {"milestone_index": 1})
.projection(doc! {"milestone_index": 1})
.sort(doc! { "at.milestone_index": 1 })
.projection(doc! { "at": 1 })
.build(),
)
.await
.map(|c| c.map_ok(|e| e.milestone_index))?;
.await?
.map_ok(|e| e.milestone_index);

let mut gaps = Vec::new();
let mut last_record: Option<MilestoneIndex> = None;
Expand All @@ -405,7 +370,7 @@ impl MongoDb {
#[derive(Deserialize)]
struct ReceiptAtIndex {
receipt: MilestoneOption,
at: MilestoneIndex,
index: MilestoneIndex,
}

Ok(self
Expand All @@ -415,20 +380,20 @@ impl MongoDb {
vec![
doc! { "$unwind": "payload.essence.options"},
doc! { "$match": {
"options.receipt.migrated_at": { "$exists": true },
"payload.essence.options.receipt.migrated_at": { "$exists": true },
} },
doc! { "$sort": { "at.milestone_index": 1 } },
doc! { "$replaceWith": {
"receipt": "options.receipt" ,
"at": "$milestone_index" ,
"index": "$at.milestone_index" ,
} },
doc! { "$sort": { "at": 1 } },
],
None,
)
.await?
.map(|doc| {
let ReceiptAtIndex { receipt, at } = bson::from_document::<ReceiptAtIndex>(doc?)?;
Ok((receipt, at))
let ReceiptAtIndex { receipt, index } = bson::from_document::<ReceiptAtIndex>(doc?)?;
Ok((receipt, index))
}))
}

Expand All @@ -441,7 +406,7 @@ impl MongoDb {
#[derive(Deserialize)]
struct ReceiptAtIndex {
receipt: MilestoneOption,
at: MilestoneIndex,
index: MilestoneIndex,
}

Ok(self
Expand All @@ -451,20 +416,20 @@ impl MongoDb {
vec![
doc! { "$unwind": "payload.essence.options"},
doc! { "$match": {
"options.receipt.migrated_at": { "$and": [ { "$exists": true }, { "$eq": migrated_at } ] },
"payload.essence.options.receipt.migrated_at": { "$and": [ { "$exists": true }, { "$eq": migrated_at } ] },
} },
doc! { "$sort": { "at.milestone_index": 1 } },
doc! { "$replaceWith": {
"receipt": "options.receipt" ,
"at": "$milestone_index" ,
"index": "$at.milestone_index" ,
} },
doc! { "$sort": { "at": 1 } },
],
None,
)
.await?
.map(|doc| {
let ReceiptAtIndex { receipt, at } = bson::from_document::<ReceiptAtIndex>(doc?)?;
Ok((receipt, at))
let ReceiptAtIndex { receipt, index } = bson::from_document::<ReceiptAtIndex>(doc?)?;
Ok((receipt, index))
}))
}
}

0 comments on commit 8470cae

Please sign in to comment.