Skip to content

Commit

Permalink
feat: add event entry count in validation API
Browse files Browse the repository at this point in the history
  • Loading branch information
akaladarshi committed Sep 24, 2024
1 parent ec8da8a commit 78a5c0a
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 52 deletions.
57 changes: 37 additions & 20 deletions chain/index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
}

return &types.IndexValidation{
TipSetKey: expectedTs.Key(),
Height: uint64(expectedTs.Height()),
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
TipSetKey: expectedTs.Key(),
Height: uint64(expectedTs.Height()),
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
IndexedEventEntriesCount: uint64(indexedData.nonRevertedEventEntriesCount),
}, nil
}

Expand Down Expand Up @@ -159,8 +160,9 @@ func (si *SqliteIndexer) getTipsetCountsAtHeight(ctx context.Context, height abi
}

type indexedTipSetData struct {
nonRevertedMessageCount int
nonRevertedEventCount int
nonRevertedMessageCount int
nonRevertedEventCount int
nonRevertedEventEntriesCount int
}

// getIndexedTipSetData fetches the indexed tipset data for a tipset
Expand All @@ -180,6 +182,10 @@ func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, ts *types.Tip
return xerrors.Errorf("failed to query non reverted event count: %w", err)
}

if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventEntriesCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedEventEntriesCount); err != nil {
return xerrors.Errorf("failed to query non reverted event entries count: %w", err)
}

return nil
})

Expand All @@ -200,14 +206,30 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
return xerrors.Errorf("failed to get next tipset for height %d: %w", ts.Height(), err)
}

// if non-reverted events exist which means that tipset `ts` has been executed, there should be 0 reverted events in the DB
var hasRevertedEventsInTipset bool
err = si.stmts.hasRevertedEventsInTipsetStmt.QueryRowContext(ctx, tsKeyCid.Bytes()).Scan(&hasRevertedEventsInTipset)
if err != nil {
return xerrors.Errorf("failed to check if there are reverted events in tipset for height %d: %w", ts.Height(), err)
}
if hasRevertedEventsInTipset {
return xerrors.Errorf("index corruption: reverted events found for an executed tipset %s at height %d", tsKeyCid, ts.Height())
}

executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs)
if err != nil {
return xerrors.Errorf("failed to load executed messages for height %d: %w", ts.Height(), err)
}

totalEventsCount := 0
var (
totalEventsCount = 0
totalEventEntriesCount = 0
)
for _, emsg := range executedMsgs {
totalEventsCount += len(emsg.evs)
for _, ev := range emsg.evs {
totalEventEntriesCount += len(ev.Entries)
}
}

if totalEventsCount != indexedData.nonRevertedEventCount {
Expand All @@ -219,14 +241,8 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
return xerrors.Errorf("message count mismatch for height %d: chainstore has %d, index has %d", ts.Height(), totalExecutedMsgCount, indexedData.nonRevertedMessageCount)
}

// if non-reverted events exist which means that tipset `ts` has been executed, there should be 0 reverted events in the DB
var hasRevertedEventsInTipset bool
err = si.stmts.hasRevertedEventsInTipsetStmt.QueryRowContext(ctx, tsKeyCid.Bytes()).Scan(&hasRevertedEventsInTipset)
if err != nil {
return xerrors.Errorf("failed to check if there are reverted events in tipset for height %d: %w", ts.Height(), err)
}
if hasRevertedEventsInTipset {
return xerrors.Errorf("index corruption: reverted events found for an executed tipset %s at height %d", tsKeyCid, ts.Height())
if indexedData.nonRevertedEventEntriesCount != totalEventEntriesCount {
return xerrors.Errorf("event entries count mismatch for height %d: chainstore has %d, index has %d", ts.Height(), totalEventEntriesCount, indexedData.nonRevertedEventEntriesCount)
}

return nil
Expand Down Expand Up @@ -269,11 +285,12 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti
}

return &types.IndexValidation{
TipSetKey: ts.Key(),
Height: uint64(ts.Height()),
Backfilled: true,
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
TipSetKey: ts.Key(),
Height: uint64(ts.Height()),
Backfilled: true,
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
IndexedEventEntriesCount: uint64(indexedData.nonRevertedEventEntriesCount),
}, nil
}

Expand Down
53 changes: 28 additions & 25 deletions chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,37 +52,40 @@ var ddls = []string{
`CREATE INDEX IF NOT EXISTS idx_height ON tipset_message (height)`,

`CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id)`,

`CREATE INDEX IF NOT EXISTS idx_tipset_key_reverted_message_id ON tipset_message (tipset_key_cid, reverted, message_id)`,
}

