diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index ab4e2449332..22c8cc14581 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" _ "github.com/mattn/go-sqlite3" "golang.org/x/xerrors" @@ -51,16 +52,23 @@ var ddls = []string{ value BLOB NOT NULL )`, + // contains the height that we have backfilled to + `CREATE TABLE IF NOT EXISTS event_backfilled ( + height INTEGER NOT NULL UNIQUE + )`, + // metadata containing version of schema `CREATE TABLE IF NOT EXISTS _meta ( version UINT64 NOT NULL UNIQUE )`, - // version 1. - `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, + // version 2. + `INSERT OR IGNORE INTO _meta (version) VALUES (2)`, } -const schemaVersion = 1 +var log = logging.Logger("filter") + +const schemaVersion = 2 const ( insertEvent = `INSERT OR IGNORE INTO event @@ -102,8 +110,7 @@ func NewEventIndex(path string) (*EventIndex, error) { _ = db.Close() return nil, xerrors.Errorf("looking for _meta table: %w", err) } else { - // Ensure we don't open a database from a different schema version - + // check the schema version to see if we need to upgrade the database schema row := db.QueryRow("SELECT max(version) FROM _meta") var version int err := row.Scan(&version) @@ -111,6 +118,23 @@ func NewEventIndex(path string) (*EventIndex, error) { _ = db.Close() return nil, xerrors.Errorf("invalid database version: no version found") } + + if version == 1 { + log.Infof("upgrading event index from version 1 to version 2") + + // to upgrade to version version 2 we only need to create the event_backfilled_ranges table + // which means we can just recreate the schema (it will not have any effect on existing data) + for _, ddl := range ddls { + if _, err := db.Exec(ddl); err != nil { + _ = db.Close() + return nil, xerrors.Errorf("could not upgrade index to version 2, exec ddl %q: %w", ddl, err) + } + } + + version = 2 + } + + // unsupported schema version if version != schemaVersion { _ = db.Close() return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 8e99869a533..7e27c82cc52 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -389,6 +389,14 @@ # env var: LOTUS_FEVM_EVENTS_DATABASEPATH #DatabasePath = "" + # EnableEventBackfillOnStartup will run a backfill of all actor events from the current head up to the genesis + # block (while we have state) and populate the events.db. The DisableHistoricFilterAPI must be set to false + # for this to have any effect. + # + # type: bool + # env var: LOTUS_FEVM_EVENTS_ENABLEEVENTBACKFILLONSTARTUP + #EnableEventBackfillOnStartup = false + [Index] # EXPERIMENTAL FEATURE. USE WITH CAUTION diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 5361b2d6c59..1b38b16adcf 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -393,6 +393,14 @@ support the historic filter APIs. If the database does not exist it will be crea the database must already exist and be writeable. If a relative path is provided here, sqlite treats it as relative to the CWD (current working directory).`, }, + { + Name: "EnableEventBackfillOnStartup", + Type: "bool", + + Comment: `EnableEventBackfillOnStartup will run a backfill of all actor events from the current head up to the genesis +block (while we have state) and populate the events.db. The DisableHistoricFilterAPI must be set to false +for this to have any effect.`, + }, }, "FeeConfig": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index c89e8f70bad..d70493cc78f 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -721,6 +721,11 @@ type Events struct { // relative to the CWD (current working directory). DatabasePath string + // EnableEventBackfillOnStartup will run a backfill of all actor events from the current head up to the genesis + // block (while we have state) and populate the events.db. The DisableHistoricFilterAPI must be set to false + // for this to have any effect. + EnableEventBackfillOnStartup bool + // Others, not implemented yet: // Set a limit on the number of active websocket subscriptions (may be zero) // Set a timeout for subscription clients diff --git a/node/impl/full/backfill.go b/node/impl/full/backfill.go new file mode 100644 index 00000000000..157c8bd827e --- /dev/null +++ b/node/impl/full/backfill.go @@ -0,0 +1,278 @@ +package full + +import ( + "context" + "database/sql" + "fmt" + "path/filepath" + + "github.com/multiformats/go-varint" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + builtintypes "github.com/filecoin-project/go-state-types/builtin" + "github.com/filecoin-project/go-state-types/exitcode" + + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/types" +) + +func isIndexedValue(b uint8) bool { + // currently we mark the full entry as indexed if either the key + // or the value are indexed; in the future we will need finer-grained + // management of indices + return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 +} + +// BackfillEvents walks up the chain from the current head and backfills all actor events that were not stored in +// the event.db database. +func BackfillEvents(ctx context.Context, chainapi ChainAPI, stateApi StateAPI, stateManager *stmgr.StateManager) error { + resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + // we only want to match using f4 addresses + idAddr, err := address.NewIDAddress(uint64(emitter)) + if err != nil { + return address.Undef, false + } + + actor, err := stateManager.LoadActor(ctx, idAddr, ts) + if err != nil || actor.Address == nil { + return address.Undef, false + } + + // if robust address is not f4 then we won't match against it so bail early + if actor.Address.Protocol() != address.Delegated { + return address.Undef, false + } + // we have an f4 address, make sure it's assigned by the EAM + if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { + return address.Undef, false + } + return *actor.Address, true + } + + dbPath := filepath.Join(chainapi.Repo.Path(), "sqlite", "events.db") + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return err + } + + defer func() { + err := db.Close() + if err != nil { + fmt.Printf("ERROR: closing db: %s", err) + } + }() + + stmtSelectEvent, err := db.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false") + if err != nil { + return err + } + stmtSelectEntry, err := db.Prepare("SELECT EXISTS(SELECT 1 from event_entry WHERE event_id=? and indexed=? and flags=? and key=? and codec=? and value=?)") + if err != nil { + return err + } + stmtEvent, err := db.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + stmtEntry, err := db.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + + checkTs := chainapi.Chain.GetHeaviestTipSet() + + // look into the event_backfilled table to see where we left off + var height sql.NullInt32 + err = db.QueryRow("SELECT MIN(height) FROM event_backfilled").Scan(&height) + if err != nil { + return err + } + if !height.Valid { + // no backfilling has been done yet, so we need to start from the beginning (current head) + _, err := db.Exec("INSERT INTO event_backfilled(height) VALUES(?)", checkTs.Height()) + if err != nil { + return err + } + log.Infof("Starting backfill from current head %d", checkTs.Height()) + } else { + checkTs, err = chainapi.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height.Int32), checkTs.Key()) + if err != nil { + return err + } + log.Infof("Starting backfill from epoch %d", checkTs.Height()) + } + + addressLookups := make(map[abi.ActorID]address.Address) + + var eventsAffected int64 + var entriesAffected int64 + for i := 0; i < int(checkTs.Height()); i++ { + select { + case <-ctx.Done(): + log.Infoln("request cancelled") + return nil + default: + } + + execTsk := checkTs.Parents() + execTs, err := chainapi.Chain.GetTipSetFromKey(ctx, execTsk) + if err != nil { + return fmt.Errorf("failed to load tipset %s: %w", execTsk, err) + } + + if i%100 == 0 { + log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", i, execTs.Height(), eventsAffected, entriesAffected) + } + + _, rcptRoot, err := stateManager.TipSetState(ctx, execTs) + if err != nil { + return fmt.Errorf("failed to load tipset state %s: %w", execTsk, err) + } + + msgs, err := chainapi.Chain.MessagesForTipset(ctx, execTs) + if err != nil { + return fmt.Errorf("failed to load messages for tipset %s: %w", execTsk, err) + } + + rcpts, err := chainapi.Chain.ReadReceipts(ctx, rcptRoot) + if err != nil { + return fmt.Errorf("failed to load receipts for tipset %s: %w", execTsk, err) + } + + if len(msgs) != len(rcpts) { + return fmt.Errorf("mismatched message and receipt count %d vs %d", len(msgs), len(rcpts)) + } + + for idx, rcpt := range rcpts { + msg := msgs[idx] + + if rcpt.ExitCode != exitcode.Ok { + continue + } + if rcpt.EventsRoot == nil { + continue + } + + events, err := chainapi.ChainGetEvents(ctx, *rcpt.EventsRoot) + if err != nil { + return fmt.Errorf("failed to load events for tipset %s: %w", execTsk, err) + } + + for eventIdx, event := range events { + addr, found := addressLookups[event.Emitter] + if !found { + var ok bool + addr, ok = resolveFn(ctx, event.Emitter, execTs) + if !ok { + // not an address we will be able to match against + continue + } + addressLookups[event.Emitter] = addr + } + + //log.Infof("[BackfillEvents] %d %d: event:%+v", eventIdx, execTs.Height(), event) + tsKeyCid, err := execTs.Key().Cid() + if err != nil { + return fmt.Errorf("failed to get tipset key cid: %w", err) + } + + // select the highest event id that exists in database, or null if none exists + var entryID sql.NullInt64 + err = stmtSelectEvent.QueryRow( + execTs.Height(), + execTs.Key().Bytes(), + tsKeyCid.Bytes(), + addr.Bytes(), + eventIdx, + msg.Cid().Bytes(), + idx, + ).Scan(&entryID) + if err != nil { + return fmt.Errorf("error checking if event exists: %w", err) + } + + if !entryID.Valid { + // event does not exist, lets backfill it + res, err := stmtEvent.Exec( + execTs.Height(), // height + execTs.Key().Bytes(), // tipset_key + tsKeyCid.Bytes(), // tipset_key_cid + addr.Bytes(), // emitter_addr + eventIdx, // event_index + msg.Cid().Bytes(), // message_cid + idx, // message_index + false, // reverted + ) + + if err != nil { + return fmt.Errorf("error inserting event: %w", err) + } + + entryID.Int64, err = res.LastInsertId() + if err != nil { + return fmt.Errorf("could not get last insert id: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("error getting rows affected: %s", err) + } + + eventsAffected += rowsAffected + } + + //log.Infof("[BackfillEvents] entryID:%d", entryID.Int64) + for _, entry := range event.Entries { + //log.Infof(" [BackfillEvents] entry:%+v", entry) + + // check if entry exists + var exists bool + err = stmtSelectEntry.QueryRow( + entryID.Int64, + isIndexedValue(entry.Flags), + []byte{entry.Flags}, + entry.Key, + entry.Codec, + entry.Value, + ).Scan(&exists) + if err != nil { + return fmt.Errorf("error checking if entry exists: %w", err) + } + + if !exists { + // entry does not exist, lets backfill it + res, err := stmtEntry.Exec( + entryID.Int64, // event_id + isIndexedValue(entry.Flags), // indexed + []byte{entry.Flags}, // flags + entry.Key, // key + entry.Codec, // codec + entry.Value, // value + ) + if err != nil { + return fmt.Errorf("error inserting entry: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("error getting rows affected: %s", err) + } + entriesAffected += rowsAffected + } + } + } + } + + _, err = db.Exec("UPDATE event_backfilled set height =?", execTs.Height()) + if err != nil { + return fmt.Errorf("error updating backfill at height %d: %w", execTs.Height(), err) + } + + checkTs = execTs + } + + log.Infof("backfilling events complete, eventsAffected:%d, entriesAffected:%d", eventsAffected, entriesAffected) + + return nil +} diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 68a6990ce61..73f08c181fe 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -84,6 +84,18 @@ func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo return nil, err } + if cfg.Events.EnableEventBackfillOnStartup { + go func() { + log.Infof("EnableEventBackfillOnStartup was set, starting backfill of actor events in the background...") + now := time.Now() + err := full.BackfillEvents(ctx, chainapi, stateapi, sm) + if err != nil { + log.Infof("backfill events error: %s", err) + } + log.Infof("backfill of actor events done in: %s", time.Since(now)) + }() + } + lc.Append(fx.Hook{ OnStop: func(context.Context) error { return eventIndex.Close()