Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add command to (slowly) prune lotus chain datastore #3876

Merged
merged 3 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,13 +1191,6 @@ func recurseLinks(bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid.
}

func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}

seen := cid.NewSet()
walked := cid.NewSet()

h := &car.CarHeader{
Roots: ts.Cids(),
Version: 1,
Expand All @@ -1207,6 +1200,28 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return xerrors.Errorf("failed to write car header: %s", err)
}

return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error {
blk, err := cs.bs.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
}

if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil {
return xerrors.Errorf("failed to write block to car output: %w", err)
}

return nil
})
}

func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}

seen := cid.NewSet()
walked := cid.NewSet()

blocksToWalk := ts.Cids()
currentMinHeight := ts.Height()

Expand All @@ -1215,15 +1230,15 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return nil
}

if err := cb(blk); err != nil {
return err
}

data, err := cs.bs.Get(blk)
if err != nil {
return xerrors.Errorf("getting block: %w", err)
}

if err := carutil.LdWrite(w, blk.Bytes(), data.RawData()); err != nil {
return xerrors.Errorf("failed to write block to car output: %w", err)
}

var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
Expand Down Expand Up @@ -1270,14 +1285,11 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
if c.Prefix().Codec != cid.DagCBOR {
continue
}
data, err := cs.bs.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car (get %s): %w", c, err)
}

if err := carutil.LdWrite(w, c.Bytes(), data.RawData()); err != nil {
return xerrors.Errorf("failed to write out car object: %w", err)
if err := cb(c); err != nil {
return err
}

}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-shed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func main() {
consensusCmd,
serveDealStatsCmd,
syncCmd,
stateTreePruneCmd,
}

