Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track event indexing progress #3075

Merged
merged 10 commits into from
Oct 23, 2024
25 changes: 25 additions & 0 deletions crates/autopilot/src/boundary/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,26 @@
use {
anyhow::{Context, Result},
sqlx::PgPool,
};

pub mod settlement;

pub async fn write_last_block_to_db(db: &PgPool, last_block: u64, index_name: &str) -> Result<()> {
let mut ex = db.acquire().await?;
database::last_processed_blocks::update(
&mut ex,
index_name,
i64::try_from(last_block).context("new value of counter is not i64")?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to store in database u64 type for the last block instead of converting it to i64?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately postgres doesn't support u64 natively. The alternative would be to use a bignumber type or sth like that.

)
.await?;
Ok(())
}

pub async fn read_last_block_from_db(db: &PgPool, index_name: &str) -> Result<u64> {
let mut ex = db.acquire().await?;
database::last_processed_blocks::fetch(&mut ex, index_name)
.await?
.unwrap_or_default()
.try_into()
.context("last block is not u64")
}
11 changes: 9 additions & 2 deletions crates/autopilot/src/boundary/events/settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ impl Indexer {
}
}

/// This name is used to store the latest processed block for indexing
/// settlement events in the `last_processed_blocks` table.
const INDEX_NAME: &str = "settlements";

#[async_trait::async_trait]
impl EventStoring<contracts::gpv2_settlement::Event> for Indexer {
async fn last_event_block(&self) -> Result<u64> {
let mut con = self.db.pool.acquire().await?;
crate::database::events::last_event_block(&mut con).await
super::read_last_block_from_db(&self.db.pool, INDEX_NAME).await
}

async fn persist_last_processed_block(&mut self, latest_block: u64) -> Result<()> {
super::write_last_block_to_db(&self.db.pool, latest_block, INDEX_NAME).await
}

async fn replace_events(
Expand Down
12 changes: 9 additions & 3 deletions crates/autopilot/src/database/ethflow_events/event_storing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@ fn get_refunds(events: Vec<ethcontract::Event<EthFlowEvent>>) -> Result<Vec<Refu

type EthFlowEvent = contracts::cowswap_eth_flow::Event;

/// This name is used to store the latest processed block for indexing
/// settlement events in the `last_processed_blocks` table.
const INDEX_NAME: &str = "ethflow_refunds";

#[async_trait::async_trait]
impl EventStoring<EthFlowEvent> for Postgres {
async fn last_event_block(&self) -> Result<u64> {
let mut ex = self.pool.acquire().await?;
let block = database::ethflow_orders::last_indexed_block(&mut ex).await?;
Ok(block.unwrap_or_default() as u64)
crate::boundary::events::read_last_block_from_db(&self.pool, INDEX_NAME).await
}

async fn persist_last_processed_block(&mut self, last_block: u64) -> Result<()> {
crate::boundary::events::write_last_block_to_db(&self.pool, last_block, INDEX_NAME).await
}

async fn append_events(&mut self, events: Vec<ethcontract::Event<EthFlowEvent>>) -> Result<()> {
Expand Down
13 changes: 0 additions & 13 deletions crates/autopilot/src/database/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use {
},
ethcontract::{Event as EthContractEvent, EventMetadata},
number::conversions::u256_to_big_decimal,
sqlx::PgConnection,
std::convert::TryInto,
};

Expand All @@ -43,18 +42,6 @@ pub fn contract_to_db_events(
.collect::<Result<Vec<_>>>()
}

pub async fn last_event_block(connection: &mut PgConnection) -> Result<u64> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["last_event_block"])
.start_timer();

let block_number = database::events::last_block(connection)
.await
.context("block_number_of_most_recent_event failed")?;
block_number.try_into().context("block number is negative")
}

pub async fn append_events(
transaction: &mut PgTransaction<'_>,
events: Vec<EthContractEvent<ContractEvent>>,
Expand Down
34 changes: 21 additions & 13 deletions crates/autopilot/src/database/onchain_order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,31 @@ where
) -> EventRow;
}

