Skip to content

Commit

Permalink
feat(db): use insertMany for initial unspent outputs (#566)
Browse files Browse the repository at this point in the history
* Use insert many for initial unspent outputs.

* Separate output and ledger update inserts. Upsert outputs until we are certain we can batch insert them.

* fmt

* Use unordered inserts

* Remove transactions and tune read unspent output performance

* Always insert many

* Separate OutputWithMetadata. Use trystreamext for read unspent outputs. Move ledger update stream. Cleanup.

* docs

* Minor cleanup and suggestions for #566 (#587)

* Increase number of retries to work with large snapshots

* Remove `replSet`s from Docker

* Refactor

* Use tasks again to parallelize. Refactor helper fns to avoid inconsistencies.

Co-authored-by: Alexandcoats <alexandcoats@gmail.com>

Co-authored-by: Jochen Görtler <grtlr@users.noreply.github.com>
  • Loading branch information
Alexandcoats and grtlr authored Aug 24, 2022
1 parent 4c80191 commit 146d5b8
Show file tree
Hide file tree
Showing 14 changed files with 505 additions and 377 deletions.
2 changes: 1 addition & 1 deletion bin/inx-chronicle/config.template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ connect_url = "http://localhost:9029"
connection_retry_interval = "5s"

### Maximum number of tries to establish an INX connection.
connection_retry_count = 10
connection_retry_count = 30

### Milestone at which synchronization should begin. A value of `1` means syncing back until genesis.
sync_start_milestone = 1
Expand Down
330 changes: 138 additions & 192 deletions bin/inx-chronicle/src/stardust_inx/mod.rs

Large diffs are not rendered by default.

118 changes: 118 additions & 0 deletions bin/inx-chronicle/src/stardust_inx/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use chronicle::types::ledger::{LedgerOutput, LedgerSpent};
use futures::Stream;
use pin_project::pin_project;

use super::{InxError, LedgerUpdateRecord};

#[pin_project]
pub struct LedgerUpdateStream<S> {
#[pin]
inner: S,
#[pin]
record: Option<LedgerUpdateRecord>,
}

impl<S> LedgerUpdateStream<S> {
pub fn new(inner: S) -> Self {
Self { inner, record: None }
}
}

impl<S: Stream<Item = Result<bee_inx::LedgerUpdate, bee_inx::Error>>> Stream for LedgerUpdateStream<S> {
type Item = Result<LedgerUpdateRecord, InxError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;

use bee_inx::LedgerUpdate;

let mut this = self.project();
Poll::Ready(loop {
if let Poll::Ready(next) = this.inner.as_mut().poll_next(cx) {
if let Some(res) = next {
match res {
Ok(ledger_update) => match ledger_update {
LedgerUpdate::Begin(marker) => {
// We shouldn't already have a record. If we do, that's bad.
if let Some(record) = this.record.as_mut().take() {
break Some(Err(InxError::InvalidLedgerUpdateCount {
received: record.consumed.len() + record.created.len(),
expected: record.consumed.capacity() + record.created.capacity(),
}));
} else {
this.record.set(Some(LedgerUpdateRecord {
milestone_index: marker.milestone_index.into(),
created: Vec::with_capacity(marker.created_count),
consumed: Vec::with_capacity(marker.consumed_count),
}));
}
}
LedgerUpdate::Consumed(consumed) => {
if let Some(mut record) = this.record.as_mut().as_pin_mut() {
match LedgerSpent::try_from(consumed) {
Ok(consumed) => {
record.consumed.push(consumed);
}
Err(e) => {
break Some(Err(e.into()));
}
}
} else {
break Some(Err(InxError::InvalidMilestoneState));
}
}
LedgerUpdate::Created(created) => {
if let Some(mut record) = this.record.as_mut().as_pin_mut() {
match LedgerOutput::try_from(created) {
Ok(created) => {
record.created.push(created);
}
Err(e) => {
break Some(Err(e.into()));
}
}
} else {
break Some(Err(InxError::InvalidMilestoneState));
}
}
LedgerUpdate::End(marker) => {
if let Some(record) = this.record.as_mut().take() {
if record.created.len() != marker.created_count
|| record.consumed.len() != marker.consumed_count
{
break Some(Err(InxError::InvalidLedgerUpdateCount {
received: record.consumed.len() + record.created.len(),
expected: marker.consumed_count + marker.created_count,
}));
}
break Some(Ok(record));
} else {
break Some(Err(InxError::InvalidMilestoneState));
}
}
},
Err(e) => {
break Some(Err(e.into()));
}
}
} else {
// If we were supposed to be in the middle of a milestone, something went wrong.
if let Some(record) = this.record.as_mut().take() {
break Some(Err(InxError::InvalidLedgerUpdateCount {
received: record.consumed.len() + record.created.len(),
expected: record.consumed.capacity() + record.created.capacity(),
}));
} else {
break None;
}
}
}
})
}
}
61 changes: 26 additions & 35 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,44 +1,19 @@
version: '3'
services:

