diff --git a/chain/index/api.go b/chain/index/api.go index c9aaa5fa90e..72c1aaa8e15 100644 --- a/chain/index/api.go +++ b/chain/index/api.go @@ -223,7 +223,8 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet return xerrors.Errorf("failed to get next tipset for height %d: %w", ts.Height(), err) } - // if non-reverted events exist which means that tipset `ts` has been executed, there should be 0 reverted events in the DB + // given that `ts` is on the canonical chain and `executionTs` is the next tipset in the chain + // `ts` can not have reverted events var hasRevertedEventsInTipset bool err = si.stmts.hasRevertedEventsInTipsetStmt.QueryRowContext(ctx, tsKeyCid.Bytes()).Scan(&hasRevertedEventsInTipset) if err != nil { @@ -233,7 +234,7 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet return xerrors.Errorf("index corruption: reverted events found for an executed tipset %s at height %d", tsKeyCid, ts.Height()) } - executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs) + executedMsgs, err := si.eventLoaderFunc(ctx, si.cs, ts, executionTs) if err != nil { return xerrors.Errorf("failed to load executed messages for height %d: %w", ts.Height(), err) } diff --git a/chain/index/api_test.go b/chain/index/api_test.go new file mode 100644 index 00000000000..44ac9aae6a0 --- /dev/null +++ b/chain/index/api_test.go @@ -0,0 +1,483 @@ +package index + +import ( + "context" + pseudo "math/rand" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/types" +) + +func TestValidateIsNullRound(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + headHeight := abi.ChainEpoch(100) + + tests := []struct { + name string + epoch abi.ChainEpoch + setupFunc func(*SqliteIndexer) + expectedResult bool + expectError bool + errorContains string + }{ + { + name: "happy path - null round", + epoch: 50, + expectedResult: true, + }, + { + name: "failure - non-null round", + epoch: 50, + setupFunc: func(si *SqliteIndexer) { + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: randomCid(t, rng).Bytes(), + height: 50, + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + }, + expectError: true, + errorContains: "index corruption", + }, + { + name: "edge case - epoch 0", + epoch: 0, + expectedResult: true, + }, + { + name: "edge case - epoch above head", + epoch: headHeight + 1, + expectedResult: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + si, _, _ := setupWithHeadIndexed(t, headHeight, rng) + defer func() { _ = si.Close() }() + + if tt.setupFunc != nil { + tt.setupFunc(si) + } + + res, err := si.validateIsNullRound(ctx, tt.epoch) + + if tt.expectError { + require.Error(t, err) + if tt.errorContains != "" { + require.Contains(t, err.Error(), tt.errorContains) + } + } else { + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, tt.expectedResult, res.IsNullRound) + require.Equal(t, uint64(tt.epoch), res.Height) + } + }) + } +} + +func TestFailureHeadHeight(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + headHeight := abi.ChainEpoch(100) + + si, head, _ := setupWithHeadIndexed(t, headHeight, rng) + defer func() { _ = si.Close() }() + _ = si.Start() + + _, err := si.ChainValidateIndex(ctx, head.Height(), false) + require.Error(t, err) + require.Contains(t, err.Error(), "cannot validate index at epoch") +} + +func TestBackfillNullRound(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + headHeight := abi.ChainEpoch(100) + + si, _, cs := setupWithHeadIndexed(t, headHeight, rng) + defer func() { _ = si.Close() }() + _ = si.Start() + + nullRoundEpoch := abi.ChainEpoch(50) + nonNullRoundEpoch := abi.ChainEpoch(51) + + // Create a tipset with a height different from the requested epoch + nonNullTs := fakeTipSet(t, rng, nonNullRoundEpoch, []cid.Cid{}) + + // Set up the chainstore to return the non-null tipset for the null round epoch + cs.SetTipsetByHeightAndKey(nullRoundEpoch, nonNullTs.Key(), nonNullTs) + + // Attempt to validate the null round epoch + result, err := si.ChainValidateIndex(ctx, nullRoundEpoch, true) + require.NoError(t, err) + require.NotNil(t, result) + require.False(t, result.Backfilled) + require.True(t, result.IsNullRound) +} + +func TestBackfillReturnsError(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + headHeight := abi.ChainEpoch(100) + + si, _, cs := setupWithHeadIndexed(t, headHeight, rng) + defer func() { _ = si.Close() }() + _ = si.Start() + + missingEpoch := abi.ChainEpoch(50) + + // Create a tipset for the missing epoch, but don't index it + missingTs := fakeTipSet(t, rng, missingEpoch, []cid.Cid{}) + cs.SetTipsetByHeightAndKey(missingEpoch, missingTs.Key(), missingTs) + + // Attempt to validate the missing epoch with backfill flag set to false + _, err := si.ChainValidateIndex(ctx, missingEpoch, false) + require.Error(t, err) + require.Contains(t, err.Error(), "missing tipset at height 50 in the chain index") +} + +func TestBackfillMissingEpoch(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + headHeight := abi.ChainEpoch(100) + + si, _, cs := setupWithHeadIndexed(t, headHeight, rng) + defer func() { _ = si.Close() }() + _ = si.Start() + + // Initialize address resolver + si.SetIdToRobustAddrFunc(func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + idAddr, err := address.NewIDAddress(uint64(emitter)) + if err != nil { + return address.Undef, false + } + return idAddr, true + }) + + missingEpoch := abi.ChainEpoch(50) + + parentTs := fakeTipSet(t, rng, missingEpoch-1, []cid.Cid{}) + cs.SetTipsetByHeightAndKey(missingEpoch-1, parentTs.Key(), parentTs) + + missingTs := fakeTipSet(t, rng, missingEpoch, parentTs.Cids()) + cs.SetTipsetByHeightAndKey(missingEpoch, missingTs.Key(), missingTs) + + executionTs := fakeTipSet(t, rng, missingEpoch+1, missingTs.Key().Cids()) + cs.SetTipsetByHeightAndKey(missingEpoch+1, executionTs.Key(), executionTs) + + // Create fake messages and events + fakeMsg := fakeMessage(randomIDAddr(t, rng), randomIDAddr(t, rng)) + fakeEvent := fakeEvent(1, []kv{{k: "test", v: []byte("value")}, {k: "test2", v: []byte("value2")}}, nil) + + executedMsg := executedMessage{ + msg: fakeMsg, + evs: []types.Event{*fakeEvent}, + } + + cs.SetMessagesForTipset(missingTs, []types.ChainMsg{fakeMsg}) + si.SetEventLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + if msgTs.Height() == missingTs.Height() { + return []executedMessage{executedMsg}, nil + } + return nil, nil + }) + + // Attempt to validate and backfill the missing epoch + result, err := si.ChainValidateIndex(ctx, missingEpoch, true) + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, result.Backfilled) + require.Equal(t, uint64(missingEpoch), result.Height) + require.Equal(t, uint64(1), result.IndexedMessagesCount) + require.Equal(t, uint64(1), result.IndexedEventsCount) + require.Equal(t, uint64(2), result.IndexedEventEntriesCount) + + // Verify that the epoch is now indexed + verificationResult, err := si.ChainValidateIndex(ctx, missingEpoch, false) + require.NoError(t, err) + require.NotNil(t, verificationResult) + require.False(t, verificationResult.Backfilled) + require.Equal(t, result.IndexedMessagesCount, verificationResult.IndexedMessagesCount) + require.Equal(t, result.IndexedEventsCount, verificationResult.IndexedEventsCount) + require.Equal(t, result.IndexedEventEntriesCount, verificationResult.IndexedEventEntriesCount) +} + +func TestIndexCorruption(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + headHeight := abi.ChainEpoch(100) + + tests := []struct { + name string + setupFunc func(*testing.T, *SqliteIndexer, *dummyChainStore) + epoch abi.ChainEpoch + errorContains string + }{ + { + name: "only reverted tipsets", + setupFunc: func(t *testing.T, si *SqliteIndexer, cs *dummyChainStore) { + epoch := abi.ChainEpoch(50) + ts := fakeTipSet(t, rng, epoch, []cid.Cid{}) + cs.SetTipsetByHeightAndKey(epoch, ts.Key(), ts) + keyBz, err := ts.Key().Cid() + require.NoError(t, err) + + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: keyBz.Bytes(), + height: uint64(epoch), + reverted: true, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + }, + epoch: 50, + errorContains: "index corruption: height 50 only has reverted tipsets", + }, + { + name: "multiple non-reverted tipsets", + setupFunc: func(t *testing.T, si *SqliteIndexer, cs *dummyChainStore) { + epoch := abi.ChainEpoch(50) + ts1 := fakeTipSet(t, rng, epoch, []cid.Cid{}) + ts2 := fakeTipSet(t, rng, epoch, []cid.Cid{}) + cs.SetTipsetByHeightAndKey(epoch, ts1.Key(), ts1) + + t1Bz, err := toTipsetKeyCidBytes(ts1) + require.NoError(t, err) + t2Bz, err := toTipsetKeyCidBytes(ts2) + require.NoError(t, err) + + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: t1Bz, + height: uint64(epoch), + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: t2Bz, + height: uint64(epoch), + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + }, + epoch: 50, + errorContains: "index corruption: height 50 has multiple non-reverted tipsets", + }, + { + name: "tipset key mismatch", + setupFunc: func(_ *testing.T, si *SqliteIndexer, cs *dummyChainStore) { + epoch := abi.ChainEpoch(50) + ts1 := fakeTipSet(t, rng, epoch, []cid.Cid{}) + ts2 := fakeTipSet(t, rng, epoch, []cid.Cid{}) + cs.SetTipsetByHeightAndKey(epoch, ts1.Key(), ts1) + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: ts2.Key().Cids()[0].Bytes(), + height: uint64(epoch), + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + }, + epoch: 50, + errorContains: "index corruption: indexed tipset at height 50 has key", + }, + { + name: "reverted events for executed tipset", + setupFunc: func(_ *testing.T, si *SqliteIndexer, cs *dummyChainStore) { + epoch := abi.ChainEpoch(50) + ts := fakeTipSet(t, rng, epoch, []cid.Cid{}) + cs.SetTipsetByHeightAndKey(epoch, ts.Key(), ts) + keyBz, err := ts.Key().Cid() + require.NoError(t, err) + + messageID := insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: keyBz.Bytes(), + height: uint64(epoch), + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + insertEvent(t, si, event{ + messageID: messageID, + eventIndex: 0, + emitterAddr: randomIDAddr(t, rng).Bytes(), + reverted: true, + }) + cs.SetTipsetByHeightAndKey(epoch+1, fakeTipSet(t, rng, epoch+1, ts.Key().Cids()).Key(), fakeTipSet(t, rng, epoch+1, ts.Key().Cids())) + }, + epoch: 50, + errorContains: "index corruption: reverted events found for an executed tipset", + }, + { + name: "message count mismatch", + setupFunc: func(_ *testing.T, si *SqliteIndexer, cs *dummyChainStore) { + epoch := abi.ChainEpoch(50) + ts := fakeTipSet(t, rng, epoch, []cid.Cid{}) + cs.SetTipsetByHeightAndKey(epoch, ts.Key(), ts) + keyBz, err := ts.Key().Cid() + require.NoError(t, err) + + // Insert two messages in the index + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: keyBz.Bytes(), + height: uint64(epoch), + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: keyBz.Bytes(), + height: uint64(epoch), + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 1, + }) + + // Setup dummy event loader + si.SetEventLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + return []executedMessage{{msg: fakeMessage(randomIDAddr(t, rng), randomIDAddr(t, rng))}}, nil + }) + + // Set up the next tipset for event execution + nextTs := fakeTipSet(t, rng, epoch+1, ts.Key().Cids()) + cs.SetTipsetByHeightAndKey(epoch+1, nextTs.Key(), nextTs) + }, + epoch: 50, + errorContains: "failed to verify indexed data at height 50: message count mismatch for height 50: chainstore has 1, index has 2", + }, + { + name: "event count mismatch", + setupFunc: func(_ *testing.T, si *SqliteIndexer, cs *dummyChainStore) { + epoch := abi.ChainEpoch(50) + ts := fakeTipSet(t, rng, epoch, []cid.Cid{}) + cs.SetTipsetByHeightAndKey(epoch, ts.Key(), ts) + keyBz, err := ts.Key().Cid() + require.NoError(t, err) + + // Insert one message in the index + messageID := insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: keyBz.Bytes(), + height: uint64(epoch), + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + + // Insert two events for the message + insertEvent(t, si, event{ + messageID: messageID, + eventIndex: 0, + emitterAddr: randomIDAddr(t, rng).Bytes(), + reverted: false, + }) + insertEvent(t, si, event{ + messageID: messageID, + eventIndex: 1, + emitterAddr: randomIDAddr(t, rng).Bytes(), + reverted: false, + }) + + // Setup dummy event loader to return only one event + si.SetEventLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + return []executedMessage{ + { + msg: fakeMessage(randomIDAddr(t, rng), randomIDAddr(t, rng)), + evs: []types.Event{*fakeEvent(1, []kv{{k: "test", v: []byte("value")}}, nil)}, + }, + }, nil + }) + + // Set up the next tipset for event execution + nextTs := fakeTipSet(t, rng, epoch+1, ts.Key().Cids()) + cs.SetTipsetByHeightAndKey(epoch+1, nextTs.Key(), nextTs) + }, + epoch: 50, + errorContains: "failed to verify indexed data at height 50: event count mismatch for height 50: chainstore has 1, index has 2", + }, + { + name: "event entries count mismatch", + setupFunc: func(_ *testing.T, si *SqliteIndexer, cs *dummyChainStore) { + epoch := abi.ChainEpoch(50) + ts := fakeTipSet(t, rng, epoch, []cid.Cid{}) + cs.SetTipsetByHeightAndKey(epoch, ts.Key(), ts) + keyBz, err := ts.Key().Cid() + require.NoError(t, err) + + // Insert one message in the index + messageID := insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: keyBz.Bytes(), + height: uint64(epoch), + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + + // Insert one event with two entries for the message + eventID := insertEvent(t, si, event{ + messageID: messageID, + eventIndex: 0, + emitterAddr: randomIDAddr(t, rng).Bytes(), + reverted: false, + }) + insertEventEntry(t, si, eventEntry{ + eventID: eventID, + indexed: true, + flags: []byte{0x01}, + key: "key1", + codec: 1, + value: []byte("value1"), + }) + insertEventEntry(t, si, eventEntry{ + eventID: eventID, + indexed: true, + flags: []byte{0x00}, + key: "key2", + codec: 2, + value: []byte("value2"), + }) + + // Setup dummy event loader to return one event with only one entry + si.SetEventLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + return []executedMessage{ + { + msg: fakeMessage(randomIDAddr(t, rng), randomIDAddr(t, rng)), + evs: []types.Event{*fakeEvent(1, []kv{{k: "key1", v: []byte("value1")}}, nil)}, + }, + }, nil + }) + + // Set up the next tipset for event execution + nextTs := fakeTipSet(t, rng, epoch+1, ts.Key().Cids()) + cs.SetTipsetByHeightAndKey(epoch+1, nextTs.Key(), nextTs) + }, + epoch: 50, + errorContains: "failed to verify indexed data at height 50: event entries count mismatch for height 50: chainstore has 1, index has 2", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + si, _, cs := setupWithHeadIndexed(t, headHeight, rng) + defer func() { _ = si.Close() }() + _ = si.Start() + + tt.setupFunc(t, si, cs) + + _, err := si.ChainValidateIndex(ctx, tt.epoch, false) + require.Error(t, err) + require.Contains(t, err.Error(), tt.errorContains) + }) + } +} diff --git a/chain/index/ddls_test.go b/chain/index/ddls_test.go new file mode 100644 index 00000000000..69b09f975c8 --- /dev/null +++ b/chain/index/ddls_test.go @@ -0,0 +1,708 @@ +package index + +import ( + "database/sql" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHasRevertedEventsInTipsetStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // running on empty DB should return false + verifyHasRevertedEventsInTipsetStmt(t, s, []byte("test_tipset_key"), false) + + // Insert tipset with a reverted event + ts := tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key"), + height: 1, + reverted: false, + messageCid: []byte("test_message_cid"), + messageIndex: 0, + } + messageID := insertTipsetMessage(t, s, ts) + + insertEvent(t, s, event{ + messageID: messageID, + eventIndex: 0, + emitterAddr: []byte("test_emitter_addr"), + reverted: true, + }) + + // Verify `hasRevertedEventsInTipset` returns true + verifyHasRevertedEventsInTipsetStmt(t, s, []byte("test_tipset_key"), true) + + // change event to non-reverted + updateEventsToNonReverted(t, s, []byte("test_tipset_key")) + + // Verify `hasRevertedEventsInTipset` returns false + verifyHasRevertedEventsInTipsetStmt(t, s, []byte("test_tipset_key"), false) +} + +func TestGetNonRevertedTipsetCountStmts(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // running on empty DB should return 0 + verifyNonRevertedEventEntriesCount(t, s, []byte("test_tipset_key"), 0) + verifyNonRevertedEventCount(t, s, []byte("test_tipset_key"), 0) + verifyNonRevertedMessageCount(t, s, []byte("test_tipset_key"), 0) + + // Insert non-reverted tipset + messageID := insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key"), + height: 1, + reverted: false, + messageCid: []byte("test_message_cid"), + messageIndex: 0, + }) + + // Insert event + eventID1 := insertEvent(t, s, event{ + messageID: messageID, + eventIndex: 0, + emitterAddr: []byte("test_emitter_addr"), + reverted: false, + }) + eventID2 := insertEvent(t, s, event{ + messageID: messageID, + eventIndex: 1, + emitterAddr: []byte("test_emitter_addr"), + reverted: false, + }) + + // Insert event entry + insertEventEntry(t, s, eventEntry{ + eventID: eventID1, + indexed: true, + flags: []byte("test_flags"), + key: "test_key", + codec: 1, + value: []byte("test_value"), + }) + insertEventEntry(t, s, eventEntry{ + eventID: eventID2, + indexed: true, + flags: []byte("test_flags2"), + key: "test_key2", + codec: 2, + value: []byte("test_value2"), + }) + + // verify 2 event entries + verifyNonRevertedEventEntriesCount(t, s, []byte("test_tipset_key"), 2) + + // Verify event count + verifyNonRevertedEventCount(t, s, []byte("test_tipset_key"), 2) + + // verify message count is 1 + verifyNonRevertedMessageCount(t, s, []byte("test_tipset_key"), 1) + + // mark tipset as reverted + revertTipset(t, s, []byte("test_tipset_key")) + + // Verify `getNonRevertedTipsetEventEntriesCountStmt` returns 0 + verifyNonRevertedEventEntriesCount(t, s, []byte("test_tipset_key"), 0) + + // verify event count is 0 + verifyNonRevertedEventCount(t, s, []byte("test_tipset_key"), 0) + + // verify message count is 0 + verifyNonRevertedMessageCount(t, s, []byte("test_tipset_key"), 0) +} + +func TestUpdateTipsetToNonRevertedStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // insert a reverted tipset + ts := tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key"), + height: 1, + reverted: true, + messageCid: []byte("test_message_cid"), + messageIndex: 0, + } + + // Insert tipset + messageId := insertTipsetMessage(t, s, ts) + + res, err := s.stmts.updateTipsetToNonRevertedStmt.Exec([]byte("test_tipset_key")) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) + + // verify the tipset is not reverted + ts.reverted = false + verifyTipsetMessage(t, s, messageId, ts) +} + +func TestHasNullRoundAtHeightStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // running on empty DB should return true + verifyHasNullRoundAtHeightStmt(t, s, 1, true) + verifyHasNullRoundAtHeightStmt(t, s, 0, true) + + // insert tipset + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key"), + height: 1, + reverted: false, + messageCid: []byte("test_message_cid"), + messageIndex: 0, + }) + + // verify not a null round + verifyHasNullRoundAtHeightStmt(t, s, 1, false) +} + +func TestHasTipsetStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // running on empty DB should return false + verifyHasTipsetStmt(t, s, []byte("test_tipset_key"), false) + + // insert tipset + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key"), + height: 1, + reverted: false, + messageCid: []byte("test_message_cid"), + messageIndex: 0, + }) + + // verify tipset exists + verifyHasTipsetStmt(t, s, []byte("test_tipset_key"), true) + + // verify non-existent tipset + verifyHasTipsetStmt(t, s, []byte("non_existent_tipset_key"), false) +} + +func TestUpdateEventsToRevertedStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // Insert a non-reverted tipset + messageID := insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key"), + height: 1, + reverted: false, + messageCid: []byte("test_message_cid"), + messageIndex: 0, + }) + + // Insert non-reverted events + insertEvent(t, s, event{ + messageID: messageID, + eventIndex: 0, + emitterAddr: []byte("test_emitter_addr"), + reverted: false, + }) + insertEvent(t, s, event{ + messageID: messageID, + eventIndex: 1, + emitterAddr: []byte("test_emitter_addr"), + reverted: false, + }) + + // Verify events are not reverted + var count int + err = s.db.QueryRow("SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id = ?", messageID).Scan(&count) + require.NoError(t, err) + require.Equal(t, 2, count) + + // Execute updateEventsToRevertedStmt + _, err = s.stmts.updateEventsToRevertedStmt.Exec([]byte("test_tipset_key")) + require.NoError(t, err) + + // Verify events are now reverted + err = s.db.QueryRow("SELECT COUNT(*) FROM event WHERE reverted = 1 AND message_id = ?", messageID).Scan(&count) + require.NoError(t, err) + require.Equal(t, 2, count) + + // Verify no non-reverted events remain + err = s.db.QueryRow("SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id = ?", messageID).Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count) +} + +func TestCountTipsetsAtHeightStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // Test empty DB + verifyCountTipsetsAtHeightStmt(t, s, 1, 0, 0) + + // Test 0,1 case + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_1"), + height: 1, + reverted: false, + messageCid: []byte("test_message_cid_1"), + messageIndex: 0, + }) + verifyCountTipsetsAtHeightStmt(t, s, 1, 0, 1) + + // Test 0,2 case + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_2"), + height: 1, + reverted: false, + messageCid: []byte("test_message_cid_2"), + messageIndex: 0, + }) + verifyCountTipsetsAtHeightStmt(t, s, 1, 0, 2) + + // Test 1,2 case + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_3"), + height: 1, + reverted: true, + messageCid: []byte("test_message_cid_3"), + messageIndex: 0, + }) + verifyCountTipsetsAtHeightStmt(t, s, 1, 1, 2) + + // Test 2,2 case + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_4"), + height: 1, + reverted: true, + messageCid: []byte("test_message_cid_4"), + messageIndex: 0, + }) + verifyCountTipsetsAtHeightStmt(t, s, 1, 2, 2) +} + +func TestNonRevertedTipsetAtHeightStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // Test empty DB + var et []byte + err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(10).Scan(&et) + require.Equal(t, sql.ErrNoRows, err) + + // Insert non-reverted tipset + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_1"), + height: 10, + reverted: false, + messageCid: []byte("test_message_cid_1"), + messageIndex: 0, + }) + + // Insert reverted tipset at same height + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_2"), + height: 10, + reverted: true, + messageCid: []byte("test_message_cid_2"), + messageIndex: 0, + }) + + // Verify getNonRevertedTipsetAtHeightStmt returns the non-reverted tipset + var tipsetKeyCid []byte + err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(10).Scan(&tipsetKeyCid) + require.NoError(t, err) + require.Equal(t, []byte("test_tipset_key_1"), tipsetKeyCid) + + // Insert another non-reverted tipset at a different height + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_3"), + height: 20, + reverted: false, + messageCid: []byte("test_message_cid_3"), + messageIndex: 0, + }) + + // Verify getNonRevertedTipsetAtHeightStmt returns the correct tipset for the new height + err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(20).Scan(&tipsetKeyCid) + require.NoError(t, err) + require.Equal(t, []byte("test_tipset_key_3"), tipsetKeyCid) + + // Test with a height that has no tipset + err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(30).Scan(&tipsetKeyCid) + require.Equal(t, sql.ErrNoRows, err) + + // Revert all tipsets at height 10 + _, err = s.db.Exec("UPDATE tipset_message SET reverted = 1 WHERE height = 10") + require.NoError(t, err) + + // Verify getNonRevertedTipsetAtHeightStmt returns no rows for the reverted height + err = s.stmts.getNonRevertedTipsetAtHeightStmt.QueryRow(10).Scan(&tipsetKeyCid) + require.Equal(t, sql.ErrNoRows, err) +} + +func TestMinNonRevertedHeightStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // Test empty DB + var minHeight sql.NullInt64 + err = s.stmts.getMinNonRevertedHeightStmt.QueryRow().Scan(&minHeight) + require.NoError(t, err) + require.False(t, minHeight.Valid) + + // Insert non-reverted tipsets + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_1"), + height: 10, + reverted: false, + messageCid: []byte("test_message_cid_1"), + messageIndex: 0, + }) + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_2"), + height: 20, + reverted: false, + messageCid: []byte("test_message_cid_2"), + messageIndex: 0, + }) + + // Verify minimum non-reverted height + verifyMinNonRevertedHeightStmt(t, s, 10) + + // Insert reverted tipset with lower height + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key_4"), + height: 5, + reverted: true, + messageCid: []byte("test_message_cid_4"), + messageIndex: 0, + }) + + // Verify minimum non-reverted height hasn't changed + verifyMinNonRevertedHeightStmt(t, s, 10) + + // Revert all tipsets + _, err = s.db.Exec("UPDATE tipset_message SET reverted = 1") + require.NoError(t, err) + + // Verify no minimum non-reverted height + err = s.stmts.getMinNonRevertedHeightStmt.QueryRow().Scan(&minHeight) + require.NoError(t, err) + require.False(t, minHeight.Valid) +} + +func verifyMinNonRevertedHeightStmt(t *testing.T, s *SqliteIndexer, expectedMinHeight int64) { + var minHeight sql.NullInt64 + err := s.stmts.getMinNonRevertedHeightStmt.QueryRow().Scan(&minHeight) + require.NoError(t, err) + require.True(t, minHeight.Valid) + require.Equal(t, expectedMinHeight, minHeight.Int64) +} + +func TestGetMsgIdForMsgCidAndTipsetStmt(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // Insert a non-reverted tipset + tipsetKeyCid := []byte("test_tipset_key") + messageCid := []byte("test_message_cid") + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: tipsetKeyCid, + height: 1, + reverted: false, + messageCid: messageCid, + messageIndex: 0, + }) + + // Verify getMsgIdForMsgCidAndTipset returns the correct message ID + var messageID int64 + err = s.stmts.getMsgIdForMsgCidAndTipsetStmt.QueryRow(tipsetKeyCid, messageCid).Scan(&messageID) + require.NoError(t, err) + require.Equal(t, int64(1), messageID) + + // Test with non-existent message CID + nonExistentMessageCid := []byte("non_existent_message_cid") + err = s.stmts.getMsgIdForMsgCidAndTipsetStmt.QueryRow(tipsetKeyCid, nonExistentMessageCid).Scan(&messageID) + require.Equal(t, sql.ErrNoRows, err) + + // Test with non-existent tipset key + nonExistentTipsetKeyCid := []byte("non_existent_tipset_key") + err = s.stmts.getMsgIdForMsgCidAndTipsetStmt.QueryRow(nonExistentTipsetKeyCid, messageCid).Scan(&messageID) + require.Equal(t, sql.ErrNoRows, err) + + // Insert a reverted tipset + revertedTipsetKeyCid := []byte("reverted_tipset_key") + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: revertedTipsetKeyCid, + height: 2, + reverted: true, + messageCid: messageCid, + messageIndex: 0, + }) + + // Verify getMsgIdForMsgCidAndTipset doesn't return the message ID for a reverted tipset + err = s.stmts.getMsgIdForMsgCidAndTipsetStmt.QueryRow(revertedTipsetKeyCid, messageCid).Scan(&messageID) + require.Equal(t, sql.ErrNoRows, err) +} + +func TestForeignKeyCascadeDelete(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + // Insert a tipset + messageID := insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key"), + height: 1, + reverted: false, + messageCid: []byte("test_message_cid"), + messageIndex: 0, + }) + + // Insert an event for the tipset + eventID := insertEvent(t, s, event{ + messageID: messageID, + eventIndex: 0, + emitterAddr: []byte("test_emitter_addr"), + reverted: false, + }) + + // Insert an event entry for the event + insertEventEntry(t, s, eventEntry{ + eventID: eventID, + indexed: true, + flags: []byte("test_flags"), + key: "test_key", + codec: 1, + value: []byte("test_value"), + }) + + // Delete the tipset + res, err := s.db.Exec("DELETE FROM tipset_message WHERE tipset_key_cid = ?", []byte("test_tipset_key")) + require.NoError(t, err) + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) + + // verify event is deleted + verifyEventAbsent(t, s, eventID) + verifyEventEntryAbsent(t, s, eventID) +} + +func TestInsertTipsetMessage(t *testing.T) { + s, err := NewSqliteIndexer(":memory:", nil, 0, false, 0) + require.NoError(t, err) + + ts := tipsetMessage{ + tipsetKeyCid: []byte("test_tipset_key"), + height: 1, + reverted: false, + messageCid: []byte("test_message_cid"), + messageIndex: 0, + } + + // Insert a tipset + messageID := insertTipsetMessage(t, s, ts) + + // revert the tipset + revertTipset(t, s, []byte("test_tipset_key")) + ts.reverted = true + verifyTipsetMessage(t, s, messageID, ts) + + // inserting with the same (tipset, message) should overwrite the reverted flag + res, err := s.stmts.insertTipsetMessageStmt.Exec(ts.tipsetKeyCid, ts.height, true, ts.messageCid, ts.messageIndex) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) + + ts.reverted = false + verifyTipsetMessage(t, s, messageID, ts) +} + +type tipsetMessage struct { + tipsetKeyCid []byte + height uint64 + reverted bool + messageCid []byte + messageIndex int64 +} + +type event struct { + eventIndex uint64 + emitterAddr []byte + reverted bool + messageID int64 +} + +type eventEntry struct { + eventID int64 + indexed bool + flags []byte + key string + codec int + value []byte +} + +func updateEventsToNonReverted(t *testing.T, s *SqliteIndexer, tsKeyCid []byte) { + res, err := s.stmts.updateEventsToNonRevertedStmt.Exec(tsKeyCid) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) + + // read all events for this tipset and verify they are not reverted using a COUNT query + var count int + err = s.db.QueryRow("SELECT COUNT(*) FROM event e JOIN tipset_message tm ON e.message_id = tm.message_id WHERE tm.tipset_key_cid = ? AND e.reverted = 1", tsKeyCid).Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "Expected no reverted events for this tipset") +} + +func revertTipset(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte) { + res, err := s.stmts.updateTipsetToRevertedStmt.Exec(tipsetKeyCid) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) + + var reverted bool + err = s.db.QueryRow("SELECT reverted FROM tipset_message WHERE tipset_key_cid = ?", tipsetKeyCid).Scan(&reverted) + require.NoError(t, err) + require.True(t, reverted) +} + +func verifyTipsetMessage(t *testing.T, s *SqliteIndexer, messageID int64, expectedTipsetMessage tipsetMessage) { + var tipsetKeyCid []byte + var height uint64 + var reverted bool + var messageCid []byte + var messageIndex int64 + err := s.db.QueryRow("SELECT tipset_key_cid, height, reverted, message_cid, message_index FROM tipset_message WHERE message_id = ?", messageID).Scan(&tipsetKeyCid, &height, &reverted, &messageCid, &messageIndex) + require.NoError(t, err) + require.Equal(t, expectedTipsetMessage.tipsetKeyCid, tipsetKeyCid) + require.Equal(t, expectedTipsetMessage.height, height) + require.Equal(t, expectedTipsetMessage.reverted, reverted) + require.Equal(t, expectedTipsetMessage.messageCid, messageCid) + require.Equal(t, expectedTipsetMessage.messageIndex, messageIndex) +} + +func verifyEventEntryAbsent(t *testing.T, s *SqliteIndexer, eventID int64) { + err := s.db.QueryRow("SELECT event_id FROM event_entry WHERE event_id = ?", eventID).Scan(&eventID) + require.Equal(t, sql.ErrNoRows, err) +} + +func verifyEventAbsent(t *testing.T, s *SqliteIndexer, eventID int64) { + var eventIndex uint64 + err := s.db.QueryRow("SELECT event_index FROM event WHERE event_id = ?", eventID).Scan(&eventIndex) + require.Equal(t, sql.ErrNoRows, err) +} + +func verifyEvent(t *testing.T, s *SqliteIndexer, eventID int64, expectedEvent event) { + var eventIndex uint64 + var emitterAddr []byte + var reverted bool + var messageID int64 + err := s.db.QueryRow("SELECT event_index, emitter_addr, reverted, message_id FROM event WHERE event_id = ?", eventID).Scan(&eventIndex, &emitterAddr, &reverted, &messageID) + require.NoError(t, err) + require.Equal(t, expectedEvent.eventIndex, eventIndex) + require.Equal(t, expectedEvent.emitterAddr, emitterAddr) + require.Equal(t, expectedEvent.reverted, reverted) + require.Equal(t, expectedEvent.messageID, messageID) +} + +func verifyCountTipsetsAtHeightStmt(t *testing.T, s *SqliteIndexer, height uint64, expectedRevertedCount, expectedNonRevertedCount int) { + var revertedCount, nonRevertedCount int + err := s.stmts.countTipsetsAtHeightStmt.QueryRow(height).Scan(&revertedCount, &nonRevertedCount) + require.NoError(t, err) + require.Equal(t, expectedRevertedCount, revertedCount) + require.Equal(t, expectedNonRevertedCount, nonRevertedCount) +} + +func verifyHasTipsetStmt(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedHas bool) { + var has bool + err := s.stmts.hasTipsetStmt.QueryRow(tipsetKeyCid).Scan(&has) + require.NoError(t, err) + require.Equal(t, expectedHas, has) +} + +func verifyHasRevertedEventsInTipsetStmt(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedHas bool) { + var hasRevertedEventsInTipset bool + err := s.stmts.hasRevertedEventsInTipsetStmt.QueryRow(tipsetKeyCid).Scan(&hasRevertedEventsInTipset) + require.NoError(t, err) + require.Equal(t, expectedHas, hasRevertedEventsInTipset) +} + +func verifyHasNullRoundAtHeightStmt(t *testing.T, s *SqliteIndexer, height uint64, expectedHasNullRound bool) { + var hasNullRound bool + err := s.stmts.hasNullRoundAtHeightStmt.QueryRow(height).Scan(&hasNullRound) + require.NoError(t, err) + require.Equal(t, expectedHasNullRound, hasNullRound) +} + +func verifyNonRevertedMessageCount(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedCount int) { + var count int + err := s.stmts.getNonRevertedTipsetMessageCountStmt.QueryRow(tipsetKeyCid).Scan(&count) + require.NoError(t, err) + require.Equal(t, expectedCount, count) +} + +func verifyNonRevertedEventCount(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedCount int) { + var count int + err := s.stmts.getNonRevertedTipsetEventCountStmt.QueryRow(tipsetKeyCid).Scan(&count) + require.NoError(t, err) + require.Equal(t, expectedCount, count) +} + +func verifyNonRevertedEventEntriesCount(t *testing.T, s *SqliteIndexer, tipsetKeyCid []byte, expectedCount int) { + var count int + err := s.stmts.getNonRevertedTipsetEventEntriesCountStmt.QueryRow(tipsetKeyCid).Scan(&count) + require.NoError(t, err) + require.Equal(t, expectedCount, count) +} + +func insertTipsetMessage(t *testing.T, s *SqliteIndexer, ts tipsetMessage) int64 { + res, err := s.stmts.insertTipsetMessageStmt.Exec(ts.tipsetKeyCid, ts.height, ts.reverted, ts.messageCid, ts.messageIndex) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) + + messageID, err := res.LastInsertId() + require.NoError(t, err) + require.NotEqual(t, int64(0), messageID) + + // read back the message to verify it was inserted correctly + verifyTipsetMessage(t, s, messageID, ts) + + return messageID +} + +func insertEvent(t *testing.T, s *SqliteIndexer, e event) int64 { + res, err := s.stmts.insertEventStmt.Exec(e.messageID, e.eventIndex, e.emitterAddr, e.reverted) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) + + eventID, err := res.LastInsertId() + require.NoError(t, err) + require.NotEqual(t, int64(0), eventID) + + verifyEvent(t, s, eventID, e) + + return eventID +} + +func insertEventEntry(t *testing.T, s *SqliteIndexer, ee eventEntry) { + res, err := s.stmts.insertEventEntryStmt.Exec(ee.eventID, ee.indexed, ee.flags, ee.key, ee.codec, ee.value) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) +} diff --git a/chain/index/events.go b/chain/index/events.go index d3badeeeff8..31259bcf7d4 100644 --- a/chain/index/events.go +++ b/chain/index/events.go @@ -33,6 +33,9 @@ func (si *SqliteIndexer) indexEvents(ctx context.Context, tx *sql.Tx, msgTs *typ if si.idToRobustAddrFunc == nil { return xerrors.Errorf("indexer can not index events without an address resolver") } + if si.eventLoaderFunc == nil { + return xerrors.Errorf("indexer can not index events without an event loader") + } // check if we have an event indexed for any message in the `msgTs` tipset -> if so, there's nothig to do here // this makes event inserts idempotent @@ -59,7 +62,7 @@ func (si *SqliteIndexer) indexEvents(ctx context.Context, tx *sql.Tx, msgTs *typ return nil } - ems, err := si.loadExecutedMessages(ctx, msgTs, executionTs) + ems, err := si.eventLoaderFunc(ctx, si.cs, msgTs, executionTs) if err != nil { return xerrors.Errorf("failed to load executed messages: %w", err) } @@ -124,13 +127,20 @@ func (si *SqliteIndexer) indexEvents(ctx context.Context, tx *sql.Tx, msgTs *typ return nil } -func (si *SqliteIndexer) loadExecutedMessages(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { - msgs, err := si.cs.MessagesForTipset(ctx, msgTs) +func MakeLoadExecutedMessages(recomputeTipSetStateFunc recomputeTipSetStateFunc) func(ctx context.Context, + cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + return func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + return loadExecutedMessages(ctx, cs, recomputeTipSetStateFunc, msgTs, rctTs) + } +} + +func loadExecutedMessages(ctx context.Context, cs ChainStore, recomputeTipSetStateFunc recomputeTipSetStateFunc, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + msgs, err := cs.MessagesForTipset(ctx, msgTs) if err != nil { return nil, xerrors.Errorf("failed to get messages for tipset: %w", err) } - st := si.cs.ActorStore(ctx) + st := cs.ActorStore(ctx) receiptsArr, err := blockadt.AsArray(st, rctTs.Blocks()[0].ParentMessageReceipts) if err != nil { @@ -163,12 +173,12 @@ func (si *SqliteIndexer) loadExecutedMessages(ctx context.Context, msgTs, rctTs eventsArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(types.EventAMTBitwidth)) if err != nil { - if si.recomputeTipSetStateFunc == nil { + if recomputeTipSetStateFunc == nil { return nil, xerrors.Errorf("failed to load events amt for message %s: %w", ems[i].msg.Cid(), err) } log.Warnf("failed to load events amt for message %s: %s; recomputing tipset state to regenerate events", ems[i].msg.Cid(), err) - if err := si.recomputeTipSetStateFunc(ctx, msgTs); err != nil { + if err := recomputeTipSetStateFunc(ctx, msgTs); err != nil { return nil, xerrors.Errorf("failed to recompute missing events; failed to recompute tipset state: %w", err) } @@ -253,6 +263,9 @@ func (si *SqliteIndexer) getTipsetKeyCidByHeight(ctx context.Context, height abi if err != nil { return nil, xerrors.Errorf("failed to get tipset by height: %w", err) } + if ts == nil { + return nil, xerrors.Errorf("tipset is nil for height: %d", height) + } if ts.Height() != height { // this means that this is a null round diff --git a/chain/index/events_test.go b/chain/index/events_test.go new file mode 100644 index 00000000000..d165efe74a0 --- /dev/null +++ b/chain/index/events_test.go @@ -0,0 +1,238 @@ +package index + +import ( + "context" + "database/sql" + "errors" + pseudo "math/rand" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/types" +) + +func TestGetEventsForFilterNoEvents(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + + headHeight := abi.ChainEpoch(60) + si, _, cs := setupWithHeadIndexed(t, headHeight, rng) + defer func() { _ = si.Close() }() + + // Create a fake tipset at height 1 + fakeTipSet1 := fakeTipSet(t, rng, 1, nil) + + // Set the dummy chainstore to return this tipset for height 1 + cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // empty DB + + // tipset is not indexed + f := &EventFilter{ + MinHeight: 1, + MaxHeight: 1, + } + ces, err := si.GetEventsForFilter(ctx, f, false) + require.True(t, errors.Is(err, ErrNotFound)) + require.Equal(t, 0, len(ces)) + + tsCid, err := fakeTipSet1.Key().Cid() + require.NoError(t, err) + f = &EventFilter{ + TipsetCid: tsCid, + } + + ces, err = si.GetEventsForFilter(ctx, f, false) + require.True(t, errors.Is(err, ErrNotFound)) + require.Equal(t, 0, len(ces)) + + // tipset is indexed but has no events + err = withTx(ctx, si.db, func(tx *sql.Tx) error { + return si.indexTipset(ctx, tx, fakeTipSet1) + }) + require.NoError(t, err) + + ces, err = si.GetEventsForFilter(ctx, f, false) + require.NoError(t, err) + require.Equal(t, 0, len(ces)) + + f = &EventFilter{ + TipsetCid: tsCid, + } + ces, err = si.GetEventsForFilter(ctx, f, false) + require.NoError(t, err) + require.Equal(t, 0, len(ces)) + + // search for a range that is absent + f = &EventFilter{ + MinHeight: 100, + MaxHeight: 200, + } + ces, err = si.GetEventsForFilter(ctx, f, false) + require.NoError(t, err) + require.Equal(t, 0, len(ces)) +} + +func TestGetEventsForFilterWithEvents(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + headHeight := abi.ChainEpoch(60) + si, _, cs := setupWithHeadIndexed(t, headHeight, rng) + defer func() { _ = si.Close() }() + + ev1 := fakeEvent( + abi.ActorID(1), + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr1")}, + }, + []kv{ + {k: "amount", v: []byte("2988181")}, + }, + ) + + ev2 := fakeEvent( + abi.ActorID(2), + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr2")}, + }, + []kv{ + {k: "amount", v: []byte("2988181")}, + }, + ) + + events := []types.Event{*ev1, *ev2} + + fm := fakeMessage(address.TestAddress, address.TestAddress) + em1 := executedMessage{ + msg: fm, + evs: events, + } + + si.SetIdToRobustAddrFunc(func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + idAddr, err := address.NewIDAddress(uint64(emitter)) + if err != nil { + return address.Undef, false + } + + return idAddr, true + }) + + si.SetEventLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + return []executedMessage{em1}, nil + }) + + // Create a fake tipset at height 1 + fakeTipSet1 := fakeTipSet(t, rng, 1, nil) + fakeTipSet2 := fakeTipSet(t, rng, 2, nil) + + // Set the dummy chainstore to return this tipset for height 1 + cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // empty DB + cs.SetTipsetByHeightAndKey(2, fakeTipSet2.Key(), fakeTipSet2) // empty DB + + cs.SetMessagesForTipset(fakeTipSet1, []types.ChainMsg{fm}) + + // index tipset and events + require.NoError(t, si.Apply(ctx, fakeTipSet1, fakeTipSet2)) + + // fetch it based on height -> works + f := &EventFilter{ + MinHeight: 1, + MaxHeight: 1, + } + ces, err := si.GetEventsForFilter(ctx, f, false) + require.NoError(t, err) + require.Equal(t, 2, len(ces)) + + // fetch it based on cid -> works + tsCid1, err := fakeTipSet1.Key().Cid() + require.NoError(t, err) + + tsCid2, err := fakeTipSet2.Key().Cid() + require.NoError(t, err) + + f = &EventFilter{ + TipsetCid: tsCid1, + } + ces, err = si.GetEventsForFilter(ctx, f, false) + require.NoError(t, err) + require.Equal(t, 2, len(ces)) + + require.Equal(t, ev1.Entries, ces[0].Entries) + require.Equal(t, ev2.Entries, ces[1].Entries) + + // mark fakeTipSet2 as reverted so events for fakeTipSet1 are reverted + require.NoError(t, si.Revert(ctx, fakeTipSet2, fakeTipSet1)) + + var reverted bool + err = si.db.QueryRow("SELECT reverted FROM tipset_message WHERE tipset_key_cid = ?", tsCid2.Bytes()).Scan(&reverted) + require.NoError(t, err) + require.True(t, reverted) + + var reverted2 bool + err = si.db.QueryRow("SELECT reverted FROM tipset_message WHERE tipset_key_cid = ?", tsCid1.Bytes()).Scan(&reverted2) + require.NoError(t, err) + require.False(t, reverted2) + + // fetching events fails if excludeReverted is true + f = &EventFilter{ + TipsetCid: tsCid1, + } + ces, err = si.GetEventsForFilter(ctx, f, true) + require.NoError(t, err) + require.Equal(t, 0, len(ces)) + + // works if excludeReverted is false + ces, err = si.GetEventsForFilter(ctx, f, false) + require.NoError(t, err) + require.Equal(t, 2, len(ces)) +} + +func fakeMessage(to, from address.Address) *types.Message { + return &types.Message{ + To: to, + From: from, + Nonce: 197, + Method: 1, + Params: []byte("some random bytes"), + GasLimit: 126723, + GasPremium: types.NewInt(4), + GasFeeCap: types.NewInt(120), + } +} + +func fakeEvent(emitter abi.ActorID, indexed []kv, unindexed []kv) *types.Event { + ev := &types.Event{ + Emitter: emitter, + } + + for _, in := range indexed { + ev.Entries = append(ev.Entries, types.EventEntry{ + Flags: 0x01, + Key: in.k, + Codec: cid.Raw, + Value: in.v, + }) + } + + for _, in := range unindexed { + ev.Entries = append(ev.Entries, types.EventEntry{ + Flags: 0x00, + Key: in.k, + Codec: cid.Raw, + Value: in.v, + }) + } + + return ev +} + +type kv struct { + k string + v []byte +} diff --git a/chain/index/gc_test.go b/chain/index/gc_test.go new file mode 100644 index 00000000000..2be23240c91 --- /dev/null +++ b/chain/index/gc_test.go @@ -0,0 +1,67 @@ +package index + +import ( + "context" + pseudo "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestGC(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + + // head at height 60 + // insert tipsets at heigh 1,10,50. + // retention epochs is 20 + si, _, _ := setupWithHeadIndexed(t, 60, rng) + si.gcRetentionEpochs = 20 + defer func() { _ = si.Close() }() + + tsCid1 := randomCid(t, rng) + tsCid10 := randomCid(t, rng) + tsCid50 := randomCid(t, rng) + + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: tsCid1.Bytes(), + height: 1, + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: tsCid10.Bytes(), + height: 10, + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + + insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: tsCid50.Bytes(), + height: 50, + reverted: false, + messageCid: randomCid(t, rng).Bytes(), + messageIndex: 0, + }) + + si.gc(ctx) + + // tipset at height 1 and 10 should be removed + var count int + err := si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 1").Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count) + + err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 10").Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count) + + // tipset at height 50 should not be removed + err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 50").Scan(&count) + require.NoError(t, err) + require.Equal(t, 1, count) +} diff --git a/chain/index/helpers.go b/chain/index/helpers.go index 6c5294a5ba7..07bf970569f 100644 --- a/chain/index/helpers.go +++ b/chain/index/helpers.go @@ -3,6 +3,7 @@ package index import ( "context" "database/sql" + "errors" "os" ipld "github.com/ipfs/go-ipld-format" @@ -80,6 +81,9 @@ func PopulateFromSnapshot(ctx context.Context, path string, cs ChainStore) error } func toTipsetKeyCidBytes(ts *types.TipSet) ([]byte, error) { + if ts == nil { + return nil, errors.New("failed to get tipset key cid: tipset is nil") + } tsKeyCid, err := ts.Key().Cid() if err != nil { return nil, xerrors.Errorf("failed to get tipset key cid: %w", err) diff --git a/chain/index/indexer.go b/chain/index/indexer.go index 3607ce1126e..d537ab18fdd 100644 --- a/chain/index/indexer.go +++ b/chain/index/indexer.go @@ -22,6 +22,7 @@ var _ Indexer = (*SqliteIndexer)(nil) // IdToRobustAddrFunc is a function type that resolves an actor ID to a robust address type IdToRobustAddrFunc func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) +type eventLoaderFunc func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) type recomputeTipSetStateFunc func(ctx context.Context, ts *types.TipSet) error type preparedStatements struct { @@ -63,8 +64,8 @@ type SqliteIndexer struct { db *sql.DB cs ChainStore - idToRobustAddrFunc IdToRobustAddrFunc - recomputeTipSetStateFunc recomputeTipSetStateFunc + idToRobustAddrFunc IdToRobustAddrFunc + eventLoaderFunc eventLoaderFunc stmts *preparedStatements @@ -144,8 +145,8 @@ func (si *SqliteIndexer) SetIdToRobustAddrFunc(idToRobustAddrFunc IdToRobustAddr si.idToRobustAddrFunc = idToRobustAddrFunc } -func (si *SqliteIndexer) SetRecomputeTipSetStateFunc(recomputeTipSetStateFunc recomputeTipSetStateFunc) { - si.recomputeTipSetStateFunc = recomputeTipSetStateFunc +func (si *SqliteIndexer) SetEventLoaderFunc(eventLoaderFunc eventLoaderFunc) { + si.eventLoaderFunc = eventLoaderFunc } func (si *SqliteIndexer) Close() error { diff --git a/chain/index/indexer_test.go b/chain/index/indexer_test.go new file mode 100644 index 00000000000..4a752772a09 --- /dev/null +++ b/chain/index/indexer_test.go @@ -0,0 +1,54 @@ +package index + +import ( + "context" + "database/sql" + pseudo "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRestoreTipsetIfExists(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + si, _, _ := setupWithHeadIndexed(t, 10, rng) + + tsKeyCid := randomCid(t, rng) + tsKeyCidBytes := tsKeyCid.Bytes() + + err := withTx(ctx, si.db, func(tx *sql.Tx) error { + // tipset does not exist + exists, err := si.restoreTipsetIfExists(ctx, tx, tsKeyCidBytes) + require.NoError(t, err) + require.False(t, exists) + + // insert reverted tipset + _, err = tx.Stmt(si.stmts.insertTipsetMessageStmt).Exec(tsKeyCidBytes, 1, 1, randomCid(t, rng).Bytes(), 0) + require.NoError(t, err) + + // tipset exists and is NOT reverted + exists, err = si.restoreTipsetIfExists(ctx, tx, tsKeyCidBytes) + require.NoError(t, err) + require.True(t, exists) + + // Verify that the tipset is not reverted + var reverted bool + err = tx.QueryRow("SELECT reverted FROM tipset_message WHERE tipset_key_cid = ?", tsKeyCidBytes).Scan(&reverted) + require.NoError(t, err) + require.False(t, reverted, "Tipset should not be reverted") + + return nil + }) + require.NoError(t, err) + + exists, err := si.isTipsetIndexed(ctx, tsKeyCidBytes) + require.NoError(t, err) + require.True(t, exists) + + fc := randomCid(t, rng) + exists, err = si.isTipsetIndexed(ctx, fc.Bytes()) + require.NoError(t, err) + require.False(t, exists) +} diff --git a/chain/index/interface.go b/chain/index/interface.go index 1dd84e7f5d1..76a3831a019 100644 --- a/chain/index/interface.go +++ b/chain/index/interface.go @@ -56,7 +56,8 @@ type Indexer interface { IndexEthTxHash(ctx context.Context, txHash ethtypes.EthHash, c cid.Cid) error SetIdToRobustAddrFunc(idToRobustAddrFunc IdToRobustAddrFunc) - SetRecomputeTipSetStateFunc(recomputeTipSetStateFunc recomputeTipSetStateFunc) + SetEventLoaderFunc(eventLoaderFunc eventLoaderFunc) + Apply(ctx context.Context, from, to *types.TipSet) error Revert(ctx context.Context, from, to *types.TipSet) error diff --git a/chain/index/read_test.go b/chain/index/read_test.go new file mode 100644 index 00000000000..c7a6797d0bf --- /dev/null +++ b/chain/index/read_test.go @@ -0,0 +1,303 @@ +package index + +import ( + "context" + "errors" + pseudo "math/rand" + "sync" + "testing" + "time" + + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + + "github.com/filecoin-project/lotus/chain/actors/adt" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/ethtypes" +) + +func TestGetCidFromHash(t *testing.T) { + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + ctx := context.Background() + + s, _, _ := setupWithHeadIndexed(t, 10, rng) + + ethTxHash := ethtypes.EthHash([32]byte{1}) + msgCid := randomCid(t, rng) + + // read from empty db -> ErrNotFound + c, err := s.GetCidFromHash(ctx, ethTxHash) + require.Error(t, err) + require.True(t, errors.Is(err, ErrNotFound)) + require.EqualValues(t, cid.Undef, c) + + // insert and read + insertEthTxHash(t, s, ethTxHash, msgCid) + c, err = s.GetCidFromHash(ctx, ethTxHash) + require.NoError(t, err) + require.EqualValues(t, msgCid, c) + + // look up some other hash -> fails + c, err = s.GetCidFromHash(ctx, ethtypes.EthHash([32]byte{2})) + require.Error(t, err) + require.True(t, errors.Is(err, ErrNotFound)) + require.EqualValues(t, cid.Undef, c) +} + +func TestGetMsgInfo(t *testing.T) { + ctx := context.Background() + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + s, _, _ := setupWithHeadIndexed(t, 10, rng) + + msgCid := randomCid(t, rng) + + // read from empty db -> ErrNotFound + mi, err := s.GetMsgInfo(ctx, msgCid) + require.Error(t, err) + require.True(t, errors.Is(err, ErrNotFound)) + require.Nil(t, mi) + + msgCidBytes := msgCid.Bytes() + tsKeyCid := randomCid(t, rng) + // insert and read + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: tsKeyCid.Bytes(), + height: uint64(1), + reverted: false, + messageCid: msgCidBytes, + messageIndex: 1, + }) + mi, err = s.GetMsgInfo(ctx, msgCid) + require.NoError(t, err) + require.Equal(t, msgCid, mi.Message) + require.Equal(t, tsKeyCid, mi.TipSet) + require.Equal(t, abi.ChainEpoch(1), mi.Epoch) +} + +func setupWithHeadIndexed(t *testing.T, headHeight abi.ChainEpoch, rng *pseudo.Rand) (*SqliteIndexer, *types.TipSet, *dummyChainStore) { + head := fakeTipSet(t, rng, headHeight, []cid.Cid{}) + d := newDummyChainStore() + d.SetHeaviestTipSet(head) + + s, err := NewSqliteIndexer(":memory:", d, 0, false, 0) + require.NoError(t, err) + insertHead(t, s, head, headHeight) + + return s, head, d +} + +func insertHead(t *testing.T, s *SqliteIndexer, head *types.TipSet, height abi.ChainEpoch) { + headKeyBytes, err := toTipsetKeyCidBytes(head) + require.NoError(t, err) + + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: headKeyBytes, + height: uint64(height), + reverted: false, + messageCid: nil, + messageIndex: -1, + }) +} + +func insertEthTxHash(t *testing.T, s *SqliteIndexer, ethTxHash ethtypes.EthHash, messageCid cid.Cid) { + msgCidBytes := messageCid.Bytes() + + res, err := s.stmts.insertEthTxHashStmt.Exec(ethTxHash.String(), msgCidBytes) + require.NoError(t, err) + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) +} + +type dummyChainStore struct { + mu sync.RWMutex + + heightToTipSet map[abi.ChainEpoch]*types.TipSet + messagesForTipset map[*types.TipSet][]types.ChainMsg + keyToTipSet map[types.TipSetKey]*types.TipSet + + heaviestTipSet *types.TipSet + tipSetByCid func(ctx context.Context, tsKeyCid cid.Cid) (*types.TipSet, error) + messagesForBlock func(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) + actorStore func(ctx context.Context) adt.Store +} + +func newDummyChainStore() *dummyChainStore { + return &dummyChainStore{ + heightToTipSet: make(map[abi.ChainEpoch]*types.TipSet), + messagesForTipset: make(map[*types.TipSet][]types.ChainMsg), + keyToTipSet: make(map[types.TipSetKey]*types.TipSet), + } +} + +func (d *dummyChainStore) MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + msgs, ok := d.messagesForTipset[ts] + if !ok { + return nil, nil + } + return msgs, nil +} + +func (d *dummyChainStore) GetHeaviestTipSet() *types.TipSet { + d.mu.RLock() + defer d.mu.RUnlock() + return d.heaviestTipSet +} + +func (d *dummyChainStore) GetTipSetByCid(ctx context.Context, tsKeyCid cid.Cid) (*types.TipSet, error) { + d.mu.RLock() + defer d.mu.RUnlock() + if d.tipSetByCid != nil { + return d.tipSetByCid(ctx, tsKeyCid) + } + return nil, nil +} + +func (d *dummyChainStore) GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + return d.keyToTipSet[tsk], nil +} + +func (d *dummyChainStore) MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { + d.mu.RLock() + defer d.mu.RUnlock() + if d.messagesForBlock != nil { + return d.messagesForBlock(ctx, b) + } + return nil, nil, nil +} + +func (d *dummyChainStore) ActorStore(ctx context.Context) adt.Store { + d.mu.RLock() + defer d.mu.RUnlock() + if d.actorStore != nil { + return d.actorStore(ctx) + } + return nil +} + +func (d *dummyChainStore) GetTipsetByHeight(ctx context.Context, h abi.ChainEpoch, _ *types.TipSet, prev bool) (*types.TipSet, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + ts, ok := d.heightToTipSet[h] + if !ok { + return nil, errors.New("tipset not found") + } + return ts, nil +} + +func (d *dummyChainStore) IsStoringEvents() bool { + return true +} + +// Setter methods to configure the mock + +func (d *dummyChainStore) SetMessagesForTipset(ts *types.TipSet, msgs []types.ChainMsg) { + d.mu.Lock() + defer d.mu.Unlock() + d.messagesForTipset[ts] = msgs +} + +func (d *dummyChainStore) SetHeaviestTipSet(ts *types.TipSet) { + d.mu.Lock() + defer d.mu.Unlock() + d.heaviestTipSet = ts +} + +func (d *dummyChainStore) SetTipSetByCid(f func(ctx context.Context, tsKeyCid cid.Cid) (*types.TipSet, error)) { + d.mu.Lock() + defer d.mu.Unlock() + d.tipSetByCid = f +} + +func (d *dummyChainStore) SetMessagesForBlock(f func(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)) { + d.mu.Lock() + defer d.mu.Unlock() + d.messagesForBlock = f +} + +func (d *dummyChainStore) SetActorStore(f func(ctx context.Context) adt.Store) { + d.mu.Lock() + defer d.mu.Unlock() + d.actorStore = f +} + +func (d *dummyChainStore) SetTipsetByHeightAndKey(h abi.ChainEpoch, tsk types.TipSetKey, ts *types.TipSet) { + d.mu.Lock() + defer d.mu.Unlock() + + d.heightToTipSet[h] = ts + d.keyToTipSet[tsk] = ts +} + +func randomIDAddr(tb testing.TB, rng *pseudo.Rand) address.Address { + tb.Helper() + addr, err := address.NewIDAddress(uint64(rng.Int63())) + require.NoError(tb, err) + return addr +} + +func randomCid(tb testing.TB, rng *pseudo.Rand) cid.Cid { + tb.Helper() + cb := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY} + c, err := cb.Sum(randomBytes(10, rng)) + require.NoError(tb, err) + return c +} + +func randomBytes(n int, rng *pseudo.Rand) []byte { + buf := make([]byte, n) + rng.Read(buf) + return buf +} + +func fakeTipSet(tb testing.TB, rng *pseudo.Rand, h abi.ChainEpoch, parents []cid.Cid) *types.TipSet { + tb.Helper() + ts, err := types.NewTipSet([]*types.BlockHeader{ + { + Height: h, + Miner: randomIDAddr(tb, rng), + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}}, + + ParentStateRoot: randomCid(tb, rng), + Messages: randomCid(tb, rng), + ParentMessageReceipts: randomCid(tb, rng), + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }, + { + Height: h, + Miner: randomIDAddr(tb, rng), + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}}, + + ParentStateRoot: randomCid(tb, rng), + Messages: randomCid(tb, rng), + ParentMessageReceipts: randomCid(tb, rng), + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }, + }) + + require.NoError(tb, err) + + return ts +} diff --git a/node/modules/chainindex.go b/node/modules/chainindex.go index 250d82873f8..59e2af3c119 100644 --- a/node/modules/chainindex.go +++ b/node/modules/chainindex.go @@ -35,9 +35,8 @@ func ChainIndexer(cfg config.ChainIndexerConfig) func(lc fx.Lifecycle, mctx help return nil, err } - // TODO Implement config driven auto-backfilling - chainIndexer, err := index.NewSqliteIndexer(filepath.Join(chainIndexPath, index.DefaultDbFilename), - cs, cfg.GCRetentionEpochs, cfg.ReconcileEmptyIndex, cfg.MaxReconcileTipsets) + dbPath := filepath.Join(chainIndexPath, index.DefaultDbFilename) + chainIndexer, err := index.NewSqliteIndexer(dbPath, cs, cfg.GCRetentionEpochs, cfg.ReconcileEmptyIndex, cfg.MaxReconcileTipsets) if err != nil { return nil, err } @@ -72,11 +71,13 @@ func InitChainIndexer(lc fx.Lifecycle, mctx helpers.MetricsCtx, indexer index.In return *actor.DelegatedAddress, true }) - indexer.SetRecomputeTipSetStateFunc(func(ctx context.Context, ts *types.TipSet) error { + eventLoaderFunc := index.MakeLoadExecutedMessages(func(ctx context.Context, ts *types.TipSet) error { _, _, err := sm.RecomputeTipSetState(ctx, ts) return err }) + indexer.SetEventLoaderFunc(eventLoaderFunc) + ch, err := mp.Updates(ctx) if err != nil { return err