Skip to content

Commit

Permalink
feat(db)!: separate database collections into individual types (#626) (
Browse files Browse the repository at this point in the history
…#650)

* feat(db): separate database collections into individual types (#626)

* Separate database collections into individual types.

* Create ext traits to simplify usage.

* Use custom thread safe iterator for chunking.

* Write chunks tests and cleanup code.

* refactor(db): move collection helpers and add docs

* Rename trait

* Minor improvements

* Fmt

Co-authored-by: Alexandcoats <alexandcoats@gmail.com>
  • Loading branch information
grtlr and Alexandcoats authored Sep 13, 2022
1 parent e1e4dc8 commit 5d5499d
Show file tree
Hide file tree
Showing 22 changed files with 1,635 additions and 1,079 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ console-subscriber = { version = "0.1", default-features = false, optional = tru

[dev-dependencies]
packable = { version = "=0.6.1", default-features = false }
rand = { version = "0.8", default-features = false, features = [ "std" ] }

[features]
default = [
Expand Down
11 changes: 9 additions & 2 deletions src/bin/inx-chronicle/api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use axum::{
routing::{get, post},
Extension, Json, Router,
};
use chronicle::{db::MongoDb, types::stardust::milestone::MilestoneTimestamp};
use chronicle::{
db::{collections::MilestoneCollection, MongoDb},
types::stardust::milestone::MilestoneTimestamp,
};
use hyper::StatusCode;
use serde::Deserialize;
use time::{Duration, OffsetDateTime};
Expand Down Expand Up @@ -70,7 +73,11 @@ fn is_new_enough(timestamp: MilestoneTimestamp) -> bool {
pub async fn is_healthy(database: &MongoDb) -> Result<bool, ApiError> {
#[cfg(feature = "stardust")]
{
let newest = match database.get_newest_milestone().await? {
let newest = match database
.collection::<MilestoneCollection>()
.get_newest_milestone()
.await?
{
Some(last) => last,
None => return Ok(false),
};
Expand Down
32 changes: 26 additions & 6 deletions src/bin/inx-chronicle/api/stardust/analytics/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use axum::{routing::get, Extension, Router};
use bee_api_types_stardust::responses::RentStructureResponse;
use chronicle::{
db::{
collections::{OutputKind, PayloadKind},
collections::{BlockCollection, OutputCollection, OutputKind, PayloadKind, ProtocolUpdateCollection},
MongoDb,
},
types::stardust::block::{
Expand Down Expand Up @@ -69,7 +69,10 @@ async fn address_activity_analytics(
database: Extension<MongoDb>,
MilestoneRange { start_index, end_index }: MilestoneRange,
) -> ApiResult<AddressAnalyticsResponse> {
let res = database.get_address_analytics(start_index, end_index).await?;
let res = database
.collection::<OutputCollection>()
.get_address_analytics(start_index, end_index)
.await?;

Ok(AddressAnalyticsResponse {
total_active_addresses: res.total_active_addresses.to_string(),
Expand All @@ -82,7 +85,10 @@ async fn block_activity_analytics<B: PayloadKind>(
database: Extension<MongoDb>,
MilestoneRange { start_index, end_index }: MilestoneRange,
) -> ApiResult<BlockAnalyticsResponse> {
let res = database.get_block_analytics::<B>(start_index, end_index).await?;
let res = database
.collection::<BlockCollection>()
.get_block_analytics::<B>(start_index, end_index)
.await?;

Ok(BlockAnalyticsResponse {
count: res.count.to_string(),
Expand All @@ -93,7 +99,10 @@ async fn output_activity_analytics<O: OutputKind>(
database: Extension<MongoDb>,
MilestoneRange { start_index, end_index }: MilestoneRange,
) -> ApiResult<OutputAnalyticsResponse> {
let res = database.get_output_analytics::<O>(start_index, end_index).await?;
let res = database
.collection::<OutputCollection>()
.get_output_analytics::<O>(start_index, end_index)
.await?;

Ok(OutputAnalyticsResponse {
count: res.count.to_string(),
Expand All @@ -106,6 +115,7 @@ async fn unspent_output_ledger_analytics<O: OutputKind>(
LedgerIndex { ledger_index }: LedgerIndex,
) -> ApiResult<OutputAnalyticsResponse> {
let res = database
.collection::<OutputCollection>()
.get_unspent_output_analytics::<O>(ledger_index)
.await?
.ok_or(ApiError::NoResults)?;
Expand All @@ -121,6 +131,7 @@ async fn storage_deposit_ledger_analytics(
LedgerIndex { ledger_index }: LedgerIndex,
) -> ApiResult<StorageDepositAnalyticsResponse> {
let res = database
.collection::<OutputCollection>()
.get_storage_deposit_analytics(ledger_index)
.await?
.ok_or(ApiError::NoResults)?;
Expand All @@ -145,7 +156,10 @@ async fn nft_activity_analytics(
database: Extension<MongoDb>,
MilestoneRange { start_index, end_index }: MilestoneRange,
) -> ApiResult<OutputDiffAnalyticsResponse> {
let res = database.get_nft_output_analytics(start_index, end_index).await?;
let res = database
.collection::<OutputCollection>()
.get_nft_output_analytics(start_index, end_index)
.await?;

Ok(OutputDiffAnalyticsResponse {
created_count: res.created_count.to_string(),
Expand All @@ -158,7 +172,10 @@ async fn native_token_activity_analytics(
database: Extension<MongoDb>,
MilestoneRange { start_index, end_index }: MilestoneRange,
) -> ApiResult<OutputDiffAnalyticsResponse> {
let res = database.get_foundry_output_analytics(start_index, end_index).await?;
let res = database
.collection::<OutputCollection>()
.get_foundry_output_analytics(start_index, end_index)
.await?;

Ok(OutputDiffAnalyticsResponse {
created_count: res.created_count.to_string(),
Expand All @@ -172,11 +189,13 @@ async fn richest_addresses_ledger_analytics(
RichestAddressesQuery { top, ledger_index }: RichestAddressesQuery,
) -> ApiResult<RichestAddressesResponse> {
let res = database
.collection::<OutputCollection>()
.get_richest_addresses(ledger_index, top)
.await?
.ok_or(ApiError::NoResults)?;

let hrp = database
.collection::<ProtocolUpdateCollection>()
.get_protocol_parameters_for_ledger_index(res.ledger_index)
.await?
.ok_or(InternalApiError::CorruptState("no protocol parameters"))?
Expand All @@ -201,6 +220,7 @@ async fn token_distribution_ledger_analytics(
LedgerIndex { ledger_index }: LedgerIndex,
) -> ApiResult<TokenDistributionResponse> {
let res = database
.collection::<OutputCollection>()
.get_token_distribution(ledger_index)
.await?
.ok_or(ApiError::NoResults)?;
Expand Down
66 changes: 47 additions & 19 deletions src/bin/inx-chronicle/api/stardust/core/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use bee_block_stardust::{
};
use chronicle::{
db::{
collections::{OutputMetadataResult, OutputWithMetadataResult, UtxoChangesResult},
collections::{
BlockCollection, MilestoneCollection, OutputCollection, OutputMetadataResult, OutputWithMetadataResult,
ProtocolUpdateCollection, TreasuryCollection, UtxoChangesResult,
},
MongoDb,
},
types::{
Expand Down Expand Up @@ -101,6 +104,7 @@ pub fn routes() -> Router {

pub async fn info(database: Extension<MongoDb>) -> ApiResult<InfoResponse> {
let protocol = database
.collection::<ProtocolUpdateCollection>()
.get_latest_protocol_parameters()
.await?
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
Expand All @@ -113,26 +117,27 @@ pub async fn info(database: Extension<MongoDb>) -> ApiResult<InfoResponse> {
false
});

let newest_milestone =
database
.get_newest_milestone()
.await?
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
"no milestone in the database",
)))?;
let oldest_milestone =
database
.get_oldest_milestone()
.await?
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
"no milestone in the database",
)))?;
let newest_milestone = database
.collection::<MilestoneCollection>()
.get_newest_milestone()
.await?
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
"no milestone in the database",
)))?;
let oldest_milestone = database
.collection::<MilestoneCollection>()
.get_oldest_milestone()
.await?
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
"no milestone in the database",
)))?;

let latest_milestone = LatestMilestoneResponse {
index: newest_milestone.milestone_index.0,
timestamp: newest_milestone.milestone_timestamp.0,
milestone_id: bee_block_stardust::payload::milestone::MilestoneId::from(
database
.collection::<MilestoneCollection>()
.get_milestone_id(newest_milestone.milestone_index)
.await?
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
Expand Down Expand Up @@ -183,12 +188,20 @@ async fn block(
if let Some(value) = headers.get(axum::http::header::ACCEPT) {
if value.eq(&*BYTE_CONTENT_HEADER) {
return Ok(BlockResponse::Raw(
database.get_block_raw(&block_id).await?.ok_or(ApiError::NoResults)?,
database
.collection::<BlockCollection>()
.get_block_raw(&block_id)
.await?
.ok_or(ApiError::NoResults)?,
));
}
}

let block = database.get_block(&block_id).await?.ok_or(ApiError::NoResults)?;
let block = database
.collection::<BlockCollection>()
.get_block(&block_id)
.await?
.ok_or(ApiError::NoResults)?;
Ok(BlockResponse::Json(BlockDto::try_from(block)?))
}

Expand All @@ -198,6 +211,7 @@ async fn block_metadata(
) -> ApiResult<BlockMetadataResponse> {
let block_id = BlockId::from_str(&block_id_str).map_err(ApiError::bad_parse)?;
let metadata = database
.collection::<BlockCollection>()
.get_block_metadata(&block_id)
.await?
.ok_or(ApiError::NoResults)?;
Expand Down Expand Up @@ -243,6 +257,7 @@ fn create_output_metadata_response(metadata: OutputMetadataResult) -> OutputMeta
async fn output(database: Extension<MongoDb>, Path(output_id): Path<String>) -> ApiResult<OutputResponse> {
let output_id = OutputId::from_str(&output_id).map_err(ApiError::bad_parse)?;
let OutputWithMetadataResult { output, metadata } = database
.collection::<OutputCollection>()
.get_output_with_metadata(&output_id)
.await?
.ok_or(ApiError::NoResults)?;
Expand All @@ -261,6 +276,7 @@ async fn output_metadata(
) -> ApiResult<OutputMetadataResponse> {
let output_id = OutputId::from_str(&output_id).map_err(ApiError::bad_parse)?;
let metadata = database
.collection::<OutputCollection>()
.get_output_metadata(&output_id)
.await?
.ok_or(ApiError::NoResults)?;
Expand All @@ -274,6 +290,7 @@ async fn transaction_included_block(
) -> ApiResult<BlockResponse> {
let transaction_id = TransactionId::from_str(&transaction_id).map_err(ApiError::bad_parse)?;
let block = database
.collection::<BlockCollection>()
.get_block_for_transaction(&transaction_id)
.await?
.ok_or(ApiError::NoResults)?;
Expand All @@ -282,7 +299,10 @@ async fn transaction_included_block(
}

async fn receipts(database: Extension<MongoDb>) -> ApiResult<ReceiptsResponse> {
let mut receipts_at = database.stream_all_receipts().await?;
let mut receipts_at = database
.collection::<MilestoneCollection>()
.stream_all_receipts()
.await?;
let mut receipts = Vec::new();
while let Some((receipt, at)) = receipts_at.try_next().await? {
let receipt: &bee_block_stardust::payload::milestone::MilestoneOption = &receipt.try_into()?;
Expand All @@ -301,7 +321,10 @@ async fn receipts(database: Extension<MongoDb>) -> ApiResult<ReceiptsResponse> {
}

async fn receipts_migrated_at(database: Extension<MongoDb>, Path(index): Path<u32>) -> ApiResult<ReceiptsResponse> {
let mut receipts_at = database.stream_receipts_migrated_at(index.into()).await?;
let mut receipts_at = database
.collection::<MilestoneCollection>()
.stream_receipts_migrated_at(index.into())
.await?;
let mut receipts = Vec::new();
while let Some((receipt, at)) = receipts_at.try_next().await? {
let receipt: &bee_block_stardust::payload::milestone::MilestoneOption = &receipt.try_into()?;
Expand All @@ -321,6 +344,7 @@ async fn receipts_migrated_at(database: Extension<MongoDb>, Path(index): Path<u3

async fn treasury(database: Extension<MongoDb>) -> ApiResult<TreasuryResponse> {
database
.collection::<TreasuryCollection>()
.get_latest_treasury()
.await?
.ok_or(ApiError::NoResults)
Expand All @@ -337,6 +361,7 @@ async fn milestone(
) -> ApiResult<MilestoneResponse> {
let milestone_id = MilestoneId::from_str(&milestone_id).map_err(ApiError::bad_parse)?;
let milestone_payload = database
.collection::<MilestoneCollection>()
.get_milestone_payload_by_id(&milestone_id)
.await?
.ok_or(ApiError::NoResults)?;
Expand All @@ -359,6 +384,7 @@ async fn milestone_by_index(
headers: HeaderMap,
) -> ApiResult<MilestoneResponse> {
let milestone_payload = database
.collection::<MilestoneCollection>()
.get_milestone_payload(index)
.await?
.ok_or(ApiError::NoResults)?;
Expand All @@ -381,6 +407,7 @@ async fn utxo_changes(
) -> ApiResult<UtxoChangesResponse> {
let milestone_id = MilestoneId::from_str(&milestone_id).map_err(ApiError::bad_parse)?;
let milestone_index = database
.collection::<MilestoneCollection>()
.get_milestone_payload_by_id(&milestone_id)
.await?
.ok_or(ApiError::NoResults)?
Expand All @@ -401,6 +428,7 @@ async fn collect_utxo_changes(database: &MongoDb, milestone_index: MilestoneInde
created_outputs,
consumed_outputs,
} = database
.collection::<OutputCollection>()
.get_utxo_changes(milestone_index)
.await?
.ok_or(ApiError::NoResults)?;
Expand Down
10 changes: 9 additions & 1 deletion src/bin/inx-chronicle/api/stardust/explorer/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::str::FromStr;

use axum::{extract::Path, routing::get, Extension, Router};
use chronicle::{
db::MongoDb,
db::{
collections::{BlockCollection, LedgerUpdateCollection, MilestoneCollection, OutputCollection},
MongoDb,
},
types::stardust::block::{payload::milestone::MilestoneId, Address, BlockId},
};
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -45,6 +48,7 @@ async fn ledger_updates_by_address(
let address_dto = Address::from_str(&address).map_err(ApiError::bad_parse)?;

let mut record_stream = database
.collection::<LedgerUpdateCollection>()
.stream_ledger_updates_by_address(
&address_dto,
// Get one extra record so that we can create the cursor.
Expand Down Expand Up @@ -84,13 +88,15 @@ async fn ledger_updates_by_milestone(
let milestone_id = MilestoneId::from_str(&milestone_id).map_err(ApiError::bad_parse)?;

let milestone_index = database
.collection::<MilestoneCollection>()
.get_milestone_payload_by_id(&milestone_id)
.await?
.ok_or(ApiError::NotFound)?
.essence
.index;

let mut record_stream = database
.collection::<LedgerUpdateCollection>()
.stream_ledger_updates_by_milestone(milestone_index, page_size + 1, cursor)
.await?;

Expand Down Expand Up @@ -122,6 +128,7 @@ async fn ledger_updates_by_milestone(
async fn balance(database: Extension<MongoDb>, Path(address): Path<String>) -> ApiResult<BalanceResponse> {
let address = Address::from_str(&address).map_err(ApiError::bad_parse)?;
let res = database
.collection::<OutputCollection>()
.get_address_balance(address)
.await?
.ok_or(ApiError::NoResults)?;
Expand All @@ -140,6 +147,7 @@ async fn block_children(
) -> ApiResult<BlockChildrenResponse> {
let block_id = BlockId::from_str(&block_id).map_err(ApiError::bad_parse)?;
let mut block_children = database
.collection::<BlockCollection>()
.get_block_children(&block_id, page_size, page)
.await
.map_err(|_| ApiError::NoResults)?;
Expand Down
6 changes: 5 additions & 1 deletion src/bin/inx-chronicle/api/stardust/indexer/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::str::FromStr;
use axum::{extract::Path, routing::get, Extension, Router};
use chronicle::{
db::{
collections::{AliasOutputsQuery, BasicOutputsQuery, FoundryOutputsQuery, IndexedId, NftOutputsQuery},
collections::{
AliasOutputsQuery, BasicOutputsQuery, FoundryOutputsQuery, IndexedId, NftOutputsQuery, OutputCollection,
},
MongoDb,
},
types::stardust::block::output::{AliasId, FoundryId, NftId},
Expand Down Expand Up @@ -52,6 +54,7 @@ where
{
let id = ID::from_str(&id).map_err(ApiError::bad_parse)?;
let res = database
.collection::<OutputCollection>()
.get_indexed_output_by_id(id)
.await?
.ok_or(ApiError::NoResults)?;
Expand All @@ -76,6 +79,7 @@ where
bson::Document: From<Q>,
{
let res = database
.collection::<OutputCollection>()
.get_indexed_outputs(
query,
// Get one extra record so that we can create the cursor.
Expand Down
Loading

0 comments on commit 5d5499d

Please sign in to comment.