Skip to content

Commit

Permalink
feat: chain indexer todos [skip changelog] (#12462)
Browse files Browse the repository at this point in the history
* feat: finish todos of validation api

* feat: add indexed data verification with chain store

* feat: address comments and finish TODO

* fix: build issue

* address comments

* fix: ci issue
  • Loading branch information
akaladarshi authored Sep 17, 2024
1 parent 0f4c627 commit 1b5fed3
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 46 deletions.
4 changes: 0 additions & 4 deletions build/openrpc/full.json
Original file line number Diff line number Diff line change
Expand Up @@ -2059,7 +2059,6 @@
"Height": 42,
"TotalMessages": 42,
"TotalEvents": 42,
"EventsReverted": true,
"Backfilled": true
}
],
Expand All @@ -2068,9 +2067,6 @@
"Backfilled": {
"type": "boolean"
},
"EventsReverted": {
"type": "boolean"
},
"Height": {
"title": "number",
"type": "number"
Expand Down
116 changes: 109 additions & 7 deletions chain/index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
if !si.started {
return nil, xerrors.Errorf("ChainValidateIndex can only be called after the indexer has been started")
}

if si.isClosed() {
return nil, xerrors.Errorf("ChainValidateIndex can only be called before the indexer has been closed")
}

si.writerLk.Lock()
defer si.writerLk.Unlock()

Expand Down Expand Up @@ -126,17 +131,107 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
return nil, xerrors.Errorf("failed to cast tipset key cid: %w", err)
}

if indexedTsKeyCid != expectedTsKeyCid {
if !indexedTsKeyCid.Equals(expectedTsKeyCid) {
return nil, xerrors.Errorf("index corruption: non-reverted tipset at height %d has key %s, but canonical chain has %s", epoch, indexedTsKeyCid, expectedTsKeyCid)
}

// indexedTsKeyCid and expectedTsKeyCid are the same, so we can use `expectedTs` to fetch the indexed data
indexedData, err := si.getIndexedTipSetData(ctx, expectedTs)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", expectedTs.Height(), err)
}

if indexedData == nil {
return nil, xerrors.Errorf("invalid indexed data for tipset at height %d", expectedTs.Height())
}

if err = si.verifyIndexedData(ctx, expectedTs, indexedData); err != nil {
return nil, xerrors.Errorf("failed to verify indexed data at height %d: %w", expectedTs.Height(), err)
}

return &types.IndexValidation{
TipsetKey: expectedTs.Key().String(),
Height: uint64(expectedTs.Height()),
// TODO Other fields
TipsetKey: expectedTs.Key().String(),
Height: uint64(expectedTs.Height()),
NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount),
NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount),
}, nil
}

type indexedTipSetData struct {
nonRevertedMessageCount int
nonRevertedEventCount int
}

// getIndexedTipSetData fetches the indexed tipset data for a tipset
func (si *SqliteIndexer) getIndexedTipSetData(ctx context.Context, ts *types.TipSet) (*indexedTipSetData, error) {
tsKeyCidBytes, err := toTipsetKeyCidBytes(ts)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset key cid: %w", err)
}

var data indexedTipSetData
err = withTx(ctx, si.db, func(tx *sql.Tx) error {
if err = tx.Stmt(si.stmts.getNonRevertedTipsetMessageCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedMessageCount); err != nil {
return xerrors.Errorf("failed to query non reverted message count: %w", err)
}

if err = tx.Stmt(si.stmts.getNonRevertedTipsetEventCountStmt).QueryRowContext(ctx, tsKeyCidBytes).Scan(&data.nonRevertedEventCount); err != nil {
return xerrors.Errorf("failed to query non reverted event count: %w", err)
}

return nil
})

return &data, err
}

