diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index 1669d840eec..6f376c3f736 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -27,21 +27,20 @@ func isIndexedValue(b uint8) bool { return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 } -type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool) - type EventFilter interface { Filter TakeCollectedEvents(context.Context) []*CollectedEvent - CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error + CollectEvents(context.Context, *TipSetEvents, bool) error } type eventFilter struct { - id types.FilterID - minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum - maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum - tipsetCid cid.Cid - addresses []address.Address // list of actor addresses that are extpected to emit the event + actorResolver ActorResolver + id types.FilterID + minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum + maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum + tipsetCid cid.Cid + addresses []address.Address // list of actor addresses that are expected to emit the event keysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match maxResults int // maximum number of results to collect, 0 is unlimited @@ -49,27 +48,27 @@ type eventFilter struct { mu sync.Mutex collected []*CollectedEvent lastTaken time.Time - ch chan<- interface{} + ch chan<- any } var _ Filter = (*eventFilter)(nil) type CollectedEvent struct { - Entries []types.EventEntry - EmitterAddr address.Address // address of emitter - EventIdx int // index of the event within the list of emitted events - Reverted bool - Height abi.ChainEpoch - TipSetKey types.TipSetKey // tipset that contained the message - MsgIdx int // index of the message in the tipset - MsgCid cid.Cid // cid of message that produced event + Entries []types.EventEntry + Emitter abi.ActorID // address of emitter + EventIdx int // index of the event within the list of emitted events + Reverted bool + Height abi.ChainEpoch + TipSetKey types.TipSetKey // tipset that contained the message + MsgIdx int // index of the message in the tipset + MsgCid cid.Cid // cid of message that produced event } func (f *eventFilter) ID() types.FilterID { return f.id } -func (f *eventFilter) SetSubChannel(ch chan<- interface{}) { +func (f *eventFilter) SetSubChannel(ch chan<- any) { f.mu.Lock() defer f.mu.Unlock() f.ch = ch @@ -82,34 +81,27 @@ func (f *eventFilter) ClearSubChannel() { f.ch = nil } -func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error { +func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool) error { if !f.matchTipset(te) { return nil } - // cache of lookups between actor id and f4 address - addressLookups := make(map[abi.ActorID]address.Address) - + // Resolve filter addresses to actor IDs at the given event TipSet. + emitters, hasAddrs := f.emitters(ctx, te.rctTs) ems, err := te.messages(ctx) if err != nil { return xerrors.Errorf("load executed messages: %w", err) } for msgIdx, em := range ems { for evIdx, ev := range em.Events() { - // lookup address corresponding to the actor id - addr, found := addressLookups[ev.Emitter] - if !found { - var ok bool - addr, ok = resolver(ctx, ev.Emitter, te.rctTs) - if !ok { - // not an address we will be able to match against + if hasAddrs { + // The filter has at least one address. Therefore, perform a match against + // the emitters even when it may be empty. + // An empty emitters map means the event filter applies to events from actors + // that may have not been deployed yet, e.g. smart contracts. + if _, match := emitters[ev.Emitter]; !match { continue } - addressLookups[ev.Emitter] = addr - } - - if !f.matchAddress(addr) { - continue } if !f.matchKeys(ev.Entries) { continue @@ -117,14 +109,14 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever // event matches filter, so record it cev := &CollectedEvent{ - Entries: ev.Entries, - EmitterAddr: addr, - EventIdx: evIdx, - Reverted: revert, - Height: te.msgTs.Height(), - TipSetKey: te.msgTs.Key(), - MsgCid: em.Message().Cid(), - MsgIdx: msgIdx, + Entries: ev.Entries, + Emitter: ev.Emitter, + EventIdx: evIdx, + Reverted: revert, + Height: te.msgTs.Height(), + TipSetKey: te.msgTs.Key(), + MsgCid: em.Message().Cid(), + MsgIdx: msgIdx, } f.mu.Lock() @@ -153,7 +145,7 @@ func (f *eventFilter) setCollectedEvents(ces []*CollectedEvent) { f.mu.Unlock() } -func (f *eventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent { +func (f *eventFilter) TakeCollectedEvents(context.Context) []*CollectedEvent { f.mu.Lock() collected := f.collected f.collected = nil @@ -188,19 +180,24 @@ func (f *eventFilter) matchTipset(te *TipSetEvents) bool { return true } -func (f *eventFilter) matchAddress(o address.Address) bool { +// emitters gets the emitter actor IDs that correspond to this filter`s addresses at the given TipSet. +// The returned bool value indicates whether this eventFilter has any addresses or not. +// This is to cover a case where the filter should match emitter actor IDs that may have not been deployed yet, +// i.e. there is at least one address that could not be resolved into its corresponding actor ID. +func (f *eventFilter) emitters(ctx context.Context, ts *types.TipSet) (map[abi.ActorID]struct{}, bool) { if len(f.addresses) == 0 { - return true + return nil, false } - - // Assume short lists of addresses - // TODO: binary search for longer lists or restrict list length - for _, a := range f.addresses { - if a == o { - return true + emitters := make(map[abi.ActorID]struct{}) + for _, addr := range f.addresses { + emitter, err := f.actorResolver(ctx, addr, ts) + if err != nil { + // Cannot match against addr; skip. + continue } + emitters[emitter] = struct{}{} } - return false + return emitters, true } func (f *eventFilter) matchKeys(ees []types.EventEntry) bool { @@ -301,11 +298,11 @@ func (e *executedMessage) Events() []*types.Event { type EventFilterManager struct { ChainStore *cstore.ChainStore - AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) + ActorResolver ActorResolver MaxFilterResults int EventIndex *EventIndex - mu sync.Mutex // guards mutations to filters + mu sync.Mutex // guards mutations to filters and currentHeight filters map[types.FilterID]EventFilter currentHeight abi.ChainEpoch } @@ -326,14 +323,14 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) } if m.EventIndex != nil { - if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { + if err := m.EventIndex.CollectEvents(ctx, tse, false); err != nil { return err } } // TODO: could run this loop in parallel with errgroup if there are many filters for _, f := range m.filters { - if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { + if err := f.CollectEvents(ctx, tse, false); err != nil { return err } } @@ -357,14 +354,14 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) } if m.EventIndex != nil { - if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { + if err := m.EventIndex.CollectEvents(ctx, tse, true); err != nil { return err } } // TODO: could run this loop in parallel with errgroup if there are many filters for _, f := range m.filters { - if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { + if err := f.CollectEvents(ctx, tse, true); err != nil { return err } } @@ -378,7 +375,9 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a currentHeight := m.currentHeight m.mu.Unlock() - if m.EventIndex == nil && minHeight != -1 && minHeight < currentHeight { + requiresHistoricEvents := minHeight != -1 && minHeight < currentHeight + + if m.EventIndex == nil && requiresHistoricEvents { return nil, xerrors.Errorf("historic event index disabled") } @@ -395,12 +394,12 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a addresses: addresses, keysWithCodec: keysWithCodec, maxResults: m.MaxFilterResults, + actorResolver: m.ActorResolver, } - if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight { - // Filter needs historic events + if m.EventIndex != nil && requiresHistoricEvents { if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil { - return nil, err + return nil, xerrors.Errorf("pre-fill historic events: %w", err) } } @@ -414,7 +413,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a return f, nil } -func (m *EventFilterManager) Remove(ctx context.Context, id types.FilterID) error { +func (m *EventFilterManager) Remove(_ context.Context, id types.FilterID) error { m.mu.Lock() defer m.mu.Unlock() if _, found := m.filters[id]; !found { @@ -483,8 +482,6 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc if err != nil { return nil, xerrors.Errorf("read events: %w", err) } - } - return ems, nil } diff --git a/chain/events/filter/event_test.go b/chain/events/filter/event_test.go index c650b71eb6f..a7970e16ba9 100644 --- a/chain/events/filter/event_test.go +++ b/chain/events/filter/event_test.go @@ -1,7 +1,9 @@ package filter import ( + "bytes" "context" + "fmt" pseudo "math/rand" "testing" @@ -73,14 +75,14 @@ func TestEventFilterCollectEvents(t *testing.T) { noCollectedEvents := []*CollectedEvent{} oneCollectedEvent := []*CollectedEvent{ { - Entries: ev1.Entries, - EmitterAddr: a1, - EventIdx: 0, - Reverted: false, - Height: 14000, - TipSetKey: events14000.msgTs.Key(), - MsgIdx: 0, - MsgCid: em.msg.Cid(), + Entries: ev1.Entries, + Emitter: a1ID, + EventIdx: 0, + Reverted: false, + Height: 14000, + TipSetKey: events14000.msgTs.Key(), + MsgIdx: 0, + MsgCid: em.msg.Cid(), }, } @@ -93,8 +95,9 @@ func TestEventFilterCollectEvents(t *testing.T) { { name: "nomatch tipset min height", filter: &eventFilter{ - minHeight: 14001, - maxHeight: -1, + minHeight: 14001, + maxHeight: -1, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -102,8 +105,9 @@ func TestEventFilterCollectEvents(t *testing.T) { { name: "nomatch tipset max height", filter: &eventFilter{ - minHeight: -1, - maxHeight: 13999, + minHeight: -1, + maxHeight: 13999, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -111,8 +115,9 @@ func TestEventFilterCollectEvents(t *testing.T) { { name: "match tipset min height", filter: &eventFilter{ - minHeight: 14000, - maxHeight: -1, + minHeight: 14000, + maxHeight: -1, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -120,9 +125,10 @@ func TestEventFilterCollectEvents(t *testing.T) { { name: "match tipset cid", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - tipsetCid: cid14000, + minHeight: -1, + maxHeight: -1, + tipsetCid: cid14000, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -130,9 +136,10 @@ func TestEventFilterCollectEvents(t *testing.T) { { name: "nomatch address", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a2}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a2}, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -140,9 +147,10 @@ func TestEventFilterCollectEvents(t *testing.T) { { name: "match address", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a1}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a1}, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -157,6 +165,7 @@ func TestEventFilterCollectEvents(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -173,6 +182,7 @@ func TestEventFilterCollectEvents(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -188,6 +198,7 @@ func TestEventFilterCollectEvents(t *testing.T) { []byte("propose"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -202,6 +213,7 @@ func TestEventFilterCollectEvents(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -219,6 +231,7 @@ func TestEventFilterCollectEvents(t *testing.T) { []byte("addr1"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -236,6 +249,7 @@ func TestEventFilterCollectEvents(t *testing.T) { []byte("addr1"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -253,6 +267,7 @@ func TestEventFilterCollectEvents(t *testing.T) { []byte("addr2"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -267,6 +282,7 @@ func TestEventFilterCollectEvents(t *testing.T) { []byte("2988181"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -276,7 +292,7 @@ func TestEventFilterCollectEvents(t *testing.T) { for _, tc := range testCases { tc := tc // appease lint t.Run(tc.name, func(t *testing.T) { - if err := tc.filter.CollectEvents(context.Background(), tc.te, false, addrMap.ResolveAddress); err != nil { + if err := tc.filter.CollectEvents(context.Background(), tc.te, false); err != nil { require.NoError(t, err, "collect events") } @@ -440,7 +456,11 @@ func (a addressMap) add(actorID abi.ActorID, addr address.Address) { a[actorID] = addr } -func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { - ra, ok := a[emitter] - return ra, ok +func (a addressMap) ResolveActor(_ context.Context, target address.Address, _ *types.TipSet) (abi.ActorID, error) { + for actor, addr := range a { + if bytes.Equal(target.Bytes(), addr.Bytes()) { + return actor, nil + } + } + return 0, fmt.Errorf("no actor for address: %s", target) } diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 4d9538c089e..ee247eb055a 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -10,13 +10,17 @@ import ( "time" "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" _ "github.com/mattn/go-sqlite3" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + _init "github.com/filecoin-project/lotus/chain/actors/builtin/init" + "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) @@ -38,7 +42,7 @@ var ddls = []string{ height INTEGER NOT NULL, tipset_key BLOB NOT NULL, tipset_key_cid BLOB NOT NULL, - emitter_addr BLOB NOT NULL, + emitter INTEGER NOT NULL, event_index INTEGER NOT NULL, message_cid BLOB NOT NULL, message_index INTEGER NOT NULL, @@ -46,7 +50,7 @@ var ddls = []string{ )`, createIndexEventHeightTipsetKey, - createIndexEventEmitterAddr, + createIndexEventEmitter, `CREATE TABLE IF NOT EXISTS event_entry ( event_id INTEGER, @@ -67,6 +71,7 @@ var ddls = []string{ `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, `INSERT OR IGNORE INTO _meta (version) VALUES (2)`, `INSERT OR IGNORE INTO _meta (version) VALUES (3)`, + `INSERT OR IGNORE INTO _meta (version) VALUES (4)`, } var ( @@ -74,16 +79,16 @@ var ( ) const ( - schemaVersion = 3 + schemaVersion = 4 - eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` + eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter=? AND event_index=? AND message_cid=? AND message_index=?` + insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` - restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter=? AND event_index=? AND message_cid=? AND message_index=?` createIndexEventHeightTipsetKey = `CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key)` - createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` + createIndexEventEmitter = `CREATE INDEX IF NOT EXISTS event_emitter ON event (emitter)` createIndexEventEntryKey = `CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)` ) @@ -266,11 +271,19 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C return xerrors.Errorf("commit transaction: %w", err) } - // during the migration, we have likely increased the WAL size a lot, so lets do some + ei.vacuumDBAndCheckpointWAL(ctx) + + log.Infof("successfully migrated event index from version 1 to version 2 in %s", time.Since(now)) + + return nil +} + +func (ei *EventIndex) vacuumDBAndCheckpointWAL(ctx context.Context) { + // During the large migrations, we have likely increased the WAL size a lot, so lets do some // simple DB administration to free up space (VACUUM followed by truncating the WAL file) - // as this would be a good time to do it when no other writes are happening + // as this would be a good time to do it when no other writes are happening. log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration") - _, err = ei.db.ExecContext(ctx, "VACUUM") + _, err := ei.db.ExecContext(ctx, "VACUUM") if err != nil { log.Warnf("error vacuuming database: %s", err) } @@ -278,10 +291,6 @@ func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.C if err != nil { log.Warnf("error checkpointing wal: %s", err) } - - log.Infof("Successfully migrated events to version 2 in %s", time.Since(now)) - - return nil } // migrateToVersion3 migrates the schema from version 2 to version 3 by creating two indices: @@ -296,7 +305,7 @@ func (ei *EventIndex) migrateToVersion3(ctx context.Context) error { defer func() { _ = tx.Rollback() }() // create index on event.emitter_addr. - _, err = tx.ExecContext(ctx, createIndexEventEmitterAddr) + _, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)") if err != nil { return xerrors.Errorf("create index event_emitter_addr: %w", err) } @@ -317,7 +326,149 @@ func (ei *EventIndex) migrateToVersion3(ctx context.Context) error { if err != nil { return xerrors.Errorf("commit transaction: %w", err) } - log.Infof("Successfully migrated events to version 3 in %s", time.Since(now)) + log.Infof("successfully migrated event index from version 2 to version 3 in %s", time.Since(now)) + return nil +} + +// migrateToVersion4 migrates the schema from version 3 to version 4: indexing events by emitter actor ID. +// This migration replaces the emitter_addr column in event table with a new column called `emitter`, which stores +// the emitter's actor ID. +func (ei *EventIndex) migrateToVersion4(ctx context.Context, chainStore *store.ChainStore) error { + now := time.Now() + + // Load address map from init actor once in order to resolve emitter addresses to IDs during migration. + head := chainStore.GetHeaviestTipSet() + cst := cbor.NewCborStore(chainStore.StateBlockstore()) + var resolveActorID func(addr address.Address) (abi.ActorID, error) + if head != nil { + tree, err := state.LoadStateTree(cst, head.ParentState()) + if err != nil { + return xerrors.Errorf("load state tree: %w", err) + } + initActor, err := tree.GetActor(_init.Address) + if err != nil { + return xerrors.Errorf("get init actor: %w", err) + } + initActorState, err := _init.Load(chainStore.ActorStore(ctx), initActor) + if err != nil { + return xerrors.Errorf("load init actor: %w", err) + } + addressMap, err := initActorState.AddressMap() + if err != nil { + return xerrors.Errorf("load address map: %w", err) + } + resolveActorID = func(addr address.Address) (abi.ActorID, error) { + if addr.Protocol() == address.ID { + id, err := address.IDFromAddress(addr) + if err != nil { + return 0, xerrors.Errorf("id from addr: %w", err) + } + return abi.ActorID(id), nil + } + var actorID cbg.CborInt + switch found, err := addressMap.Get(abi.AddrKey(addr), &actorID); { + case err != nil: + return 0, xerrors.Errorf("get from address map: %w", err) + case !found: + return 0, types.ErrActorNotFound + default: + return abi.ActorID(uint64(actorID)), nil + } + } + } else { + // Head must be the genesis block; we cannot resolve anything. + resolveActorID = func(address.Address) (abi.ActorID, error) { + return 0, types.ErrActorNotFound + } + } + + tx, err := ei.db.BeginTx(ctx, nil) + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + // Rollback the transaction (a no-op if the transaction was already committed) + defer func() { _ = tx.Rollback() }() + + // Alter the event table to add a new column called `emitter` + // Note: since `emitter` column does not accept NULL values, set the default to 0 so that the table + // can be altered. This means after the migration all the reverted events for which address resolution + // cannot be performed will end up with 0 as their emitter actor ID. + if _, err = tx.Exec("ALTER TABLE event ADD COLUMN emitter INTEGER NOT NULL DEFAULT 0"); err != nil { + return xerrors.Errorf("increment _meta version: %w", err) + } + + stmtUpdateEmitterByID, err := tx.Prepare("UPDATE event SET emitter=? WHERE id=?") + if err != nil { + return xerrors.Errorf("prepare stmtUpdateEmitterByID: %w", err) + } + + rows, err := tx.QueryContext(ctx, `SELECT id, emitter_addr FROM event WHERE reverted=false ORDER BY id DESC`) + if err != nil { + return xerrors.Errorf("select event emitter_addrs: %w", err) + } + + for rows.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + var row struct { + id int64 + emitterAddr []byte + } + if err := rows.Scan(&row.id, &row.emitterAddr); err != nil { + return xerrors.Errorf("read event emitter_addr row: %w", err) + } + addr, err := address.NewFromBytes(row.emitterAddr) + if err != nil { + return xerrors.Errorf("parse emitter_addr: %w", err) + } + emitter, err := resolveActorID(addr) + if err != nil { + return xerrors.Errorf("resolve emitter from addr: %w", err) + } + updateRes, err := stmtUpdateEmitterByID.ExecContext(ctx, uint64(emitter), row.id) + if err != nil { + return xerrors.Errorf("resolve emitter from addr: %w", err) + } + affected, err := updateRes.RowsAffected() + if err != nil { + return xerrors.Errorf("rows affected: %w", err) + } + if affected != 1 { + log.Warnw("expected exactly one row to be affected as a result of emitter update", "affected", affected) + } + } + + // Delete event.emitter_addr index introduced in version 3 + if _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS event_emitter_addr"); err != nil { + return xerrors.Errorf("drop event_emitter_addr index: %w", err) + } + + // Delete the redundant emitter_addr column. + if _, err = tx.ExecContext(ctx, "ALTER TABLE event DROP COLUMN emitter_addr"); err != nil { + return xerrors.Errorf("drop event.emitter_addr column: %w", err) + } + + // Create a new index index fo event.emitter, replacing the deleted event.emitter_addr. + if _, err = tx.ExecContext(ctx, createIndexEventEmitter); err != nil { + return xerrors.Errorf("create event_emitter index: %w", err) + } + + // Increment the schema version in _meta table to 4. + if _, err = tx.Exec("INSERT OR IGNORE INTO _meta (version) VALUES (4)"); err != nil { + return xerrors.Errorf("increment _meta version: %w", err) + } + + if err = tx.Commit(); err != nil { + return xerrors.Errorf("commit transaction: %w", err) + } + + ei.vacuumDBAndCheckpointWAL(ctx) + + log.Infof("successfully migrated event index from version 3 to version 4 in %s", time.Since(now)) return nil } @@ -362,7 +513,7 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor err = eventIndex.migrateToVersion2(ctx, chainStore) if err != nil { _ = db.Close() - return nil, xerrors.Errorf("could not migrate sql data to version 2: %w", err) + return nil, xerrors.Errorf("could not migrate event index schema from version 1 to version 2: %w", err) } version = 2 } @@ -377,6 +528,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor version = 3 } + if version == 3 { + log.Infof("upgrading event index from version 3 to version 4") + err = eventIndex.migrateToVersion4(ctx, chainStore) + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("could not migrate sql data from version 2 to version 3: %w", err) + } + version = 4 + } + if version != schemaVersion { _ = db.Close() return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) @@ -399,7 +560,7 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } -func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { +func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool) error { tx, err := ei.db.Begin() if err != nil { return xerrors.Errorf("begin transaction: %w", err) @@ -422,9 +583,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return nil } - // cache of lookups between actor id and f4 address - addressLookups := make(map[abi.ActorID]address.Address) - ems, err := te.messages(ctx) if err != nil { return xerrors.Errorf("load executed messages: %w", err) @@ -434,17 +592,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // don't exist, otherwise mark them as not reverted for msgIdx, em := range ems { for evIdx, ev := range em.Events() { - addr, found := addressLookups[ev.Emitter] - if !found { - var ok bool - addr, ok = resolver(ctx, ev.Emitter, te.rctTs) - if !ok { - // not an address we will be able to match against - continue - } - addressLookups[ev.Emitter] = addr - } - tsKeyCid, err := te.msgTs.Key().Cid() if err != nil { return xerrors.Errorf("tipset key cid: %w", err) @@ -456,7 +603,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid - addr.Bytes(), // emitter_addr + ev.Emitter, // emitter evIdx, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index @@ -471,7 +618,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid - addr.Bytes(), // emitter_addr + ev.Emitter, // emitter evIdx, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index @@ -506,7 +653,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever te.msgTs.Height(), // height te.msgTs.Key().Bytes(), // tipset_key tsKeyCid.Bytes(), // tipset_key_cid - addr.Bytes(), // emitter_addr + ev.Emitter, // emitter evIdx, // event_index em.Message().Cid().Bytes(), // message_cid msgIdx, // message_index @@ -536,11 +683,12 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return nil } -// PrefillFilter fills a filter's collection of events from the historic index +// prefillFilter fills a filter's collection of events from the historic index. func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error { - clauses := []string{} - values := []any{} - joins := []string{} + var ( + clauses, joins []string + values []any + ) if f.tipsetCid != cid.Undef { clauses = append(clauses, "event.tipset_key_cid=?") @@ -556,20 +704,31 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude } } + // Resolve emitters from filter addresses at the latest TipSet. + // TODO investigate if it is safe to use the current tipset; what about finality? + emitters, hasAddrs := f.emitters(ctx, nil) + if hasAddrs { + // The filter has at least one address. + if len(emitters) == 0 { + // Length of emitters is zero, meaning none of the addresses in the event filter + // were resolvable, and therefore there is no event that would match the fillter. + return nil + } + subclauses := make([]string, 0, len(emitters)) + for emitter := range emitters { + subclauses = append(subclauses, "emitter=?") + values = append(values, emitter) + } + clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") + // Explicitly exclude reverted events, since at least one emitter is present and reverts cannot be considered. + excludeReverted = true + } + if excludeReverted { clauses = append(clauses, "event.reverted=?") values = append(values, false) } - if len(f.addresses) > 0 { - subclauses := make([]string, 0, len(f.addresses)) - for _, addr := range f.addresses { - subclauses = append(subclauses, "emitter_addr=?") - values = append(values, addr.Bytes()) - } - clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") - } - if len(f.keysWithCodec) > 0 { join := 0 for key, vals := range f.keysWithCodec { @@ -594,7 +753,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude event.height, event.tipset_key, event.tipset_key_cid, - event.emitter_addr, + event.emitter, event.event_index, event.message_cid, event.message_index, @@ -644,7 +803,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude height uint64 tipsetKey []byte tipsetKeyCid []byte - emitterAddr []byte + emitter uint64 eventIndex int messageCid []byte messageIndex int @@ -660,7 +819,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude &row.height, &row.tipsetKey, &row.tipsetKeyCid, - &row.emitterAddr, + &row.emitter, &row.eventIndex, &row.messageCid, &row.messageIndex, @@ -687,17 +846,13 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude currentID = row.id ce = &CollectedEvent{ + Emitter: abi.ActorID(row.emitter), EventIdx: row.eventIndex, Reverted: row.reverted, Height: abi.ChainEpoch(row.height), MsgIdx: row.messageIndex, } - ce.EmitterAddr, err = address.NewFromBytes(row.emitterAddr) - if err != nil { - return xerrors.Errorf("parse emitter addr: %w", err) - } - ce.TipSetKey, err = types.TipSetKeyFromBytes(row.tipsetKey) if err != nil { return xerrors.Errorf("parse tipsetkey: %w", err) diff --git a/chain/events/filter/index_migration_test.go b/chain/events/filter/index_migration_test.go new file mode 100644 index 00000000000..e9028d5c71d --- /dev/null +++ b/chain/events/filter/index_migration_test.go @@ -0,0 +1,300 @@ +package filter + +import ( + "context" + "database/sql" + _ "embed" + "io" + pseudo "math/rand" + "os" + "path" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + syncds "github.com/ipfs/go-datastore/sync" + cbor "github.com/ipfs/go-ipld-cbor" + _ "github.com/mattn/go-sqlite3" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/consensus/filcns" + "github.com/filecoin-project/lotus/chain/state" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" +) + +//go:embed testdata/events_db_v3_calibnet_sample1.sql +var eventsDBVersion3Sample1 string + +type ( + v3EventRow struct { + id int64 + height uint64 + tipsetKey []byte + tipsetKeyCid []byte + emitterAddr []byte + eventIndex int + messageCid []byte + messageIndex int + reverted bool + } + v4EventRow struct { + id int64 + height uint64 + tipsetKey []byte + tipsetKeyCid []byte + emitter uint64 + eventIndex int + messageCid []byte + messageIndex int + reverted bool + } + eventEntry struct { + eventId int64 + indexed int64 + flags []byte + key string + codec uint64 + value []byte + } +) + +func TestMigration_V3ToV4Sample1(t *testing.T) { + const ( + seed = 4585623 + timeout = 20 * time.Second + ) + + rng := pseudo.New(pseudo.NewSource(seed)) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + t.Cleanup(cancel) + + // Populate the sample data and assert expected schema version. + testDir := t.TempDir() + v3DbPath := path.Join(testDir, "events.db") + v3Db, err := sql.Open("sqlite3", v3DbPath) + require.NoError(t, err) + requireSqlExec(ctx, t, v3Db, eventsDBVersion3Sample1) + requireSchemaVersion(t, v3Db, 3) + + addrsMap := map[address.Address]struct{}{} + var addrs []address.Address + for row := range listV3Events(ctx, t, v3Db) { + addr, err := address.NewFromBytes(row.emitterAddr) + require.NoError(t, err) + if _, exists := addrsMap[addr]; !exists { + addrsMap[addr] = struct{}{} + addrs = append(addrs, addr) + } + } + + require.NoError(t, v3Db.Close()) + + // Copy the database file for migration, retaining the original for test comparison. + migratedDbPath := path.Join(testDir, "migrated-events.db") + requireFileCopy(t, v3DbPath, migratedDbPath) + + // Assert that event index can instantiate successfully, which includes successful migration process. + nbs := blockstore.NewMemorySync() + cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), filcns.Weight, nil) + cst := cbor.NewCborStore(cs.StateBlockstore()) + network, err := state.VersionForNetwork(build.TestNetworkVersion) + require.NoError(t, err) + tree, err := state.NewStateTree(cst, network) + require.NoError(t, err) + //for _, addr := range addrs { + // _, err = tree.RegisterNewAddress(addr) + // require.NoError(t, err) + //} + _, err = tree.Flush(ctx) + require.NoError(t, err) + + set := fakeTipSet(t, rng, abi.ChainEpoch(14000), []cid.Cid{}) + require.NoError(t, cs.SetHead(ctx, set)) + //policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1) + //policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048)) + //policy.SetMinVerifiedDealSize(abi.NewStoragePower(256)) + //generator, err := gen.NewGenerator() + //require.NoError(t, err) + ////_, err = generator.NextTipSet() + ////require.NoError(t, err) + //manager, err := stmgr.NewStateManager(generator.ChainStore(), nil, nil, filcns.DefaultUpgradeSchedule(), generator.BeaconSchedule(), nil, index.DummyMsgIndex) + //generator.SetStateManager(manager) + //generator.ChainStore(). + //for i := 0; i < 100; i++ { + // _, err = generator.NextTipSetFromMiners(generator.CurTipset.TipSet(), addrs, 0) + // require.NoError(t, err) + //} + + ar := newRandomActorResolver(rng) + subject, err := NewEventIndex(ctx, migratedDbPath, cs) + require.NoError(t, err) + require.NoError(t, subject.Close()) + + // Assert schema version 4 in migrated DB path. + v4Db, err := sql.Open("sqlite3", migratedDbPath) + require.NoError(t, err) + requireSchemaVersion(t, v4Db, 4) + t.Cleanup(func() { require.NoError(t, v4Db.Close()) }) + + // Open the original sample DB + v3Db, err = sql.Open("sqlite3", v3DbPath) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, v3Db.Close()) }) + + // Assert row counts are equal across tables. + require.Equal(t, countTableRows(ctx, t, v3Db, "event_entry"), countTableRows(ctx, t, v4Db, "event_entry")) + require.Equal(t, countTableRows(ctx, t, v3Db, "event"), countTableRows(ctx, t, v4Db, "event")) + + // Assert each event maps to its expected row. + for v3Event := range listV3Events(ctx, t, v3Db) { + v4Event := getV4EventByID(ctx, t, v4Db, v3Event.id) + + require.Equal(t, v3Event.id, v4Event.id) + require.Equal(t, v3Event.height, v4Event.height) + require.Equal(t, v3Event.tipsetKey, v4Event.tipsetKey) + require.Equal(t, v3Event.tipsetKeyCid, v4Event.tipsetKeyCid) + require.Equal(t, v3Event.eventIndex, v4Event.eventIndex) + require.Equal(t, v3Event.messageCid, v4Event.messageCid) + require.Equal(t, v3Event.messageIndex, v4Event.messageIndex) + require.Equal(t, v3Event.reverted, v4Event.reverted) + + // Assert that emitter actor ID matches the resolved actor ID. + v3EmitterAddr, err := address.NewFromBytes(v3Event.emitterAddr) + require.NoError(t, err) + wantEmitter, err := ar(ctx, v3EmitterAddr, nil) + require.NoError(t, err) + require.EqualValues(t, wantEmitter, v4Event.emitter) + } + + // Assert event entries are unchanged. + for wantEntry := range listEventEntries(ctx, t, v3Db) { + requireEntryExists(ctx, t, v4Db, wantEntry) + } +} + +func requireFileCopy(t *testing.T, source string, destination string) { + t.Helper() + srcFile, err := os.Open(source) + require.NoError(t, err) + destFile, err := os.Create(destination) + require.NoError(t, err) + _, err = io.Copy(destFile, srcFile) + require.NoError(t, err) + require.NoError(t, destFile.Sync()) + require.NoError(t, destFile.Close()) + require.NoError(t, srcFile.Close()) +} + +func requireSchemaVersion(t *testing.T, db *sql.DB, want int) { + t.Helper() + var got int + require.NoError(t, db.QueryRow(`SELECT max(version) FROM _meta`).Scan(&got)) + require.Equal(t, want, got) +} + +func listV3Events(ctx context.Context, t *testing.T, db *sql.DB) <-chan v3EventRow { + t.Helper() + rows, err := db.QueryContext(ctx, `SELECT id, height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted FROM event`) + require.NoError(t, err) + events := make(chan v3EventRow) + go func() { + defer func() { + close(events) + _ = rows.Close() + }() + for rows.Next() { + var event v3EventRow + require.NoError(t, + rows.Scan(&event.id, &event.height, &event.tipsetKey, &event.tipsetKeyCid, &event.emitterAddr, &event.eventIndex, &event.messageCid, &event.messageIndex, &event.reverted)) + select { + case <-ctx.Done(): + case events <- event: + } + } + }() + return events +} + +func listEventEntries(ctx context.Context, t *testing.T, db *sql.DB) <-chan eventEntry { + t.Helper() + rows, err := db.QueryContext(ctx, `SELECT event_id, indexed, flags, key, codec, value FROM event_entry`) + require.NoError(t, err) + entries := make(chan eventEntry) + go func() { + defer func() { + close(entries) + _ = rows.Close() + }() + for rows.Next() { + var entry eventEntry + require.NoError(t, + rows.Scan(&entry.eventId, &entry.indexed, &entry.flags, &entry.key, &entry.codec, &entry.value)) + select { + case <-ctx.Done(): + case entries <- entry: + } + } + }() + return entries +} + +func countTableRows(ctx context.Context, t *testing.T, db *sql.DB, table string) int64 { + t.Helper() + var count int64 + require.NoError(t, db.QueryRowContext(ctx, `SELECT count(*) FROM `+table).Scan(&count)) + return count +} + +func getV4EventByID(ctx context.Context, t *testing.T, db *sql.DB, id int64) v4EventRow { + t.Helper() + row := db.QueryRowContext(ctx, `SELECT id, height, tipset_key, tipset_key_cid, emitter, event_index, message_cid, message_index, reverted FROM event WHERE id=?`, id) + require.NoError(t, row.Err()) + var event v4EventRow + require.NoError(t, + row.Scan(&event.id, &event.height, &event.tipsetKey, &event.tipsetKeyCid, &event.emitter, &event.eventIndex, &event.messageCid, &event.messageIndex, &event.reverted)) + return event +} + +func requireEntryExists(ctx context.Context, t *testing.T, db *sql.DB, entry eventEntry) { + t.Helper() + row := db.QueryRowContext(ctx, + `SELECT + event_id, indexed, flags, key, codec, value FROM event_entry + WHERE + event_id=? AND indexed=? AND flags=? AND key=? AND codec=? AND value=?`, + entry.eventId, entry.indexed, entry.flags, entry.key, entry.codec, entry.value) + require.NoError(t, row.Err()) + var got eventEntry + require.NoError(t, row.Scan(&got.eventId, &got.indexed, &got.flags, &got.key, &got.codec, &got.value)) + require.Equal(t, entry, got) +} + +func requireSqlExec(ctx context.Context, t *testing.T, db *sql.DB, stmt string) { + t.Helper() + result, err := db.ExecContext(ctx, stmt) + require.NoError(t, err) + affected, err := result.RowsAffected() + require.NoError(t, err) + require.NotZero(t, affected) +} + +func newRandomActorResolver(rng *pseudo.Rand) ActorResolver { + // Configure the cache to effectively never expire. + // This results in a sticky random actor resolution, which is good enough for testing. + const ( + size = 1000 + expiry = 1000 * time.Hour + cacheNilTipSet = true + ) + return NewCachedActorResolver( + func(context.Context, address.Address, *types.TipSet) (abi.ActorID, error) { + return abi.ActorID(rng.Int63()), nil + }, size, expiry, cacheNilTipSet) +} diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index ce3f7b78a03..9eb7a2b7a7b 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -50,17 +50,17 @@ func TestEventIndexPrefillFilter(t *testing.T) { cid14000, err := events14000.msgTs.Key().Cid() require.NoError(t, err, "tipset cid") - noCollectedEvents := []*CollectedEvent{} + var noCollectedEvents []*CollectedEvent oneCollectedEvent := []*CollectedEvent{ { - Entries: ev1.Entries, - EmitterAddr: a1, - EventIdx: 0, - Reverted: false, - Height: 14000, - TipSetKey: events14000.msgTs.Key(), - MsgIdx: 0, - MsgCid: em.msg.Cid(), + Entries: ev1.Entries, + Emitter: a1ID, + EventIdx: 0, + Reverted: false, + Height: 14000, + TipSetKey: events14000.msgTs.Key(), + MsgIdx: 0, + MsgCid: em.msg.Cid(), }, } @@ -76,7 +76,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") - if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { + if err := ei.CollectEvents(context.Background(), events14000, false); err != nil { require.NoError(t, err, "collect events") } @@ -89,8 +89,9 @@ func TestEventIndexPrefillFilter(t *testing.T) { { name: "nomatch tipset min height", filter: &eventFilter{ - minHeight: 14001, - maxHeight: -1, + minHeight: 14001, + maxHeight: -1, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -98,8 +99,9 @@ func TestEventIndexPrefillFilter(t *testing.T) { { name: "nomatch tipset max height", filter: &eventFilter{ - minHeight: -1, - maxHeight: 13999, + minHeight: -1, + maxHeight: 13999, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -107,8 +109,9 @@ func TestEventIndexPrefillFilter(t *testing.T) { { name: "match tipset min height", filter: &eventFilter{ - minHeight: 14000, - maxHeight: -1, + minHeight: 14000, + maxHeight: -1, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -116,9 +119,10 @@ func TestEventIndexPrefillFilter(t *testing.T) { { name: "match tipset cid", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - tipsetCid: cid14000, + minHeight: -1, + maxHeight: -1, + tipsetCid: cid14000, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -126,9 +130,10 @@ func TestEventIndexPrefillFilter(t *testing.T) { { name: "nomatch address", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a2}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a2}, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -136,9 +141,10 @@ func TestEventIndexPrefillFilter(t *testing.T) { { name: "match address", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a1}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a1}, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -153,6 +159,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -169,6 +176,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -184,6 +192,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { []byte("propose"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -198,6 +207,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -215,6 +225,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { []byte("addr1"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -232,6 +243,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { []byte("addr1"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -249,6 +261,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { []byte("addr2"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -263,6 +276,7 @@ func TestEventIndexPrefillFilter(t *testing.T) { []byte("2988181"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -290,10 +304,12 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { a1ID := abi.ActorID(1) a2ID := abi.ActorID(2) + a3ID := abi.ActorID(3) addrMap := addressMap{} addrMap.add(a1ID, a1) addrMap.add(a2ID, a2) + addrMap.add(a3ID, a3) ev1 := fakeEvent( a1ID, @@ -340,48 +356,48 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { noCollectedEvents := []*CollectedEvent{} oneCollectedEvent := []*CollectedEvent{ { - Entries: ev1.Entries, - EmitterAddr: a1, - EventIdx: 0, - Reverted: false, - Height: 14000, - TipSetKey: events14000.msgTs.Key(), - MsgIdx: 0, - MsgCid: em.msg.Cid(), + Entries: ev1.Entries, + Emitter: a1ID, + EventIdx: 0, + Reverted: false, + Height: 14000, + TipSetKey: events14000.msgTs.Key(), + MsgIdx: 0, + MsgCid: em.msg.Cid(), }, } twoCollectedEvent := []*CollectedEvent{ { - Entries: ev1.Entries, - EmitterAddr: a1, - EventIdx: 0, - Reverted: false, - Height: 14000, - TipSetKey: events14000.msgTs.Key(), - MsgIdx: 0, - MsgCid: em.msg.Cid(), - }, - { - Entries: ev2.Entries, - EmitterAddr: a2, - EventIdx: 0, - Reverted: true, - Height: 14000, - TipSetKey: revertedEvents14000.msgTs.Key(), - MsgIdx: 0, - MsgCid: revertedEm.msg.Cid(), + Entries: ev1.Entries, + Emitter: a1ID, + EventIdx: 0, + Reverted: false, + Height: 14000, + TipSetKey: events14000.msgTs.Key(), + MsgIdx: 0, + MsgCid: em.msg.Cid(), + }, + { + Entries: ev2.Entries, + Emitter: a2ID, + EventIdx: 0, + Reverted: true, + Height: 14000, + TipSetKey: revertedEvents14000.msgTs.Key(), + MsgIdx: 0, + MsgCid: revertedEm.msg.Cid(), }, } oneCollectedRevertedEvent := []*CollectedEvent{ { - Entries: ev2.Entries, - EmitterAddr: a2, - EventIdx: 0, - Reverted: true, - Height: 14000, - TipSetKey: revertedEvents14000.msgTs.Key(), - MsgIdx: 0, - MsgCid: revertedEm.msg.Cid(), + Entries: ev2.Entries, + Emitter: a2ID, + EventIdx: 0, + Reverted: true, + Height: 14000, + TipSetKey: revertedEvents14000.msgTs.Key(), + MsgIdx: 0, + MsgCid: revertedEm.msg.Cid(), }, } @@ -397,13 +413,13 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") - if err := ei.CollectEvents(context.Background(), revertedEvents14000, false, addrMap.ResolveAddress); err != nil { + if err := ei.CollectEvents(context.Background(), revertedEvents14000, false); err != nil { require.NoError(t, err, "collect reverted events") } - if err := ei.CollectEvents(context.Background(), revertedEvents14000, true, addrMap.ResolveAddress); err != nil { + if err := ei.CollectEvents(context.Background(), revertedEvents14000, true); err != nil { require.NoError(t, err, "revert reverted events") } - if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { + if err := ei.CollectEvents(context.Background(), events14000, false); err != nil { require.NoError(t, err, "collect events") } @@ -416,8 +432,9 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "nomatch tipset min height", filter: &eventFilter{ - minHeight: 14001, - maxHeight: -1, + minHeight: 14001, + maxHeight: -1, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -425,8 +442,9 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "nomatch tipset max height", filter: &eventFilter{ - minHeight: -1, - maxHeight: 13999, + minHeight: -1, + maxHeight: 13999, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -434,8 +452,9 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "match tipset min height", filter: &eventFilter{ - minHeight: 14000, - maxHeight: -1, + minHeight: 14000, + maxHeight: -1, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: twoCollectedEvent, @@ -443,9 +462,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "match tipset cid", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - tipsetCid: cid14000, + minHeight: -1, + maxHeight: -1, + tipsetCid: cid14000, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -453,9 +473,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "match tipset cid", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - tipsetCid: reveredCID14000, + minHeight: -1, + maxHeight: -1, + tipsetCid: reveredCID14000, + actorResolver: addrMap.ResolveActor, }, te: revertedEvents14000, want: oneCollectedRevertedEvent, @@ -463,9 +484,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "nomatch address", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a3}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a3}, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -473,19 +495,23 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "match address 2", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a2}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a2}, + actorResolver: addrMap.ResolveActor, }, - te: revertedEvents14000, - want: oneCollectedRevertedEvent, + te: revertedEvents14000, + // Prefilling events should explicitly exclude reverted events, since + // we cannot confidently infer that the emitter ID matches the filter. + want: noCollectedEvents, }, { name: "match address 1", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a1}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a1}, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -500,6 +526,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: twoCollectedEvent, @@ -516,6 +543,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: twoCollectedEvent, @@ -531,6 +559,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("propose"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -545,6 +574,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -562,6 +592,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("addr1"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -579,6 +610,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("addr2"), }, }), + actorResolver: addrMap.ResolveActor, }, te: revertedEvents14000, want: oneCollectedRevertedEvent, @@ -596,6 +628,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("addr1"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -613,6 +646,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("addr3"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -627,6 +661,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("2988181"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -641,6 +676,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("2988182"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -656,8 +692,9 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "nomatch tipset min height", filter: &eventFilter{ - minHeight: 14001, - maxHeight: -1, + minHeight: 14001, + maxHeight: -1, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -665,8 +702,9 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "nomatch tipset max height", filter: &eventFilter{ - minHeight: -1, - maxHeight: 13999, + minHeight: -1, + maxHeight: 13999, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -674,8 +712,9 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "match tipset min height", filter: &eventFilter{ - minHeight: 14000, - maxHeight: -1, + minHeight: 14000, + maxHeight: -1, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -683,9 +722,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "match tipset cid", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - tipsetCid: cid14000, + minHeight: -1, + maxHeight: -1, + tipsetCid: cid14000, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -693,9 +733,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "match tipset cid but reverted", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - tipsetCid: reveredCID14000, + minHeight: -1, + maxHeight: -1, + tipsetCid: reveredCID14000, + actorResolver: addrMap.ResolveActor, }, te: revertedEvents14000, want: noCollectedEvents, @@ -703,9 +744,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "nomatch address", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a3}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a3}, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -713,9 +755,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "nomatch address 2 but reverted", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a2}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a2}, + actorResolver: addrMap.ResolveActor, }, te: revertedEvents14000, want: noCollectedEvents, @@ -723,9 +766,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { { name: "match address", filter: &eventFilter{ - minHeight: -1, - maxHeight: -1, - addresses: []address.Address{a1}, + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a1}, + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -740,6 +784,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -756,6 +801,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -771,6 +817,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("propose"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -785,6 +832,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("approval"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -802,6 +850,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("addr1"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: oneCollectedEvent, @@ -819,6 +868,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("addr1"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -836,6 +886,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("addr2"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -853,6 +904,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("addr3"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, @@ -867,6 +919,7 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { []byte("2988181"), }, }), + actorResolver: addrMap.ResolveActor, }, te: events14000, want: noCollectedEvents, diff --git a/chain/events/filter/resolver.go b/chain/events/filter/resolver.go new file mode 100644 index 00000000000..84f7887c83d --- /dev/null +++ b/chain/events/filter/resolver.go @@ -0,0 +1,73 @@ +package filter + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/golang-lru/v2/expirable" + "lukechampine.com/blake3" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/types" +) + +// ActorResolver resolves an address to its actor ID at a given TipSet, or the latest TipSet if nil. +type ActorResolver func(context.Context, address.Address, *types.TipSet) (abi.ActorID, error) + +type cachedActorResolver struct { + delegate ActorResolver + cache *expirable.LRU[string, cachedActorResolution] + cacheNilTipSet bool +} + +type cachedActorResolution struct { + result abi.ActorID + err error +} + +var cacheHasherPool = sync.Pool{ + New: func() any { + return blake3.New(32, nil) + }, +} + +func NewCachedActorResolver(delegate ActorResolver, size int, expiry time.Duration, cacheNilTipSet bool) ActorResolver { + + resolver := &cachedActorResolver{ + delegate: delegate, + cache: expirable.NewLRU[string, cachedActorResolution](size, nil, expiry), + cacheNilTipSet: cacheNilTipSet, + } + return resolver.resolve +} + +func (c *cachedActorResolver) resolve(ctx context.Context, addr address.Address, ts *types.TipSet) (abi.ActorID, error) { + if ts == nil && !c.cacheNilTipSet { + return c.delegate(ctx, addr, ts) + } + key := c.newCacheKey(addr, ts) + resolution, found := c.cache.Get(key) + if !found { + resolution.result, resolution.err = c.delegate(ctx, addr, ts) + c.cache.Add(key, resolution) + } + return resolution.result, resolution.err +} + +func (c *cachedActorResolver) newCacheKey(addr address.Address, ts *types.TipSet) string { + // TODO: How much do we care about efficiency? + // If not much, use tipset key directly in exchange for larger memory footprint. + hasher := cacheHasherPool.Get().(*blake3.Hasher) + defer func() { + hasher.Reset() + cacheHasherPool.Put(hasher) + }() + if ts != nil { + _, _ = hasher.Write(ts.Key().Bytes()) + } + _, _ = hasher.Write(addr.Bytes()) + return string(hasher.Sum(nil)) +} diff --git a/chain/events/filter/resolver_test.go b/chain/events/filter/resolver_test.go new file mode 100644 index 00000000000..0b5c66c3881 --- /dev/null +++ b/chain/events/filter/resolver_test.go @@ -0,0 +1,180 @@ +package filter + +import ( + "context" + pseudo "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/builtin" + "github.com/filecoin-project/go-state-types/crypto" + + "github.com/filecoin-project/lotus/chain/types" +) + +func TestCachedActorResolver(t *testing.T) { + rng := pseudo.New(pseudo.NewSource(299792458)) + a1 := randomF4Addr(t, rng) + a2 := randomF4Addr(t, rng) + + a1ID := abi.ActorID(1) + a2ID := abi.ActorID(2) + + addrMap := addressMap{} + addrMap.add(a1ID, a1) + addrMap.add(a2ID, a2) + + ctx := context.Background() + t.Run("when nil tipset caching enabled", func(t *testing.T) { + const ( + size = 1 + expiry = time.Minute + cacheNilTipSet = true + ) + var delegateCallCount int + subject := NewCachedActorResolver(func(ctx context.Context, addr address.Address, ts *types.TipSet) (abi.ActorID, error) { + delegateCallCount++ + return addrMap.ResolveActor(ctx, addr, ts) + }, size, expiry, cacheNilTipSet) + + t.Run("cache miss calls delegate", func(t *testing.T) { + got, err := subject(ctx, a1, nil) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 1, delegateCallCount) + }) + + t.Run("cache hit does not call delegate", func(t *testing.T) { + got, err := subject(ctx, a1, nil) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 1, delegateCallCount) + }) + t.Run("size is respected", func(t *testing.T) { + // Cause a cache miss. + got, err := subject(ctx, a2, nil) + require.NoError(t, err) + require.Equal(t, a2ID, got) + require.Equal(t, 2, delegateCallCount) + + // Assert size is respected and a1 is no longer cached + got, err = subject(ctx, a1, nil) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 3, delegateCallCount) + }) + }) + + t.Run("when nil tipset chaching disabled", func(t *testing.T) { + const ( + size = 1 + expiry = time.Minute + cacheNilTipSet = false + ) + var delegateCallCount int + subject := NewCachedActorResolver(func(ctx context.Context, addr address.Address, ts *types.TipSet) (abi.ActorID, error) { + delegateCallCount++ + return addrMap.ResolveActor(ctx, addr, ts) + }, size, expiry, cacheNilTipSet) + + t.Run("delegate is called every time", func(t *testing.T) { + got, err := subject(ctx, a1, nil) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 1, delegateCallCount) + + got, err = subject(ctx, a1, nil) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 2, delegateCallCount) + }) + + t.Run("non nil tipset is cached", func(t *testing.T) { + dummyCid := randomCid(t, rng) + ts, err := types.NewTipSet([]*types.BlockHeader{ + { + Height: 123, + Miner: builtin.SystemActorAddr, + Ticket: &types.Ticket{VRFProof: []byte{byte(123 % 2)}}, + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + + ParentBaseFee: big.NewInt(0), + }, + }) + require.NoError(t, err) + + got, err := subject(ctx, a1, ts) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 3, delegateCallCount) + + got, err = subject(ctx, a1, ts) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 3, delegateCallCount) + }) + + t.Run("cached by tipset", func(t *testing.T) { + dummyCid := randomCid(t, rng) + ts, err := types.NewTipSet([]*types.BlockHeader{ + { + Height: 123, + Miner: builtin.SystemActorAddr, + Ticket: &types.Ticket{VRFProof: []byte{byte(123 % 2)}}, + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + + ParentBaseFee: big.NewInt(0), + }, + }) + require.NoError(t, err) + + dummyCid2 := randomCid(t, rng) + ts2, err := types.NewTipSet([]*types.BlockHeader{ + { + Height: 124, + Miner: builtin.SystemActorAddr, + Ticket: &types.Ticket{VRFProof: []byte{byte(124 % 2)}}, + ParentStateRoot: dummyCid2, + Messages: dummyCid2, + ParentMessageReceipts: dummyCid2, + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + + ParentBaseFee: big.NewInt(0), + }, + }) + require.NoError(t, err) + + got, err := subject(ctx, a1, ts) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 4, delegateCallCount) + + got, err = subject(ctx, a1, ts2) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 5, delegateCallCount) + + got, err = subject(ctx, a1, ts2) + require.NoError(t, err) + require.Equal(t, a1ID, got) + require.Equal(t, 5, delegateCallCount) + }) + }) +} diff --git a/chain/events/filter/store.go b/chain/events/filter/store.go index d3c173ec015..88bbb5bdb1c 100644 --- a/chain/events/filter/store.go +++ b/chain/events/filter/store.go @@ -15,7 +15,7 @@ import ( type Filter interface { ID() types.FilterID LastTaken() time.Time - SetSubChannel(chan<- interface{}) + SetSubChannel(chan<- any) ClearSubChannel() } diff --git a/chain/events/filter/testdata/events_db_v3_calibnet_sample1.sql b/chain/events/filter/testdata/events_db_v3_calibnet_sample1.sql new file mode 100644 index 00000000000..42fa0290508 --- /dev/null +++ b/chain/events/filter/testdata/events_db_v3_calibnet_sample1.sql @@ -0,0 +1,136 @@ +PRAGMA foreign_keys=OFF; +BEGIN TRANSACTION; +CREATE TABLE event ( + id INTEGER PRIMARY KEY, + height INTEGER NOT NULL, + tipset_key BLOB NOT NULL, + tipset_key_cid BLOB NOT NULL, + emitter_addr BLOB NOT NULL, + event_index INTEGER NOT NULL, + message_cid BLOB NOT NULL, + message_index INTEGER NOT NULL, + reverted INTEGER NOT NULL + ); +INSERT INTO event VALUES(1,1439236,X'0171a0e40220f2a6d7af0565ca4ec88f1bfcc52c42e5db2aeea63ad31477decf709af53399460171a0e4022029f8aa98785b283d35da9bfe81034f601305fed9b15ed05f6dca44e94087bab10171a0e402205e0ab9e99ce1aa4593614e765b0f46547b5a9ea8eb5deec1de8f36c77e8672f5',X'0171a0e40220113e78b685b29105b364391d4854dd7660c4a4c0087dcee9725424c625d5a77d',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e40220992f1d326ee771505029502b06fc03b069ef68bbce94a22c7d43a4ceacfa4093',0,0); +INSERT INTO event VALUES(2,1439236,X'0171a0e40220f2a6d7af0565ca4ec88f1bfcc52c42e5db2aeea63ad31477decf709af53399460171a0e4022029f8aa98785b283d35da9bfe81034f601305fed9b15ed05f6dca44e94087bab10171a0e402205e0ab9e99ce1aa4593614e765b0f46547b5a9ea8eb5deec1de8f36c77e8672f5',X'0171a0e40220113e78b685b29105b364391d4854dd7660c4a4c0087dcee9725424c625d5a77d',X'040a065234e36bdea315b41503fd45a44aa552bf9799',1,X'0171a0e40220992f1d326ee771505029502b06fc03b069ef68bbce94a22c7d43a4ceacfa4093',0,0); +INSERT INTO event VALUES(3,1439236,X'0171a0e40220f2a6d7af0565ca4ec88f1bfcc52c42e5db2aeea63ad31477decf709af53399460171a0e4022029f8aa98785b283d35da9bfe81034f601305fed9b15ed05f6dca44e94087bab10171a0e402205e0ab9e99ce1aa4593614e765b0f46547b5a9ea8eb5deec1de8f36c77e8672f5',X'0171a0e40220113e78b685b29105b364391d4854dd7660c4a4c0087dcee9725424c625d5a77d',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e402202e4c569fa84e105048e9041228a11e4d51c0dfe5c5183c4375440865fb09699b',1,0); +INSERT INTO event VALUES(4,1439236,X'0171a0e40220f2a6d7af0565ca4ec88f1bfcc52c42e5db2aeea63ad31477decf709af53399460171a0e4022029f8aa98785b283d35da9bfe81034f601305fed9b15ed05f6dca44e94087bab10171a0e402205e0ab9e99ce1aa4593614e765b0f46547b5a9ea8eb5deec1de8f36c77e8672f5',X'0171a0e40220113e78b685b29105b364391d4854dd7660c4a4c0087dcee9725424c625d5a77d',X'040a065234e36bdea315b41503fd45a44aa552bf9799',1,X'0171a0e402202e4c569fa84e105048e9041228a11e4d51c0dfe5c5183c4375440865fb09699b',1,0); +INSERT INTO event VALUES(5,1439237,X'0171a0e40220ce673d78d316fd202ebaaf29ab694321c462fd168f74c356a789f807a763a8d40171a0e40220c7c722f54134a2ae5547226299aad8370258cc7285c9ccb4137cf56394c5a89a',X'0171a0e40220df65d75c6e3a7fc0019947a6fb0801b6e9ce02f2970e4a6ea45b94b9fa242f5e',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e40220efdedf511996e49a4fcb880db2d7859026750803f554630372dcec418b79196f',0,0); +INSERT INTO event VALUES(6,1439237,X'0171a0e40220ce673d78d316fd202ebaaf29ab694321c462fd168f74c356a789f807a763a8d40171a0e40220c7c722f54134a2ae5547226299aad8370258cc7285c9ccb4137cf56394c5a89a',X'0171a0e40220df65d75c6e3a7fc0019947a6fb0801b6e9ce02f2970e4a6ea45b94b9fa242f5e',X'040a065234e36bdea315b41503fd45a44aa552bf9799',1,X'0171a0e40220efdedf511996e49a4fcb880db2d7859026750803f554630372dcec418b79196f',0,0); +INSERT INTO event VALUES(7,1439238,X'0171a0e40220bcb4517521348db6de47e75de4238922bc13edf64837b574c72d49436471374b0171a0e4022055aa26ee61cb1cb103e92814988d636c1983acb25ab21735d83eebe5dc14cc640171a0e40220b46c88ffd8100659a1057d797b412b11a210c5ac7f22a29da10ee4cd383a2d6b0171a0e402204338aecc04cdfc918d678dab34d51cd95acce6d3d29d4298449582afba5e6d4c',X'0171a0e40220ea238c8dcfcfd98a82a15d4ddb8156be69c0df3dd4d2631ef8abeb0af051978b',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e40220c224e8bb410e5fe3eaa6ad2aa917c01159d799b00674f77060ec81ca259349de',0,0); +INSERT INTO event VALUES(8,1439238,X'0171a0e40220bcb4517521348db6de47e75de4238922bc13edf64837b574c72d49436471374b0171a0e4022055aa26ee61cb1cb103e92814988d636c1983acb25ab21735d83eebe5dc14cc640171a0e40220b46c88ffd8100659a1057d797b412b11a210c5ac7f22a29da10ee4cd383a2d6b0171a0e402204338aecc04cdfc918d678dab34d51cd95acce6d3d29d4298449582afba5e6d4c',X'0171a0e40220ea238c8dcfcfd98a82a15d4ddb8156be69c0df3dd4d2631ef8abeb0af051978b',X'040acd1ffdfb11aba71d189486e13ea5f8a2aeb44f99',1,X'0171a0e40220c224e8bb410e5fe3eaa6ad2aa917c01159d799b00674f77060ec81ca259349de',0,0); +INSERT INTO event VALUES(9,1439238,X'0171a0e40220bcb4517521348db6de47e75de4238922bc13edf64837b574c72d49436471374b0171a0e4022055aa26ee61cb1cb103e92814988d636c1983acb25ab21735d83eebe5dc14cc640171a0e40220b46c88ffd8100659a1057d797b412b11a210c5ac7f22a29da10ee4cd383a2d6b0171a0e402204338aecc04cdfc918d678dab34d51cd95acce6d3d29d4298449582afba5e6d4c',X'0171a0e40220ea238c8dcfcfd98a82a15d4ddb8156be69c0df3dd4d2631ef8abeb0af051978b',X'040a71ec3eab55218ff0a812a54a7c6d46085d4f9b32',2,X'0171a0e40220c224e8bb410e5fe3eaa6ad2aa917c01159d799b00674f77060ec81ca259349de',0,0); +INSERT INTO event VALUES(10,1439238,X'0171a0e40220bcb4517521348db6de47e75de4238922bc13edf64837b574c72d49436471374b0171a0e4022055aa26ee61cb1cb103e92814988d636c1983acb25ab21735d83eebe5dc14cc640171a0e40220b46c88ffd8100659a1057d797b412b11a210c5ac7f22a29da10ee4cd383a2d6b0171a0e402204338aecc04cdfc918d678dab34d51cd95acce6d3d29d4298449582afba5e6d4c',X'0171a0e40220ea238c8dcfcfd98a82a15d4ddb8156be69c0df3dd4d2631ef8abeb0af051978b',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e402205a49724314343165f6b947f4fa19042a585eafbfbfe57d2074c85b12b481f514',3,0); +INSERT INTO event VALUES(11,1439238,X'0171a0e40220bcb4517521348db6de47e75de4238922bc13edf64837b574c72d49436471374b0171a0e4022055aa26ee61cb1cb103e92814988d636c1983acb25ab21735d83eebe5dc14cc640171a0e40220b46c88ffd8100659a1057d797b412b11a210c5ac7f22a29da10ee4cd383a2d6b0171a0e402204338aecc04cdfc918d678dab34d51cd95acce6d3d29d4298449582afba5e6d4c',X'0171a0e40220ea238c8dcfcfd98a82a15d4ddb8156be69c0df3dd4d2631ef8abeb0af051978b',X'040a065234e36bdea315b41503fd45a44aa552bf9799',1,X'0171a0e402205a49724314343165f6b947f4fa19042a585eafbfbfe57d2074c85b12b481f514',3,0); +INSERT INTO event VALUES(12,1439239,X'0171a0e40220126b54f4db21ffa037ee1bd66662d3c09550dfa707768e6510ca5e5042d6c03a0171a0e402201dca49098a43ae468ffccc51686405fc63cddf606c13f030f6b2f1bbcb8a93130171a0e40220d6263a4a2efc7f37d20873743aced559ea627c05db17ab5f05582fb033e1d4fc0171a0e4022058b979b506c75152e7c9711f00d68d7038203f9234b9b71633291f910b7f8b23',X'0171a0e402200d838c4c0fcf5e5deb5da736b1eaefb408406a55cfccdcd7cea0e35c9a1b26a9',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e40220662dc41c495d920a7eb8c0629696af2bb696564fa15e5879ab116a6b7c86a2d7',0,0); +INSERT INTO event VALUES(13,1439239,X'0171a0e40220126b54f4db21ffa037ee1bd66662d3c09550dfa707768e6510ca5e5042d6c03a0171a0e402201dca49098a43ae468ffccc51686405fc63cddf606c13f030f6b2f1bbcb8a93130171a0e40220d6263a4a2efc7f37d20873743aced559ea627c05db17ab5f05582fb033e1d4fc0171a0e4022058b979b506c75152e7c9711f00d68d7038203f9234b9b71633291f910b7f8b23',X'0171a0e402200d838c4c0fcf5e5deb5da736b1eaefb408406a55cfccdcd7cea0e35c9a1b26a9',X'040acd1ffdfb11aba71d189486e13ea5f8a2aeb44f99',1,X'0171a0e40220662dc41c495d920a7eb8c0629696af2bb696564fa15e5879ab116a6b7c86a2d7',0,0); +INSERT INTO event VALUES(14,1439239,X'0171a0e40220126b54f4db21ffa037ee1bd66662d3c09550dfa707768e6510ca5e5042d6c03a0171a0e402201dca49098a43ae468ffccc51686405fc63cddf606c13f030f6b2f1bbcb8a93130171a0e40220d6263a4a2efc7f37d20873743aced559ea627c05db17ab5f05582fb033e1d4fc0171a0e4022058b979b506c75152e7c9711f00d68d7038203f9234b9b71633291f910b7f8b23',X'0171a0e402200d838c4c0fcf5e5deb5da736b1eaefb408406a55cfccdcd7cea0e35c9a1b26a9',X'040a71ec3eab55218ff0a812a54a7c6d46085d4f9b32',2,X'0171a0e40220662dc41c495d920a7eb8c0629696af2bb696564fa15e5879ab116a6b7c86a2d7',0,0); +INSERT INTO event VALUES(15,1439240,X'0171a0e402200041ee6f02c43f948f3048b918cfe181dd20ade1d2ef345193cd94c4a5215c5f0171a0e40220d9ae2e0d856a6ad24fa52c1df14717d2ca8c4088da3561c448b5c7bbefb89f990171a0e4022064f3ef3cd4b04c8fb60b0c837a1fa4d623a48382862aefb5f037bb2ed7f789280171a0e40220cb60b91ac05da942f6db1d1104a4f881ac9bf848b2c80f65179312788fa040760171a0e4022022708c2ec05b5e70764d37725efb5d5090e1128f53d0c48273d9de02246851c9',X'0171a0e402208d27fe994d4a906f20f8731c5c22b603c3b31164e40ebde85e65c94d83debd22',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e402209ec2190f256ce970cb477ccd0ea292222214b1328b50023effa4d0973fd02be6',0,0); +INSERT INTO event VALUES(16,1439240,X'0171a0e402200041ee6f02c43f948f3048b918cfe181dd20ade1d2ef345193cd94c4a5215c5f0171a0e40220d9ae2e0d856a6ad24fa52c1df14717d2ca8c4088da3561c448b5c7bbefb89f990171a0e4022064f3ef3cd4b04c8fb60b0c837a1fa4d623a48382862aefb5f037bb2ed7f789280171a0e40220cb60b91ac05da942f6db1d1104a4f881ac9bf848b2c80f65179312788fa040760171a0e4022022708c2ec05b5e70764d37725efb5d5090e1128f53d0c48273d9de02246851c9',X'0171a0e402208d27fe994d4a906f20f8731c5c22b603c3b31164e40ebde85e65c94d83debd22',X'040a065234e36bdea315b41503fd45a44aa552bf9799',1,X'0171a0e402209ec2190f256ce970cb477ccd0ea292222214b1328b50023effa4d0973fd02be6',0,0); +INSERT INTO event VALUES(17,1439240,X'0171a0e402200041ee6f02c43f948f3048b918cfe181dd20ade1d2ef345193cd94c4a5215c5f0171a0e40220d9ae2e0d856a6ad24fa52c1df14717d2ca8c4088da3561c448b5c7bbefb89f990171a0e4022064f3ef3cd4b04c8fb60b0c837a1fa4d623a48382862aefb5f037bb2ed7f789280171a0e40220cb60b91ac05da942f6db1d1104a4f881ac9bf848b2c80f65179312788fa040760171a0e4022022708c2ec05b5e70764d37725efb5d5090e1128f53d0c48273d9de02246851c9',X'0171a0e402208d27fe994d4a906f20f8731c5c22b603c3b31164e40ebde85e65c94d83debd22',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e4022074e22c86f066276706bbac4ad95fa4175c5e35192c5aea6563602c6499f9962f',5,0); +INSERT INTO event VALUES(18,1439240,X'0171a0e402200041ee6f02c43f948f3048b918cfe181dd20ade1d2ef345193cd94c4a5215c5f0171a0e40220d9ae2e0d856a6ad24fa52c1df14717d2ca8c4088da3561c448b5c7bbefb89f990171a0e4022064f3ef3cd4b04c8fb60b0c837a1fa4d623a48382862aefb5f037bb2ed7f789280171a0e40220cb60b91ac05da942f6db1d1104a4f881ac9bf848b2c80f65179312788fa040760171a0e4022022708c2ec05b5e70764d37725efb5d5090e1128f53d0c48273d9de02246851c9',X'0171a0e402208d27fe994d4a906f20f8731c5c22b603c3b31164e40ebde85e65c94d83debd22',X'040a065234e36bdea315b41503fd45a44aa552bf9799',1,X'0171a0e4022074e22c86f066276706bbac4ad95fa4175c5e35192c5aea6563602c6499f9962f',5,0); +INSERT INTO event VALUES(19,1439240,X'0171a0e402200041ee6f02c43f948f3048b918cfe181dd20ade1d2ef345193cd94c4a5215c5f0171a0e40220d9ae2e0d856a6ad24fa52c1df14717d2ca8c4088da3561c448b5c7bbefb89f990171a0e4022064f3ef3cd4b04c8fb60b0c837a1fa4d623a48382862aefb5f037bb2ed7f789280171a0e40220cb60b91ac05da942f6db1d1104a4f881ac9bf848b2c80f65179312788fa040760171a0e4022022708c2ec05b5e70764d37725efb5d5090e1128f53d0c48273d9de02246851c9',X'0171a0e402208d27fe994d4a906f20f8731c5c22b603c3b31164e40ebde85e65c94d83debd22',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e40220ccf3350dde489becee862c0f13a2e764dd2eba91a4e8e3da5076201e25fd95fd',6,0); +INSERT INTO event VALUES(20,1439240,X'0171a0e402200041ee6f02c43f948f3048b918cfe181dd20ade1d2ef345193cd94c4a5215c5f0171a0e40220d9ae2e0d856a6ad24fa52c1df14717d2ca8c4088da3561c448b5c7bbefb89f990171a0e4022064f3ef3cd4b04c8fb60b0c837a1fa4d623a48382862aefb5f037bb2ed7f789280171a0e40220cb60b91ac05da942f6db1d1104a4f881ac9bf848b2c80f65179312788fa040760171a0e4022022708c2ec05b5e70764d37725efb5d5090e1128f53d0c48273d9de02246851c9',X'0171a0e402208d27fe994d4a906f20f8731c5c22b603c3b31164e40ebde85e65c94d83debd22',X'040a065234e36bdea315b41503fd45a44aa552bf9799',1,X'0171a0e40220ccf3350dde489becee862c0f13a2e764dd2eba91a4e8e3da5076201e25fd95fd',6,0); +INSERT INTO event VALUES(21,1439241,X'0171a0e40220400c1fc55319ba10619e3d96b45f72b16a617c71f5177ce6ec71fcd7ad0969e00171a0e402203ddb25d8d946a61748826aebfd041ec3871181723297229bbaebf54b237e8da2',X'0171a0e40220ab53f66adb026dba7bab9eb1727d288b9e53be90ae46fe35f9e6a1da0308f304',X'040ae6cc8d509b2971420f536e4c64f02fe7a41671d0',0,X'0171a0e402206889ae5d89b0ae8e4401f6604ecce75fcfdcfdbb723185b43d4b3210291ab890',0,0); +INSERT INTO event VALUES(22,1439241,X'0171a0e40220400c1fc55319ba10619e3d96b45f72b16a617c71f5177ce6ec71fcd7ad0969e00171a0e402203ddb25d8d946a61748826aebfd041ec3871181723297229bbaebf54b237e8da2',X'0171a0e40220ab53f66adb026dba7bab9eb1727d288b9e53be90ae46fe35f9e6a1da0308f304',X'040a065234e36bdea315b41503fd45a44aa552bf9799',1,X'0171a0e402206889ae5d89b0ae8e4401f6604ecce75fcfdcfdbb723185b43d4b3210291ab890',0,0); +INSERT INTO event VALUES(23,1439241,X'0171a0e40220400c1fc55319ba10619e3d96b45f72b16a617c71f5177ce6ec71fcd7ad0969e00171a0e402203ddb25d8d946a61748826aebfd041ec3871181723297229bbaebf54b237e8da2',X'0171a0e40220ab53f66adb026dba7bab9eb1727d288b9e53be90ae46fe35f9e6a1da0308f304',X'040a030bcf3d50cad04c2e57391b12740982a9308621',0,X'0171a0e40220cbe92703c76594c7995996ef9872eb2e7b23f30ff9c8c9e1deccd2ae14d76d84',1,0); +CREATE TABLE event_entry ( + event_id INTEGER, + indexed INTEGER NOT NULL, + flags BLOB NOT NULL, + key TEXT NOT NULL, + codec INTEGER, + value BLOB NOT NULL + ); +INSERT INTO event_entry VALUES(1,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(1,1,X'03','t2',85,X'000000000000000000000000cd97b08ee3a702d20546c29bf81f2d18dfff32e0'); +INSERT INTO event_entry VALUES(1,1,X'03','t3',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(1,1,X'03','d',85,X'00000000000000000000000000000000000000000000000006f05b59d3b20000'); +INSERT INTO event_entry VALUES(2,1,X'03','t1',85,X'bd5034ffbd47e4e72a94baa2cdb74c6fad73cb3bcdc13036b72ec8306f5a7646'); +INSERT INTO event_entry VALUES(2,1,X'03','t2',85,X'000000000000000000000000cd97b08ee3a702d20546c29bf81f2d18dfff32e0'); +INSERT INTO event_entry VALUES(2,1,X'03','d',85,X'00000000000000000000000000000000000000000000000006f05b59d3b2000000000000000000000000000000000000000000000000000006f1fbf742203f4d0000000000000000000000000000000000000000000000000001c79e9d26ba1b'); +INSERT INTO event_entry VALUES(3,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(3,1,X'03','t2',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(3,1,X'03','t3',85,X'000000000000000000000000563f8726d19fc8dc14a1629c1ad38ea0a08bbb29'); +INSERT INTO event_entry VALUES(3,1,X'03','d',85,X'000000000000000000000000000000000000000000000081d93034edbea02b20'); +INSERT INTO event_entry VALUES(4,1,X'03','t1',85,X'90890809c654f11d6e72a28fa60149770a0d11ec6c92319d6ceb2bb0a4ea1a15'); +INSERT INTO event_entry VALUES(4,1,X'03','t2',85,X'000000000000000000000000563f8726d19fc8dc14a1629c1ad38ea0a08bbb29'); +INSERT INTO event_entry VALUES(4,1,X'03','d',85,X'00000000000000000000000000000000000000000000008218f270f967681000000000000000000000000000000000000000000000000081d93034edbea02b20'); +INSERT INTO event_entry VALUES(5,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(5,1,X'03','t2',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(5,1,X'03','t3',85,X'00000000000000000000000039fc2d7f6bc81439a23cacea538bbec8090f66e4'); +INSERT INTO event_entry VALUES(5,1,X'03','d',85,X'0000000000000000000000000000000000000000000000045418fdc8095c2663'); +INSERT INTO event_entry VALUES(6,1,X'03','t1',85,X'90890809c654f11d6e72a28fa60149770a0d11ec6c92319d6ceb2bb0a4ea1a15'); +INSERT INTO event_entry VALUES(6,1,X'03','t2',85,X'00000000000000000000000039fc2d7f6bc81439a23cacea538bbec8090f66e4'); +INSERT INTO event_entry VALUES(6,1,X'03','d',85,X'000000000000000000000000000000000000000000000004563918244f4000000000000000000000000000000000000000000000000000045418fdc8095c2663'); +INSERT INTO event_entry VALUES(7,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(7,1,X'03','t2',85,X'000000000000000000000000cd97b08ee3a702d20546c29bf81f2d18dfff32e0'); +INSERT INTO event_entry VALUES(7,1,X'03','t3',85,X'00000000000000000000000071ec3eab55218ff0a812a54a7c6d46085d4f9b32'); +INSERT INTO event_entry VALUES(7,1,X'03','d',85,X'0000000000000000000000000000000000000000000000004563918244f40000'); +INSERT INTO event_entry VALUES(8,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(8,1,X'03','t2',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(8,1,X'03','t3',85,X'000000000000000000000000cd97b08ee3a702d20546c29bf81f2d18dfff32e0'); +INSERT INTO event_entry VALUES(8,1,X'03','d',85,X'000000000000000000000000000000000000000000000001a10f45229a9eb43c'); +INSERT INTO event_entry VALUES(9,1,X'03','t1',85,X'6381ea17a5324d29cc015352644672ead5185c1c61a0d3a521eda97e35cec97e'); +INSERT INTO event_entry VALUES(9,1,X'03','t2',85,X'000000000000000000000000cd97b08ee3a702d20546c29bf81f2d18dfff32e0'); +INSERT INTO event_entry VALUES(9,1,X'03','t3',85,X'00000000000000000000000000000000000000000000000000000000000046b1'); +INSERT INTO event_entry VALUES(9,1,X'03','d',85,X'0000000000000000000000000000000000000000000000004563918244f40000000000000000000000000000000000000000000000000000000000000015f6060000000000000000000000000000000000000000000000000000000000174786000000000000000000000000000000000000000000000001a10f45229a9eb43c'); +INSERT INTO event_entry VALUES(10,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(10,1,X'03','t2',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(10,1,X'03','t3',85,X'000000000000000000000000dde09bbfcd0a824c45834d29b2f889990aec37ac'); +INSERT INTO event_entry VALUES(10,1,X'03','d',85,X'0000000000000000000000000000000000000000000000054d6b6a150b7748a0'); +INSERT INTO event_entry VALUES(11,1,X'03','t1',85,X'90890809c654f11d6e72a28fa60149770a0d11ec6c92319d6ceb2bb0a4ea1a15'); +INSERT INTO event_entry VALUES(11,1,X'03','t2',85,X'000000000000000000000000dde09bbfcd0a824c45834d29b2f889990aec37ac'); +INSERT INTO event_entry VALUES(11,1,X'03','d',85,X'0000000000000000000000000000000000000000000000055005f0c6144800000000000000000000000000000000000000000000000000054d6b6a150b7748a0'); +INSERT INTO event_entry VALUES(12,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(12,1,X'03','t2',85,X'000000000000000000000000d50285442b6c0fe5529009a84161e2001f94df50'); +INSERT INTO event_entry VALUES(12,1,X'03','t3',85,X'00000000000000000000000071ec3eab55218ff0a812a54a7c6d46085d4f9b32'); +INSERT INTO event_entry VALUES(12,1,X'03','d',85,X'000000000000000000000000000000000000000000000002b48fb0721886c000'); +INSERT INTO event_entry VALUES(13,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(13,1,X'03','t2',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(13,1,X'03','t3',85,X'000000000000000000000000d50285442b6c0fe5529009a84161e2001f94df50'); +INSERT INTO event_entry VALUES(13,1,X'03','d',85,X'00000000000000000000000000000000000000000000008214d74dee806c9836'); +INSERT INTO event_entry VALUES(14,1,X'03','t1',85,X'6381ea17a5324d29cc015352644672ead5185c1c61a0d3a521eda97e35cec97e'); +INSERT INTO event_entry VALUES(14,1,X'03','t2',85,X'000000000000000000000000d50285442b6c0fe5529009a84161e2001f94df50'); +INSERT INTO event_entry VALUES(14,1,X'03','t3',85,X'00000000000000000000000000000000000000000000000000000000000046b2'); +INSERT INTO event_entry VALUES(14,1,X'03','d',85,X'000000000000000000000000000000000000000000000002b48fb0721886c000000000000000000000000000000000000000000000000000000000000015f607000000000000000000000000000000000000000000000000000000000020820700000000000000000000000000000000000000000000008214d74dee806c9836'); +INSERT INTO event_entry VALUES(15,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(15,1,X'03','t2',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(15,1,X'03','t3',85,X'000000000000000000000000d063000a1c12255688badc3dea44fd029ee10351'); +INSERT INTO event_entry VALUES(15,1,X'03','d',85,X'000000000000000000000000000000000000000000000002b48f9e9d05d997fe'); +INSERT INTO event_entry VALUES(16,1,X'03','t1',85,X'90890809c654f11d6e72a28fa60149770a0d11ec6c92319d6ceb2bb0a4ea1a15'); +INSERT INTO event_entry VALUES(16,1,X'03','t2',85,X'000000000000000000000000d063000a1c12255688badc3dea44fd029ee10351'); +INSERT INTO event_entry VALUES(16,1,X'03','d',85,X'000000000000000000000000000000000000000000000002b5e3af16b1880000000000000000000000000000000000000000000000000002b48f9e9d05d997fe'); +INSERT INTO event_entry VALUES(17,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(17,1,X'03','t2',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(17,1,X'03','t3',85,X'00000000000000000000000039fc2d7f6bc81439a23cacea538bbec8090f66e4'); +INSERT INTO event_entry VALUES(17,1,X'03','d',85,X'000000000000000000000000000000000000000000000005691f3d3a0bb32ffc'); +INSERT INTO event_entry VALUES(18,1,X'03','t1',85,X'90890809c654f11d6e72a28fa60149770a0d11ec6c92319d6ceb2bb0a4ea1a15'); +INSERT INTO event_entry VALUES(18,1,X'03','t2',85,X'00000000000000000000000039fc2d7f6bc81439a23cacea538bbec8090f66e4'); +INSERT INTO event_entry VALUES(18,1,X'03','d',85,X'0000000000000000000000000000000000000000000000056bc75e2d63100000000000000000000000000000000000000000000000000005691f3d3a0bb32ffc'); +INSERT INTO event_entry VALUES(19,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(19,1,X'03','t2',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(19,1,X'03','t3',85,X'000000000000000000000000c1fd608ded09e74367b7b429a48b15dff70807c3'); +INSERT INTO event_entry VALUES(19,1,X'03','d',85,X'00000000000000000000000000000000000000000000000523ddad5d8b1d6d96'); +INSERT INTO event_entry VALUES(20,1,X'03','t1',85,X'90890809c654f11d6e72a28fa60149770a0d11ec6c92319d6ceb2bb0a4ea1a15'); +INSERT INTO event_entry VALUES(20,1,X'03','t2',85,X'000000000000000000000000c1fd608ded09e74367b7b429a48b15dff70807c3'); +INSERT INTO event_entry VALUES(20,1,X'03','d',85,X'0000000000000000000000000000000000000000000000052663ccab1e1c000000000000000000000000000000000000000000000000000523ddad5d8b1d6d96'); +INSERT INTO event_entry VALUES(21,1,X'03','t1',85,X'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'); +INSERT INTO event_entry VALUES(21,1,X'03','t2',85,X'000000000000000000000000711ce39cc3501f66dc46d052a1b66e74eaa9d0bd'); +INSERT INTO event_entry VALUES(21,1,X'03','t3',85,X'0000000000000000000000000000000000000000000000000000000000000000'); +INSERT INTO event_entry VALUES(21,1,X'03','d',85,X'00000000000000000000000000000000000000000000000006f05b59d3b20000'); +INSERT INTO event_entry VALUES(22,1,X'03','t1',85,X'bd5034ffbd47e4e72a94baa2cdb74c6fad73cb3bcdc13036b72ec8306f5a7646'); +INSERT INTO event_entry VALUES(22,1,X'03','t2',85,X'000000000000000000000000711ce39cc3501f66dc46d052a1b66e74eaa9d0bd'); +INSERT INTO event_entry VALUES(22,1,X'03','d',85,X'00000000000000000000000000000000000000000000000006f05b59d3b2000000000000000000000000000000000000000000000000000006f1fbf742203f4d0000000000000000000000000000000000000000000000000001c79e9d26ba1b'); +INSERT INTO event_entry VALUES(23,1,X'03','t1',85,X'6de956d2cb2e161f8c91c6ae7b286358c7458d5ad5e26ea2d55330fbe282839c'); +INSERT INTO event_entry VALUES(23,1,X'03','d',85,X'00000000000000000000000074f755672e8c0dc02dd6a98f5bdf8c3b8a7983280000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000500000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000002f757064617465206865616c7468626f745f3331343135395f352073657420636f756e7465723d636f756e7465722b31000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'); +CREATE TABLE _meta ( + version UINT64 NOT NULL UNIQUE + ); +INSERT INTO _meta VALUES(1); +INSERT INTO _meta VALUES(2); +INSERT INTO _meta VALUES(3); +CREATE INDEX height_tipset_key ON event (height,tipset_key); +CREATE INDEX event_emitter_addr ON event (emitter_addr); +CREATE INDEX event_entry_key_index ON event_entry (key); +COMMIT; diff --git a/chain/state/statetree.go b/chain/state/statetree.go index 1a6497d04b9..f5f9e1c3e7a 100644 --- a/chain/state/statetree.go +++ b/chain/state/statetree.go @@ -31,11 +31,12 @@ var log = logging.Logger("statetree") // StateTree stores actors state by their ID. type StateTree struct { - root adt.Map - version types.StateTreeVersion - info cid.Cid - Store cbor.IpldStore - lookupIDFun func(address.Address) (address.Address, error) + root adt.Map + version types.StateTreeVersion + info cid.Cid + Store cbor.IpldStore + + LookupIDFun func(address.Address) (address.Address, error) // Exported for testing puroses snaps *stateSnaps } @@ -230,7 +231,7 @@ func NewStateTree(cst cbor.IpldStore, ver types.StateTreeVersion) (*StateTree, e Store: cst, snaps: newStateSnaps(), } - s.lookupIDFun = s.lookupIDinternal + s.LookupIDFun = s.lookupIDinternal return s, nil } @@ -302,7 +303,7 @@ func LoadStateTree(cst cbor.IpldStore, c cid.Cid) (*StateTree, error) { Store: cst, snaps: newStateSnaps(), } - s.lookupIDFun = s.lookupIDinternal + s.LookupIDFun = s.lookupIDinternal return s, nil } @@ -349,7 +350,7 @@ func (st *StateTree) LookupID(addr address.Address) (address.Address, error) { if ok { return resa, nil } - a, err := st.lookupIDFun(addr) + a, err := st.LookupIDFun(addr) if err != nil { return a, err } diff --git a/chain/state/statetree_test.go b/chain/state/statetree_test.go index 9a221751a75..b1790e63420 100644 --- a/chain/state/statetree_test.go +++ b/chain/state/statetree_test.go @@ -97,7 +97,7 @@ func TestResolveCache(t *testing.T) { nonId := address.NewForTestGetter()() id, _ := address.NewIDAddress(1000) - st.lookupIDFun = func(a address.Address) (address.Address, error) { + st.LookupIDFun = func(a address.Address) (address.Address, error) { if a == nonId { return id, nil } diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index cd35d6d193e..d17d07cfa9c 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -401,12 +401,11 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres } func (sm *StateManager) LookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { - cst := cbor.NewCborStore(sm.cs.StateBlockstore()) - state, err := state.LoadStateTree(cst, sm.parentState(ts)) + id, err := sm.ResolveActorID(ctx, addr, ts) if err != nil { - return address.Undef, xerrors.Errorf("load state tree: %w", err) + return address.Undef, err } - return state.LookupID(addr) + return address.NewIDAddress(uint64(id)) } func (sm *StateManager) LookupRobustAddress(ctx context.Context, idAddr address.Address, ts *types.TipSet) (address.Address, error) { @@ -570,3 +569,25 @@ func (sm *StateManager) GetRandomnessDigestFromTickets(ctx context.Context, rand return r.GetChainRandomness(ctx, randEpoch) } + +func (sm *StateManager) ResolveActorID(_ context.Context, addr address.Address, ts *types.TipSet) (abi.ActorID, error) { + var idAddr address.Address + if addr.Protocol() == address.ID { + idAddr = addr // already an ID address + } else { + var err error + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) + st, err := state.LoadStateTree(cst, sm.parentState(ts)) + if err != nil { + return 0, xerrors.Errorf("load state tree: %w", err) + } + if idAddr, err = st.LookupID(addr); err != nil { + return 0, xerrors.Errorf("state tree lookup id: %w", err) + } + } + actor, err := address.IDFromAddress(idAddr) + if err != nil { + return 0, xerrors.Errorf("resolve actor id: id from addr: %w", err) + } + return abi.ActorID(actor), nil +} diff --git a/chain/types/actor_event.go b/chain/types/actor_event.go index bf95189e19c..76de8644022 100644 --- a/chain/types/actor_event.go +++ b/chain/types/actor_event.go @@ -31,7 +31,7 @@ type ActorEventFilter struct { // last finalized tipset. // NOTE: In a future upgrade, this will be strict when set and will result in an error if a filter // cannot be fulfilled by the depth of history available in the node. Currently, the node will - // nott return an error, but will return starting from the epoch it has data for. + // not return an error, but will return starting from the epoch it has data for. FromHeight *abi.ChainEpoch `json:"fromHeight,omitempty"` // The height of the latest tipset to include in the query. If empty, the query ends at the @@ -47,9 +47,7 @@ type ActorEvent struct { // Event entries in log form. Entries []EventEntry `json:"entries"` - // Filecoin address of the actor that emitted this event. - // NOTE: In a future upgrade, this will change to always be an ID address. Currently this will be - // either the f4 address, or ID address if an f4 is not available for this actor. + // Emitter is the ID address of the actor that emitted this event. Emitter address.Address `json:"emitter"` // Reverted is set to true if the message that produced this event was reverted because of a network re-org diff --git a/chain/types/ethtypes/eth_types.go b/chain/types/ethtypes/eth_types.go index 2740a3e9d25..26853037033 100644 --- a/chain/types/ethtypes/eth_types.go +++ b/chain/types/ethtypes/eth_types.go @@ -382,8 +382,9 @@ func EthAddressFromFilecoinAddress(addr address.Address) (EthAddress, error) { return EthAddress{}, xerrors.Errorf("f410f addresses cannot embed masked-ID payloads: %s", ethAddr) } return ethAddr, nil + default: + return EthAddress{}, ErrInvalidAddress } - return EthAddress{}, ErrInvalidAddress } // ParseEthAddress parses an Ethereum address from a hex string. diff --git a/go.mod b/go.mod index bb1b33aef1f..ee7846c4f0c 100644 --- a/go.mod +++ b/go.mod @@ -169,6 +169,7 @@ require ( golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 gopkg.in/cheggaaa/pb.v1 v1.0.28 gotest.tools v2.2.0+incompatible + lukechampine.com/blake3 v1.2.1 ) require ( @@ -330,7 +331,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect howett.net/plist v0.0.0-20181124034731-591f970eefbb // indirect - lukechampine.com/blake3 v1.2.1 // indirect ) // https://github.com/magik6k/reflink/commit/cff5a40f3eeca17f44fc95a57ff3878e5ac761dc diff --git a/node/impl/full/actor_events.go b/node/impl/full/actor_events.go index bb192a4cf28..279982fb0fb 100644 --- a/node/impl/full/actor_events.go +++ b/node/impl/full/actor_events.go @@ -117,7 +117,7 @@ func (a *ActorEventHandler) GetActorEventsRaw(ctx context.Context, evtFilter *ty log.Warnf("failed to remove filter: %s", err) } }() - return getCollected(ctx, f), nil + return getCollectedEvents(ctx, f) } type filterParams struct { @@ -249,7 +249,11 @@ func (a *ActorEventHandler) SubscribeActorEventsRaw(ctx context.Context, evtFilt // Handle any historical events that our filter may have picked up ----------------------------- - evs := getCollected(ctx, fm) + evs, err := getCollectedEvents(ctx, fm) + if err != nil { + log.Errorw("failed to get collected events", "err", err) + return + } if len(evs) > 0 { // ensure we get all events out on the channel within one block's time (30s on mainnet) timer := a.clock.Timer(a.blockDelay) @@ -296,9 +300,15 @@ func (a *ActorEventHandler) SubscribeActorEventsRaw(ctx context.Context, evtFilt return false } + emitter, err := address.NewIDAddress(uint64(ce.Emitter)) + if err != nil { + log.Errorw("failed to instantiate address from emitter actor id", "emitter", ce.Emitter, "err", err) + return false + } + buffer = append(buffer, &types.ActorEvent{ Entries: ce.Entries, - Emitter: ce.EmitterAddr, + Emitter: emitter, Reverted: ce.Reverted, Height: ce.Height, TipSetKey: ce.TipSetKey, @@ -356,21 +366,22 @@ func (a *ActorEventHandler) SubscribeActorEventsRaw(ctx context.Context, evtFilt return out, nil } -func getCollected(ctx context.Context, f filter.EventFilter) []*types.ActorEvent { +func getCollectedEvents(ctx context.Context, f filter.EventFilter) ([]*types.ActorEvent, error) { ces := f.TakeCollectedEvents(ctx) - - var out []*types.ActorEvent - + out := make([]*types.ActorEvent, 0, len(ces)) for _, e := range ces { + emitter, err := address.NewIDAddress(uint64(e.Emitter)) + if err != nil { + return nil, err + } out = append(out, &types.ActorEvent{ Entries: e.Entries, - Emitter: e.EmitterAddr, + Emitter: emitter, Reverted: e.Reverted, Height: e.Height, TipSetKey: e.TipSetKey, MsgCid: e.MsgCid, }) } - - return out + return out, nil } diff --git a/node/impl/full/actor_events_test.go b/node/impl/full/actor_events_test.go index b4c4e103c0c..ec58a8a0552 100644 --- a/node/impl/full/actor_events_test.go +++ b/node/impl/full/actor_events_test.go @@ -237,7 +237,7 @@ func TestGetActorEventsRaw(t *testing.T) { req.Contains(err.Error(), tc.expectErr) } else { req.NoError(err) - expectedEvents := collectedToActorEvents(collectedEvents) + expectedEvents := collectedToActorEvents(t, collectedEvents) req.Equal(expectedEvents, gotEvents) efm.requireRemoved(filter.ID()) } @@ -314,7 +314,7 @@ func TestSubscribeActorEventsRaw(t *testing.T) { t.Fatalf("timed out waiting for event") } } - req.Equal(collectedToActorEvents(historicalEvents), gotEvents) + req.Equal(collectedToActorEvents(t, historicalEvents), gotEvents) mockClock.Add(blockDelay) nextReceiveTime := mockClock.Now() @@ -389,7 +389,7 @@ func TestSubscribeActorEventsRaw(t *testing.T) { // sanity check that we got what we expected this epoch req.Len(newEvents, eventsPerEpoch) epochEvents := allEvents[(thisHeight)*eventsPerEpoch : (thisHeight+1)*eventsPerEpoch] - req.Equal(collectedToActorEvents(epochEvents), newEvents) + req.Equal(collectedToActorEvents(t, epochEvents), newEvents) gotEvents = append(gotEvents, newEvents...) } } @@ -397,7 +397,7 @@ func TestSubscribeActorEventsRaw(t *testing.T) { req.Equal(tc.expectComplete, !prematureEnd, "expected to complete") if tc.expectComplete { req.Len(gotEvents, len(allEvents)) - req.Equal(collectedToActorEvents(allEvents), gotEvents) + req.Equal(collectedToActorEvents(t, allEvents), gotEvents) } else { req.NotEqual(len(gotEvents), len(allEvents)) } @@ -481,7 +481,7 @@ func TestSubscribeActorEventsRaw_OnlyHistorical(t *testing.T) { } } if tc.expectComplete { - req.Equal(collectedToActorEvents(allEvents), gotEvents) + req.Equal(collectedToActorEvents(t, allEvents), gotEvents) } else { req.NotEqual(len(gotEvents), len(allEvents)) } @@ -609,7 +609,7 @@ func (m *mockFilter) TakeCollectedEvents(context.Context) []*filter.CollectedEve return e } -func (m *mockFilter) CollectEvents(context.Context, *filter.TipSetEvents, bool, filter.AddressResolver) error { +func (m *mockFilter) CollectEvents(context.Context, *filter.TipSetEvents, bool) error { m.t.Fatalf("unexpected call to CollectEvents") return nil } @@ -729,12 +729,15 @@ func epochPtr(i int) *abi.ChainEpoch { return &e } -func collectedToActorEvents(collected []*filter.CollectedEvent) []*types.ActorEvent { +func collectedToActorEvents(t *testing.T, collected []*filter.CollectedEvent) []*types.ActorEvent { + t.Helper() var out []*types.ActorEvent for _, c := range collected { + emitter, err := address.NewIDAddress(uint64(c.Emitter)) + require.NoError(t, err) out = append(out, &types.ActorEvent{ Entries: c.Entries, - Emitter: c.EmitterAddr, + Emitter: emitter, Reverted: c.Reverted, Height: c.Height, TipSetKey: c.TipSetKey, @@ -755,25 +758,24 @@ func makeCollectedEvents(t *testing.T, rng *pseudo.Rand, eventStartHeight, event } func makeCollectedEvent(t *testing.T, rng *pseudo.Rand, tsKey types.TipSetKey, height abi.ChainEpoch) *filter.CollectedEvent { - addr, err := address.NewIDAddress(uint64(rng.Int63())) - require.NoError(t, err) - + t.Helper() return &filter.CollectedEvent{ Entries: []types.EventEntry{ {Flags: 0x01, Key: "k1", Codec: cid.Raw, Value: []byte("v1")}, {Flags: 0x01, Key: "k2", Codec: cid.Raw, Value: []byte("v2")}, }, - EmitterAddr: addr, - EventIdx: 0, - Reverted: false, - Height: height, - TipSetKey: tsKey, - MsgIdx: 0, - MsgCid: testCid, + Emitter: abi.ActorID(rng.Int63()), + EventIdx: 0, + Reverted: false, + Height: height, + TipSetKey: tsKey, + MsgIdx: 0, + MsgCid: testCid, } } func mkCid(t *testing.T, s string) cid.Cid { + t.Helper() h, err := multihash.Sum([]byte(s), multihash.SHA2_256, -1) require.NoError(t, err) return cid.NewCidV1(cid.Raw, h) diff --git a/node/impl/full/eth_events.go b/node/impl/full/eth_events.go index 063590d8dd2..9ff0000576d 100644 --- a/node/impl/full/eth_events.go +++ b/node/impl/full/eth_events.go @@ -10,6 +10,7 @@ import ( "github.com/zyedidia/generic/queue" "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/lotus/chain/events/filter" @@ -112,11 +113,22 @@ func ethFilterResultFromEvents(ctx context.Context, evs []*filter.CollectedEvent continue } - log.Address, err = ethtypes.EthAddressFromFilecoinAddress(ev.EmitterAddr) + emitterAddr, err := address.NewIDAddress(uint64(ev.Emitter)) if err != nil { - return nil, err + return nil, xerrors.Errorf("emitter to addr: %w", err) } + actor, err := sa.StateGetActor(ctx, emitterAddr, ev.TipSetKey) + if err != nil { + return nil, xerrors.Errorf("state get actor: %w", err) + } + if actor == nil && actor.Address == nil { + return nil, xerrors.New("state get actor: nil") + } + log.Address, err = ethtypes.EthAddressFromFilecoinAddress(*actor.Address) + if err != nil { + return nil, xerrors.Errorf("eth addr from fil: %w", err) + } log.TransactionHash, err = ethTxHashFromMessageCid(ctx, ev.MsgCid, sa) if err != nil { return nil, err diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index d92da1940a9..430cc2fdcd9 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -7,7 +7,6 @@ import ( "go.uber.org/fx" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/build" @@ -16,7 +15,6 @@ import ( "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/helpers" @@ -98,6 +96,14 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, chainapi full.ChainAPI) (*filter.EventFilterManager, error) { ctx := helpers.LifecycleCtx(mctx, lc) + const ( + // TODO: Consider introducing configuration parameters for constants below. + resolverCacheSize = 1024 + resolverCacheExpiry = time.Minute + resolverCacheNilTipSet = false + ) + actorResolver := filter.NewCachedActorResolver(sm.ResolveActorID, resolverCacheSize, resolverCacheExpiry, resolverCacheNilTipSet) + // Enable indexing of actor events var eventIndex *filter.EventIndex if !cfg.DisableHistoricFilterAPI { @@ -126,24 +132,9 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L } fm := &filter.EventFilterManager{ - ChainStore: cs, - EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true - // TODO: - // We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands - AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { - idAddr, err := address.NewIDAddress(uint64(emitter)) - if err != nil { - return address.Undef, false - } - - actor, err := sm.LoadActor(ctx, idAddr, ts) - if err != nil || actor.Address == nil { - return idAddr, true - } - - return *actor.Address, true - }, - + ChainStore: cs, + EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true + ActorResolver: actorResolver, MaxFilterResults: cfg.MaxFilterResults, }