Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chainindex): compare events AMT root between Index and chain state for validation #12632

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions chain/index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"errors"

"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"

bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
)

Expand Down Expand Up @@ -282,6 +286,32 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
return xerrors.Errorf("event entries count mismatch for height %d: chainstore has %d, index has %d", ts.Height(), totalEventEntriesCount, indexedData.nonRevertedEventEntriesCount)
}

// compare the events AMT root between the indexed events and the events in the chain state
for _, emsg := range executedMsgs {
indexedRoot, hasEvents, err := si.amtRootForEvents(ctx, tsKeyCid, emsg.msg.Cid())
if err != nil {
return xerrors.Errorf("failed to generate AMT root for indexed events of message %s at height %d: %w", emsg.msg.Cid(), ts.Height(), err)
}

if !hasEvents && emsg.rct.EventsRoot == nil {
// No events in index and no events in receipt, this is fine
continue
}

if hasEvents && emsg.rct.EventsRoot == nil {
return xerrors.Errorf("index corruption: events found in index for message %s at height %d, but message receipt has no events root", emsg.msg.Cid(), ts.Height())
}

if !hasEvents && emsg.rct.EventsRoot != nil {
return xerrors.Errorf("index corruption: no events found in index for message %s at height %d, but message receipt has events root %s", emsg.msg.Cid(), ts.Height(), emsg.rct.EventsRoot)
}

// Both index and receipt have events, compare the roots
if !indexedRoot.Equals(*emsg.rct.EventsRoot) {
return xerrors.Errorf("index corruption: events AMT root mismatch for message %s at height %d. Index root: %s, Receipt root: %s", emsg.msg.Cid(), ts.Height(), indexedRoot, emsg.rct.EventsRoot)
}
}

return nil
}

Expand Down Expand Up @@ -335,3 +365,77 @@ func (si *SqliteIndexer) getNextTipset(ctx context.Context, ts *types.TipSet) (*
func makeBackfillRequiredErr(height abi.ChainEpoch) error {
return xerrors.Errorf("missing tipset at height %d in the chain index, set backfill flag to true to fix", height)
}

// amtRootForEvents generates the events AMT root CID for a given message's events, and returns
// whether the message has events and a fatal error if one occurred.
func (si *SqliteIndexer) amtRootForEvents(
ctx context.Context,
tsKeyCid cid.Cid,
msgCid cid.Cid,
) (cid.Cid, bool, error) {
events := make([]cbg.CBORMarshaler, 0)

err := withTx(ctx, si.db, func(tx *sql.Tx) error {
rows, err := tx.Stmt(si.stmts.getEventIdAndEmitterIdStmt).QueryContext(ctx, tsKeyCid.Bytes(), msgCid.Bytes())
if err != nil {
return xerrors.Errorf("failed to query events: %w", err)
}
defer func() {
_ = rows.Close()
}()

for rows.Next() {
var eventId int
var actorId int64
if err := rows.Scan(&eventId, &actorId); err != nil {
return xerrors.Errorf("failed to scan row: %w", err)
}

event := types.Event{
Emitter: abi.ActorID(actorId),
Entries: make([]types.EventEntry, 0),
}

rows2, err := tx.Stmt(si.stmts.getEventEntriesStmt).QueryContext(ctx, eventId)
if err != nil {
return xerrors.Errorf("failed to query event entries: %w", err)
}
defer func() {
_ = rows2.Close()
}()

for rows2.Next() {
var flags []byte
var key string
var codec uint64
var value []byte
if err := rows2.Scan(&flags, &key, &codec, &value); err != nil {
return xerrors.Errorf("failed to scan row: %w", err)
}
entry := types.EventEntry{
Flags: flags[0],
Key: key,
Codec: codec,
Value: value,
}
event.Entries = append(event.Entries, entry)
}

events = append(events, &event)
}

return nil
})

if err != nil {
return cid.Undef, false, xerrors.Errorf("failed to retrieve events for message %s in tipset %s: %w", msgCid, tsKeyCid, err)
}

// construct the AMT from our slice to an in-memory IPLD store just so we can get the root,
// we don't need the blocks themselves
root, err := amt4.FromArray(ctx, cbor.NewCborStore(bstore.NewMemory()), events, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
if err != nil {
return cid.Undef, false, xerrors.Errorf("failed to create AMT: %w", err)
}
return root, len(events) > 0, nil
}
23 changes: 23 additions & 0 deletions chain/index/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,13 @@ func TestBackfillMissingEpoch(t *testing.T) {
fakeMsg := fakeMessage(randomIDAddr(t, rng), randomIDAddr(t, rng))
fakeEvent := fakeEvent(1, []kv{{k: "test", v: []byte("value")}, {k: "test2", v: []byte("value2")}}, nil)

ec := randomCid(t, rng)
executedMsg := executedMessage{
msg: fakeMsg,
evs: []types.Event{*fakeEvent},
rct: types.MessageReceipt{
EventsRoot: &ec,
},
}

cs.SetMessagesForTipset(missingTs, []types.ChainMsg{fakeMsg})
Expand All @@ -214,7 +218,26 @@ func TestBackfillMissingEpoch(t *testing.T) {
require.Equal(t, uint64(2), result.IndexedEventEntriesCount)

// Verify that the epoch is now indexed
// fails as the events root dont match
verificationResult, err := si.ChainValidateIndex(ctx, missingEpoch, false)
require.ErrorContains(t, err, "events AMT root mismatch")
require.Nil(t, verificationResult)

tsKeyCid, err := missingTs.Key().Cid()
require.NoError(t, err)

root, b, err := si.amtRootForEvents(ctx, tsKeyCid, fakeMsg.Cid())
require.NoError(t, err)
require.True(t, b)
executedMsg.rct.EventsRoot = &root
si.setExecutedMessagesLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
if msgTs.Height() == missingTs.Height() {
return []executedMessage{executedMsg}, nil
}
return nil, nil
})

verificationResult, err = si.ChainValidateIndex(ctx, missingEpoch, false)
require.NoError(t, err)
require.NotNil(t, verificationResult)
require.False(t, verificationResult.Backfilled)
Expand Down
2 changes: 2 additions & 0 deletions chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.getMsgIdForMsgCidAndTipsetStmt: "SELECT id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0 LIMIT 1",
&ps.insertEventStmt: "INSERT INTO event (message_id, event_index, emitter_id, emitter_addr, reverted) VALUES (?, ?, ?, ?, ?)",
&ps.insertEventEntryStmt: "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)",
&ps.getEventEntriesStmt: "SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC",
&ps.getEventIdAndEmitterIdStmt: "SELECT e.id, e.emitter_id FROM event e JOIN tipset_message tm ON e.message_id = tm.id WHERE tm.tipset_key_cid = ? AND tm.message_cid = ? ORDER BY e.event_index ASC",
}
}
123 changes: 123 additions & 0 deletions chain/index/ddls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,129 @@ func TestGetNonRevertedTipsetCountStmts(t *testing.T) {
verifyNonRevertedMessageCount(t, s, []byte(tipsetKeyCid1), 0)
}

