Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

splistore cold object reification redux #8029

Merged
merged 12 commits into from
Feb 14, 2022
21 changes: 21 additions & 0 deletions blockstore/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package blockstore

import (
"context"
)

type hotViewKey struct{}

var hotView = hotViewKey{}

// WithHotView constructs a new context with an option that provides a hint to the blockstore
// (e.g. the splitstore) that the object (and its ipld references) should be kept hot.
func WithHotView(ctx context.Context) context.Context {
return context.WithValue(ctx, hotView, struct{}{})
}

// IsHotView returns true if the hot view option is set in the context
func IsHotView(ctx context.Context) bool {
v := ctx.Value(hotView)
return v != nil
}
37 changes: 35 additions & 2 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ type SplitStore struct {
txnSyncCond sync.Cond
txnSync bool

// background cold object reification
reifyWorkers sync.WaitGroup
reifyMx sync.Mutex
reifyCond sync.Cond
reifyPend map[cid.Cid]struct{}
reifyInProgress map[cid.Cid]struct{}

// registered protectors
protectors []func(func(cid.Cid) error) error
}
Expand Down Expand Up @@ -202,6 +209,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
ss.txnSyncCond.L = &ss.txnSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())

ss.reifyCond.L = &ss.reifyMx
ss.reifyPend = make(map[cid.Cid]struct{})
ss.reifyInProgress = make(map[cid.Cid]struct{})

if enableDebugLog {
ss.debug, err = openDebugLog(path)
if err != nil {
Expand Down Expand Up @@ -264,7 +275,13 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
return true, nil
}

return s.cold.Has(ctx, cid)
has, err = s.cold.Has(ctx, cid)
if has && bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}

return has, err

}

func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
Expand Down Expand Up @@ -308,8 +325,11 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

blk, err = s.cold.Get(ctx, cid)
if err == nil {
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}

stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return blk, err

Expand Down Expand Up @@ -359,6 +379,10 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {

size, err = s.cold.GetSize(ctx, cid)
if err == nil {
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}

stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return size, err
Expand Down Expand Up @@ -536,6 +560,10 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro

err = s.cold.View(ctx, cid, cb)
if err == nil {
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}

stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return err
Expand Down Expand Up @@ -645,6 +673,9 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error
}
}

// spawn the reifier
go s.reifyOrchestrator()

// watch the chain
chain.SubscribeHeadChanges(s.HeadChange)

Expand Down Expand Up @@ -676,6 +707,8 @@ func (s *SplitStore) Close() error {
}
}

s.reifyCond.Broadcast()
vyzo marked this conversation as resolved.
Show resolved Hide resolved
s.reifyWorkers.Wait()
s.cancel()
return multierr.Combine(s.markSetEnv.Close(), s.debug.Close())
}
Expand Down
193 changes: 193 additions & 0 deletions blockstore/splitstore/splitstore_reify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package splitstore

import (
"runtime"
"sync/atomic"

"golang.org/x/xerrors"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)

func (s *SplitStore) reifyColdObject(c cid.Cid) {
if !s.isWarm() {
return
}

if isUnitaryObject(c) {
return
}

s.reifyMx.Lock()
defer s.reifyMx.Unlock()

_, ok := s.reifyInProgress[c]
if ok {
return
}

s.reifyPend[c] = struct{}{}
s.reifyCond.Broadcast()
}

func (s *SplitStore) reifyOrchestrator() {
workers := runtime.NumCPU() / 4
if workers < 2 {
workers = 2
}

workch := make(chan cid.Cid, workers)
defer close(workch)

for i := 0; i < workers; i++ {
s.reifyWorkers.Add(1)
go s.reifyWorker(workch)
}

for {
s.reifyMx.Lock()
for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 {
s.reifyCond.Wait()
}

if atomic.LoadInt32(&s.closing) != 0 {
s.reifyMx.Unlock()
return
}

reifyPend := s.reifyPend
s.reifyPend = make(map[cid.Cid]struct{})
s.reifyMx.Unlock()

for c := range reifyPend {
select {
case workch <- c:
case <-s.ctx.Done():
return
}
}
}
}

func (s *SplitStore) reifyWorker(workch chan cid.Cid) {
defer s.reifyWorkers.Done()
for c := range workch {
s.doReify(c)
}
}

func (s *SplitStore) doReify(c cid.Cid) {
var toreify, totrack, toforget []cid.Cid

defer func() {
s.reifyMx.Lock()
defer s.reifyMx.Unlock()

for _, c := range toreify {
delete(s.reifyInProgress, c)
}
for _, c := range totrack {
delete(s.reifyInProgress, c)
}
for _, c := range toforget {
delete(s.reifyInProgress, c)
}
}()

s.txnLk.RLock()
defer s.txnLk.RUnlock()

err := s.walkObject(c, newTmpVisitor(),
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}

s.reifyMx.Lock()
_, inProgress := s.reifyInProgress[c]
if !inProgress {
s.reifyInProgress[c] = struct{}{}
}
s.reifyMx.Unlock()

if inProgress {
return errStopWalk
}

has, err := s.hot.Has(s.ctx, c)
if err != nil {
return xerrors.Errorf("error checking hotstore: %w", err)
}

if has {
if s.txnMarkSet != nil {
hasMark, err := s.txnMarkSet.Has(c)
if err != nil {
log.Warnf("error checking markset: %s", err)
} else if hasMark {
toforget = append(toforget, c)
magik6k marked this conversation as resolved.
Show resolved Hide resolved
return errStopWalk
}
} else {
totrack = append(totrack, c)
return errStopWalk
}
}

toreify = append(toreify, c)
return nil
})

if err != nil {
log.Warnf("error walking cold object for reification (cid: %s): %s", c, err)
return
}

log.Debugf("reifying %d objects rooted at %s", len(toreify), c)

// this should not get too big, maybe some 100s of objects.
batch := make([]blocks.Block, 0, len(toreify))
vyzo marked this conversation as resolved.
Show resolved Hide resolved
for _, c := range toreify {
blk, err := s.cold.Get(s.ctx, c)
if err != nil {
log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err)
continue
}

if err := s.checkClosing(); err != nil {
return
}

batch = append(batch, blk)
}

if len(batch) > 0 {
err = s.hot.PutMany(s.ctx, batch)
if err != nil {
log.Warnf("error reifying cold object (cid: %s): %s", c, err)
return
}
}

if s.txnMarkSet != nil {
if len(toreify) > 0 {
if err := s.txnMarkSet.MarkMany(toreify); err != nil {
log.Warnf("error marking reified objects: %s", err)
}
}
if len(totrack) > 0 {
if err := s.txnMarkSet.MarkMany(totrack); err != nil {
log.Warnf("error marking tracked objects: %s", err)
}
}
} else {
// if txnActive is false these are noops
if len(toreify) > 0 {
s.trackTxnRefMany(toreify)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}
if len(totrack) > 0 {
s.trackTxnRefMany(totrack)
}
}
}
Loading