Skip to content

Commit

Permalink
Merge pull request #7000 from filecoin-project/feat/refactor-events
Browse files Browse the repository at this point in the history
Refactor events subsystem
  • Loading branch information
magik6k authored Aug 31, 2021
2 parents d113813 + 1da59fa commit b0f57d7
Show file tree
Hide file tree
Showing 49 changed files with 1,527 additions and 1,190 deletions.
1 change: 1 addition & 0 deletions api/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Gateway interface {
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*HeadChange, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
Expand Down
13 changes: 13 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
1 change: 1 addition & 0 deletions build/params_2k.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build debug || 2k
// +build debug 2k

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_butterfly.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build butterflynet
// +build butterflynet

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_calibnet.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build calibnet
// +build calibnet

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_debug.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build debug
// +build debug

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_interop.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build interopnet
// +build interopnet

package build
Expand Down
9 changes: 2 additions & 7 deletions build/params_mainnet.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
// +build !debug
// +build !2k
// +build !testground
// +build !calibnet
// +build !nerpanet
// +build !butterflynet
// +build !interopnet
//go:build !debug && !2k && !testground && !calibnet && !nerpanet && !butterflynet && !interopnet
// +build !debug,!2k,!testground,!calibnet,!nerpanet,!butterflynet,!interopnet

package build

Expand Down
1 change: 1 addition & 0 deletions build/params_nerpanet.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build nerpanet
// +build nerpanet

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_shared_vals.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !testground
// +build !testground

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_testground.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build testground
// +build testground

// This file makes hardcoded parameters (const) configurable as vars.
Expand Down
3 changes: 2 additions & 1 deletion build/tools.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//+build tools
//go:build tools
// +build tools

package build

Expand Down
33 changes: 33 additions & 0 deletions chain/events/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package events

import (
"context"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)

type uncachedAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)

StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
}

type cache struct {
*tipSetCache
*messageCache
uncachedAPI
}

func newCache(api EventAPI, gcConfidence abi.ChainEpoch) *cache {
return &cache{
newTSCache(api, gcConfidence),
newMessageCache(api),
api,
}
}
201 changes: 17 additions & 184 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ package events

import (
"context"
"sync"
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)

Expand All @@ -25,209 +21,46 @@ type (
RevertHandler func(ctx context.Context, ts *types.TipSet) error
)

type heightHandler struct {
confidence int
called bool

handle HeightHandler
revert RevertHandler
// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, from, to *types.TipSet) error
Revert(ctx context.Context, from, to *types.TipSet) error
}

type EventAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainHead(context.Context) (*types.TipSet, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)

StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
}

type Events struct {
api EventAPI

tsc *tipSetCache
lk sync.Mutex

ready chan struct{}
readyOnce sync.Once

heightEvents
*observer
*heightEvents
*hcEvents

observers []TipSetObserver
}

func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)
func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) (*Events, error) {
cache := newCache(api, gcConfidence)

e := &Events{
api: api,

tsc: tsc,

heightEvents: heightEvents{
tsc: tsc,
ctx: ctx,
gcConfidence: gcConfidence,

heightTriggers: map[uint64]*heightHandler{},
htTriggerHeights: map[abi.ChainEpoch][]uint64{},
htHeights: map[abi.ChainEpoch][]uint64{},
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
ob := newObserver(cache, gcConfidence)
if err := ob.start(ctx); err != nil {
return nil, err
}

go e.listenHeadChanges(ctx)
he := newHeightEvents(cache, ob, gcConfidence)
headChange := newHCEvents(cache, ob)

// Wait for the first tipset to be seen or bail if shutting down
select {
case <-e.ready:
case <-ctx.Done():
}

return e
return &Events{ob, he, headChange}, nil
}

func NewEvents(ctx context.Context, api EventAPI) *Events {
func NewEvents(ctx context.Context, api EventAPI) (*Events, error) {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, gcConfidence)
}

func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
select {
case <-build.Clock.After(time.Second):
case <-ctx.Done():
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}

log.Info("restarting listenHeadChanges")
}
}

func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}

var cur []*api.HeadChange
var ok bool

// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}

if len(cur) != 1 {
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
}

if cur[0].Type != store.HCCurrent {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}

if err := e.tsc.add(cur[0].Val); err != nil {
log.Warnf("tsc.add: adding current tipset failed: %v", err)
}

e.readyOnce.Do(func() {
e.lastTs = cur[0].Val
// Signal that we have seen first tipset
close(e.ready)
})

for notif := range notifs {
var rev, app []*types.TipSet
for _, notif := range notif {
switch notif.Type {
case store.HCRevert:
rev = append(rev, notif.Val)
case store.HCApply:
app = append(app, notif.Val)
default:
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
}
}

if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}

// sync with fake chainstore (for tests)
if fcs, ok := e.api.(interface{ notifDone() }); ok {
fcs.notifDone()
}
}

return nil
}

func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}

e.lk.Lock()
defer e.lk.Unlock()

if err := e.headChangeAt(rev, app); err != nil {
return err
}

if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}
return e.processHeadChangeEvent(rev, app)
}

// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}

// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}

// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}

for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}

return nil
}
Loading

0 comments on commit b0f57d7

Please sign in to comment.