From 3ebcb4d56601688093b0b5a96e73486d815f3235 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Thu, 4 Apr 2024 05:12:59 +0000 Subject: [PATCH 1/3] Flush() correctly in case we are in MovingGC --- blockstore/badger/blockstore.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 8f5b52c3269..0eb48160ddc 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -627,7 +627,18 @@ func (b *Blockstore) Flush(context.Context) error { b.lockDB() defer b.unlockDB() - return b.db.Sync() + // fsync the new db first + if b.dbNext != nil { + if err := b.dbNext.Sync(); err != nil { + return err + } + } + + if err := b.db.Sync(); err != nil { + return err + } + + return nil } // Has implements Blockstore.Has. From d6fe66d8280a7c9852519e2763836481c679a8f6 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Sat, 6 Apr 2024 06:23:31 +0200 Subject: [PATCH 2/3] Allow MovingGC to be interrupted by a context + slight refactor --- blockstore/badger/blockstore.go | 83 ++++++++++++++++++++++----------- 1 file changed, 56 insertions(+), 27 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 0eb48160ddc..64d443a6033 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -12,7 +12,7 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" - "github.com/dgraph-io/badger/v2/pb" + badgerstruct "github.com/dgraph-io/badger/v2/pb" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -242,7 +242,7 @@ func (b *Blockstore) unlockMove(state bsMoveState) { // are persisted to the new blockstore; if a failure occurs aboring the move, // then they must be peristed to the old blockstore. // In short, the blockstore must not lose data from new writes during the move. -func (b *Blockstore) movingGC() error { +func (b *Blockstore) movingGC(ctx context.Context) error { // this inlines moveLock/moveUnlock for the initial state check to prevent a second move // while one is in progress without clobbering state b.moveMx.Lock() @@ -327,7 +327,7 @@ func (b *Blockstore) movingGC() error { b.unlockMove(moveStateMoving) log.Info("copying blockstore") - err = b.doCopy(b.db, b.dbNext) + err = b.doCopy(ctx, b.db, b.dbNext) if err != nil { return fmt.Errorf("error moving badger blockstore to %s: %w", newPath, err) } @@ -389,37 +389,66 @@ func symlink(path, linkTo string) error { return os.Symlink(path, linkTo) } -// doCopy copies a badger blockstore to another, with an optional filter; if the filter -// is not nil, then only cids that satisfy the filter will be copied. -func (b *Blockstore) doCopy(from, to *badger.DB) error { - workers := runtime.NumCPU() / 2 - if workers < 2 { - workers = 2 - } - if workers > 8 { - workers = 8 - } - - stream := from.NewStream() - stream.NumGo = workers - stream.LogPrefix = "doCopy" - stream.Send = func(list *pb.KVList) error { - batch := to.NewWriteBatch() - defer batch.Cancel() +// doCopy copies a badger blockstore to another +func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr error) { + batch := to.NewWriteBatch() + defer func() { + if defErr == nil { + defErr = batch.Flush() + } + if defErr != nil { + batch.Cancel() + } + }() - for _, kv := range list.Kv { - if kv.Key == nil || kv.Value == nil { - continue - } + return iterateBadger(ctx, from, func(kvs []*badgerstruct.KV) error { + // check whether context is closed on every kv group + if err := ctx.Err(); err != nil { + return err + } + for _, kv := range kvs { if err := batch.Set(kv.Key, kv.Value); err != nil { return err } } + return nil + }) +} - return batch.Flush() +var IterateLSMWorkers int // defaults to between( 2, 8, runtime.NumCPU/2 ) + +func iterateBadger(ctx context.Context, db *badger.DB, iter func([]*badgerstruct.KV) error) error { + workers := IterateLSMWorkers + if workers == 0 { + workers = between(2, 8, runtime.NumCPU()/2) } - return stream.Orchestrate(context.Background()) + stream := db.NewStream() + stream.NumGo = workers + stream.LogPrefix = "iterateBadgerKVs" + stream.Send = func(kvl *badgerstruct.KVList) error { + kvs := make([]*badgerstruct.KV, 0, len(kvl.Kv)) + for _, kv := range kvl.Kv { + if kv.Key != nil && kv.Value != nil { + kvs = append(kvs, kv) + } + } + if len(kvs) == 0 { + return nil + } + return iter(kvs) + } + return stream.Orchestrate(ctx) +} + +func between(min, max, val int) int { + if val > max { + val = max + } + if val < min { + val = min + } + return val } func (b *Blockstore) deleteDB(path string) { @@ -500,7 +529,7 @@ func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.Bloc } if options.FullGC { - return b.movingGC() + return b.movingGC(ctx) } threshold := options.Threshold if threshold == 0 { From 6727f1af1bc2c04f1c93e8130d0e9f6398aef9f6 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Mon, 29 Jul 2024 10:38:29 +0200 Subject: [PATCH 3/3] switch to using multierr as per review --- blockstore/badger/blockstore.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 64d443a6033..54440cc66e7 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -19,6 +19,7 @@ import ( logger "github.com/ipfs/go-log/v2" pool "github.com/libp2p/go-buffer-pool" "github.com/multiformats/go-base32" + "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/xerrors" @@ -656,18 +657,15 @@ func (b *Blockstore) Flush(context.Context) error { b.lockDB() defer b.unlockDB() - // fsync the new db first + var nextErr error if b.dbNext != nil { - if err := b.dbNext.Sync(); err != nil { - return err - } - } - - if err := b.db.Sync(); err != nil { - return err + nextErr = b.dbNext.Sync() } - return nil + return multierr.Combine( + nextErr, + b.db.Sync(), + ) } // Has implements Blockstore.Has.