Skip to content

Commit

Permalink
fix(db)!: replace projections with aggregate pipelines (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
grtlr committed May 31, 2022
1 parent 165303c commit d7d1643
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 136 deletions.
4 changes: 4 additions & 0 deletions bin/inx-chronicle/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ impl IntoResponse for ErrorBody {

impl From<ApiError> for ErrorBody {
fn from(err: ApiError) -> Self {
if let ApiError::Internal(e) = &err {
log::error!("Internal API error: {}", e);
}

Self {
status: err.status(),
code: err.code(),
Expand Down
4 changes: 2 additions & 2 deletions bin/inx-chronicle/src/api/stardust/v2/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use axum::response::IntoResponse;
use chronicle::types::{
ledger::LedgerInclusionState,
stardust::{
block::{BlockId, Input, Output, Payload},
block::{BlockId, Input, MilestonePayload, Output, Payload},
milestone::MilestoneTimestamp,
},
tangle::MilestoneIndex,
Expand Down Expand Up @@ -143,7 +143,7 @@ impl_success_response!(TransactionsResponse);
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct MilestoneResponse {
pub payload: Payload,
pub payload: MilestonePayload,
}

impl_success_response!(MilestoneResponse);
26 changes: 2 additions & 24 deletions bin/inx-chronicle/src/api/stardust/v2/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ use chronicle::{
tangle::MilestoneIndex,
},
};
use futures::TryStreamExt;

use super::responses::*;
use crate::api::{error::ApiError, extractors::Pagination, ApiResult};
use crate::api::{error::ApiError, ApiResult};

pub fn routes() -> Router {
Router::new()
Expand All @@ -27,8 +26,7 @@ pub fn routes() -> Router {
Router::new()
.route("/:block_id", get(block))
.route("/:block_id/raw", get(block_raw))
.route("/:block_id/metadata", get(block_metadata))
.route("/:block_id/children", get(block_children)),
.route("/:block_id/metadata", get(block_metadata)),
)
.nest(
"/outputs",
Expand Down Expand Up @@ -87,26 +85,6 @@ async fn block_metadata(
})
}

async fn block_children(
database: Extension<MongoDb>,
Path(block_id): Path<String>,
Pagination { page_size, page }: Pagination,
) -> ApiResult<BlockChildrenResponse> {
let block_id_dto = BlockId::from_str(&block_id).map_err(ApiError::bad_parse)?;
let blocks = database
.get_block_children(&block_id_dto, page_size, page)
.await?
.try_collect::<Vec<_>>()
.await?;

Ok(BlockChildrenResponse {
block_id,
max_results: page_size,
count: blocks.len(),
children: blocks,
})
}

async fn output(database: Extension<MongoDb>, Path(output_id): Path<String>) -> ApiResult<OutputResponse> {
let output_id = OutputId::from_str(&output_id).map_err(ApiError::bad_parse)?;
let (output, metadata) = database
Expand Down
119 changes: 68 additions & 51 deletions src/db/collections/block.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use futures::{Stream, StreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use mongodb::{
bson::{self, doc},
error::Error,
options::{FindOneOptions, FindOptions, UpdateOptions},
options::UpdateOptions,
};
use serde::{Deserialize, Serialize};

Expand All @@ -22,7 +22,6 @@ use crate::{
#[derive(Clone, Debug, Serialize, Deserialize)]
struct BlockDocument {
/// The id of the current block.
#[serde(rename = "_id")]
block_id: BlockId,
/// The block.
block: Block,
Expand Down Expand Up @@ -63,56 +62,65 @@ pub struct TransactionHistoryResult {
impl MongoDb {
/// Get a [`Block`] by its [`BlockId`].
pub async fn get_block(&self, block_id: &BlockId) -> Result<Option<Block>, Error> {
self.0
let block = self
.0
.collection::<Block>(BlockDocument::COLLECTION)
.find_one(
doc! {"_id": block_id},
FindOneOptions::builder().projection(doc! {"block": 1 }).build(),
.aggregate(
vec![
doc! { "$match": { "block_id": block_id } },
doc! { "$replaceRoot": { "newRoot": "$block" } },
],
None,
)
.await
.await?
.try_next()
.await?
.map(bson::from_document)
.transpose()?;

Ok(block)
}

/// Get the raw bytes of a [`Block`] by its [`BlockId`].
pub async fn get_block_raw(&self, block_id: &BlockId) -> Result<Option<Vec<u8>>, Error> {
self.0
.collection::<Vec<u8>>(BlockDocument::COLLECTION)
.find_one(
doc! {"_id": block_id},
FindOneOptions::builder().projection(doc! {"raw": 1 }).build(),
let raw = self
.0
.collection::<Block>(BlockDocument::COLLECTION)
.aggregate(
vec![
doc! { "$match": { "block_id": block_id } },
doc! { "$replaceRoot": { "newRoot": "$raw" } },
],
None,
)
.await
.await?
.try_next()
.await?
.map(bson::from_document)
.transpose()?;

Ok(raw)
}

/// Get the metadata of a [`Block`] by its [`BlockId`].
pub async fn get_block_metadata(&self, block_id: &BlockId) -> Result<Option<BlockMetadata>, Error> {
self.0
.collection::<BlockMetadata>(BlockDocument::COLLECTION)
.find_one(
doc! {"_id": block_id},
FindOneOptions::builder().projection(doc! {"metadata": 1 }).build(),
let block = self
.0
.collection::<Block>(BlockDocument::COLLECTION)
.aggregate(
vec![
doc! { "$match": { "block_id": block_id } },
doc! { "$replaceRoot": { "newRoot": "$metadata" } },
],
None,
)
.await
}
.await?
.try_next()
.await?
.map(bson::from_document)
.transpose()?;

/// Get the children of a [`Block`] as a stream of [`BlockId`]s.
pub async fn get_block_children(
&self,
block_id: &BlockId,
page_size: usize,
page: usize,
) -> Result<impl Stream<Item = Result<BlockId, Error>>, Error> {
self.0
.collection::<BlockId>(BlockDocument::COLLECTION)
.find(
doc! {"block.parents": block_id},
FindOptions::builder()
.skip((page_size * page) as u64)
.sort(doc! {"metadata.referenced_by_milestone_index": -1})
.limit(page_size as i64)
.projection(doc! {"block_id": 1 })
.build(),
)
.await
Ok(block)
}

/// Inserts a [`Block`] together with its associated [`BlockMetadata`].
Expand All @@ -135,7 +143,7 @@ impl MongoDb {
self.0
.collection::<BlockDocument>(BlockDocument::COLLECTION)
.update_one(
doc! { "_id": block_id },
doc! { "block_id": block_id },
doc! { "$set": bson::to_document(&block_document)? },
UpdateOptions::builder().upsert(true).build(),
)
Expand All @@ -146,16 +154,27 @@ impl MongoDb {

/// Finds the [`Block`] that included a transaction by [`TransactionId`].
pub async fn get_block_for_transaction(&self, transaction_id: &TransactionId) -> Result<Option<Block>, Error> {
self.0
let block = self
.0
.collection::<Block>(BlockDocument::COLLECTION)
.find_one(
doc! {
"inclusion_state": LedgerInclusionState::Included,
"block.payload.transaction_id": transaction_id,
},
FindOneOptions::builder().projection(doc! {"block": 1 }).build(),
.aggregate(
vec![
doc! { "$match": {
"$and": [
{ "inclusion_state": LedgerInclusionState::Included },
{ "block.payload.transaction_id": transaction_id },
] } },
doc! { "$replaceRoot": { "newRoot": "$block" } },
],
None,
)
.await
.await?
.try_next()
.await?
.map(bson::from_document)
.transpose()?;

Ok(block)
}

/// Aggregates the transaction history for an address.
Expand Down Expand Up @@ -260,8 +279,6 @@ impl MongoDb {
start_milestone: MilestoneIndex,
end_milestone: MilestoneIndex,
) -> Result<Option<AddressAnalyticsResult>, Error> {
use futures::TryStreamExt;

Ok(self.0.collection::<BlockDocument>(BlockDocument::COLLECTION)
.aggregate(
vec![
Expand Down
Loading

0 comments on commit d7d1643

Please sign in to comment.