Skip to content

Commit

Permalink
feat(api): add ledger/updates/by-milestone endpoint (#326)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex6323 authored Jun 28, 2022
1 parent d4e0408 commit dbef5f1
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 68 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/api/stardust/core/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ async fn collect_utxo_changes(database: &MongoDb, milestone_index: MilestoneInde
let mut created_outputs = Vec::new();
let mut consumed_outputs = Vec::new();

let mut updates = database.get_ledger_updates_at_index(milestone_index).await?;
let mut updates = database.stream_ledger_updates_for_index(milestone_index).await?;
while let Some(update) = updates.try_next().await? {
if update.is_spent {
consumed_outputs.push(update.output_id.to_hex());
Expand Down
73 changes: 55 additions & 18 deletions bin/inx-chronicle/src/api/stardust/history/extractors.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,96 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::str::FromStr;

use async_trait::async_trait;
use axum::extract::{FromRequest, Query};
use chronicle::types::stardust::block::OutputId;
use regex::Regex;
use serde::Deserialize;

use crate::api::{error::ParseError, ApiError};

const HISTORY_PAGING_REGEX: &str = r"^([0-9]+)\.(0x(?:[0-9a-fA-F]{2})+)\.([0-9]+)$";
const DEFAULT_PAGE_SIZE: usize = 100;

#[derive(Clone)]
pub struct HistoryPagination {
pub struct HistoryByAddressPagination {
pub page_size: usize,
pub start_milestone_index: Option<u32>,
pub start_output_id: Option<OutputId>,
}

#[derive(Clone, Deserialize, Default)]
#[serde(default)]
pub struct HistoryPaginationQuery {
pub struct HistoryByAddressPaginationQuery {
pub page_size: Option<usize>,
pub start_milestone_index: Option<u32>,
pub cursor: Option<String>,
}

#[async_trait]
impl<B: Send> FromRequest<B> for HistoryPagination {
impl<B: Send> FromRequest<B> for HistoryByAddressPagination {
type Rejection = ApiError;

async fn from_request(req: &mut axum::extract::RequestParts<B>) -> Result<Self, Self::Rejection> {
let Query(HistoryPaginationQuery {
let Query(HistoryByAddressPaginationQuery {
mut page_size,
mut start_milestone_index,
cursor,
}) = Query::<HistoryPaginationQuery>::from_request(req)
}) = Query::<HistoryByAddressPaginationQuery>::from_request(req)
.await
.map_err(ApiError::QueryError)?;
let mut start_output_id = None;
if let Some(cursor) = cursor {
// Unwrap: Infallable as long as the regex is valid
let regex = Regex::new(HISTORY_PAGING_REGEX).unwrap();
let captures = regex.captures(&cursor).ok_or(ParseError::BadPagingState)?;
start_milestone_index.replace(captures.get(1).unwrap().as_str().parse().map_err(ApiError::bad_parse)?);
start_output_id
.replace(OutputId::from_str(captures.get(2).unwrap().as_str()).map_err(ApiError::bad_parse)?);
page_size.replace(captures.get(3).unwrap().as_str().parse().map_err(ApiError::bad_parse)?);
let parts = cursor.split('.').collect::<Vec<_>>();
if parts.len() != 3 {
return Err(ApiError::bad_parse(ParseError::BadPagingState));
} else {
start_milestone_index.replace(parts[0].parse().map_err(ApiError::bad_parse)?);
start_output_id.replace(parts[1].parse().map_err(ApiError::bad_parse)?);
page_size.replace(parts[2].parse().map_err(ApiError::bad_parse)?);
}
}
Ok(HistoryPagination {
page_size: page_size.unwrap_or(100),
Ok(HistoryByAddressPagination {
page_size: page_size.unwrap_or(DEFAULT_PAGE_SIZE),
start_milestone_index,
start_output_id,
})
}
}

#[derive(Clone)]
pub struct HistoryByMilestonePagination {
pub page_size: usize,
pub start_output_id: Option<OutputId>,
}

#[derive(Clone, Deserialize, Default)]
#[serde(default)]
pub struct HistoryByMilestonePaginationQuery {
pub page_size: Option<usize>,
pub cursor: Option<String>,
}

#[async_trait]
impl<B: Send> FromRequest<B> for HistoryByMilestonePagination {
type Rejection = ApiError;

async fn from_request(req: &mut axum::extract::RequestParts<B>) -> Result<Self, Self::Rejection> {
let Query(HistoryByMilestonePaginationQuery { mut page_size, cursor }) =
Query::<HistoryByMilestonePaginationQuery>::from_request(req)
.await
.map_err(ApiError::QueryError)?;
let mut start_output_id = None;
if let Some(cursor) = cursor {
let parts = cursor.split('.').collect::<Vec<_>>();
if parts.len() != 2 {
return Err(ApiError::bad_parse(ParseError::BadPagingState));
} else {
start_output_id.replace(parts[0].parse().map_err(ApiError::bad_parse)?);
page_size.replace(parts[1].parse().map_err(ApiError::bad_parse)?);
}
}
Ok(HistoryByMilestonePagination {
page_size: page_size.unwrap_or(DEFAULT_PAGE_SIZE),
start_output_id,
})
}
}
31 changes: 26 additions & 5 deletions bin/inx-chronicle/src/api/stardust/history/responses.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use chronicle::types::{stardust::milestone::MilestoneTimestamp, tangle::MilestoneIndex};
use chronicle::types::{
stardust::{block::Address, milestone::MilestoneTimestamp},
tangle::MilestoneIndex,
};
use serde::{Deserialize, Serialize};

use crate::api::responses::impl_success_response;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TransactionHistoryResponse {
pub struct TransactionsPerAddressResponse {
pub address: String,
pub items: Vec<Transfer>,
pub items: Vec<TransferByAddress>,
pub cursor: Option<String>,
}

impl_success_response!(TransactionHistoryResponse);
impl_success_response!(TransactionsPerAddressResponse);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Transfer {
pub struct TransferByAddress {
#[serde(rename = "outputId")]
pub output_id: String,
#[serde(rename = "isSpent")]
Expand All @@ -26,3 +29,21 @@ pub struct Transfer {
#[serde(rename = "milestoneTimestamp")]
pub milestone_timestamp: MilestoneTimestamp,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TransactionsPerMilestoneResponse {
pub index: MilestoneIndex,
pub items: Vec<TransferByMilestone>,
pub cursor: Option<String>,
}

impl_success_response!(TransactionsPerMilestoneResponse);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TransferByMilestone {
pub address: Address,
#[serde(rename = "outputId")]
pub output_id: String,
#[serde(rename = "isSpent")]
pub is_spent: bool,
}
82 changes: 64 additions & 18 deletions bin/inx-chronicle/src/api/stardust/history/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,40 @@ use std::str::FromStr;
use axum::{extract::Path, routing::get, Extension, Router};
use chronicle::{
db::{collections::SortOrder, MongoDb},
types::stardust::block::Address,
types::{stardust::block::Address, tangle::MilestoneIndex},
};
use futures::{StreamExt, TryStreamExt};

use super::{
extractors::HistoryPagination,
responses::{TransactionHistoryResponse, Transfer},
extractors::{HistoryByAddressPagination, HistoryByMilestonePagination},
responses::{
TransactionsPerAddressResponse, TransactionsPerMilestoneResponse, TransferByAddress, TransferByMilestone,
},
};
use crate::api::{routes::sync, ApiError, ApiResult};

pub fn routes() -> Router {
Router::new().route("/gaps", get(sync)).nest(
"/ledger",
Router::new().route("/updates/by-address/:address", get(transaction_history)),
"/ledger/updates",
Router::new()
.route("/by-address/:address", get(transactions_by_address_history))
.route("/by-milestone/:milestone_id", get(transactions_by_milestone_history)),
)
}

async fn transaction_history(
async fn transactions_by_address_history(
database: Extension<MongoDb>,
Path(address): Path<String>,
HistoryPagination {
HistoryByAddressPagination {
page_size,
start_milestone_index,
start_output_id,
}: HistoryPagination,
) -> ApiResult<TransactionHistoryResponse> {
}: HistoryByAddressPagination,
) -> ApiResult<TransactionsPerAddressResponse> {
let address_dto = Address::from_str(&address).map_err(ApiError::bad_parse)?;

let mut records_iter = database
.get_ledger_updates(
let mut record_stream = database
.stream_ledger_updates_for_address(
&address_dto,
// Get one extra record so that we can create the cursor.
page_size + 1,
Expand All @@ -47,17 +51,18 @@ async fn transaction_history(
.await?;

// Take all of the requested records first
let records = records_iter.by_ref().take(page_size).try_collect::<Vec<_>>().await?;
let records = record_stream.by_ref().take(page_size).try_collect::<Vec<_>>().await?;

// If any record is left, use it to make the cursor
let cursor = records_iter
let cursor = record_stream
.try_next()
.await?
.map(|doc| format!("{}.{}.{}", doc.at.milestone_index, doc.output_id.to_hex(), page_size));
.map(|rec| format!("{}.{}.{}", rec.at.milestone_index, rec.output_id.to_hex(), page_size));

let transactions = records
let transfers = records
.into_iter()
.map(|rec| {
Ok(Transfer {
Ok(TransferByAddress {
output_id: rec.output_id.to_hex(),
is_spent: rec.is_spent,
milestone_index: rec.at.milestone_index,
Expand All @@ -66,9 +71,50 @@ async fn transaction_history(
})
.collect::<Result<_, ApiError>>()?;

Ok(TransactionHistoryResponse {
items: transactions,
Ok(TransactionsPerAddressResponse {
address,
items: transfers,
cursor,
})
}

async fn transactions_by_milestone_history(
database: Extension<MongoDb>,
Path(milestone_index): Path<String>,
HistoryByMilestonePagination {
page_size,
start_output_id,
}: HistoryByMilestonePagination,
) -> ApiResult<TransactionsPerMilestoneResponse> {
let milestone_index = MilestoneIndex::from_str(&milestone_index).map_err(ApiError::bad_parse)?;

let mut record_stream = database
.stream_ledger_updates_at_index_paginated(milestone_index, page_size + 1, start_output_id, SortOrder::Newest)
.await?;

// Take all of the requested records first
let records = record_stream.by_ref().take(page_size).try_collect::<Vec<_>>().await?;

// If any record is left, use it to make the paging state
let cursor = record_stream
.try_next()
.await?
.map(|rec| format!("{}.{}", rec.output_id.to_hex(), page_size));

let transfers = records
.into_iter()
.map(|rec| {
Ok(TransferByMilestone {
address: rec.address,
output_id: rec.output_id.to_hex(),
is_spent: rec.is_spent,
})
})
.collect::<Result<_, ApiError>>()?;

Ok(TransactionsPerMilestoneResponse {
index: milestone_index,
items: transfers,
cursor,
})
}
Loading

0 comments on commit dbef5f1

Please sign in to comment.