Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: index actor events by actor ID rather than f4 addr #11694

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 99 additions & 58 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package filter
import (
"bytes"
"context"
"fmt"
"math"
"sync"
"time"

"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
Expand All @@ -20,20 +20,22 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

type ActorResolver func(ctx context.Context, addr address.Address, ts *types.TipSet) (abi.ActorID, error)
type AddressResolver func(ctx context.Context, actor abi.ActorID, ts *types.TipSet) (address.Address, bool)
type TipsetResolver func(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)

func isIndexedValue(b uint8) bool {
// currently we mark the full entry as indexed if either the key
// or the value are indexed; in the future we will need finer-grained
// management of indices
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool)

type EventFilter interface {
Filter

TakeCollectedEvents(context.Context) []*CollectedEvent
CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error
CollectEvents(context.Context, *TipSetEvents, bool, ActorResolver, AddressResolver) error
}

type eventFilter struct {
Expand Down Expand Up @@ -82,39 +84,62 @@ func (f *eventFilter) ClearSubChannel() {
f.ch = nil
}

func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error {
func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, actorResolver ActorResolver, addressResolver AddressResolver) error {
if !f.matchTipset(te) {
return nil
}

// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address)

ems, err := te.messages(ctx)
filterByActorsMap := make(map[abi.ActorID]address.Address, len(f.addresses))
for _, addr := range f.addresses {
actor, err := actorResolver(ctx, addr, te.rctTs)
if err != nil {
// not an address we will be able to match against
continue
}
filterByActorsMap[actor] = addr
}
if len(f.addresses) > 0 && len(filterByActorsMap) == 0 {
// none of the addresses we were asked to match against were found in the tipset
log.Warnf("no actors found for addresses specified by filter in this tipset")
return nil
}

actorsCache := make(map[abi.ActorID]address.Address)

if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
return fmt.Errorf("load executed messages: %w", err)
}
for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
// lookup address corresponding to the actor id
addr, found := addressLookups[ev.Emitter]
if !found {
var ok bool
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
if !ok {
// not an address we will be able to match against
if !f.matchKeys(ev.Entries) {
continue
}
addr := address.Undef
// first look in our filter for the actor
if len(filterByActorsMap) > 0 {
if a, ok := filterByActorsMap[ev.Emitter]; !ok {
continue
} else if ok && a.Protocol() != address.ID {
// should be OK to use this as the returned address
addr = a
}
addressLookups[ev.Emitter] = addr
}

if !f.matchAddress(addr) {
continue
}
if !f.matchKeys(ev.Entries) {
continue
if addr == address.Undef {
// look in the cache for the mapping or resolve it
var ok bool
if addr, ok = actorsCache[ev.Emitter]; !ok {
if addr, ok = addressResolver(ctx, ev.Emitter, te.rctTs); !ok {
// can't resolve it, so last resort: create an ID address from the actor ID
addr, err = address.NewIDAddress(uint64(ev.Emitter))
if err != nil {
log.Warnf("failed to create ID address from actor ID: %s", err)
continue
}
}
actorsCache[ev.Emitter] = addr
}
}

// event matches filter, so record it
cev := &CollectedEvent{
Entries: ev.Entries,
Expand Down Expand Up @@ -188,23 +213,8 @@ func (f *eventFilter) matchTipset(te *TipSetEvents) bool {
return true
}

func (f *eventFilter) matchAddress(o address.Address) bool {
if len(f.addresses) == 0 {
return true
}

// Assume short lists of addresses
// TODO: binary search for longer lists or restrict list length
for _, a := range f.addresses {
if a == o {
return true
}
}
return false
}

func (f *eventFilter) matchKeys(ees []types.EventEntry) bool {
if len(f.keysWithCodec) == 0 {
if len(f.keysWithCodec) == 0 { // no keys specified, so match all
return true
}
// TODO: optimize this naive algorithm
Expand Down Expand Up @@ -301,10 +311,13 @@ func (e *executedMessage) Events() []*types.Event {

type EventFilterManager struct {
ChainStore *cstore.ChainStore
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
MaxFilterResults int
EventIndex *EventIndex

ActorResolver ActorResolver
AddressResolver AddressResolver
TipsetResolver TipsetResolver

mu sync.Mutex // guards mutations to filters
filters map[types.FilterID]EventFilter
currentHeight abi.ChainEpoch
Expand All @@ -326,14 +339,14 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
}

if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, false); err != nil {
return err
}
}

// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
if err := f.CollectEvents(ctx, tse, false, m.ActorResolver, m.AddressResolver); err != nil {
return err
}
}
Expand All @@ -357,34 +370,42 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
}

if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, true); err != nil {
return err
}
}

// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
if err := f.CollectEvents(ctx, tse, true, m.ActorResolver, m.AddressResolver); err != nil {
return err
}
}

return nil
}

func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) {
func (m *EventFilterManager) Install(
ctx context.Context,
minHeight,
maxHeight abi.ChainEpoch,
tipsetCid cid.Cid,
addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock,
excludeReverted bool,
) (EventFilter, error) {

m.mu.Lock()
currentHeight := m.currentHeight
m.mu.Unlock()

if m.EventIndex == nil && minHeight != -1 && minHeight < currentHeight {
return nil, xerrors.Errorf("historic event index disabled")
return nil, fmt.Errorf("historic event index disabled")
}

id, err := newFilterID()
if err != nil {
return nil, xerrors.Errorf("new filter id: %w", err)
return nil, fmt.Errorf("new filter id: %w", err)
}

f := &eventFilter{
Expand All @@ -399,7 +420,27 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a

if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight {
// Filter needs historic events
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil {

// filterActors is actors at the _current_ tipset, which we'll use for
// historical addresses; which should be OK as long as we're not looking at
// reverted events.
filterActors := make([]abi.ActorID, 0, len(addresses))
for _, addr := range addresses {
actor, err := m.ActorResolver(ctx, addr, nil)
if err != nil {
// not an address we will be able to match against
continue
}
filterActors = append(filterActors, actor)
}
if len(addresses) > 0 {
if len(filterActors) == 0 {
// none of the addresses we were asked to match against were found in the tipset
return nil, fmt.Errorf("no actors found for addresses")
}
excludeReverted = true // we can't match against reverted events
}
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted, filterActors, m.AddressResolver, m.TipsetResolver); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -427,18 +468,18 @@ func (m *EventFilterManager) Remove(ctx context.Context, id types.FilterID) erro
func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
msgs, err := m.ChainStore.MessagesForTipset(ctx, msgTs)
if err != nil {
return nil, xerrors.Errorf("read messages: %w", err)
return nil, fmt.Errorf("read messages: %w", err)
}

st := m.ChainStore.ActorStore(ctx)

arr, err := blockadt.AsArray(st, rctTs.Blocks()[0].ParentMessageReceipts)
if err != nil {
return nil, xerrors.Errorf("load receipts amt: %w", err)
return nil, fmt.Errorf("load receipts amt: %w", err)
}

if uint64(len(msgs)) != arr.Length() {
return nil, xerrors.Errorf("mismatching message and receipt counts (%d msgs, %d rcts)", len(msgs), arr.Length())
return nil, fmt.Errorf("mismatching message and receipt counts (%d msgs, %d rcts)", len(msgs), arr.Length())
}

ems := make([]executedMessage, len(msgs))
Expand All @@ -449,10 +490,10 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
var rct types.MessageReceipt
found, err := arr.Get(uint64(i), &rct)
if err != nil {
return nil, xerrors.Errorf("load receipt: %w", err)
return nil, fmt.Errorf("load receipt: %w", err)
}
if !found {
return nil, xerrors.Errorf("receipt %d not found", i)
return nil, fmt.Errorf("receipt %d not found", i)
}
ems[i].rct = &rct

Expand All @@ -462,14 +503,14 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc

evtArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
if err != nil {
return nil, xerrors.Errorf("load events amt: %w", err)
return nil, fmt.Errorf("load events amt: %w", err)
}

ems[i].evs = make([]*types.Event, evtArr.Len())
var evt types.Event
err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error {
if u > math.MaxInt {
return xerrors.Errorf("too many events")
return fmt.Errorf("too many events")
}
if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil {
return err
Expand All @@ -481,7 +522,7 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
})

if err != nil {
return nil, xerrors.Errorf("read events: %w", err)
return nil, fmt.Errorf("read events: %w", err)
}

}
Expand Down
17 changes: 16 additions & 1 deletion chain/events/filter/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filter

import (
"context"
"fmt"
pseudo "math/rand"
"testing"

Expand Down Expand Up @@ -276,7 +277,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
for _, tc := range testCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := tc.filter.CollectEvents(context.Background(), tc.te, false, addrMap.ResolveAddress); err != nil {
if err := tc.filter.CollectEvents(context.Background(), tc.te, false, addrMap.ResolveActor, addrMap.ResolveAddress); err != nil {
require.NoError(t, err, "collect events")
}

Expand Down Expand Up @@ -444,3 +445,17 @@ func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts
ra, ok := a[emitter]
return ra, ok
}

func (a addressMap) ResolveActor(ctx context.Context, emitter address.Address, ts *types.TipSet) (abi.ActorID, error) {
for id, addr := range a {
if addr == emitter {
return id, nil
}
}
fmt.Println("Have:")
for id, addr := range a {
fmt.Printf("%d -> %s\n", id, addr)
}
fmt.Println("Want:", emitter)
return 0, fmt.Errorf("address not found")
}
Loading