mongo1:
mongo:
image: mongo:latest
container_name: mongo1
container_name: mongo
# Warning: We don't keep logs to make development simpler
command: ["--replSet", "my-replica-set", "--bind_ip_all", "--port", "27017", "--quiet", "--logpath", "/dev/null"]
command: ["--quiet", "--logpath", "/dev/null"]
volumes:
- ./data/chronicle/mongo1:/data/db
- ./data/chronicle/mongodb:/data/db
ports:
- 27017:27017
healthcheck:
test: test $$(echo "rs.initiate({_id:'my-replica-set',members:[{_id:0,host:\"mongo1:27017\"},{_id:1,host:\"mongo2:27019\"},{_id:2,host:\"mongo3:27020\"}]}).ok || rs.status().ok" | mongo --port 27017 --quiet) -eq 1
interval: 10s
start_period: 5s

mongo2:
image: mongo:latest
container_name: mongo2
# Warning: We don't keep logs to make development simpler
command: ["--replSet", "my-replica-set", "--bind_ip_all", "--port", "27019", "--quiet", "--logpath", "/dev/null"]
volumes:
- ./data/chronicle/mongo2:/data/db
ports:
- 27019:27019

mongo3:
image: mongo:latest
container_name: mongo3
# Warning: We don't keep logs to make development simpler
command: ["--replSet", "my-replica-set", "--bind_ip_all", "--port", "27020", "--quiet", "--logpath", "/dev/null"]
volumes:
- ./data/chronicle/mongo3:/data/db
ports:
- 27020:27020


inx-chronicle:
container_name: inx-chronicle
depends_on: [mongo1, mongo2, mongo3, hornet]
depends_on: [mongo, hornet]
build:
context: ..
dockerfile: docker/Dockerfile.debug
Expand All @@ -53,7 +28,7 @@ services:
- "--inx"
- "http://hornet:9029"
- "--db"
- "mongodb://mongo1:27017"
- "mongodb://mongo:27017"
- "--config"
- "config.toml"
volumes:
Expand Down Expand Up @@ -86,6 +61,24 @@ services:
- "--inx.enabled=true"
- "--inx.bindAddress=hornet:9029"

################################################################################
# The following services can be enabled by setting the `debug` profile.

mongo-express:
image: mongo-express
depends_on:
- mongo
profiles:
- debug
restart: unless-stopped
ports:
- 8084:8084
environment:
- ME_CONFIG_MONGODB_SERVER=mongo
- ME_CONFIG_MONGODB_PORT=27017
- ME_CONFIG_OPTIONS_READONLY=true
- VCAP_APP_PORT=8084

################################################################################
# The following services can be enabled by setting the `metrics` profile.

