Skip to content

Commit

Permalink
feat: Add support to backfill actor events + resuming when left off
Browse files Browse the repository at this point in the history
  • Loading branch information
fridrik01 committed Jun 23, 2023
1 parent a2431ff commit ef4028e
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 5 deletions.
34 changes: 29 additions & 5 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -51,16 +52,23 @@ var ddls = []string{
value BLOB NOT NULL
)`,

// contains the height that we have backfilled to
`CREATE TABLE IF NOT EXISTS event_backfilled (
height INTEGER NOT NULL UNIQUE
)`,

// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`,

// version 1.
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
// version 2.
`INSERT OR IGNORE INTO _meta (version) VALUES (2)`,
}

const schemaVersion = 1
var log = logging.Logger("filter")

const schemaVersion = 2

const (
insertEvent = `INSERT OR IGNORE INTO event
Expand Down Expand Up @@ -102,15 +110,31 @@ func NewEventIndex(path string) (*EventIndex, error) {
_ = db.Close()
return nil, xerrors.Errorf("looking for _meta table: %w", err)
} else {
// Ensure we don't open a database from a different schema version

// check the schema version to see if we need to upgrade the database schema
row := db.QueryRow("SELECT max(version) FROM _meta")
var version int
err := row.Scan(&version)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: no version found")
}

if version == 1 {
log.Infof("upgrading event index from version 1 to version 2")

// to upgrade to version version 2 we only need to create the event_backfilled_ranges table
// which means we can just recreate the schema (it will not have any effect on existing data)
for _, ddl := range ddls {
if _, err := db.Exec(ddl); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not upgrade index to version 2, exec ddl %q: %w", ddl, err)
}
}

version = 2
}

// unsupported schema version
if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
Expand Down
8 changes: 8 additions & 0 deletions documentation/en/default-lotus-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,14 @@
# env var: LOTUS_FEVM_EVENTS_DATABASEPATH
#DatabasePath = ""

# EnableEventBackfillOnStartup will run a backfill of all actor events from the current head up to the genesis
# block (while we have state) and populate the events.db. The DisableHistoricFilterAPI must be set to false
# for this to have any effect.
#
# type: bool
# env var: LOTUS_FEVM_EVENTS_ENABLEEVENTBACKFILLONSTARTUP
#EnableEventBackfillOnStartup = false


[Index]
# EXPERIMENTAL FEATURE. USE WITH CAUTION
Expand Down
8 changes: 8 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,11 @@ type Events struct {
// relative to the CWD (current working directory).
DatabasePath string

// EnableEventBackfillOnStartup will run a backfill of all actor events from the current head up to the genesis
// block (while we have state) and populate the events.db. The DisableHistoricFilterAPI must be set to false
// for this to have any effect.
EnableEventBackfillOnStartup bool

// Others, not implemented yet:
// Set a limit on the number of active websocket subscriptions (may be zero)
// Set a timeout for subscription clients
Expand Down
278 changes: 278 additions & 0 deletions node/impl/full/backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
package full

import (
"context"
"database/sql"
"fmt"
"path/filepath"

"github.com/multiformats/go-varint"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/exitcode"

"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
)

func isIndexedValue(b uint8) bool {
// currently we mark the full entry as indexed if either the key
// or the value are indexed; in the future we will need finer-grained
// management of indices
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

// BackfillEvents walks up the chain from the current head and backfills all actor events that were not stored in
// the event.db database.
func BackfillEvents(ctx context.Context, chainapi ChainAPI, stateApi StateAPI, stateManager *stmgr.StateManager) error {
resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
// we only want to match using f4 addresses
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}

actor, err := stateManager.LoadActor(ctx, idAddr, ts)
if err != nil || actor.Address == nil {
return address.Undef, false
}

// if robust address is not f4 then we won't match against it so bail early
if actor.Address.Protocol() != address.Delegated {
return address.Undef, false
}
// we have an f4 address, make sure it's assigned by the EAM
if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
return *actor.Address, true
}

dbPath := filepath.Join(chainapi.Repo.Path(), "sqlite", "events.db")
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
}

defer func() {
err := db.Close()
if err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()

stmtSelectEvent, err := db.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false")
if err != nil {
return err
}
stmtSelectEntry, err := db.Prepare("SELECT EXISTS(SELECT 1 from event_entry WHERE event_id=? and indexed=? and flags=? and key=? and codec=? and value=?)")
if err != nil {
return err
}
stmtEvent, err := db.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
stmtEntry, err := db.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}

checkTs := chainapi.Chain.GetHeaviestTipSet()

// look into the event_backfilled table to see where we left off
var height sql.NullInt32
err = db.QueryRow("SELECT MIN(height) FROM event_backfilled").Scan(&height)
if err != nil {
return err
}
if !height.Valid {
// no backfilling has been done yet, so we need to start from the beginning (current head)
_, err := db.Exec("INSERT INTO event_backfilled(height) VALUES(?)", checkTs.Height())
if err != nil {
return err
}
log.Infof("Starting backfill from current head %d", checkTs.Height())
} else {
checkTs, err = chainapi.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height.Int32), checkTs.Key())
if err != nil {
return err
}
log.Infof("Starting backfill from epoch %d", checkTs.Height())
}

addressLookups := make(map[abi.ActorID]address.Address)

var eventsAffected int64
var entriesAffected int64
for i := 0; i < int(checkTs.Height()); i++ {
select {
case <-ctx.Done():
log.Infoln("request cancelled")
return nil
default:
}

execTsk := checkTs.Parents()
execTs, err := chainapi.Chain.GetTipSetFromKey(ctx, execTsk)
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", execTsk, err)
}

if i%100 == 0 {
log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", i, execTs.Height(), eventsAffected, entriesAffected)
}

_, rcptRoot, err := stateManager.TipSetState(ctx, execTs)
if err != nil {
return fmt.Errorf("failed to load tipset state %s: %w", execTsk, err)
}

msgs, err := chainapi.Chain.MessagesForTipset(ctx, execTs)
if err != nil {
return fmt.Errorf("failed to load messages for tipset %s: %w", execTsk, err)
}

rcpts, err := chainapi.Chain.ReadReceipts(ctx, rcptRoot)
if err != nil {
return fmt.Errorf("failed to load receipts for tipset %s: %w", execTsk, err)
}

if len(msgs) != len(rcpts) {
return fmt.Errorf("mismatched message and receipt count %d vs %d", len(msgs), len(rcpts))
}

for idx, rcpt := range rcpts {
msg := msgs[idx]

if rcpt.ExitCode != exitcode.Ok {
continue
}
if rcpt.EventsRoot == nil {
continue
}

events, err := chainapi.ChainGetEvents(ctx, *rcpt.EventsRoot)
if err != nil {
return fmt.Errorf("failed to load events for tipset %s: %w", execTsk, err)
}

for eventIdx, event := range events {
addr, found := addressLookups[event.Emitter]
if !found {
var ok bool
addr, ok = resolveFn(ctx, event.Emitter, execTs)
if !ok {
// not an address we will be able to match against
continue
}
addressLookups[event.Emitter] = addr
}

//log.Infof("[BackfillEvents] %d %d: event:%+v", eventIdx, execTs.Height(), event)
tsKeyCid, err := execTs.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
}

// select the highest event id that exists in database, or null if none exists
var entryID sql.NullInt64
err = stmtSelectEvent.QueryRow(
execTs.Height(),
execTs.Key().Bytes(),
tsKeyCid.Bytes(),
addr.Bytes(),
eventIdx,
msg.Cid().Bytes(),
idx,
).Scan(&entryID)
if err != nil {
return fmt.Errorf("error checking if event exists: %w", err)
}

if !entryID.Valid {
// event does not exist, lets backfill it
res, err := stmtEvent.Exec(
execTs.Height(), // height
execTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
eventIdx, // event_index
msg.Cid().Bytes(), // message_cid
idx, // message_index
false, // reverted
)

if err != nil {
return fmt.Errorf("error inserting event: %w", err)
}

entryID.Int64, err = res.LastInsertId()
if err != nil {
return fmt.Errorf("could not get last insert id: %w", err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error getting rows affected: %s", err)
}

eventsAffected += rowsAffected
}

//log.Infof("[BackfillEvents] entryID:%d", entryID.Int64)
for _, entry := range event.Entries {
//log.Infof(" [BackfillEvents] entry:%+v", entry)

// check if entry exists
var exists bool
err = stmtSelectEntry.QueryRow(
entryID.Int64,
isIndexedValue(entry.Flags),
[]byte{entry.Flags},
entry.Key,
entry.Codec,
entry.Value,
).Scan(&exists)
if err != nil {
return fmt.Errorf("error checking if entry exists: %w", err)
}

if !exists {
// entry does not exist, lets backfill it
res, err := stmtEntry.Exec(
entryID.Int64, // event_id
isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
entry.Codec, // codec
entry.Value, // value
)
if err != nil {
return fmt.Errorf("error inserting entry: %w", err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error getting rows affected: %s", err)
}
entriesAffected += rowsAffected
}
}
}
}

_, err = db.Exec("UPDATE event_backfilled set height =?", execTs.Height())
if err != nil {
return fmt.Errorf("error updating backfill at height %d: %w", execTs.Height(), err)
}

checkTs = execTs
}

log.Infof("backfilling events complete, eventsAffected:%d, entriesAffected:%d", eventsAffected, entriesAffected)

return nil
}
Loading

0 comments on commit ef4028e

Please sign in to comment.