Skip to content

Commit

Permalink
events: index actor events by actor ID rather than addr
Browse files Browse the repository at this point in the history
Ref: #11594
  • Loading branch information
rvagg committed Mar 7, 2024
1 parent 1f6e556 commit fecd58d
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 119 deletions.
157 changes: 99 additions & 58 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package filter
import (
"bytes"
"context"
"fmt"
"math"
"sync"
"time"

"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
Expand All @@ -20,20 +20,22 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

type ActorResolver func(ctx context.Context, addr address.Address, ts *types.TipSet) (abi.ActorID, error)
type AddressResolver func(ctx context.Context, actor abi.ActorID, ts *types.TipSet) (address.Address, bool)
type TipsetResolver func(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)

func isIndexedValue(b uint8) bool {
// currently we mark the full entry as indexed if either the key
// or the value are indexed; in the future we will need finer-grained
// management of indices
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool)

type EventFilter interface {
Filter

TakeCollectedEvents(context.Context) []*CollectedEvent
CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error
CollectEvents(context.Context, *TipSetEvents, bool, ActorResolver, AddressResolver) error
}

type eventFilter struct {
Expand Down Expand Up @@ -82,39 +84,62 @@ func (f *eventFilter) ClearSubChannel() {
f.ch = nil
}

func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error {
func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, actorResolver ActorResolver, addressResolver AddressResolver) error {
if !f.matchTipset(te) {
return nil
}

// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address)

ems, err := te.messages(ctx)
filterByActorsMap := make(map[abi.ActorID]address.Address, len(f.addresses))
for _, addr := range f.addresses {
actor, err := actorResolver(ctx, addr, te.rctTs)
if err != nil {
// not an address we will be able to match against
continue
}
filterByActorsMap[actor] = addr
}
if len(f.addresses) > 0 && len(filterByActorsMap) == 0 {
// none of the addresses we were asked to match against were found in the tipset
log.Warnf("no actors found for addresses specified by filter in this tipset")
return nil
}

actorsCache := make(map[abi.ActorID]address.Address)

if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
return fmt.Errorf("load executed messages: %w", err)
}
for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
// lookup address corresponding to the actor id
addr, found := addressLookups[ev.Emitter]
if !found {
var ok bool
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
if !ok {
// not an address we will be able to match against
if !f.matchKeys(ev.Entries) {
continue
}
addr := address.Undef
// first look in our filter for the actor
if len(filterByActorsMap) > 0 {
if a, ok := filterByActorsMap[ev.Emitter]; !ok {
continue
} else if ok && a.Protocol() != address.ID {
// should be OK to use this as the returned address
addr = a
}
addressLookups[ev.Emitter] = addr
}

if !f.matchAddress(addr) {
continue
}
if !f.matchKeys(ev.Entries) {
continue
if addr == address.Undef {
// look in the cache for the mapping or resolve it
var ok bool
if addr, ok = actorsCache[ev.Emitter]; !ok {
if addr, ok = addressResolver(ctx, ev.Emitter, te.rctTs); !ok {
// can't resolve it, so last resort: create an ID address from the actor ID
addr, err = address.NewIDAddress(uint64(ev.Emitter))
if err != nil {
log.Warnf("failed to create ID address from actor ID: %s", err)
continue
}
}
actorsCache[ev.Emitter] = addr
}
}

// event matches filter, so record it
cev := &CollectedEvent{
Entries: ev.Entries,
Expand Down Expand Up @@ -188,23 +213,8 @@ func (f *eventFilter) matchTipset(te *TipSetEvents) bool {
return true
}

func (f *eventFilter) matchAddress(o address.Address) bool {
if len(f.addresses) == 0 {
return true
}

// Assume short lists of addresses
// TODO: binary search for longer lists or restrict list length
for _, a := range f.addresses {
if a == o {
return true
}
}
return false
}

func (f *eventFilter) matchKeys(ees []types.EventEntry) bool {
if len(f.keysWithCodec) == 0 {
if len(f.keysWithCodec) == 0 { // no keys specified, so match all
return true
}
// TODO: optimize this naive algorithm
Expand Down Expand Up @@ -301,10 +311,13 @@ func (e *executedMessage) Events() []*types.Event {

type EventFilterManager struct {
ChainStore *cstore.ChainStore
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
MaxFilterResults int
EventIndex *EventIndex

ActorResolver ActorResolver
AddressResolver AddressResolver
TipsetResolver TipsetResolver

mu sync.Mutex // guards mutations to filters
filters map[types.FilterID]EventFilter
currentHeight abi.ChainEpoch
Expand All @@ -326,14 +339,14 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
}

if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, false); err != nil {
return err
}
}

// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
if err := f.CollectEvents(ctx, tse, false, m.ActorResolver, m.AddressResolver); err != nil {
return err
}
}
Expand All @@ -357,34 +370,42 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
}

if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, true); err != nil {
return err
}
}

// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
if err := f.CollectEvents(ctx, tse, true, m.ActorResolver, m.AddressResolver); err != nil {
return err
}
}

return nil
}

func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) {
func (m *EventFilterManager) Install(
ctx context.Context,
minHeight,
maxHeight abi.ChainEpoch,
tipsetCid cid.Cid,
addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock,
excludeReverted bool,
) (EventFilter, error) {

m.mu.Lock()
currentHeight := m.currentHeight
m.mu.Unlock()

if m.EventIndex == nil && minHeight != -1 && minHeight < currentHeight {
return nil, xerrors.Errorf("historic event index disabled")
return nil, fmt.Errorf("historic event index disabled")
}

id, err := newFilterID()
if err != nil {
return nil, xerrors.Errorf("new filter id: %w", err)
return nil, fmt.Errorf("new filter id: %w", err)
}

f := &eventFilter{
Expand All @@ -399,7 +420,27 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a

if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight {
// Filter needs historic events
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil {

// filterActors is actors at the _current_ tipset, which we'll use for
// historical addresses; which should be OK as long as we're not looking at
// reverted events.
filterActors := make([]abi.ActorID, 0, len(addresses))
for _, addr := range addresses {
actor, err := m.ActorResolver(ctx, addr, nil)
if err != nil {
// not an address we will be able to match against
continue
}
filterActors = append(filterActors, actor)
}
if len(addresses) > 0 {
if len(filterActors) == 0 {
// none of the addresses we were asked to match against were found in the tipset
return nil, fmt.Errorf("no actors found for addresses")
}
excludeReverted = true // we can't match against reverted events
}
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted, filterActors, m.AddressResolver, m.TipsetResolver); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -427,18 +468,18 @@ func (m *EventFilterManager) Remove(ctx context.Context, id types.FilterID) erro
func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
msgs, err := m.ChainStore.MessagesForTipset(ctx, msgTs)
if err != nil {
return nil, xerrors.Errorf("read messages: %w", err)
return nil, fmt.Errorf("read messages: %w", err)
}

st := m.ChainStore.ActorStore(ctx)

arr, err := blockadt.AsArray(st, rctTs.Blocks()[0].ParentMessageReceipts)
if err != nil {
return nil, xerrors.Errorf("load receipts amt: %w", err)
return nil, fmt.Errorf("load receipts amt: %w", err)
}

if uint64(len(msgs)) != arr.Length() {
return nil, xerrors.Errorf("mismatching message and receipt counts (%d msgs, %d rcts)", len(msgs), arr.Length())
return nil, fmt.Errorf("mismatching message and receipt counts (%d msgs, %d rcts)", len(msgs), arr.Length())
}

ems := make([]executedMessage, len(msgs))
Expand All @@ -449,10 +490,10 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
var rct types.MessageReceipt
found, err := arr.Get(uint64(i), &rct)
if err != nil {
return nil, xerrors.Errorf("load receipt: %w", err)
return nil, fmt.Errorf("load receipt: %w", err)
}
if !found {
return nil, xerrors.Errorf("receipt %d not found", i)
return nil, fmt.Errorf("receipt %d not found", i)
}
ems[i].rct = &rct

Expand All @@ -462,14 +503,14 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc

evtArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
if err != nil {
return nil, xerrors.Errorf("load events amt: %w", err)
return nil, fmt.Errorf("load events amt: %w", err)
}

ems[i].evs = make([]*types.Event, evtArr.Len())
var evt types.Event
err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error {
if u > math.MaxInt {
return xerrors.Errorf("too many events")
return fmt.Errorf("too many events")
}
if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil {
return err
Expand All @@ -481,7 +522,7 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
})

if err != nil {
return nil, xerrors.Errorf("read events: %w", err)
return nil, fmt.Errorf("read events: %w", err)
}

}
Expand Down
17 changes: 16 additions & 1 deletion chain/events/filter/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filter

import (
"context"
"fmt"
pseudo "math/rand"
"testing"

Expand Down Expand Up @@ -276,7 +277,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
for _, tc := range testCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := tc.filter.CollectEvents(context.Background(), tc.te, false, addrMap.ResolveAddress); err != nil {
if err := tc.filter.CollectEvents(context.Background(), tc.te, false, addrMap.ResolveActor, addrMap.ResolveAddress); err != nil {
require.NoError(t, err, "collect events")
}

Expand Down Expand Up @@ -444,3 +445,17 @@ func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts
ra, ok := a[emitter]
return ra, ok
}

func (a addressMap) ResolveActor(ctx context.Context, emitter address.Address, ts *types.TipSet) (abi.ActorID, error) {
for id, addr := range a {
if addr == emitter {
return id, nil
}
}
fmt.Println("Have:")
for id, addr := range a {
fmt.Printf("%d -> %s\n", id, addr)
}
fmt.Println("Want:", emitter)
return 0, fmt.Errorf("address not found")
}
Loading

0 comments on commit fecd58d

Please sign in to comment.