/// This name is used to store the latest processed block for indexing
/// settlement events in the `last_processed_blocks` table.
const INDEX_NAME: &str = "onchain_orders";

#[async_trait::async_trait]
impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>
for OnchainOrderParser<T, W>
{
async fn last_event_block(&self) -> Result<u64> {
let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["read_last_block_onchain_orders"])
.start_timer();
crate::boundary::events::read_last_block_from_db(&self.db.pool, INDEX_NAME).await
}

async fn persist_last_processed_block(&mut self, latest_block: u64) -> Result<()> {
let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["update_last_block_onchain_orders"])
.start_timer();
crate::boundary::events::write_last_block_to_db(&self.db.pool, latest_block, INDEX_NAME)
.await
}

async fn replace_events(
&mut self,
events: Vec<EthContractEvent<ContractEvent>>,
Expand Down Expand Up @@ -183,19 +204,6 @@ impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>

Ok(())
}

async fn last_event_block(&self) -> Result<u64> {
let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["last_event_block"])
.start_timer();

let mut con = self.db.pool.acquire().await?;
let block_number = database::onchain_broadcasted_orders::last_block(&mut con)
.await
.context("block_number_of_most_recent_event failed")?;
block_number.try_into().context("block number is negative")
}
}

impl<T: Send + Sync + Clone, W: Send + Sync> OnchainOrderParser<T, W> {
Expand Down
5 changes: 5 additions & 0 deletions crates/cow-amm/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,9 @@ impl EventStoring<CowAmmEvent> for Storage {
.unwrap_or(self.0.start_of_index);
Ok(last_block)
}

async fn persist_last_processed_block(&mut self, _new_value: u64) -> anyhow::Result<()> {
// storage is only in-memory so we don't need to persist anything here
Ok(())
}
}
98 changes: 0 additions & 98 deletions crates/database/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ pub struct EventIndex {
pub log_index: i64,
}

pub async fn last_block(ex: &mut PgConnection) -> Result<i64, sqlx::Error> {
const QUERY: &str = "\
SELECT GREATEST( (SELECT COALESCE(MAX(block_number), 0) FROM trades), (SELECT \
COALESCE(MAX(block_number), 0) FROM settlements), (SELECT \
COALESCE(MAX(block_number), 0) FROM invalidations), (SELECT \
COALESCE(MAX(block_number), 0) FROM presignature_events));";
sqlx::query_scalar(QUERY).fetch_one(ex).await
}

pub async fn delete(
ex: &mut PgTransaction<'_>,
delete_from_block_number: u64,
Expand Down Expand Up @@ -171,92 +162,3 @@ async fn insert_presignature(
.await?;
Ok(())
}

#[cfg(test)]
mod tests {
use {super::*, sqlx::Connection};

#[tokio::test]
#[ignore]
async fn postgres_events() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();

assert_eq!(last_block(&mut db).await.unwrap(), 0);

let mut event_index = EventIndex {
block_number: 1,
log_index: 0,
};
append(
&mut db,
&[(event_index, Event::Invalidation(Default::default()))],
)
.await
.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 1);

event_index.block_number = 2;
append(&mut db, &[(event_index, Event::Trade(Default::default()))])
.await
.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 2);

event_index.block_number = 3;
append(
&mut db,
&[(event_index, Event::PreSignature(Default::default()))],
)
.await
.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 3);

event_index.block_number = 4;
append(
&mut db,
&[(event_index, Event::Settlement(Default::default()))],
)
.await
.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 4);

delete(&mut db, 5).await.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 4);

delete(&mut db, 3).await.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 2);

delete(&mut db, 0).await.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 0);
}

#[tokio::test]
#[ignore]
async fn postgres_repeated_event_insert_ignored() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();
async fn append(con: &mut PgTransaction<'_>, log_index: i64, event: Event) {
super::append(
con,
&[(
EventIndex {
block_number: 2,
log_index,
},
event,
)],
)
.await
.unwrap()
}
for _ in 0..2 {
append(&mut db, 0, Event::Trade(Default::default())).await;
append(&mut db, 1, Event::Invalidation(Default::default())).await;
append(&mut db, 2, Event::Settlement(Default::default())).await;
append(&mut db, 3, Event::PreSignature(Default::default())).await;
}
assert_eq!(last_block(&mut db).await.unwrap(), 2);
}
}
52 changes: 52 additions & 0 deletions crates/database/src/last_processed_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use sqlx::{Executor, PgConnection};