app := &cli.App{
Expand Down
289 changes: 289 additions & 0 deletions cmd/lotus-shed/pruning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
package main

import (
"context"
"fmt"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/node/repo"
"github.com/ipfs/bbloom"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
)

type cidSet interface {
Add(cid.Cid)
Has(cid.Cid) bool
HasRaw([]byte) bool
Len() int
}

type bloomSet struct {
bloom *bbloom.Bloom
}

func newBloomSet(size int64) (*bloomSet, error) {
b, err := bbloom.New(float64(size), 3)
if err != nil {
return nil, err
}

return &bloomSet{bloom: b}, nil
}

func (bs *bloomSet) Add(c cid.Cid) {
bs.bloom.Add(c.Hash())

}

func (bs *bloomSet) Has(c cid.Cid) bool {
return bs.bloom.Has(c.Hash())
}

func (bs *bloomSet) HasRaw(b []byte) bool {
return bs.bloom.Has(b)
}

func (bs *bloomSet) Len() int {
return int(bs.bloom.ElementsAdded())
}

type mapSet struct {
m map[string]struct{}
}

func newMapSet() *mapSet {
return &mapSet{m: make(map[string]struct{})}
}

func (bs *mapSet) Add(c cid.Cid) {
bs.m[string(c.Hash())] = struct{}{}
}

func (bs *mapSet) Has(c cid.Cid) bool {
_, ok := bs.m[string(c.Hash())]
return ok
}

func (bs *mapSet) HasRaw(b []byte) bool {
_, ok := bs.m[string(b)]
return ok
}

func (bs *mapSet) Len() int {
return len(bs.m)
}

var stateTreePruneCmd = &cli.Command{
Name: "state-prune",
Description: "Deletes old state root data from local chainstore",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&cli.Int64Flag{
Name: "keep-from-lookback",
Usage: "keep stateroots at or newer than the current height minus this lookback",
Value: 1800, // 2 x finality
},
&cli.IntFlag{
Name: "delete-up-to",
Usage: "delete up to the given number of objects (used to run a faster 'partial' sync)",
},
&cli.BoolFlag{
Name: "use-bloom-set",
Usage: "use a bloom filter for the 'good' set instead of a map, reduces memory usage but may not clean up as much",
},
&cli.BoolFlag{
Name: "dry-run",
Usage: "only enumerate the good set, don't do any deletions",
},
&cli.BoolFlag{
Name: "only-ds-gc",
Usage: "Only run datastore GC",
},
&cli.IntFlag{
Name: "gc-count",
Usage: "number of times to run gc",
Value: 20,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.TODO()

fsrepo, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return err
}

lkrepo, err := fsrepo.Lock(repo.FullNode)
if err != nil {
return err
}

defer lkrepo.Close() //nolint:errcheck

ds, err := lkrepo.Datastore("/chain")
if err != nil {
return err
}

defer ds.Close() //nolint:errcheck

mds, err := lkrepo.Datastore("/metadata")
if err != nil {
return err
}
defer mds.Close() //nolint:errcheck

if cctx.Bool("only-ds-gc") {
gcds, ok := ds.(datastore.GCDatastore)
if ok {
fmt.Println("running datastore gc....")
for i := 0; i < cctx.Int("gc-count"); i++ {
if err := gcds.CollectGarbage(); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
}
fmt.Println("gc complete!")
return nil
}
return fmt.Errorf("datastore doesnt support gc")
}

bs := blockstore.NewBlockstore(ds)

cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
if err := cs.Load(); err != nil {
return fmt.Errorf("loading chainstore: %w", err)
}

var goodSet cidSet
if cctx.Bool("use-bloom-set") {
bset, err := newBloomSet(10000000)
if err != nil {
return err
}
goodSet = bset
} else {
goodSet = newMapSet()
}

ts := cs.GetHeaviestTipSet()

rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback"))

if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error {
if goodSet.Len()%20 == 0 {
fmt.Printf("\renumerating keep set: %d ", goodSet.Len())
}
goodSet.Add(c)
return nil
}); err != nil {
return fmt.Errorf("snapshot walk failed: %w", err)
}

fmt.Println()
fmt.Printf("Successfully marked keep set! (%d objects)\n", goodSet.Len())

if cctx.Bool("dry-run") {
return nil
}

var b datastore.Batch
var batchCount int
markForRemoval := func(c cid.Cid) error {
if b == nil {
nb, err := ds.Batch()
if err != nil {
return fmt.Errorf("opening batch: %w", err)
}

b = nb
}
batchCount++

if err := b.Delete(dshelp.MultihashToDsKey(c.Hash())); err != nil {
return err
}

if batchCount > 100 {
if err := b.Commit(); err != nil {
return xerrors.Errorf("failed to commit batch deletes: %w", err)
}
b = nil
batchCount = 0
}
return nil
}

res, err := ds.Query(query.Query{KeysOnly: true})
if err != nil {
return xerrors.Errorf("failed to query datastore: %w", err)
}

dupTo := cctx.Int("delete-up-to")

var deleteCount int
var goodHits int
for {
v, ok := res.NextSync()
if !ok {
break
}

bk, err := dshelp.BinaryFromDsKey(datastore.RawKey(v.Key[len("/blocks"):]))
if err != nil {
return xerrors.Errorf("failed to parse key: %w", err)
}

if goodSet.HasRaw(bk) {
goodHits++
continue
}

nc := cid.NewCidV1(cid.Raw, bk)

deleteCount++
if err := markForRemoval(nc); err != nil {
return fmt.Errorf("failed to remove cid %s: %w", nc, err)
}

if deleteCount%20 == 0 {
fmt.Printf("\rdeleting %d objects (good hits: %d)... ", deleteCount, goodHits)
}

if dupTo != 0 && deleteCount > dupTo {
break
}
}

if b != nil {
if err := b.Commit(); err != nil {
return xerrors.Errorf("failed to commit final batch delete: %w", err)
}
}

gcds, ok := ds.(datastore.GCDatastore)
if ok {
fmt.Println("running datastore gc....")
for i := 0; i < cctx.Int("gc-count"); i++ {
if err := gcds.CollectGarbage(); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
}
fmt.Println("gc complete!")
}

return nil
},
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/golang-lru v0.5.4
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
github.com/ipfs/bbloom v0.0.4
github.com/ipfs/go-bitswap v0.2.20
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
Expand Down