diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 39ba487f2ef..27eeea73e1d 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -37,7 +37,17 @@ var dbDefs = []string{ )`, `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, } -var dbPragmas = []string{} + +var dbPragmas = []string{ + "PRAGMA synchronous = normal", + "PRAGMA temp_store = memory", + "PRAGMA mmap_size = 30000000000", + "PRAGMA page_size = 32768", + "PRAGMA auto_vacuum = NONE", + "PRAGMA automatic_index = OFF", + "PRAGMA journal_mode = WAL", + "PRAGMA read_uncommitted = ON", +} const ( // prepared stmts diff --git a/cmd/lotus-shed/msgindex.go b/cmd/lotus-shed/msgindex.go index fae733bbe64..0c47988038d 100644 --- a/cmd/lotus-shed/msgindex.go +++ b/cmd/lotus-shed/msgindex.go @@ -5,7 +5,6 @@ import ( "fmt" "path" - "github.com/ipfs/go-cid" _ "github.com/mattn/go-sqlite3" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" @@ -83,63 +82,50 @@ var msgindexBackfillCmd = &cli.Command{ } }() - tx, err := db.Begin() - if err != nil { - return err - } - - insertStmt, err := tx.Prepare("INSERT INTO messages VALUES (?, ?, ?)") + insertStmt, err := db.Prepare("INSERT OR IGNORE INTO messages (cid, tipset_cid, epoch) VALUES (?, ?, ?)") if err != nil { return err } - insertMsg := func(cid, tsCid cid.Cid, epoch abi.ChainEpoch) error { - key := cid.String() - tskey := tsCid.String() - if _, err := insertStmt.Exec(key, tskey, int64(epoch)); err != nil { - return err - } - - return nil - } - rollback := func() { - if err := tx.Rollback(); err != nil { - fmt.Printf("ERROR: rollback: %s", err) - } - } - + var nrRowsAffected int64 for i := 0; i < epochs; i++ { epoch := abi.ChainEpoch(startHeight - int64(i)) + if i%100 == 0 { + log.Infof("%d/%d processing epoch:%d, nrRowsAffected:%d", i, epochs, epoch, nrRowsAffected) + } + ts, err := api.ChainGetTipSetByHeight(ctx, epoch, curTs.Key()) if err != nil { - rollback() - return err + return fmt.Errorf("failed to get tipset at epoch %d: %w", epoch, err) } tsCid, err := ts.Key().Cid() if err != nil { - rollback() - return err + return fmt.Errorf("failed to get tipset cid at epoch %d: %w", epoch, err) } msgs, err := api.ChainGetMessagesInTipset(ctx, ts.Key()) if err != nil { - rollback() - return err + return fmt.Errorf("failed to get messages in tipset at epoch %d: %w", epoch, err) } for _, msg := range msgs { - if err := insertMsg(msg.Cid, tsCid, epoch); err != nil { - rollback() - return err + key := msg.Cid.String() + tskey := tsCid.String() + res, err := insertStmt.Exec(key, tskey, int64(epoch)) + if err != nil { + return fmt.Errorf("failed to insert message cid %s in tipset %s at epoch %d: %w", key, tskey, epoch, err) + } + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected for message cid %s in tipset %s at epoch %d: %w", key, tskey, epoch, err) } + nrRowsAffected += rowsAffected } } - if err := tx.Commit(); err != nil { - return err - } + log.Infof("Done backfilling, nrRowsAffected:%d", nrRowsAffected) return nil },