func TestGetEventIdAndEmitterIdStmtAndGetEventEntriesStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)

// Insert a tipset message
tsKeyCid := []byte("test_tipset_key")
msgCid := []byte("test_message_cid")
messageID := insertTipsetMessage(t, s, tipsetMessage{
tipsetKeyCid: tsKeyCid,
height: 1,
reverted: false,
messageCid: msgCid,
messageIndex: 0,
})

// Insert events
event1ID := insertEvent(t, s, event{
messageID: messageID,
eventIndex: 0,
emitterId: 1,
emitterAddr: []byte("emitter_addr_1"),
reverted: false,
})
event2ID := insertEvent(t, s, event{
messageID: messageID,
eventIndex: 1,
emitterId: 2,
emitterAddr: []byte("emitter_addr_2"),
reverted: false,
})

// Insert event entries
insertEventEntry(t, s, eventEntry{
eventID: event1ID,
indexed: true,
flags: []byte{0x01},
key: "key1",
codec: 1,
value: []byte("value1"),
})
insertEventEntry(t, s, eventEntry{
eventID: event1ID,
indexed: false,
flags: []byte{0x00},
key: "key2",
codec: 2,
value: []byte("value2"),
})
insertEventEntry(t, s, eventEntry{
eventID: event2ID,
indexed: true,
flags: []byte{0x01},
key: "key3",
codec: 3,
value: []byte("value3"),
})

// Test getEventIdAndEmitterIdStmt
rows, err := s.stmts.getEventIdAndEmitterIdStmt.Query(tsKeyCid, msgCid)
require.NoError(t, err)
defer func() {
_ = rows.Close()
}()
var eventIDs []int64
var emitterIDs []uint64
for rows.Next() {
var eventID int64
var emitterID uint64
err := rows.Scan(&eventID, &emitterID)
require.NoError(t, err)
eventIDs = append(eventIDs, eventID)
emitterIDs = append(emitterIDs, emitterID)
}
require.NoError(t, rows.Err())
require.Equal(t, []int64{event1ID, event2ID}, eventIDs)
require.Equal(t, []uint64{1, 2}, emitterIDs)

// Test getEventEntriesStmt for event1
rows, err = s.stmts.getEventEntriesStmt.Query(event1ID)
require.NoError(t, err)
defer func() {
_ = rows.Close()
}()

var entries []eventEntry
for rows.Next() {
var entry eventEntry
err := rows.Scan(&entry.flags, &entry.key, &entry.codec, &entry.value)
require.NoError(t, err)
entries = append(entries, entry)
}
require.NoError(t, rows.Err())
require.Len(t, entries, 2)
require.Equal(t, []byte{0x01}, entries[0].flags)
require.Equal(t, "key1", entries[0].key)
require.Equal(t, 1, entries[0].codec)
require.Equal(t, []byte("value1"), entries[0].value)
require.Equal(t, []byte{0x00}, entries[1].flags)
require.Equal(t, "key2", entries[1].key)
require.Equal(t, 2, entries[1].codec)
require.Equal(t, []byte("value2"), entries[1].value)

// Test getEventEntriesStmt for event2
rows, err = s.stmts.getEventEntriesStmt.Query(event2ID)
require.NoError(t, err)
defer func() {
_ = rows.Close()
}()

entries = nil
for rows.Next() {
var entry eventEntry
err := rows.Scan(&entry.flags, &entry.key, &entry.codec, &entry.value)
require.NoError(t, err)
entries = append(entries, entry)
}
require.NoError(t, rows.Err())
require.Len(t, entries, 1)
require.Equal(t, []byte{0x01}, entries[0].flags)
require.Equal(t, "key3", entries[0].key)
require.Equal(t, 3, entries[0].codec)
require.Equal(t, []byte("value3"), entries[0].value)
}
func TestUpdateTipsetToNonRevertedStmt(t *testing.T) {
s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0)
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type preparedStatements struct {
getMsgIdForMsgCidAndTipsetStmt *sql.Stmt
insertEventStmt *sql.Stmt
insertEventEntryStmt *sql.Stmt
getEventIdAndEmitterIdStmt *sql.Stmt
getEventEntriesStmt *sql.Stmt

hasNullRoundAtHeightStmt *sql.Stmt
getNonRevertedTipsetAtHeightStmt *sql.Stmt
Expand Down
Loading