Skip to content

Commit

Permalink
Merge pull request #4872 from filecoin-project/raulk/fix-blockstore-i…
Browse files Browse the repository at this point in the history
…mport

fix badger double open on daemon --import-snapshot; chainstore lifecycle
  • Loading branch information
magik6k authored Nov 18, 2020
2 parents 0033587 + 27c0ce4 commit 50146fb
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 21 deletions.
1 change: 1 addition & 0 deletions chain/store/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestIndexSeeks(t *testing.T) {

nbs := blockstore.NewTemporarySync()
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
defer cs.Close() //nolint:errcheck

_, err = cs.Import(bytes.NewReader(gencar))
if err != nil {
Expand Down
29 changes: 21 additions & 8 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,30 @@ type ChainStore struct {

evtTypes [1]journal.EventType
journal journal.Journal

cancelFn context.CancelFunc
wg sync.WaitGroup
}

// localbs is guaranteed to fail Get* if requested block isn't stored locally
func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
mmCache, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsCache, _ := lru.NewARC(DefaultTipSetCacheSize)
if j == nil {
j = journal.NilJournal()
}

ctx, cancel := context.WithCancel(context.Background())
cs := &ChainStore{
bs: bs,
localbs: localbs,
ds: ds,
bestTips: pubsub.New(64),
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
mmCache: c,
tsCache: tsc,
mmCache: mmCache,
tsCache: tsCache,
vmcalls: vmcalls,
cancelFn: cancel,
journal: j,
}

Expand Down Expand Up @@ -191,19 +197,24 @@ func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Ba
}

hcmetric := func(rev, app []*types.TipSet) error {
ctx := context.Background()
for _, r := range app {
stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height())))
stats.Record(context.Background(), metrics.ChainNodeHeight.M(int64(r.Height())))
}
return nil
}

cs.reorgNotifeeCh = make(chan ReorgNotifee)
cs.reorgCh = cs.reorgWorker(context.TODO(), []ReorgNotifee{hcnf, hcmetric})
cs.reorgCh = cs.reorgWorker(ctx, []ReorgNotifee{hcnf, hcmetric})

return cs
}

func (cs *ChainStore) Close() error {
cs.cancelFn()
cs.wg.Wait()
return nil
}

