Skip to content

Commit

Permalink
feat: Add logging & metrics for mempool (#3447)
Browse files Browse the repository at this point in the history
## What ❔

<!-- What are the changes this PR brings about? -->
<!-- Example: This PR adds a PR template to the repo. -->
<!-- (For bigger PRs adding more context is appreciated) -->

## Why ❔

<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zkstack dev fmt` and `zkstack dev
lint`.
  • Loading branch information
Artemka374 authored Jan 9, 2025
1 parent 7adb612 commit 64d861d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 2 deletions.
17 changes: 15 additions & 2 deletions core/lib/dal/src/transactions_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ impl TransactionsDal<'_, '_> {
limit: usize,
) -> DalResult<Vec<(Transaction, TransactionTimeRangeConstraint)>> {
let stashed_addresses: Vec<_> = stashed_accounts.iter().map(Address::as_bytes).collect();
sqlx::query!(
let result = sqlx::query!(
r#"
UPDATE transactions
SET
Expand All @@ -1794,8 +1794,15 @@ impl TransactionsDal<'_, '_> {
.execute(self.storage)
.await?;

tracing::debug!(
"Updated {} transactions for stashed accounts, stashed accounts amount: {}, stashed_accounts: {:?}",
result.rows_affected(),
stashed_addresses.len(),
stashed_accounts.iter().map(|a|format!("{:x}", a)).collect::<Vec<_>>()
);

let purged_addresses: Vec<_> = purged_accounts.iter().map(Address::as_bytes).collect();
sqlx::query!(
let result = sqlx::query!(
r#"
DELETE FROM transactions
WHERE
Expand All @@ -1809,6 +1816,12 @@ impl TransactionsDal<'_, '_> {
.execute(self.storage)
.await?;

tracing::debug!(
"Updated {} transactions for purged accounts, purged accounts amount: {}",
result.rows_affected(),
purged_addresses.len()
);

// Note, that transactions are updated in order of their hashes to avoid deadlocks with other UPDATE queries.
let transactions = sqlx::query_as!(
StorageTransaction,
Expand Down
9 changes: 9 additions & 0 deletions core/lib/mempool/src/mempool_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ impl MempoolStore {
.rfind(|el| el.matches_filter(filter))?
.clone();

let initial_length = self.stashed_accounts.len();

// Stash all observed transactions that don't meet criteria
for stashed_pointer in self
.l2_priority_queue
Expand All @@ -187,6 +189,13 @@ impl MempoolStore {

self.stashed_accounts.push(stashed_pointer.account);
}

tracing::debug!(
"Stashed {} accounts by filter: {:?}",
self.stashed_accounts.len() - initial_length,
filter
);

// insert pointer to the next transaction if it exists
let (transaction, constraint, score) = self
.l2_transactions_per_account
Expand Down
8 changes: 8 additions & 0 deletions core/node/state_keeper/src/mempool_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ impl MempoolFetcher {
let latency = KEEPER_METRICS.mempool_sync.start();
let mut storage = self.pool.connection_tagged("state_keeper").await?;
let mempool_info = self.mempool.get_mempool_info();

KEEPER_METRICS
.mempool_stashed_accounts
.set(mempool_info.stashed_accounts.len());
KEEPER_METRICS
.mempool_purged_accounts
.set(mempool_info.purged_accounts.len());

let protocol_version = storage
.blocks_dal()
.pending_protocol_version()
Expand Down
4 changes: 4 additions & 0 deletions core/node/state_keeper/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub struct StateKeeperMetrics {
/// Latency to synchronize the mempool with Postgres.
#[metrics(buckets = Buckets::LATENCIES)]
pub mempool_sync: Histogram<Duration>,
/// Number of stashed accounts in mempool
pub mempool_stashed_accounts: Gauge<usize>,
/// Number of purged accounts in mempool
pub mempool_purged_accounts: Gauge<usize>,
/// Latency of the state keeper waiting for a transaction.
#[metrics(buckets = Buckets::LATENCIES)]
pub waiting_for_tx: Histogram<Duration>,
Expand Down

0 comments on commit 64d861d

Please sign in to comment.