Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to backfill actor events + resuming when left off #10939

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -51,16 +52,23 @@ var ddls = []string{
value BLOB NOT NULL
)`,

// contains the height that we have backfilled to
`CREATE TABLE IF NOT EXISTS event_backfilled (
height INTEGER NOT NULL UNIQUE
)`,

// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`,

// version 1.
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
// version 2.
`INSERT OR IGNORE INTO _meta (version) VALUES (2)`,
}

const schemaVersion = 1
var log = logging.Logger("filter")

const schemaVersion = 2

const (
insertEvent = `INSERT OR IGNORE INTO event
Expand Down Expand Up @@ -102,15 +110,31 @@ func NewEventIndex(path string) (*EventIndex, error) {
_ = db.Close()
return nil, xerrors.Errorf("looking for _meta table: %w", err)
} else {
// Ensure we don't open a database from a different schema version

// check the schema version to see if we need to upgrade the database schema
row := db.QueryRow("SELECT max(version) FROM _meta")
var version int
err := row.Scan(&version)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: no version found")
}

if version == 1 {
log.Infof("upgrading event index from version 1 to version 2")

// to upgrade to version version 2 we only need to create the event_backfilled_ranges table
// which means we can just recreate the schema (it will not have any effect on existing data)
for _, ddl := range ddls {
if _, err := db.Exec(ddl); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not upgrade index to version 2, exec ddl %q: %w", ddl, err)
}
}

version = 2
}

// unsupported schema version
if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
Expand Down
8 changes: 8 additions & 0 deletions documentation/en/default-lotus-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,14 @@
# env var: LOTUS_FEVM_EVENTS_DATABASEPATH
#DatabasePath = ""

# EnableEventBackfillOnStartup will run a backfill of all actor events from the current head up to the genesis
# block (while we have state) and populate the events.db. The DisableHistoricFilterAPI must be set to false
# for this to have any effect.
#
# type: bool
# env var: LOTUS_FEVM_EVENTS_ENABLEEVENTBACKFILLONSTARTUP
#EnableEventBackfillOnStartup = false


[Index]
# EXPERIMENTAL FEATURE. USE WITH CAUTION
Expand Down
8 changes: 8 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,11 @@ type Events struct {
// relative to the CWD (current working directory).
DatabasePath string

// EnableEventBackfillOnStartup will run a backfill of all actor events from the current head up to the genesis
// block (while we have state) and populate the events.db. The DisableHistoricFilterAPI must be set to false
// for this to have any effect.
EnableEventBackfillOnStartup bool

// Others, not implemented yet:
// Set a limit on the number of active websocket subscriptions (may be zero)
// Set a timeout for subscription clients
Expand Down
278 changes: 278 additions & 0 deletions node/impl/full/backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
package full

import (
"context"
"database/sql"
"fmt"
"path/filepath"

"github.com/multiformats/go-varint"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/exitcode"

"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
)

func isIndexedValue(b uint8) bool {
// currently we mark the full entry as indexed if either the key
// or the value are indexed; in the future we will need finer-grained
// management of indices
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

// BackfillEvents walks up the chain from the current head and backfills all actor events that were not stored in
// the event.db database.
func BackfillEvents(ctx context.Context, chainapi ChainAPI, stateApi StateAPI, stateManager *stmgr.StateManager) error {
resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
// we only want to match using f4 addresses
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}

actor, err := stateManager.LoadActor(ctx, idAddr, ts)
if err != nil || actor.Address == nil {
return address.Undef, false
}

// if robust address is not f4 then we won't match against it so bail early
if actor.Address.Protocol() != address.Delegated {
return address.Undef, false
}
// we have an f4 address, make sure it's assigned by the EAM
if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
return *actor.Address, true
}

dbPath := filepath.Join(chainapi.Repo.Path(), "sqlite", "events.db")
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
}

defer func() {
err := db.Close()
if err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()

stmtSelectEvent, err := db.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false")
if err != nil {
return err
}
stmtSelectEntry, err := db.Prepare("SELECT EXISTS(SELECT 1 from event_entry WHERE event_id=? and indexed=? and flags=? and key=? and codec=? and value=?)")
if err != nil {
return err
}
stmtEvent, err := db.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
stmtEntry, err := db.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}

checkTs := chainapi.Chain.GetHeaviestTipSet()

// look into the event_backfilled table to see where we left off
var height sql.NullInt32
err = db.QueryRow("SELECT MIN(height) FROM event_backfilled").Scan(&height)
if err != nil {
return err
}
if !height.Valid {
// no backfilling has been done yet, so we need to start from the beginning (current head)
_, err := db.Exec("INSERT INTO event_backfilled(height) VALUES(?)", checkTs.Height())
if err != nil {
return err
}
log.Infof("Starting backfill from current head %d", checkTs.Height())
} else {
checkTs, err = chainapi.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height.Int32), checkTs.Key())
if err != nil {
return err
}
log.Infof("Starting backfill from epoch %d", checkTs.Height())
}

addressLookups := make(map[abi.ActorID]address.Address)

var eventsAffected int64
var entriesAffected int64
for i := 0; i < int(checkTs.Height()); i++ {
select {
case <-ctx.Done():
log.Infoln("request cancelled")
return nil
default:
}

execTsk := checkTs.Parents()
execTs, err := chainapi.Chain.GetTipSetFromKey(ctx, execTsk)
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", execTsk, err)
}

if i%100 == 0 {
log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", i, execTs.Height(), eventsAffected, entriesAffected)
}

_, rcptRoot, err := stateManager.TipSetState(ctx, execTs)
if err != nil {
return fmt.Errorf("failed to load tipset state %s: %w", execTsk, err)
}

msgs, err := chainapi.Chain.MessagesForTipset(ctx, execTs)
if err != nil {
return fmt.Errorf("failed to load messages for tipset %s: %w", execTsk, err)
}

rcpts, err := chainapi.Chain.ReadReceipts(ctx, rcptRoot)
if err != nil {
return fmt.Errorf("failed to load receipts for tipset %s: %w", execTsk, err)
}

if len(msgs) != len(rcpts) {
return fmt.Errorf("mismatched message and receipt count %d vs %d", len(msgs), len(rcpts))
}

for idx, rcpt := range rcpts {
msg := msgs[idx]

if rcpt.ExitCode != exitcode.Ok {
continue
}
if rcpt.EventsRoot == nil {
continue
}

events, err := chainapi.ChainGetEvents(ctx, *rcpt.EventsRoot)
if err != nil {
return fmt.Errorf("failed to load events for tipset %s: %w", execTsk, err)
}

for eventIdx, event := range events {
addr, found := addressLookups[event.Emitter]
if !found {
var ok bool
addr, ok = resolveFn(ctx, event.Emitter, execTs)
if !ok {
// not an address we will be able to match against
continue
}
addressLookups[event.Emitter] = addr
}

//log.Infof("[BackfillEvents] %d %d: event:%+v", eventIdx, execTs.Height(), event)
tsKeyCid, err := execTs.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
}

// select the highest event id that exists in database, or null if none exists
var entryID sql.NullInt64
err = stmtSelectEvent.QueryRow(
execTs.Height(),
execTs.Key().Bytes(),
tsKeyCid.Bytes(),
addr.Bytes(),
eventIdx,
msg.Cid().Bytes(),
idx,
).Scan(&entryID)
if err != nil {
return fmt.Errorf("error checking if event exists: %w", err)
}

if !entryID.Valid {
// event does not exist, lets backfill it
res, err := stmtEvent.Exec(
execTs.Height(), // height
execTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
eventIdx, // event_index
msg.Cid().Bytes(), // message_cid
idx, // message_index
false, // reverted
)

if err != nil {
return fmt.Errorf("error inserting event: %w", err)
}

entryID.Int64, err = res.LastInsertId()
if err != nil {
return fmt.Errorf("could not get last insert id: %w", err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error getting rows affected: %s", err)
}

eventsAffected += rowsAffected
}

//log.Infof("[BackfillEvents] entryID:%d", entryID.Int64)
for _, entry := range event.Entries {
//log.Infof(" [BackfillEvents] entry:%+v", entry)

// check if entry exists
var exists bool
err = stmtSelectEntry.QueryRow(
entryID.Int64,
isIndexedValue(entry.Flags),
[]byte{entry.Flags},
entry.Key,
entry.Codec,
entry.Value,
).Scan(&exists)
if err != nil {
return fmt.Errorf("error checking if entry exists: %w", err)
}

if !exists {
// entry does not exist, lets backfill it
res, err := stmtEntry.Exec(
entryID.Int64, // event_id
isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
entry.Codec, // codec
entry.Value, // value
)
if err != nil {
return fmt.Errorf("error inserting entry: %w", err)
}

rowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("error getting rows affected: %s", err)
}
entriesAffected += rowsAffected
}
}
}
}

_, err = db.Exec("UPDATE event_backfilled set height =?", execTs.Height())
if err != nil {
return fmt.Errorf("error updating backfill at height %d: %w", execTs.Height(), err)
}

checkTs = execTs
}

log.Infof("backfilling events complete, eventsAffected:%d, entriesAffected:%d", eventsAffected, entriesAffected)

return nil
}
Loading