Skip to content

Commit

Permalink
feat(inx): switch to stream-based updates (#524)
Browse files Browse the repository at this point in the history
* feat(inx): switch to stream-based updates

* Use a stream map to hold ledger update state

* Validate the begin marker better

Co-authored-by: Alexandcoats <alexandcoats@gmail.com>
  • Loading branch information
grtlr and Alexandcoats authored Aug 4, 2022
1 parent 2befce8 commit 8ded3c0
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 32 deletions.
20 changes: 8 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,22 @@ tower-http = { version = "0.3", default-features = false, features = ["cors", "c
zeroize = { version = "1.5", default-features = false, features = ["std"], optional = true }

# INX
bee-inx = { version = "1.0.0-beta.2", default-features = false, optional = true }
tonic = { version = "0.7.2", default-features = false, optional = true }
bee-inx = { git = "https://github.com/grtlr/bee", branch = "feat/inx/bump-inx", version = "1.0.0-beta.3", default-features = false, optional = true }
tonic = { version = "0.7", default-features = false, optional = true }

# Metrics
bee-metrics = { git = "https://github.com/iotaledger/bee", branch = "mainnet-develop-0.4", default-features = false, features = ["sync"], optional = true }

# Stardust types
bee-api-types-stardust = { package = "bee-api-types", version = "1.0.0-beta.4", default-features = false, optional = true }
bee-block-stardust = { package = "bee-block", version = "1.0.0-beta.4", default-features = false, features = [ "dto", "std", "serde", ], optional = true }
bee-api-types-stardust = { package = "bee-api-types", git = "https://github.com/grtlr/bee", branch = "feat/inx/bump-inx", version = "1.0.0-beta.4", default-features = false, optional = true }
bee-block-stardust = { package = "bee-block", git = "https://github.com/grtlr/bee", branch = "feat/inx/bump-inx", version = "1.0.0-beta.5", default-features = false, features = [ "dto", "std", "serde", ], optional = true }

# Tokio Console
console-subscriber = { version = "0.1", default-features = false, optional = true }


[dev-dependencies]
bee-block-stardust = { package = "bee-block", version = "1.0.0-beta.4", default-features = false, features = [ "dto", "rand", "std", "serde", ] }
bee-block-stardust = { package = "bee-block", git = "https://github.com/grtlr/bee", branch = "feat/inx/bump-inx", version = "1.0.0-beta.5", default-features = false, features = [ "dto", "rand", "std", "serde", ] }
packable = { version = "0.5", default-features = false }

[features]
Expand Down
4 changes: 4 additions & 0 deletions bin/inx-chronicle/src/stardust_inx/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub enum InxError {
ConnectionError,
#[error("expected INX address with format `http://<address>:<port>`, but found `{0}`")]
InvalidAddress(String),
#[error("wrong number of ledger updates: `{received}` but expected `{expected}`")]
InvalidLedgerUpdateCount { received: usize, expected: usize },
#[error("invalid milestone state")]
InvalidMilestoneState,
#[error("missing milestone id for milestone index `{0}`")]
MissingMilestoneInfo(MilestoneIndex),
#[error("MongoDB error: {0}")]
Expand Down
140 changes: 125 additions & 15 deletions bin/inx-chronicle/src/stardust_inx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ use bee_inx::client::Inx;
use chronicle::{
db::MongoDb,
runtime::{Actor, ActorContext, HandleEvent},
types::{ledger::MilestoneIndexTimestamp, tangle::ProtocolParameters},
types::{
ledger::{MilestoneIndexTimestamp, OutputWithMetadata},
tangle::{MilestoneIndex, ProtocolParameters},
},
};
pub use config::InxConfig;
pub use error::InxError;
use futures::{StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use pin_project::pin_project;
use tokio::time::Instant;

pub struct InxWorker {
Expand Down Expand Up @@ -147,7 +151,8 @@ impl Actor for InxWorker {

session.commit_transaction().await?;

let ledger_update_stream = inx.listen_to_ledger_updates((start_index.0..).into()).await?;
let ledger_update_stream =
LedgerUpdateStream::new(inx.listen_to_ledger_updates((start_index.0..).into()).await?);

cx.add_stream(ledger_update_stream);

Expand All @@ -159,12 +164,123 @@ impl Actor for InxWorker {
}
}

#[derive(Debug)]
pub struct LedgerUpdateRecord {
milestone_index: MilestoneIndex,
outputs: Vec<OutputWithMetadata>,
}

#[pin_project]
pub struct LedgerUpdateStream<S> {
#[pin]
inner: S,
#[pin]
record: Option<LedgerUpdateRecord>,
}

impl<S> LedgerUpdateStream<S> {
fn new(inner: S) -> Self {
Self { inner, record: None }
}
}

impl<S: Stream<Item = Result<bee_inx::LedgerUpdate, bee_inx::Error>>> Stream for LedgerUpdateStream<S> {
type Item = Result<LedgerUpdateRecord, InxError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;

use bee_inx::LedgerUpdate;

let this = self.project();
if let Poll::Ready(next) = this.inner.poll_next(cx) {
if let Some(res) = next {
match res {
Ok(ledger_update) => match ledger_update {
LedgerUpdate::Begin(marker) => {
// We shouldn't already have a record. If we do, that's bad.
let record = this.record.get_mut();
if let Some(record) = record.take() {
return Poll::Ready(Some(Err(InxError::InvalidLedgerUpdateCount {
received: record.outputs.len(),
expected: record.outputs.capacity(),
})));
} else {
*record = Some(LedgerUpdateRecord {
milestone_index: marker.milestone_index.into(),
outputs: Vec::with_capacity(marker.created_count + marker.consumed_count),
});
}
}
LedgerUpdate::Consumed(consumed) => {
if let Some(record) = this.record.get_mut() {
match OutputWithMetadata::try_from(consumed) {
Ok(consumed) => {
record.outputs.push(consumed);
}
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
}
} else {
return Poll::Ready(Some(Err(InxError::InvalidMilestoneState)));
}
}
LedgerUpdate::Created(created) => {
if let Some(record) = this.record.get_mut() {
match OutputWithMetadata::try_from(created) {
Ok(created) => {
record.outputs.push(created);
}
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
}
} else {
return Poll::Ready(Some(Err(InxError::InvalidMilestoneState)));
}
}
LedgerUpdate::End(marker) => {
if let Some(record) = this.record.get_mut().take() {
if record.outputs.len() != marker.consumed_count + marker.created_count {
return Poll::Ready(Some(Err(InxError::InvalidLedgerUpdateCount {
received: record.outputs.len(),
expected: marker.consumed_count + marker.created_count,
})));
}
return Poll::Ready(Some(Ok(record)));
} else {
return Poll::Ready(Some(Err(InxError::InvalidMilestoneState)));
}
}
},
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
}
} else {
// If we were supposed to be in the middle of a milestone, something went wrong.
if let Some(record) = this.record.get_mut().take() {
return Poll::Ready(Some(Err(InxError::InvalidLedgerUpdateCount {
received: record.outputs.len(),
expected: record.outputs.capacity(),
})));
}
}
}
Poll::Pending
}
}

#[async_trait]
impl HandleEvent<Result<bee_inx::LedgerUpdate, bee_inx::Error>> for InxWorker {
impl HandleEvent<Result<LedgerUpdateRecord, InxError>> for InxWorker {
async fn handle_event(
&mut self,
_cx: &mut ActorContext<Self>,
ledger_update_result: Result<bee_inx::LedgerUpdate, bee_inx::Error>,
ledger_update_result: Result<LedgerUpdateRecord, InxError>,
inx: &mut Self::State,
) -> Result<(), Self::Error> {
log::trace!("Received ledger update event {:#?}", ledger_update_result);
Expand All @@ -174,27 +290,21 @@ impl HandleEvent<Result<bee_inx::LedgerUpdate, bee_inx::Error>> for InxWorker {

let ledger_update = ledger_update_result?;

let output_updates = Vec::from(ledger_update.created)
.into_iter()
.map(TryInto::try_into)
.chain(Vec::from(ledger_update.consumed).into_iter().map(TryInto::try_into))
.collect::<Result<Vec<_>, _>>()?;

let mut session = self.db.start_transaction(None).await?;

self.db
.insert_ledger_updates(&mut session, output_updates.into_iter())
.insert_ledger_updates(&mut session, ledger_update.outputs.into_iter())
.await?;

let milestone = inx.read_milestone(ledger_update.milestone_index.into()).await?;
let milestone = inx.read_milestone(ledger_update.milestone_index.0.into()).await?;
let parameters: ProtocolParameters = inx
.read_protocol_parameters(ledger_update.milestone_index.into())
.read_protocol_parameters(ledger_update.milestone_index.0.into())
.await?
.inner()?
.into();

self.db
.update_latest_protocol_parameters(&mut session, ledger_update.milestone_index.into(), parameters)
.update_latest_protocol_parameters(&mut session, ledger_update.milestone_index, parameters)
.await?;

log::trace!("Received milestone: `{:?}`", milestone);
Expand Down

0 comments on commit 8ded3c0

Please sign in to comment.