Expand All @@ -111,14 +104,14 @@ services:
profiles:
- metrics
depends_on:
- mongo1
- mongo
container_name: mongodb-exporter
restart: unless-stopped
user: "65532"
ports:
- '9216:9261'
command:
- '--mongodb.uri=mongodb://mongo1:27017'
- '--mongodb.uri=mongodb://mongo:27017'
- '--mongodb.direct-connect=true'
- '--web.listen-address=:9216'
- '--log.level=info'
Expand All @@ -139,5 +132,3 @@ services:
- ./assets/grafana/:/etc/grafana/provisioning/
environment:
- GF_INSTALL_PLUGINS=yesoreyeram-infinity-datasource


4 changes: 1 addition & 3 deletions docker/prepare_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ fi
# Prepare Hornet's data directory.
mkdir -p ${DIR}/data/hornet
mkdir -p ${DIR}/data/chronicle
mkdir -p ${DIR}/data/chronicle/mongo1
mkdir -p ${DIR}/data/chronicle/mongo2
mkdir -p ${DIR}/data/chronicle/mongo3
mkdir -p ${DIR}/data/chronicle/mongo
mkdir -p ${DIR}/data/grafana
mkdir -p ${DIR}/data/prometheus
if [[ "$OSTYPE" != "darwin"* ]]; then
Expand Down
45 changes: 16 additions & 29 deletions src/db/collections/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use futures::{Stream, StreamExt, TryStreamExt};
use mongodb::{
bson::{self, doc},
error::Error,
options::IndexOptions,
ClientSession, IndexModel,
options::{IndexOptions, InsertManyOptions},
IndexModel,
};
use serde::{Deserialize, Serialize};
use tracing::instrument;

use super::PayloadKind;
use super::{PayloadKind, INSERT_BATCH_SIZE};
use crate::{
db::MongoDb,
types::{
Expand Down Expand Up @@ -210,41 +210,28 @@ impl MongoDb {
#[instrument(skip_all, err, level = "trace")]
pub async fn insert_blocks_with_metadata(
&self,
session: &mut ClientSession,
blocks_with_metadata: impl IntoIterator<Item = (Block, Vec<u8>, BlockMetadata)>,
) -> Result<(), Error> {
let blocks_with_metadata = blocks_with_metadata
.into_iter()
.map(|(block, raw, metadata)| BlockDocument { block, raw, metadata })
.collect::<Vec<_>>();
if !blocks_with_metadata.is_empty() {
self.insert_treasury_payloads(
session,
blocks_with_metadata.iter().filter_map(|block_document| {
if block_document.metadata.inclusion_state == LedgerInclusionState::Included {
if let Some(Payload::TreasuryTransaction(payload)) = &block_document.block.payload {
return Some((block_document.metadata.referenced_by_milestone_index, payload.as_ref()));
}
}
None
}),
)
.await?;
let blocks_with_metadata = blocks_with_metadata
.into_iter()
.map(|block_document| {
let block_id = block_document.metadata.block_id;
let mut doc = bson::to_document(&block_document)?;
doc.insert("_id", block_id.to_hex());
Result::<_, Error>::Ok(doc)
})
.collect::<Result<Vec<_>, _>>()?;
self.insert_treasury_payloads(blocks_with_metadata.iter().filter_map(|block_document| {
if block_document.metadata.inclusion_state == LedgerInclusionState::Included {
if let Some(Payload::TreasuryTransaction(payload)) = &block_document.block.payload {
return Some((block_document.metadata.referenced_by_milestone_index, payload.as_ref()));
}
}
None
}))
.await?;

self.db
.collection::<bson::Document>(BlockDocument::COLLECTION)
.insert_many_with_session(blocks_with_metadata, None, session)
for batch in blocks_with_metadata.chunks(INSERT_BATCH_SIZE) {
self.collection::<BlockDocument>(BlockDocument::COLLECTION)
.insert_many_ignore_duplicates(batch, InsertManyOptions::builder().ordered(false).build())
.await?;
}

Ok(())
}

Expand Down
Loading

0 comments on commit 146d5b8

Please sign in to comment.