Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add event entries count in validation API #12506

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build/openrpc/full.json
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,7 @@
"Height": 42,
"IndexedMessagesCount": 42,
"IndexedEventsCount": 42,
"IndexedEventEntriesCount": 42,
"Backfilled": true,
"IsNullRound": true
}
Expand All @@ -2079,6 +2080,10 @@
"title": "number",
"type": "number"
},
"IndexedEventEntriesCount": {
"title": "number",
"type": "number"
},
"IndexedEventsCount": {
"title": "number",
"type": "number"
Expand Down
57 changes: 37 additions & 20 deletions chain/index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
}

return &types.IndexValidation{
TipSetKey: expectedTs.Key(),
Height: uint64(expectedTs.Height()),
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
TipSetKey: expectedTs.Key(),
Height: uint64(expectedTs.Height()),
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
IndexedEventEntriesCount: uint64(indexedData.nonRevertedEventEntriesCount),
}, nil
}

Expand Down Expand Up @@ -159,8 +160,9 @@ func (si *SqliteIndexer) getTipsetCountsAtHeight(ctx context.Context, height abi
}

type indexedTipSetData struct {
nonRevertedMessageCount int
nonRevertedEventCount int
nonRevertedMessageCount int
nonRevertedEventCount int
nonRevertedEventEntriesCount int
}

// getIndexedTipSetData fetches the indexed tipset data for a tipset
Expand All @@ -180,6 +182,10 @@ func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, ts *types.Tip
return xerrors.Errorf("failed to query non reverted event count: %w", err)
}

if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventEntriesCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedEventEntriesCount); err != nil {
return xerrors.Errorf("failed to query non reverted event entries count: %w", err)
}

return nil
})

Expand All @@ -200,14 +206,30 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
return xerrors.Errorf("failed to get next tipset for height %d: %w", ts.Height(), err)
}

// if non-reverted events exist which means that tipset `ts` has been executed, there should be 0 reverted events in the DB
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 I have moved this part of top of the function, because if we have reverted events in the tipset then we can just return without loading all the messages.

var hasRevertedEventsInTipset bool
err = si.stmts.hasRevertedEventsInTipsetStmt.QueryRowContext(ctx, tsKeyCid.Bytes()).Scan(&hasRevertedEventsInTipset)
if err != nil {
return xerrors.Errorf("failed to check if there are reverted events in tipset for height %d: %w", ts.Height(), err)
}
if hasRevertedEventsInTipset {
return xerrors.Errorf("index corruption: reverted events found for an executed tipset %s at height %d", tsKeyCid, ts.Height())
}

executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs)
if err != nil {
return xerrors.Errorf("failed to load executed messages for height %d: %w", ts.Height(), err)
}

totalEventsCount := 0
var (
totalEventsCount = 0
totalEventEntriesCount = 0
)
for _, emsg := range executedMsgs {
totalEventsCount += len(emsg.evs)
for _, ev := range emsg.evs {
totalEventEntriesCount += len(ev.Entries)
}
}

if totalEventsCount != indexedData.nonRevertedEventCount {
Expand All @@ -219,14 +241,8 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
return xerrors.Errorf("message count mismatch for height %d: chainstore has %d, index has %d", ts.Height(), totalExecutedMsgCount, indexedData.nonRevertedMessageCount)
}

// if non-reverted events exist which means that tipset `ts` has been executed, there should be 0 reverted events in the DB
var hasRevertedEventsInTipset bool
err = si.stmts.hasRevertedEventsInTipsetStmt.QueryRowContext(ctx, tsKeyCid.Bytes()).Scan(&hasRevertedEventsInTipset)
if err != nil {
return xerrors.Errorf("failed to check if there are reverted events in tipset for height %d: %w", ts.Height(), err)
}
if hasRevertedEventsInTipset {
return xerrors.Errorf("index corruption: reverted events found for an executed tipset %s at height %d", tsKeyCid, ts.Height())
if indexedData.nonRevertedEventEntriesCount != totalEventEntriesCount {
return xerrors.Errorf("event entries count mismatch for height %d: chainstore has %d, index has %d", ts.Height(), totalEventEntriesCount, indexedData.nonRevertedEventEntriesCount)
}

return nil
Expand Down Expand Up @@ -269,11 +285,12 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti
}

