From fecd58d07503f520cdb3326b47187571cdf6b477 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 6 Feb 2024 16:01:07 +1100 Subject: [PATCH 1/2] events: index actor events by actor ID rather than addr Ref: https://github.com/filecoin-project/lotus/issues/11594 --- chain/events/filter/event.go | 157 ++++++++++++++++++---------- chain/events/filter/event_test.go | 17 ++- chain/events/filter/index.go | 84 ++++++++------- chain/events/filter/index_test.go | 59 +++++++++-- node/impl/full/actor_events_test.go | 2 +- node/modules/actorevent.go | 40 ++++--- 6 files changed, 240 insertions(+), 119 deletions(-) diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index 1669d840eec..5cf94a25e1d 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -3,13 +3,13 @@ package filter import ( "bytes" "context" + "fmt" "math" "sync" "time" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" - "golang.org/x/xerrors" "github.com/filecoin-project/go-address" amt4 "github.com/filecoin-project/go-amt-ipld/v4" @@ -20,6 +20,10 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +type ActorResolver func(ctx context.Context, addr address.Address, ts *types.TipSet) (abi.ActorID, error) +type AddressResolver func(ctx context.Context, actor abi.ActorID, ts *types.TipSet) (address.Address, bool) +type TipsetResolver func(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) + func isIndexedValue(b uint8) bool { // currently we mark the full entry as indexed if either the key // or the value are indexed; in the future we will need finer-grained @@ -27,13 +31,11 @@ func isIndexedValue(b uint8) bool { return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 } -type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool) - type EventFilter interface { Filter TakeCollectedEvents(context.Context) []*CollectedEvent - CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error + CollectEvents(context.Context, *TipSetEvents, bool, ActorResolver, AddressResolver) error } type eventFilter struct { @@ -82,39 +84,62 @@ func (f *eventFilter) ClearSubChannel() { f.ch = nil } -func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error { +func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, actorResolver ActorResolver, addressResolver AddressResolver) error { if !f.matchTipset(te) { return nil } - // cache of lookups between actor id and f4 address - addressLookups := make(map[abi.ActorID]address.Address) - ems, err := te.messages(ctx) + filterByActorsMap := make(map[abi.ActorID]address.Address, len(f.addresses)) + for _, addr := range f.addresses { + actor, err := actorResolver(ctx, addr, te.rctTs) + if err != nil { + // not an address we will be able to match against + continue + } + filterByActorsMap[actor] = addr + } + if len(f.addresses) > 0 && len(filterByActorsMap) == 0 { + // none of the addresses we were asked to match against were found in the tipset + log.Warnf("no actors found for addresses specified by filter in this tipset") + return nil + } + + actorsCache := make(map[abi.ActorID]address.Address) + if err != nil { - return xerrors.Errorf("load executed messages: %w", err) + return fmt.Errorf("load executed messages: %w", err) } for msgIdx, em := range ems { for evIdx, ev := range em.Events() { - // lookup address corresponding to the actor id - addr, found := addressLookups[ev.Emitter] - if !found { - var ok bool - addr, ok = resolver(ctx, ev.Emitter, te.rctTs) - if !ok { - // not an address we will be able to match against + if !f.matchKeys(ev.Entries) { + continue + } + addr := address.Undef + // first look in our filter for the actor + if len(filterByActorsMap) > 0 { + if a, ok := filterByActorsMap[ev.Emitter]; !ok { continue + } else if ok && a.Protocol() != address.ID { + // should be OK to use this as the returned address + addr = a } - addressLookups[ev.Emitter] = addr } - - if !f.matchAddress(addr) { - continue - } - if !f.matchKeys(ev.Entries) { - continue + if addr == address.Undef { + // look in the cache for the mapping or resolve it + var ok bool + if addr, ok = actorsCache[ev.Emitter]; !ok { + if addr, ok = addressResolver(ctx, ev.Emitter, te.rctTs); !ok { + // can't resolve it, so last resort: create an ID address from the actor ID + addr, err = address.NewIDAddress(uint64(ev.Emitter)) + if err != nil { + log.Warnf("failed to create ID address from actor ID: %s", err) + continue + } + } + actorsCache[ev.Emitter] = addr + } } - // event matches filter, so record it cev := &CollectedEvent{ Entries: ev.Entries, @@ -188,23 +213,8 @@ func (f *eventFilter) matchTipset(te *TipSetEvents) bool { return true } -func (f *eventFilter) matchAddress(o address.Address) bool { - if len(f.addresses) == 0 { - return true - } - - // Assume short lists of addresses - // TODO: binary search for longer lists or restrict list length - for _, a := range f.addresses { - if a == o { - return true - } - } - return false -} - func (f *eventFilter) matchKeys(ees []types.EventEntry) bool { - if len(f.keysWithCodec) == 0 { + if len(f.keysWithCodec) == 0 { // no keys specified, so match all return true } // TODO: optimize this naive algorithm @@ -301,10 +311,13 @@ func (e *executedMessage) Events() []*types.Event { type EventFilterManager struct { ChainStore *cstore.ChainStore - AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) MaxFilterResults int EventIndex *EventIndex + ActorResolver ActorResolver + AddressResolver AddressResolver + TipsetResolver TipsetResolver + mu sync.Mutex // guards mutations to filters filters map[types.FilterID]EventFilter currentHeight abi.ChainEpoch @@ -326,14 +339,14 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) } if m.EventIndex != nil { - if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { + if err := m.EventIndex.CollectEvents(ctx, tse, false); err != nil { return err } } // TODO: could run this loop in parallel with errgroup if there are many filters for _, f := range m.filters { - if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { + if err := f.CollectEvents(ctx, tse, false, m.ActorResolver, m.AddressResolver); err != nil { return err } } @@ -357,14 +370,14 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) } if m.EventIndex != nil { - if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { + if err := m.EventIndex.CollectEvents(ctx, tse, true); err != nil { return err } } // TODO: could run this loop in parallel with errgroup if there are many filters for _, f := range m.filters { - if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { + if err := f.CollectEvents(ctx, tse, true, m.ActorResolver, m.AddressResolver); err != nil { return err } } @@ -372,19 +385,27 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) return nil } -func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, - keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) { +func (m *EventFilterManager) Install( + ctx context.Context, + minHeight, + maxHeight abi.ChainEpoch, + tipsetCid cid.Cid, + addresses []address.Address, + keysWithCodec map[string][]types.ActorEventBlock, + excludeReverted bool, +) (EventFilter, error) { + m.mu.Lock() currentHeight := m.currentHeight m.mu.Unlock() if m.EventIndex == nil && minHeight != -1 && minHeight < currentHeight { - return nil, xerrors.Errorf("historic event index disabled") + return nil, fmt.Errorf("historic event index disabled") } id, err := newFilterID() if err != nil { - return nil, xerrors.Errorf("new filter id: %w", err) + return nil, fmt.Errorf("new filter id: %w", err) } f := &eventFilter{ @@ -399,7 +420,27 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight { // Filter needs historic events - if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil { + + // filterActors is actors at the _current_ tipset, which we'll use for + // historical addresses; which should be OK as long as we're not looking at + // reverted events. + filterActors := make([]abi.ActorID, 0, len(addresses)) + for _, addr := range addresses { + actor, err := m.ActorResolver(ctx, addr, nil) + if err != nil { + // not an address we will be able to match against + continue + } + filterActors = append(filterActors, actor) + } + if len(addresses) > 0 { + if len(filterActors) == 0 { + // none of the addresses we were asked to match against were found in the tipset + return nil, fmt.Errorf("no actors found for addresses") + } + excludeReverted = true // we can't match against reverted events + } + if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted, filterActors, m.AddressResolver, m.TipsetResolver); err != nil { return nil, err } } @@ -427,18 +468,18 @@ func (m *EventFilterManager) Remove(ctx context.Context, id types.FilterID) erro func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { msgs, err := m.ChainStore.MessagesForTipset(ctx, msgTs) if err != nil { - return nil, xerrors.Errorf("read messages: %w", err) + return nil, fmt.Errorf("read messages: %w", err) } st := m.ChainStore.ActorStore(ctx) arr, err := blockadt.AsArray(st, rctTs.Blocks()[0].ParentMessageReceipts) if err != nil { - return nil, xerrors.Errorf("load receipts amt: %w", err) + return nil, fmt.Errorf("load receipts amt: %w", err) } if uint64(len(msgs)) != arr.Length() { - return nil, xerrors.Errorf("mismatching message and receipt counts (%d msgs, %d rcts)", len(msgs), arr.Length()) + return nil, fmt.Errorf("mismatching message and receipt counts (%d msgs, %d rcts)", len(msgs), arr.Length()) } ems := make([]executedMessage, len(msgs)) @@ -449,10 +490,10 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc var rct types.MessageReceipt found, err := arr.Get(uint64(i), &rct) if err != nil { - return nil, xerrors.Errorf("load receipt: %w", err) + return nil, fmt.Errorf("load receipt: %w", err) } if !found { - return nil, xerrors.Errorf("receipt %d not found", i) + return nil, fmt.Errorf("receipt %d not found", i) } ems[i].rct = &rct @@ -462,14 +503,14 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc evtArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(types.EventAMTBitwidth)) if err != nil { - return nil, xerrors.Errorf("load events amt: %w", err) + return nil, fmt.Errorf("load events amt: %w", err) } ems[i].evs = make([]*types.Event, evtArr.Len()) var evt types.Event err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error { if u > math.MaxInt { - return xerrors.Errorf("too many events") + return fmt.Errorf("too many events") } if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil { return err @@ -481,7 +522,7 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc }) if err != nil { - return nil, xerrors.Errorf("read events: %w", err) + return nil, fmt.Errorf("read events: %w", err) } } diff --git a/chain/events/filter/event_test.go b/chain/events/filter/event_test.go index c650b71eb6f..24a88a9412b 100644 --- a/chain/events/filter/event_test.go +++ b/chain/events/filter/event_test.go @@ -2,6 +2,7 @@ package filter import ( "context" + "fmt" pseudo "math/rand" "testing" @@ -276,7 +277,7 @@ func TestEventFilterCollectEvents(t *testing.T) { for _, tc := range testCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := tc.filter.CollectEvents(context.Background(), tc.te, false, addrMap.ResolveAddress); err != nil { + if err := tc.filter.CollectEvents(context.Background(), tc.te, false, addrMap.ResolveActor, addrMap.ResolveAddress); err != nil { require.NoError(t, err, "collect events") } @@ -444,3 +445,17 @@ func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts ra, ok := a[emitter] return ra, ok } + +func (a addressMap) ResolveActor(ctx context.Context, emitter address.Address, ts *types.TipSet) (abi.ActorID, error) { + for id, addr := range a { + if addr == emitter { + return id, nil + } + } + fmt.Println("Have:") + for id, addr := range a { + fmt.Printf("%d -> %s\n", id, addr) + } + fmt.Println("Want:", emitter) + return 0, fmt.Errorf("address not found") +} diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index d3dd1a08587..ce5683376c6 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -38,7 +38,7 @@ var ddls = []string{ height INTEGER NOT NULL, tipset_key BLOB NOT NULL, tipset_key_cid BLOB NOT NULL, - emitter_addr BLOB NOT NULL, + emitter INTEGER NOT NULL, event_index INTEGER NOT NULL, message_cid BLOB NOT NULL, message_index INTEGER NOT NULL, @@ -72,11 +72,11 @@ var ( const ( schemaVersion = 2 - eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` + eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter=? AND event_index=? AND message_cid=? AND message_index=?` + insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` - restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter=? AND event_index=? AND message_cid=? AND message_index=?` ) type EventIndex struct { @@ -118,6 +118,21 @@ func (ei *EventIndex) initStatements() (err error) { return nil } +/* +func (ei *EventIndex) migrateToVersion3(ctx context.Context, chainStore *store.ChainStore) error { + now := time.Now() + + tx, err := ei.db.Begin() + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + // rollback the transaction (a no-op if the transaction was already committed) + defer tx.Rollback() //nolint:errcheck + + q, err := ei.db.Query("SELECT id, height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted FROM event ORDER BY height, tipset_key_cid ASC, height ASC, event_index ASC, message_index ASC, id DESC") +} +*/ + func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.ChainStore) error { now := time.Now() @@ -343,7 +358,7 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } -func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { +func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool) error { tx, err := ei.db.Begin() if err != nil { return xerrors.Errorf("begin transaction: %w", err) @@ -366,9 +381,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return nil } - // cache of lookups between actor id and f4 address - addressLookups := make(map[abi.ActorID]address.Address) - ems, err := te.messages(ctx) if err != nil { return xerrors.Errorf("load executed messages: %w", err) @@ -378,17 +390,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // don't exist, otherwise mark them as not reverted for msgIdx, em := range ems { for evIdx, ev := range em.Events() { - addr, found := addressLookups[ev.Emitter] - if !found { - var ok bool - addr, ok = resolver(ctx, ev.Emitter, te.rctTs) - if !ok { - // not an address we will be able to match against - continue - } - addressLookups[ev.Emitter] = addr - } - tsKeyCid, err := te.msgTs.Key().Cid() if err != nil { return xerrors.Errorf("tipset key cid: %w", err) @@ -400,7 +401,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid - addr.Bytes(), // emitter_addr + ev.Emitter, // emitter evIdx, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index @@ -415,7 +416,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid - addr.Bytes(), // emitter_addr + ev.Emitter, // emitter evIdx, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index @@ -450,7 +451,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid - addr.Bytes(), // emitter_addr + ev.Emitter, // emitter evIdx, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index @@ -481,7 +482,15 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } // PrefillFilter fills a filter's collection of events from the historic index -func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error { +func (ei *EventIndex) prefillFilter( + ctx context.Context, + f *eventFilter, + excludeReverted bool, + filterActors []abi.ActorID, + addressResolver AddressResolver, + tipsetResolver TipsetResolver, +) error { + clauses := []string{} values := []any{} joins := []string{} @@ -505,11 +514,11 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude values = append(values, false) } - if len(f.addresses) > 0 { + if len(filterActors) > 0 { subclauses := []string{} - for _, addr := range f.addresses { - subclauses = append(subclauses, "emitter_addr=?") - values = append(values, addr.Bytes()) + for actor := range filterActors { + subclauses = append(subclauses, "emitter=?") + values = append(values, actor) } clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") } @@ -538,7 +547,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude event.height, event.tipset_key, event.tipset_key_cid, - event.emitter_addr, + event.emitter, event.event_index, event.message_cid, event.message_index, @@ -588,7 +597,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude height uint64 tipsetKey []byte tipsetKeyCid []byte - emitterAddr []byte + emitter uint64 eventIndex int messageCid []byte messageIndex int @@ -604,7 +613,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude &row.height, &row.tipsetKey, &row.tipsetKeyCid, - &row.emitterAddr, + &row.emitter, &row.eventIndex, &row.messageCid, &row.messageIndex, @@ -628,8 +637,8 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude break } } - currentID = row.id + ce = &CollectedEvent{ EventIdx: row.eventIndex, Reverted: row.reverted, @@ -637,9 +646,9 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude MsgIdx: row.messageIndex, } - ce.EmitterAddr, err = address.NewFromBytes(row.emitterAddr) + ce.MsgCid, err = cid.Cast(row.messageCid) if err != nil { - return xerrors.Errorf("parse emitter addr: %w", err) + return xerrors.Errorf("parse message cid: %w", err) } ce.TipSetKey, err = types.TipSetKeyFromBytes(row.tipsetKey) @@ -647,9 +656,14 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude return xerrors.Errorf("parse tipsetkey: %w", err) } - ce.MsgCid, err = cid.Cast(row.messageCid) + ts, err := tipsetResolver(ctx, ce.TipSetKey) // chainstore lookup has a cache so this should be ok for repeat calls if err != nil { - return xerrors.Errorf("parse message cid: %w", err) + return xerrors.Errorf("resolve tipset: %w", err) + } + if addr, _ := addressResolver(ctx, abi.ActorID(row.emitter), ts); addr != address.Undef { + ce.EmitterAddr = addr + } else { + log.Warnf("could not resolve address of actor %d at height %d", row.emitter, row.height) } } diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index ce3f7b78a03..2afc839172d 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -50,6 +50,11 @@ func TestEventIndexPrefillFilter(t *testing.T) { cid14000, err := events14000.msgTs.Key().Cid() require.NoError(t, err, "tipset cid") + tipsetResolver := func(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + require.Equal(t, events14000.msgTs.Key(), tsk) + return events14000.msgTs, nil + } + noCollectedEvents := []*CollectedEvent{} oneCollectedEvent := []*CollectedEvent{ { @@ -76,7 +81,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") - if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { + if err := ei.CollectEvents(context.Background(), events14000, false); err != nil { require.NoError(t, err, "collect events") } @@ -141,7 +146,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { addresses: []address.Address{a1}, }, te: events14000, - want: oneCollectedEvent, + want: noCollectedEvents, // can't search history for reverted events with addresses }, { name: "match one entry", @@ -272,7 +277,13 @@ func TestEventIndexPrefillFilter(t *testing.T) { for _, tc := range testCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil { + filterActors := make([]abi.ActorID, 0) + for _, addr := range tc.filter.addresses { + a, err := addrMap.ResolveActor(context.Background(), addr, nil) + require.NoError(t, err) + filterActors = append(filterActors, a) + } + if err := ei.prefillFilter(context.Background(), tc.filter, false, filterActors, addrMap.ResolveAddress, tipsetResolver); err != nil { require.NoError(t, err, "prefill filter events") } @@ -290,10 +301,12 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { a1ID := abi.ActorID(1) a2ID := abi.ActorID(2) + a3ID := abi.ActorID(3) addrMap := addressMap{} addrMap.add(a1ID, a1) addrMap.add(a2ID, a2) + addrMap.add(a3ID, a3) ev1 := fakeEvent( a1ID, @@ -337,6 +350,18 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { reveredCID14000, err := revertedEvents14000.msgTs.Key().Cid() require.NoError(t, err, "tipset cid") + tipsetResolver := func(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + switch tsk { + case events14000.msgTs.Key(): + return events14000.msgTs, nil + case revertedEvents14000.msgTs.Key(): + return revertedEvents14000.msgTs, nil + default: + require.FailNow(t, "unexpected tipset key") + } + return nil, nil + } + noCollectedEvents := []*CollectedEvent{} oneCollectedEvent := []*CollectedEvent{ { @@ -397,13 +422,13 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") - if err := ei.CollectEvents(context.Background(), revertedEvents14000, false, addrMap.ResolveAddress); err != nil { + if err := ei.CollectEvents(context.Background(), revertedEvents14000, false); err != nil { require.NoError(t, err, "collect reverted events") } - if err := ei.CollectEvents(context.Background(), revertedEvents14000, true, addrMap.ResolveAddress); err != nil { + if err := ei.CollectEvents(context.Background(), revertedEvents14000, true); err != nil { require.NoError(t, err, "revert reverted events") } - if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { + if err := ei.CollectEvents(context.Background(), events14000, false); err != nil { require.NoError(t, err, "collect events") } @@ -478,7 +503,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { addresses: []address.Address{a2}, }, te: revertedEvents14000, - want: oneCollectedRevertedEvent, + want: noCollectedEvents, // can't search history for reverted events with addresses }, { name: "match address 1", @@ -488,7 +513,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { addresses: []address.Address{a1}, }, te: events14000, - want: oneCollectedEvent, + want: noCollectedEvents, // can't search history for reverted events with addresses }, { name: "match one entry", @@ -728,7 +753,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { addresses: []address.Address{a1}, }, te: events14000, - want: oneCollectedEvent, + want: noCollectedEvents, // can't search history for reverted events with addresses }, { name: "match one entry", @@ -876,7 +901,13 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { for _, tc := range inclusiveTestCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := ei.prefillFilter(context.Background(), tc.filter, false); err != nil { + filterActors := make([]abi.ActorID, 0) + for _, addr := range tc.filter.addresses { + a, err := addrMap.ResolveActor(context.Background(), addr, nil) + require.NoError(t, err) + filterActors = append(filterActors, a) + } + if err := ei.prefillFilter(context.Background(), tc.filter, false, filterActors, addrMap.ResolveAddress, tipsetResolver); err != nil { require.NoError(t, err, "prefill filter events") } @@ -888,7 +919,13 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { for _, tc := range exclusiveTestCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := ei.prefillFilter(context.Background(), tc.filter, true); err != nil { + filterActors := make([]abi.ActorID, 0) + for _, addr := range tc.filter.addresses { + a, err := addrMap.ResolveActor(context.Background(), addr, nil) + require.NoError(t, err) + filterActors = append(filterActors, a) + } + if err := ei.prefillFilter(context.Background(), tc.filter, true, filterActors, addrMap.ResolveAddress, tipsetResolver); err != nil { require.NoError(t, err, "prefill filter events") } diff --git a/node/impl/full/actor_events_test.go b/node/impl/full/actor_events_test.go index ab446e57b4a..b946001469f 100644 --- a/node/impl/full/actor_events_test.go +++ b/node/impl/full/actor_events_test.go @@ -609,7 +609,7 @@ func (m *mockFilter) TakeCollectedEvents(context.Context) []*filter.CollectedEve return e } -func (m *mockFilter) CollectEvents(context.Context, *filter.TipSetEvents, bool, filter.AddressResolver) error { +func (m *mockFilter) CollectEvents(context.Context, *filter.TipSetEvents, bool, filter.ActorResolver, filter.AddressResolver) error { m.t.Fatalf("unexpected call to CollectEvents") return nil } diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 135a34e5be7..15f60cbe5fb 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -126,25 +126,39 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc } fm := &filter.EventFilterManager{ - ChainStore: cs, - EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true - // TODO: - // We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands - AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { - idAddr, err := address.NewIDAddress(uint64(emitter)) + ChainStore: cs, + EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true + MaxFilterResults: cfg.Events.MaxFilterResults, + ActorResolver: func(ctx context.Context, emitter address.Address, ts *types.TipSet) (abi.ActorID, error) { + var addr address.Address + if emitter.Protocol() == address.ID { + addr = emitter // already an ID address + } else { + var err error + addr, err = sm.LookupRobustAddress(ctx, emitter, ts) + if err != nil { + return 0, err + } + } + actor, err := address.IDFromAddress(addr) + if err != nil { + return 0, err + } + return abi.ActorID(actor), nil + }, + AddressResolver: func(ctx context.Context, actor abi.ActorID, ts *types.TipSet) (address.Address, bool) { + idAddr, err := address.NewIDAddress(uint64(actor)) if err != nil { return address.Undef, false } - actor, err := sm.LoadActor(ctx, idAddr, ts) - if err != nil || actor.Address == nil { - return idAddr, true + actorAddr, err := sm.LoadActor(ctx, idAddr, ts) + if err != nil || actorAddr.Address == nil { + return idAddr, false } - - return *actor.Address, true + return *actorAddr.Address, true }, - - MaxFilterResults: cfg.Events.MaxFilterResults, + TipsetResolver: cs.LoadTipSet, } lc.Append(fx.Hook{ From 8c5a11a8d25169638d46cc22cb98f996326f8f2d Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 8 Feb 2024 11:50:31 +1100 Subject: [PATCH 2/2] WIP: events(test): add test file --- chain/events/filter/index_migration_test.go | 73 +++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 chain/events/filter/index_migration_test.go diff --git a/chain/events/filter/index_migration_test.go b/chain/events/filter/index_migration_test.go new file mode 100644 index 00000000000..040e1c7832f --- /dev/null +++ b/chain/events/filter/index_migration_test.go @@ -0,0 +1,73 @@ +package filter_test + +import ( + "database/sql" + "path" + "strings" + "testing" + + _ "github.com/mattn/go-sqlite3" + "github.com/stretchr/testify/require" +) + +const V2_DUMP = ` +PRAGMA foreign_keys=OFF; +BEGIN TRANSACTION; +CREATE TABLE event ( + id INTEGER PRIMARY KEY, + height INTEGER NOT NULL, + tipset_key BLOB NOT NULL, + tipset_key_cid BLOB NOT NULL, + emitter_addr BLOB NOT NULL, + event_index INTEGER NOT NULL, + message_cid BLOB NOT NULL, + message_index INTEGER NOT NULL, + reverted INTEGER NOT NULL + ); +INSERT INTO event VALUES(1,20,X'0171a0e40220440557380ab22c576118b11040c301ceb52e3712cd46b93c021576af3cc3f8bd',X'0171a0e402205eb0d6f509d97c2bc765a13189cd1a4dd2fbf9023563a4e7ecb183e5e2ae3602',X'040a555a21c730eb2ae1159120bd780e15f2c634ed24',0,X'0171a0e4022023b08253a1d39f0306ad0a905ef8553489ff5a94e82562f395da2e3d8b01eb85',0,0); +INSERT INTO event VALUES(2,25,X'0171a0e40220beeff892e675374f22d51bf5c32afe6f4f6d2e68a8812a81dad9d8f0c239e516',X'0171a0e4022088f2d6e01dd9c0d138c2626319639e388fbe284d96e162ad7329e15b3a1898c1',X'040a555a21c730eb2ae1159120bd780e15f2c634ed24',0,X'0171a0e40220f7bf7aa42b9e3a6c17d237968d6c62b3ba5bc8c8cd3fac239c5dd39a3d4ab40e',0,0); +INSERT INTO event VALUES(3,30,X'0171a0e40220cdd19768ce4e471561e78605bdfeb82e2e693604bed401596db3fd00d946bdf1',X'0171a0e40220c042a4795aa1f1e9dfa6e2cff8a5081ccd90b04e7636d269f421a0509c0edd78',X'040a555a21c730eb2ae1159120bd780e15f2c634ed24',0,X'0171a0e40220d79bb23187fcced749ed15065865989057d3ba5cc815fbb6ce7a98ea778d17e5',0,0); +CREATE TABLE event_entry ( + event_id INTEGER, + indexed INTEGER NOT NULL, + flags BLOB NOT NULL, + key TEXT NOT NULL, + codec INTEGER, + value BLOB NOT NULL + ); +INSERT INTO event_entry VALUES(1,1,X'03','d',85,X'1122334455667788'); +INSERT INTO event_entry VALUES(3,1,X'03','t1',85,X'0000000000000000000000000000000000000000000000000000000000001111'); +INSERT INTO event_entry VALUES(3,1,X'03','t2',85,X'0000000000000000000000000000000000000000000000000000000000002222'); +INSERT INTO event_entry VALUES(3,1,X'03','t3',85,X'0000000000000000000000000000000000000000000000000000000000003333'); +INSERT INTO event_entry VALUES(3,1,X'03','t4',85,X'0000000000000000000000000000000000000000000000000000000000004444'); +INSERT INTO event_entry VALUES(3,1,X'03','d',85,X'1122334455667788'); +CREATE TABLE _meta ( + version UINT64 NOT NULL UNIQUE + ); +INSERT INTO _meta VALUES(1); +INSERT INTO _meta VALUES(2); +CREATE INDEX height_tipset_key ON event (height,tipset_key); +COMMIT; +` + +func TestV2ToV3Migration(t *testing.T) { + req := require.New(t) + dir := t.TempDir() + + // Connect to SQLite DB + db, err := sql.Open("sqlite3", path.Join(dir, "new_database.db")) + req.NoError(err) + defer db.Close() + + // Execute the commands from the SQLite dump + commands := strings.Split(V2_DUMP, ";") + for _, command := range commands { + command = strings.TrimSpace(command) + if command != "" { + _, err := db.Exec(command) + req.NoError(err) + } + } + + t.Log("Database created and populated successfully.") +}