Skip to content

Commit

Permalink
fix(db): Rename message_id to _id (#172)
Browse files Browse the repository at this point in the history
* fix(db): index on `message.id`

* Don't use `.sparse()`

* Pull out `MessageId`

* Move `SyncData` to `responses`

* Reuse `_id` for `MessageId`

* Create basic integration test

* Add option to clear db

* Change milestone `milestone_id` to `_id`

* Fmt

* Remove services from CI
  • Loading branch information
grtlr authored May 20, 2022
1 parent 46f5bcb commit d5da16a
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 90 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ console-subscriber = { version = "0.1", default-features = false, optional = tru

[dev-dependencies]
bee-test = { package = "bee-test", git = "https://github.com/iotaledger/bee.git", branch = "shimmer-develop", default-features = false }
packable = { version = "0.3.2", default-features = false }

[features]
default = [
Expand Down
16 changes: 12 additions & 4 deletions bin/inx-chronicle/src/api/responses.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::ops::Deref;
use std::ops::{Deref, Range};

use axum::response::IntoResponse;
use chronicle::{db::model::sync::SyncData, types::ledger::LedgerInclusionState};
use chronicle::types::ledger::LedgerInclusionState;
use derive_more::From;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -34,8 +34,16 @@ pub struct InfoResponse {

impl_success_response!(InfoResponse);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncDataResponse(pub SyncData);
/// An aggregation type that represents the ranges of completed milestones and gaps.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SyncDataResponse {
/// The completed(synced and logged) milestones data
pub completed: Vec<Range<u32>>,
/// Synced milestones data but unlogged
pub synced_but_unlogged: Vec<Range<u32>>,
/// Gaps/missings milestones data
pub gaps: Vec<Range<u32>>,
}

impl_success_response!(SyncDataResponse);

Expand Down
9 changes: 3 additions & 6 deletions bin/inx-chronicle/src/api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use axum::{handler::Handler, routing::get, Extension, Router};
use chronicle::db::{
model::sync::{SyncData, SyncRecord},
MongoDb,
};
use chronicle::db::{model::sync::SyncRecord, MongoDb};
use futures::TryStreamExt;

use super::{error::ApiError, responses::*, ApiResult};
Expand Down Expand Up @@ -34,7 +31,7 @@ async fn info() -> InfoResponse {

async fn sync(database: Extension<MongoDb>) -> ApiResult<SyncDataResponse> {
let mut res = database.sync_records_sorted().await?;
let mut sync_data = SyncData::default();
let mut sync_data = SyncDataResponse::default();
let mut last_record: Option<SyncRecord> = None;
while let Some(sync_record) = res.try_next().await? {
// Missing records go into gaps
Expand Down Expand Up @@ -80,7 +77,7 @@ async fn sync(database: Extension<MongoDb>) -> ApiResult<SyncDataResponse> {
}
last_record.replace(sync_record);
}
Ok(SyncDataResponse(sync_data))
Ok(sync_data)
}

async fn not_found() -> ApiError {
Expand Down
26 changes: 13 additions & 13 deletions bin/inx-chronicle/src/api/stardust/v2/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ async fn message(database: Extension<MongoDb>, Path(message_id): Path<String>) -
.await?
.ok_or(ApiError::NoResults)?;
Ok(MessageResponse {
protocol_version: rec.message.protocol_version,
parents: rec.message.parents.iter().map(|m| m.to_hex()).collect(),
payload: rec.message.payload,
nonce: rec.message.nonce,
protocol_version: rec.inner.protocol_version,
parents: rec.inner.parents.iter().map(|m| m.to_hex()).collect(),
payload: rec.inner.payload,
nonce: rec.inner.nonce,
})
}

Expand All @@ -84,8 +84,8 @@ async fn message_metadata(
.ok_or(ApiError::NoResults)?;

Ok(MessageMetadataResponse {
message_id: rec.message.id.to_hex(),
parent_message_ids: rec.message.parents.iter().map(|id| id.to_hex()).collect(),
message_id: rec.inner.message_id.to_hex(),
parent_message_ids: rec.inner.parents.iter().map(|id| id.to_hex()).collect(),
is_solid: rec.metadata.as_ref().map(|d| d.is_solid),
referenced_by_milestone_index: rec.metadata.as_ref().map(|d| d.referenced_by_milestone_index),
milestone_index: rec.metadata.as_ref().map(|d| d.milestone_index),
Expand Down Expand Up @@ -118,13 +118,13 @@ async fn message_children(
.map(|rec| {
if expanded {
Record {
id: rec.message.id.to_hex(),
id: rec.inner.message_id.to_hex(),
inclusion_state: rec.metadata.as_ref().map(|d| d.inclusion_state),
milestone_index: rec.metadata.as_ref().map(|d| d.referenced_by_milestone_index),
}
.into()
} else {
rec.message.id.to_hex().into()
rec.inner.message_id.to_hex().into()
}
})
.collect(),
Expand Down Expand Up @@ -215,7 +215,7 @@ async fn output_metadata(
.as_ref()
.map(|ms| (ms.milestone_timestamp.timestamp_millis() / 1000) as u32),
transaction_id_spent: spending_transaction.as_ref().map(|txn| {
if let Some(Payload::Transaction(payload)) = &txn.message.payload {
if let Some(Payload::Transaction(payload)) = &txn.inner.payload {
payload.id.to_hex()
} else {
unreachable!()
Expand All @@ -237,10 +237,10 @@ async fn transaction_included_message(
.ok_or(ApiError::NoResults)?;

Ok(MessageResponse {
protocol_version: rec.message.protocol_version,
parents: rec.message.parents.iter().map(|m| m.to_hex()).collect(),
payload: rec.message.payload,
nonce: rec.message.nonce,
protocol_version: rec.inner.protocol_version,
parents: rec.inner.parents.iter().map(|m| m.to_hex()).collect(),
payload: rec.inner.payload,
nonce: rec.inner.nonce,
})
}

Expand Down
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/collector/stardust_inx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl HandleEvent<MilestoneState> for Solidifier {
// We may have reached a different milestone, in which case there is nothing to
// do for this message
if ms_state.milestone_index == ms_index {
let parents = Vec::from(message_rec.message.parents);
let parents = Vec::from(message_rec.inner.parents);
ms_state.process_queue.extend(parents);
}
ms_state.process_queue.pop_front();
Expand Down
6 changes: 2 additions & 4 deletions src/db/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

/// Module containing information about the network and state of the node.
pub mod status;

/// Module containing Stardust data models.
#[cfg(feature = "stardust")]
pub mod stardust;

/// Module containing information about the network and state of the node.
pub mod status;
/// Module containing sync models.
pub mod sync;
13 changes: 7 additions & 6 deletions src/db/model/stardust/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::{
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MessageRecord {
/// The message.
pub message: Message,
#[serde(flatten)]
pub inner: Message,
/// The raw bytes of the message.
#[serde(with = "serde_bytes")]
pub raw: Vec<u8>,
Expand All @@ -37,7 +38,7 @@ impl MessageRecord {
/// Creates a new message record.
pub fn new(message: Message, raw: Vec<u8>) -> Self {
Self {
message,
inner: message,
raw,
metadata: None,
}
Expand All @@ -63,7 +64,7 @@ impl TryFrom<(inx::proto::RawMessage, inx::proto::MessageMetadata)> for MessageR
) -> Result<Self, Self::Error> {
let message = bee_message_stardust::Message::try_from(raw_message.clone())?;
Ok(Self {
message: message.into(),
inner: message.into(),
raw: raw_message.data,
metadata: Some(inx::MessageMetadata::try_from(metadata)?.into()),
})
Expand Down Expand Up @@ -105,7 +106,7 @@ impl MongoDb {
pub async fn get_message(&self, message_id: &MessageId) -> Result<Option<MessageRecord>, Error> {
self.0
.collection::<MessageRecord>(MessageRecord::COLLECTION)
.find_one(doc! {"message.id": bson::to_bson(message_id)?}, None)
.find_one(doc! {"_id": bson::to_bson(message_id)?}, None)
.await
}

Expand Down Expand Up @@ -134,7 +135,7 @@ impl MongoDb {
self.0
.collection::<MessageRecord>(MessageRecord::COLLECTION)
.update_one(
doc! { "_id": bson::to_bson(&message_record.message.id)? },
doc! { "_id": bson::to_bson(&message_record.inner.message_id)? },
doc! { "$set": bson::to_document(message_record)? },
UpdateOptions::builder().upsert(true).build(),
)
Expand All @@ -150,7 +151,7 @@ impl MongoDb {
self.0
.collection::<MessageRecord>(MessageRecord::COLLECTION)
.update_one(
doc! { "message.id": bson::to_bson(message_id)? },
doc! { "_id": bson::to_bson(message_id)? },
doc! { "$set": { "metadata": bson::to_document(metadata)? } },
None,
)
Expand Down
7 changes: 4 additions & 3 deletions src/db/model/stardust/milestone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ use crate::{
/// A milestone's metadata.
#[derive(Serialize, Deserialize)]
pub struct MilestoneRecord {
/// The [`MilestoneId`](MilestoneId) of the milestone.
#[serde(rename = "_id")]
pub milestone_id: MilestoneId,
/// The milestone index.
pub milestone_index: u32,
/// The timestamp of the milestone.
pub milestone_timestamp: DateTime,
/// The [`MilestoneId`](MilestoneId) of the milestone.
pub milestone_id: MilestoneId,
/// The milestone's payload.
pub payload: MilestonePayload,
}
Expand Down Expand Up @@ -54,7 +55,7 @@ impl MongoDb {
pub async fn get_milestone_record(&self, id: &MilestoneId) -> Result<Option<MilestoneRecord>, Error> {
self.0
.collection::<MilestoneRecord>(MilestoneRecord::COLLECTION)
.find_one(doc! {"milestone_id": bson::to_bson(id)?}, None)
.find_one(doc! {"_id": bson::to_bson(id)?}, None)
.await
}

Expand Down
13 changes: 0 additions & 13 deletions src/db/model/sync.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;

use futures::stream::Stream;
use mongodb::{
bson::{self, doc},
Expand Down Expand Up @@ -30,17 +28,6 @@ impl SyncRecord {
pub const COLLECTION: &'static str = "sync";
}

/// An aggregation type that represents the ranges of completed milestones and gaps.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SyncData {
/// The completed(synced and logged) milestones data
pub completed: Vec<Range<u32>>,
/// Synced milestones data but unlogged
pub synced_but_unlogged: Vec<Range<u32>>,
/// Gaps/missings milestones data
pub gaps: Vec<Range<u32>>,
}

impl MongoDb {
/// Upserts a [`SyncRecord`] to the database.
pub async fn upsert_sync_record(&self, record: &SyncRecord) -> Result<UpdateResult, Error> {
Expand Down
30 changes: 27 additions & 3 deletions src/db/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! Holds the `MongoDb` type and its config.
use mongodb::{
bson::doc,
bson::{doc, Document},
error::Error,
options::{ClientOptions, Credential},
Client,
Expand All @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize};
pub struct MongoDb(pub(crate) mongodb::Database);

impl MongoDb {
const NAME: &'static str = "chronicle-test";
const NAME: &'static str = "chronicle";
const DEFAULT_CONNECT_URL: &'static str = "mongodb://localhost:27017";

/// Constructs a [`MongoDb`] by connecting to a MongoDB instance.
Expand All @@ -34,10 +34,26 @@ impl MongoDb {
}

let client = Client::with_options(client_options)?;
let db = client.database(Self::NAME);

let name = match &config.suffix {
Some(suffix) => format!("{}-{}", Self::NAME, suffix),
None => Self::NAME.to_string(),
};
let db = client.database(&name);

Ok(MongoDb(db))
}

/// Clears all the collections from the database.
pub async fn clear(&self) -> Result<(), Error> {
let collections = self.0.list_collection_names(None).await?;

for c in collections {
self.0.collection::<Document>(&c).drop(None).await?;
}

Ok(())
}
}

/// The [`MongoDb`] config.
Expand All @@ -47,6 +63,7 @@ pub struct MongoDbConfig {
pub(crate) connect_url: String,
pub(crate) username: Option<String>,
pub(crate) password: Option<String>,
pub(crate) suffix: Option<String>,
}

impl MongoDbConfig {
Expand All @@ -72,6 +89,12 @@ impl MongoDbConfig {
self.password = Some(password.into());
self
}

/// Sets the suffix.
pub fn with_suffix(mut self, suffix: impl Into<String>) -> Self {
self.suffix = Some(suffix.into());
self
}
}

impl Default for MongoDbConfig {
Expand All @@ -80,6 +103,7 @@ impl Default for MongoDbConfig {
connect_url: MongoDb::DEFAULT_CONNECT_URL.to_string(),
username: None,
password: None,
suffix: None,
}
}
}
39 changes: 39 additions & 0 deletions src/types/stardust/message/message_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::str::FromStr;

use bee_message_stardust as bee;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Ord, PartialOrd, Eq)]
#[serde(transparent)]
pub struct MessageId(#[serde(with = "serde_bytes")] pub Box<[u8]>);

impl MessageId {
pub fn to_hex(&self) -> String {
prefix_hex::encode(self.0.as_ref())
}
}

impl From<bee::MessageId> for MessageId {
fn from(value: bee::MessageId) -> Self {
Self(value.to_vec().into_boxed_slice())
}
}

impl TryFrom<MessageId> for bee::MessageId {
type Error = crate::types::error::Error;

fn try_from(value: MessageId) -> Result<Self, Self::Error> {
Ok(bee::MessageId::new(value.0.as_ref().try_into()?))
}
}

impl FromStr for MessageId {
type Err = crate::types::error::ParseError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(bee::MessageId::from_str(s)?.into())
}
}
Loading

0 comments on commit d5da16a

Please sign in to comment.