return &types.IndexValidation{
TipSetKey: ts.Key(),
Height: uint64(ts.Height()),
Backfilled: true,
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
TipSetKey: ts.Key(),
Height: uint64(ts.Height()),
Backfilled: true,
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
IndexedEventEntriesCount: uint64(indexedData.nonRevertedEventEntriesCount),
}, nil
}

Expand Down
51 changes: 26 additions & 25 deletions chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,30 +59,31 @@ var ddls = []string{
// the preparedStatements struct.
func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
return map[**sql.Stmt]string{
&ps.getNonRevertedMsgInfoStmt: "SELECT tipset_key_cid, height FROM tipset_message WHERE message_cid = ? AND reverted = 0 LIMIT 1",
&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ? LIMIT 1",
&ps.insertEthTxHashStmt: "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP",
&ps.insertTipsetMessageStmt: "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0",
&ps.hasTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
&ps.removeEthHashesOlderThanStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)",
&ps.updateTipsetsToRevertedFromHeightStmt: "UPDATE tipset_message SET reverted = 1 WHERE height >= ?",
&ps.updateEventsToRevertedFromHeightStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE height >= ?)",
&ps.isIndexEmptyStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)",
&ps.getMinNonRevertedHeightStmt: "SELECT MIN(height) FROM tipset_message WHERE reverted = 0",
&ps.hasNonRevertedTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.updateEventsToRevertedStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateEventsToNonRevertedStmt: "UPDATE event SET reverted = 0 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.getMsgIdForMsgCidAndTipsetStmt: "SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0",
&ps.insertEventStmt: "INSERT INTO event (message_id, event_index, emitter_addr, reverted) VALUES (?, ?, ?, ?) ON CONFLICT (message_id, event_index) DO UPDATE SET reverted = 0",
&ps.insertEventEntryStmt: "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)",
&ps.hasNullRoundAtHeightStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)",
&ps.getNonRevertedTipsetAtHeightStmt: "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0 LIMIT 1",
&ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets",
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0 AND message_cid IS NOT NULL",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.hasRevertedEventsInTipsetStmt: "SELECT EXISTS(SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))",
&ps.getNonRevertedMsgInfoStmt: "SELECT tipset_key_cid, height FROM tipset_message WHERE message_cid = ? AND reverted = 0 LIMIT 1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akaladarshi Why is the diff so big here ? Makes it hard to review the change. Please can you fix this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only change we should see is the new getNonRevertedTipsetEventEntriesCountStmt query.

Copy link
Contributor Author

@akaladarshi akaladarshi Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 I think that's because getNonRevertedTipsetEventEntriesCountStmt variable name is bigger than all other variables, so go fmt shifted everything.

Copy link
Contributor

@aarshkshah1992 aarshkshah1992 Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akaladarshi

Please can you put the getNonRevertedTipsetEventEntriesCountStmt statement at the bottom rather than inserting it before hasRevertedEventsInTipsetStmt and re-fmt ? I think that will reduce the diff.

Copy link
Contributor Author

@akaladarshi akaladarshi Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 I don't think that will work as well, formatting happens based on the largest variable size. Otherwise it will look odd.

Anyways, I have move towards the end but result is the same.