// preparedStatementMapping returns a map of fields of the preparedStatements struct to the SQL
// query that should be prepared for that field. This is used to prepare all the statements in
// the preparedStatements struct.
func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
return map[**sql.Stmt]string{
&ps.getNonRevertedMsgInfoStmt: "SELECT tipset_key_cid, height FROM tipset_message WHERE message_cid = ? AND reverted = 0 LIMIT 1",
&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ? LIMIT 1",
&ps.insertEthTxHashStmt: "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP",
&ps.insertTipsetMessageStmt: "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0",
&ps.hasTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
&ps.removeEthHashesOlderThanStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)",
&ps.updateTipsetsToRevertedFromHeightStmt: "UPDATE tipset_message SET reverted = 1 WHERE height >= ?",
&ps.updateEventsToRevertedFromHeightStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE height >= ?)",
&ps.isIndexEmptyStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)",
&ps.getMinNonRevertedHeightStmt: "SELECT MIN(height) FROM tipset_message WHERE reverted = 0",
&ps.hasNonRevertedTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.updateEventsToRevertedStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateEventsToNonRevertedStmt: "UPDATE event SET reverted = 0 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.getMsgIdForMsgCidAndTipsetStmt: "SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0",
&ps.insertEventStmt: "INSERT INTO event (message_id, event_index, emitter_addr, reverted) VALUES (?, ?, ?, ?) ON CONFLICT (message_id, event_index) DO UPDATE SET reverted = 0",
&ps.insertEventEntryStmt: "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)",
&ps.hasNullRoundAtHeightStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)",
&ps.getNonRevertedTipsetAtHeightStmt: "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0 LIMIT 1",
&ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets",
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0 AND message_cid IS NOT NULL",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.hasRevertedEventsInTipsetStmt: "SELECT EXISTS(SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))",
&ps.getNonRevertedMsgInfoStmt: "SELECT tipset_key_cid, height FROM tipset_message WHERE message_cid = ? AND reverted = 0 LIMIT 1",
&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ? LIMIT 1",
&ps.insertEthTxHashStmt: "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP",
&ps.insertTipsetMessageStmt: "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0",
&ps.hasTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
&ps.removeEthHashesOlderThanStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)",
&ps.updateTipsetsToRevertedFromHeightStmt: "UPDATE tipset_message SET reverted = 1 WHERE height >= ?",
&ps.updateEventsToRevertedFromHeightStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE height >= ?)",
&ps.isIndexEmptyStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)",
&ps.getMinNonRevertedHeightStmt: "SELECT MIN(height) FROM tipset_message WHERE reverted = 0",
&ps.hasNonRevertedTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.updateEventsToRevertedStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateEventsToNonRevertedStmt: "UPDATE event SET reverted = 0 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.getMsgIdForMsgCidAndTipsetStmt: "SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0",
&ps.insertEventStmt: "INSERT INTO event (message_id, event_index, emitter_addr, reverted) VALUES (?, ?, ?, ?) ON CONFLICT (message_id, event_index) DO UPDATE SET reverted = 0",
&ps.insertEventEntryStmt: "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)",
&ps.hasNullRoundAtHeightStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)",
&ps.getNonRevertedTipsetAtHeightStmt: "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0 LIMIT 1",
&ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets",
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0 AND message_cid IS NOT NULL",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.getNonRevertedTipsetEventEntriesCountStmt: "SELECT COUNT(ee.event_id) AS event_entry_count FROM tipset_message AS t INNER JOIN event AS ev ON t.message_id = ev.message_id INNER JOIN event_entry AS ee ON ev.event_id = ee.event_id WHERE t.tipset_key_cid = ? AND t.reverted = 0",
&ps.hasRevertedEventsInTipsetStmt: "SELECT EXISTS(SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))",
}
}
7 changes: 4 additions & 3 deletions chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type preparedStatements struct {
getNonRevertedTipsetAtHeightStmt *sql.Stmt
countTipsetsAtHeightStmt *sql.Stmt

getNonRevertedTipsetMessageCountStmt *sql.Stmt
getNonRevertedTipsetEventCountStmt *sql.Stmt
hasRevertedEventsInTipsetStmt *sql.Stmt
getNonRevertedTipsetMessageCountStmt *sql.Stmt
getNonRevertedTipsetEventCountStmt *sql.Stmt
getNonRevertedTipsetEventEntriesCountStmt *sql.Stmt
hasRevertedEventsInTipsetStmt *sql.Stmt
}

type SqliteIndexer struct {
Expand Down
10 changes: 6 additions & 4 deletions chain/types/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ type IndexValidation struct {
TipSetKey TipSetKey
Height uint64

IndexedMessagesCount uint64
IndexedEventsCount uint64
Backfilled bool
IsNullRound bool
IndexedMessagesCount uint64
IndexedEventsCount uint64
IndexedEventEntriesCount uint64

Backfilled bool
IsNullRound bool
}

0 comments on commit 78a5c0a

Please sign in to comment.