// verifyIndexedData verifies that the indexed data for a tipset is correct
// by comparing the number of messages and events in the chainstore to the number of messages and events indexed.
// NOTE: Events are loaded from the executed messages of the tipset at the next epoch (ts.Height() + 1).
func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet, indexedData *indexedTipSetData) (err error) {
tsKeyCid, err := ts.Key().Cid()
if err != nil {
return xerrors.Errorf("failed to get tipset key cid: %w", err)
}

// get the tipset where the messages of `ts` will be executed (deferred execution)
executionTs, err := si.cs.GetTipsetByHeight(ctx, ts.Height()+1, nil, false)
if err != nil {
return xerrors.Errorf("failed to get tipset by height: %w", err)
}

eParentTsKeyCid, err := executionTs.Parents().Cid()
if err != nil {
return xerrors.Errorf("failed to get execution tipset parent key cid: %w", err)
}

// the parent tipset of the execution tipset should be the same as the indexed tipset (`ts` should be the parent of `executionTs`)
if !eParentTsKeyCid.Equals(tsKeyCid) {
return xerrors.Errorf("execution tipset parent key mismatch: chainstore has %s, index has %s", eParentTsKeyCid, tsKeyCid)
}

executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs)
if err != nil {
return xerrors.Errorf("failed to load executed messages: %w", err)
}

totalEventsCount := 0
for _, emsg := range executedMsgs {
totalEventsCount += len(emsg.evs)
}

if totalEventsCount != indexedData.nonRevertedEventCount {
return xerrors.Errorf("tipset event count mismatch: chainstore has %d, index has %d", totalEventsCount, indexedData.nonRevertedEventCount)
}

totalExecutedMsgCount := len(executedMsgs)
if totalExecutedMsgCount != indexedData.nonRevertedMessageCount {
return xerrors.Errorf("tipset executed message count mismatch: chainstore has %d, index has %d", totalExecutedMsgCount, indexedData.nonRevertedMessageCount)
}

return nil
}

func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.TipSet, backfill bool) (*types.IndexValidation, error) {
if !backfill {
return nil, xerrors.Errorf("missing tipset at height %d in the chain index, set backfill to true to backfill", ts.Height())
Expand All @@ -159,9 +254,16 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti
return nil, xerrors.Errorf("error applying tipset: %w", err)
}

indexedData, err := si.getIndexedTipSetData(ctx, ts)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", ts.Height(), err)
}