&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ? LIMIT 1",
&ps.insertEthTxHashStmt: "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP",
&ps.insertTipsetMessageStmt: "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0",
&ps.hasTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
&ps.removeEthHashesOlderThanStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)",
&ps.updateTipsetsToRevertedFromHeightStmt: "UPDATE tipset_message SET reverted = 1 WHERE height >= ?",
&ps.updateEventsToRevertedFromHeightStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE height >= ?)",
&ps.isIndexEmptyStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)",
&ps.getMinNonRevertedHeightStmt: "SELECT MIN(height) FROM tipset_message WHERE reverted = 0",
&ps.hasNonRevertedTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.updateEventsToRevertedStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateEventsToNonRevertedStmt: "UPDATE event SET reverted = 0 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.getMsgIdForMsgCidAndTipsetStmt: "SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0",
&ps.insertEventStmt: "INSERT INTO event (message_id, event_index, emitter_addr, reverted) VALUES (?, ?, ?, ?) ON CONFLICT (message_id, event_index) DO UPDATE SET reverted = 0",
&ps.insertEventEntryStmt: "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)",
&ps.hasNullRoundAtHeightStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)",
&ps.getNonRevertedTipsetAtHeightStmt: "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0 LIMIT 1",
&ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets",
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0 AND message_cid IS NOT NULL",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.hasRevertedEventsInTipsetStmt: "SELECT EXISTS(SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))",
&ps.getNonRevertedTipsetEventEntriesCountStmt: "SELECT COUNT(ee.event_id) AS entry_count FROM event_entry ee JOIN event e ON ee.event_id = e.event_id JOIN tipset_message tm ON e.message_id = tm.message_id WHERE tm.tipset_key_cid = ? AND tm.reverted = 0",
}
}
7 changes: 4 additions & 3 deletions chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type preparedStatements struct {
getNonRevertedTipsetAtHeightStmt *sql.Stmt
countTipsetsAtHeightStmt *sql.Stmt

getNonRevertedTipsetMessageCountStmt *sql.Stmt
getNonRevertedTipsetEventCountStmt *sql.Stmt
hasRevertedEventsInTipsetStmt *sql.Stmt
getNonRevertedTipsetMessageCountStmt *sql.Stmt
getNonRevertedTipsetEventCountStmt *sql.Stmt
getNonRevertedTipsetEventEntriesCountStmt *sql.Stmt
hasRevertedEventsInTipsetStmt *sql.Stmt
}

type SqliteIndexer struct {
Expand Down
6 changes: 4 additions & 2 deletions chain/types/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ type IndexValidation struct {
TipSetKey TipSetKey
// Height is the epoch height at which the validation is performed.
Height uint64
// IndexedMessagesCount indicates the number of indexed messages for the canonical tipset at this epoch.
// IndexedMessagesCount is the number of indexed messages for the canonical tipset at this epoch.
IndexedMessagesCount uint64
// IndexedEventsCount signifies the number of indexed events for the canonical tipset at this epoch.
// IndexedEventsCount is the number of indexed events for the canonical tipset at this epoch.
IndexedEventsCount uint64
// IndexedEventEntriesCount is the number of indexed event entries for the canonical tipset at this epoch.
IndexedEventEntriesCount uint64
// Backfilled denotes whether missing data was successfully backfilled into the index during validation.
Backfilled bool
// IsNullRound indicates if the epoch corresponds to a null round and therefore does not have any indexed messages or events.
Expand Down
1 change: 1 addition & 0 deletions documentation/en/api-v1-unstable-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,7 @@ Response:
"Height": 42,
"IndexedMessagesCount": 42,
"IndexedEventsCount": 42,
"IndexedEventEntriesCount": 42,
"Backfilled": true,
"IsNullRound": true
}
Expand Down
5 changes: 5 additions & 0 deletions itests/eth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ func TestEthGetLogsBasic(t *testing.T) {

totalMessageCount := 0
totalEventCount := 0
totalEventEntriesCount := 0
messages, err := client.ChainGetMessagesInTipset(ctx, ts.Key())
require.NoError(err)
totalMessageCount = len(messages)
Expand All @@ -555,6 +556,9 @@ func TestEthGetLogsBasic(t *testing.T) {
events, err := client.ChainGetEvents(ctx, *receipt.Receipt.EventsRoot)
require.NoError(err)
totalEventCount += len(events)
for _, event := range events {
totalEventEntriesCount += len(event.Entries)
}
}
}
t.Logf("tipset %d: %d messages, %d events", height, totalMessageCount, totalEventCount)
Expand All @@ -566,6 +570,7 @@ func TestEthGetLogsBasic(t *testing.T) {
require.EqualValues(height, iv.Height)
require.EqualValues(totalMessageCount, iv.IndexedMessagesCount)
require.EqualValues(totalEventCount, iv.IndexedEventsCount)
require.EqualValues(totalEventEntriesCount, iv.IndexedEventEntriesCount)
require.False(iv.Backfilled)
}
}
Expand Down
Loading