pub async fn update(
ex: &mut PgConnection,
index: &str,
last_processed_block: i64,
) -> Result<(), sqlx::Error> {
const QUERY: &str = r#"
INSERT INTO last_processed_blocks (index, block_number)
VALUES ($1, $2)
ON CONFLICT (index)
DO UPDATE SET block_number = EXCLUDED.block_number;
"#;

ex.execute(sqlx::query(QUERY).bind(index).bind(last_processed_block))
.await?;
Ok(())
}

pub async fn fetch(ex: &mut PgConnection, index: &str) -> Result<Option<i64>, sqlx::Error> {
const QUERY: &str = r#"
SELECT block_number
FROM last_processed_blocks
WHERE index = $1;
"#;

sqlx::query_scalar(QUERY)
.bind(index)
.fetch_optional(ex)
.await
}

#[cfg(test)]
mod tests {
use {super::*, sqlx::Connection};

#[tokio::test]
#[ignore]
async fn postgres_last_processed_block_roundtrip() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();

assert_eq!(fetch(&mut db, "test").await.unwrap(), None);

update(&mut db, "test", 42).await.unwrap();
assert_eq!(fetch(&mut db, "test").await.unwrap(), Some(42));

update(&mut db, "test", 43).await.unwrap();
assert_eq!(fetch(&mut db, "test").await.unwrap(), Some(43));
}
}
2 changes: 2 additions & 0 deletions crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod ethflow_orders;
pub mod events;
pub mod fee_policies;
pub mod jit_orders;
pub mod last_processed_blocks;
MartinquaXD marked this conversation as resolved.
Show resolved Hide resolved
pub mod onchain_broadcasted_orders;
pub mod onchain_invalidations;
pub mod order_events;
Expand Down Expand Up @@ -52,6 +53,7 @@ pub const TABLES: &[&str] = &[
"orders",
"trades",
"invalidations",
"last_processed_blocks",
"quotes",
"settlements",
"presignature_events",
Expand Down
16 changes: 15 additions & 1 deletion crates/shared/src/event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ pub trait EventStoring<T>: Send + Sync {
/// * `events` the contract events to be appended by the implementer
async fn append_events(&mut self, events: Vec<EthcontractEvent<T>>) -> Result<()>;

/// Fetches the last processed block to know where to resume indexing after
/// a restart.
async fn last_event_block(&self) -> Result<u64>;

/// Stores the last processed block to know where to resume indexing after a
/// restart.
async fn persist_last_processed_block(&mut self, last_block: u64) -> Result<()>;
}

pub trait EventRetrieving {
Expand Down Expand Up @@ -282,9 +288,12 @@ where
if let Some(range) = event_range.history_range {
self.update_events_from_old_blocks(range).await?;
}
if !event_range.latest_blocks.is_empty() {
if let Some(last_block) = event_range.latest_blocks.last() {
self.update_events_from_latest_blocks(&event_range.latest_blocks, event_range.is_reorg)
.await?;
self.store_mut()
.persist_last_processed_block(last_block.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd move this call inside update_last_handled_blocks as this function is where we update the local cache so aligning local cache and db makes sense.

.await?;
}
Ok(())
}
Expand Down Expand Up @@ -652,6 +661,11 @@ mod tests {
.map(|event| event.meta.clone().unwrap().block_number)
.unwrap_or_default())
}

async fn persist_last_processed_block(&mut self, _last_block: u64) -> Result<()> {
// Nothing to do here since `last_event_block` looks up last stored event.
Ok(())
}
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ where
async fn last_event_block(&self) -> Result<u64> {
Ok(self.last_event_block())
}

async fn persist_last_processed_block(&mut self, _block: u64) -> Result<()> {
// storage is only in-memory so we don't need to persist anything here
Ok(())
}
}

#[cfg(test)]
Expand Down
Loading
Loading