return &types.IndexValidation{
TipsetKey: ts.Key().String(),
Height: uint64(ts.Height()),
Backfilled: true,
TipsetKey: ts.Key().String(),
Height: uint64(ts.Height()),
Backfilled: true,
NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount),
NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount),
}, nil
}
2 changes: 2 additions & 0 deletions chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.hasNullRoundAtHeightStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)",
&ps.getNonRevertedTipsetAtHeightStmt: "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0",
&ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets",
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
}
}
5 changes: 1 addition & 4 deletions chain/index/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ func (si *SqliteIndexer) gcLoop() {
defer cleanupTicker.Stop()

for si.ctx.Err() == nil {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return
}
si.closeLk.RUnlock()

select {
case <-cleanupTicker.C:
Expand Down
27 changes: 14 additions & 13 deletions chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type preparedStatements struct {
hasNullRoundAtHeightStmt *sql.Stmt
getNonRevertedTipsetAtHeightStmt *sql.Stmt
countTipsetsAtHeightStmt *sql.Stmt

getNonRevertedTipsetMessageCountStmt *sql.Stmt
getNonRevertedTipsetEventCountStmt *sql.Stmt
}

type SqliteIndexer struct {
Expand Down Expand Up @@ -173,12 +176,9 @@ func (si *SqliteIndexer) initStatements() error {
}

func (si *SqliteIndexer) IndexEthTxHash(ctx context.Context, txHash ethtypes.EthHash, msgCid cid.Cid) error {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return ErrClosed
}
si.closeLk.RUnlock()

return withTx(ctx, si.db, func(tx *sql.Tx) error {
return si.indexEthTxHash(ctx, tx, txHash, msgCid)
Expand All @@ -199,12 +199,10 @@ func (si *SqliteIndexer) IndexSignedMessage(ctx context.Context, msg *types.Sign
if msg.Signature.Type != crypto.SigTypeDelegated {
return nil
}
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()

if si.isClosed() {
return ErrClosed
}
si.closeLk.RUnlock()

return withTx(ctx, si.db, func(tx *sql.Tx) error {
return si.indexSignedMessage(ctx, tx, msg)
Expand All @@ -227,7 +225,7 @@ func (si *SqliteIndexer) indexSignedMessage(ctx context.Context, tx *sql.Tx, msg

func (si *SqliteIndexer) Apply(ctx context.Context, from, to *types.TipSet) error {
si.closeLk.RLock()
if si.closed {
if si.isClosed() {
si.closeLk.RUnlock()
return ErrClosed
}
Expand Down Expand Up @@ -346,12 +344,9 @@ func (si *SqliteIndexer) restoreTipsetIfExists(ctx context.Context, tx *sql.Tx,
}

func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) error {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return ErrClosed
}
si.closeLk.RUnlock()

si.writerLk.Lock()
defer si.writerLk.Unlock()
Expand Down Expand Up @@ -398,3 +393,9 @@ func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) err

return nil
}

func (si *SqliteIndexer) isClosed() bool {
si.closeLk.RLock()
defer si.closeLk.RUnlock()
return si.closed
}
10 changes: 2 additions & 8 deletions chain/index/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ import (
const headIndexedWaitTimeout = 5 * time.Second

func (si *SqliteIndexer) GetCidFromHash(ctx context.Context, txHash ethtypes.EthHash) (cid.Cid, error) {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return cid.Undef, ErrClosed
}
si.closeLk.RUnlock()

var msgCidBytes []byte

Expand All @@ -44,12 +41,9 @@ func (si *SqliteIndexer) queryMsgCidFromEthHash(ctx context.Context, txHash etht
}

func (si *SqliteIndexer) GetMsgInfo(ctx context.Context, messageCid cid.Cid) (*MsgInfo, error) {
si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return nil, ErrClosed
}
si.closeLk.RUnlock()

var tipsetKeyCidBytes []byte
var height int64
Expand Down
5 changes: 1 addition & 4 deletions chain/index/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ func (si *SqliteIndexer) ReconcileWithChain(ctx context.Context, head *types.Tip
log.Warn("chain indexer is not storing events during reconciliation; please ensure this is intentional")
}

si.closeLk.RLock()
if si.closed {
si.closeLk.RUnlock()
if si.isClosed() {
return ErrClosed
}
si.closeLk.RUnlock()

if head == nil {
return nil
Expand Down
7 changes: 3 additions & 4 deletions chain/types/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ type IndexValidation struct {
TipsetKey string
Height uint64

TotalMessages uint64
TotalEvents uint64
EventsReverted bool
Backfilled bool
NonRevertedMessageCount uint64
NonRevertedEventsCount uint64
Backfilled bool
}
19 changes: 17 additions & 2 deletions itests/eth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,16 +525,31 @@ func TestEthGetLogsBasic(t *testing.T) {

AssertEthLogs(t, rctLogs, expected, received)

iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(0), false)
epoch := uint64(0)
iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false)
require.NoError(err)
require.NotNil(iv)

fmt.Printf("index validation: %v\n", iv)

iv, err = client.ChainValidateIndex(ctx, abi.ChainEpoch(22), false)
// Add assertions for IndexValidation fields
require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty")
require.Equal(t, epoch, iv.Height, "Height should be 0")
require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount should be non-negative") // TODO: change according to actual number of messages in the tipset
require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount should be non-negative") // TODO: change according to actual number of messages in the tipset
require.False(iv.Backfilled, "Backfilled should be flase")

epoch = 22
iv, err = client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false)
require.NoError(err)
require.NotNil(iv)
fmt.Printf("index validation: %v\n", iv)

require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty")
require.Equal(t, epoch, iv.Height, "Height should be 22")
require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount be non-negative") // TODO: change according to actual number of messages in the tipset
require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount be non-negative") // TODO: change according to actual number of messages in the tipset
require.True(iv.Backfilled, "Backfilled should be false")
}

func TestEthSubscribeLogsNoTopicSpec(t *testing.T) {
Expand Down

0 comments on commit 1b5fed3

Please sign in to comment.