diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index bd989680421..ac16967f400 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -370,7 +370,7 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) } func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, - keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) { + keysWithCodec map[string][]types.ActorEventBlock, _ bool) (EventFilter, error) { m.mu.Lock() if m.currentHeight == 0 { // sync in progress, we haven't had an Apply @@ -402,7 +402,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a } if m.EventIndex != nil && requiresHistoricEvents { - if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil { + if err := m.EventIndex.prefillFilter(ctx, f); err != nil { return nil, xerrors.Errorf("pre-fill historic events: %w", err) } } diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index b19162ddd4e..4f49f4efbcf 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -92,7 +92,7 @@ const ( revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter=? AND event_index=? AND message_cid=? AND message_index=?` - createIndexEventEmitter = `CREATE INDEX IF NOT EXISTS event_emitter ON event (emitter)` + createIndexEventEmitter = `CREATE INDEX IF NOT EXISTS event_emitter ON event (emitter)` createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);` createIndexEventHeight = `CREATE INDEX IF NOT EXISTS event_height ON event (height);` createIndexEventReverted = `CREATE INDEX IF NOT EXISTS event_reverted ON event (reverted);` @@ -295,11 +295,12 @@ func (ei *EventIndex) migrateToVersion3(ctx context.Context) error { } defer func() { _ = tx.Rollback() }() - // create index on event.emitter_addr. - _, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)") - if err != nil { - return xerrors.Errorf("create index event_emitter_addr: %w", err) - } + // The original work on schema v3 included an index on, then, emitter_addr. + // Successive work to index events by emitter actor ID instead required the index on the column + // to be recreated on a new column called emitter. Therefore, the index creation is done as part of + // migrateToVersion4. + // + // For further context, see: https://github.com/filecoin-project/lotus/pull/11723#discussion_r1526295815 // original v3 migration introduced an index: // CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key) @@ -370,14 +371,11 @@ func (ei *EventIndex) migrateToVersion4(ctx context.Context) error { return xerrors.Errorf("commit transaction: %w", err) } - ei.vacuumDBAndCheckpointWAL(ctx) - log.Infof("Successfully migrated event index from version 3 to version 4 in %s", time.Since(now)) return nil } - -// migrateToVersion4 migrates the schema from version 3 to version 4: indexing events by emitter actor ID. +// migrateToVersion5 migrates the schema from version 3 to version 4: indexing events by emitter actor ID. // This migration replaces the emitter_addr column in event table with a new column called `emitter`, which stores // the emitter's actor ID. func (ei *EventIndex) migrateToVersion5(ctx context.Context, chainStore *store.ChainStore) error { @@ -505,7 +503,7 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context, chainStore *store.C } // Increment the schema version in _meta table to 4. - if _, err = tx.Exec("INSERT OR IGNORE INTO _meta (version) VALUES (4)"); err != nil { + if _, err = tx.Exec("INSERT OR IGNORE INTO _meta (version) VALUES (5)"); err != nil { return xerrors.Errorf("increment _meta version: %w", err) } @@ -515,7 +513,7 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context, chainStore *store.C ei.vacuumDBAndCheckpointWAL(ctx) - log.Infof("successfully migrated event index from version 3 to version 4 in %s", time.Since(now)) + log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now)) return nil } @@ -756,7 +754,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } // prefillFilter fills a filter's collection of events from the historic index. -func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error { +func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter) error { var ( clauses, joins []string values []any @@ -792,14 +790,12 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude values = append(values, emitter) } clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") - // Explicitly exclude reverted events, since at least one emitter is present and reverts cannot be considered. - excludeReverted = true } - if excludeReverted { - clauses = append(clauses, "event.reverted=?") - values = append(values, false) - } + // Always exclude reverted events when prefilling. See: + // - https://github.com/filecoin-project/lotus/issues/11770 + clauses = append(clauses, "event.reverted=?") + values = append(values, false) if len(f.keysWithCodec) > 0 { join := 0 diff --git a/chain/events/filter/index_migration_test.go b/chain/events/filter/index_migration_test.go index e9028d5c71d..9c3afef961b 100644 --- a/chain/events/filter/index_migration_test.go +++ b/chain/events/filter/index_migration_test.go @@ -108,10 +108,10 @@ func TestMigration_V3ToV4Sample1(t *testing.T) { require.NoError(t, err) tree, err := state.NewStateTree(cst, network) require.NoError(t, err) - //for _, addr := range addrs { - // _, err = tree.RegisterNewAddress(addr) - // require.NoError(t, err) - //} + for _, addr := range addrs { + _, err = tree.RegisterNewAddress(addr) + require.NoError(t, err) + } _, err = tree.Flush(ctx) require.NoError(t, err) diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index 9eb7a2b7a7b..e218f437bcb 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -286,7 +286,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { for _, tc := range testCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil { + if err := ei.prefillFilter(context.Background(), tc.filter); err != nil { require.NoError(t, err, "prefill filter events") } @@ -681,14 +681,6 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { te: events14000, want: noCollectedEvents, }, - } - - exclusiveTestCases := []struct { - name string - filter *eventFilter - te *TipSetEvents - want []*CollectedEvent - }{ { name: "nomatch tipset min height", filter: &eventFilter{ @@ -929,19 +921,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { for _, tc := range inclusiveTestCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil { - require.NoError(t, err, "prefill filter events") - } - - coll := tc.filter.TakeCollectedEvents(context.Background()) - require.ElementsMatch(t, coll, tc.want, tc.name) - }) - } - - for _, tc := range exclusiveTestCases { - tc := tc // appease lint - t.Run(tc.name, func(t *testing.T) { - if err := ei.prefillFilter(context.Background(), tc.filter, true); err != nil { + if err := ei.prefillFilter(context.Background(), tc.filter); err != nil { require.NoError(t, err, "prefill filter events") } diff --git a/chain/state/statetree_test.go b/chain/state/statetree_test.go index b1790e63420..9a221751a75 100644 --- a/chain/state/statetree_test.go +++ b/chain/state/statetree_test.go @@ -97,7 +97,7 @@ func TestResolveCache(t *testing.T) { nonId := address.NewForTestGetter()() id, _ := address.NewIDAddress(1000) - st.LookupIDFun = func(a address.Address) (address.Address, error) { + st.lookupIDFun = func(a address.Address) (address.Address, error) { if a == nonId { return id, nil } diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 3b1d53ac3b9..d88d7dfd154 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -587,21 +587,3 @@ func (sm *StateManager) GetRandomnessDigestFromTickets(ctx context.Context, rand return r.GetChainRandomness(ctx, randEpoch) } - -func (sm *StateManager) LookupActorID(ctx context.Context, addr address.Address, ts *types.TipSet) (abi.ActorID, error) { - var idAddr address.Address - if addr.Protocol() == address.ID { - idAddr = addr // already an ID address - } else { - var err error - addr, err = sm.LookupID(ctx, addr, ts) - if err != nil { - return 0, xerrors.Errorf("state manager lookup id: %w", err) - } - } - actor, err := address.IDFromAddress(idAddr) - if err != nil { - return 0, xerrors.Errorf("resolve actor id: id from addr: %w", err) - } - return abi.ActorID(actor), nil -} diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 41067834844..f9286a32137 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -103,7 +103,7 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L resolverCacheExpiry = time.Minute resolverCacheNilTipSet = false ) - actorResolver := filter.NewCachedActorResolver(sm.LookupActorID, resolverCacheSize, resolverCacheExpiry, resolverCacheNilTipSet) + actorResolver := filter.NewCachedActorResolver(sm.LookupID, resolverCacheSize, resolverCacheExpiry, resolverCacheNilTipSet) // Enable indexing of actor events var eventIndex *filter.EventIndex