func (cs *ChainStore) Load() error {
head, err := cs.ds.Get(chainHeadKey)
if err == dstore.ErrNotFound {
Expand Down Expand Up @@ -383,7 +394,7 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS
// particular tipset to carry out a benchmark, verification, etc. on a chain
// segment.
func (cs *ChainStore) ForceHeadSilent(_ context.Context, ts *types.TipSet) error {
log.Warnf("(!!!) forcing a new head silently; only use this only for testing; new head: %s", ts)
log.Warnf("(!!!) forcing a new head silently; new head: %s", ts)

cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
Expand All @@ -406,7 +417,9 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
notifees := make([]ReorgNotifee, len(initialNotifees))
copy(notifees, initialNotifees)

cs.wg.Add(1)
go func() {
defer cs.wg.Done()
defer log.Warn("reorgWorker quit")

for {
Expand Down
4 changes: 4 additions & 0 deletions chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func BenchmarkGetRandomness(b *testing.B) {
}

cs := store.NewChainStore(bs, bs, mds, nil, nil)
defer cs.Close() //nolint:errcheck

b.ResetTimer()

Expand Down Expand Up @@ -105,6 +106,7 @@ func TestChainExportImport(t *testing.T) {

nbs := blockstore.NewTemporary()
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
defer cs.Close() //nolint:errcheck

root, err := cs.Import(buf)
if err != nil {
Expand Down Expand Up @@ -139,6 +141,8 @@ func TestChainExportImportFull(t *testing.T) {

nbs := blockstore.NewTemporary()
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
defer cs.Close() //nolint:errcheck

root, err := cs.Import(buf)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/lotus-bench/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ var importBenchCmd = &cli.Command{

metadataDs := datastore.NewMapDatastore()
cs := store.NewChainStore(bs, bs, metadataDs, vm.Syscalls(verifier), nil)
defer cs.Close() //nolint:errcheck

stm := stmgr.NewStateManager(cs)

startTime := time.Now()
Expand Down
2 changes: 2 additions & 0 deletions cmd/lotus-shed/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ var chainBalanceStateCmd = &cli.Command{
}

cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
defer cs.Close() //nolint:errcheck

cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
Expand Down Expand Up @@ -409,6 +410,7 @@ var chainPledgeCmd = &cli.Command{
}

cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
defer cs.Close() //nolint:errcheck

cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
Expand Down
2 changes: 2 additions & 0 deletions cmd/lotus-shed/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var exportChainCmd = &cli.Command{
}

cs := store.NewChainStore(bs, bs, mds, nil, nil)
defer cs.Close() //nolint:errcheck

if err := cs.Load(); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-shed/genesis-verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var genesisVerifyCmd = &cli.Command{
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())

cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, nil)
defer cs.Close() //nolint:errcheck

cf := cctx.Args().Get(0)
f, err := os.Open(cf)
Expand Down
2 changes: 2 additions & 0 deletions cmd/lotus-shed/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ var stateTreePruneCmd = &cli.Command{
}

cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
defer cs.Close() //nolint:errcheck

if err := cs.Load(); err != nil {
return fmt.Errorf("loading chainstore: %w", err)
}
Expand Down
12 changes: 3 additions & 9 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,6 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
return xerrors.Errorf("failed to open blockstore: %w", err)
}

defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()

mds, err := lr.Datastore("/metadata")
if err != nil {
return err
Expand All @@ -427,7 +419,9 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
if err != nil {
return xerrors.Errorf("failed to open journal: %w", err)
}

cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
defer cst.Close() //nolint:errcheck

log.Infof("importing chain from %s...", fname)

Expand Down Expand Up @@ -472,7 +466,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
}

log.Infof("accepting %s as new head", ts.Cids())
if err := cst.SetHead(ts); err != nil {
if err := cst.ForceHeadSilent(context.Background(), ts); err != nil {
return err
}

Expand Down
2 changes: 2 additions & 0 deletions conformance/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot
sm = stmgr.NewStateManager(cs)
)

defer cs.Close() //nolint:errcheck

blocks := make([]store.BlockMessages, 0, len(tipset.Blocks))
for _, b := range tipset.Blocks {
sb := store.BlockMessages{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.5.0
github.com/ipfs/go-ipfs-blockstore v1.0.2
github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86
github.com/ipfs/go-ipfs-blockstore v1.0.0/go.mod h1:knLVdhVU9L7CC4T+T4nvGdeUIPAXlnd9zmXfp+9MIjU=
github.com/ipfs/go-ipfs-blockstore v1.0.1 h1:fnuVj4XdZp4yExhd0CnUwAiMNJHiPnfInhiuwz4lW1w=
github.com/ipfs/go-ipfs-blockstore v1.0.1/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE=
github.com/ipfs/go-ipfs-blockstore v1.0.2 h1:Z8nUlBHK7wVKPKliQCQR9tLgUtz4J2QRbqFcJrqzM+E=
github.com/ipfs/go-ipfs-blockstore v1.0.2/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE=
github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f h1:AQQb5zZj7KKTEFh9EaAUXc5Q+F7SbYkjfYogZnEzfUc=
github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE=
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk=
github.com/ipfs/go-ipfs-chunker v0.0.1/go.mod h1:tWewYK0we3+rMbOh7pPFGDyypCtvGcBFymgY4rSDLAw=
Expand Down
8 changes: 7 additions & 1 deletion node/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,19 @@ func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap
return nil
}

func ChainStore(bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(bs, lbs, ds, syscalls, j)

if err := chain.Load(); err != nil {
log.Warnf("loading chain state from disk: %s", err)
}

lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return chain.Close()
},
})

return chain
}

Expand Down

0 comments on commit 50146fb

Please sign in to comment.