Skip to content

Commit

Permalink
feat:chain:splitstore chain prune (#9056)
Browse files Browse the repository at this point in the history
* Splitstore chain prune
* Protect on reification for simpler logic and sound cold compact protect
* Recovery from checkpoint during chain prune
* Splitstore (discard and universal mode) running in itests
* Add pause and restart functions to itest block miner
* Add config options to itest full nodes
* Add FsRepo support for itest full ndoes

Co-authored-by: zenground0 <ZenGround0@users.noreply.github.com>
  • Loading branch information
ZenGround0 and ZenGround0 authored Aug 5, 2022
1 parent 881e16e commit 0c91b0d
Show file tree
Hide file tree
Showing 27 changed files with 1,308 additions and 84 deletions.
5 changes: 5 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,11 @@ workflows:
suite: itest-self_sent_txn
target: "./itests/self_sent_txn_test.go"

- test:
name: test-itest-splitstore
suite: itest-splitstore
target: "./itests/splitstore_test.go"

- test:
name: test-itest-tape
suite: itest-tape
Expand Down
9 changes: 9 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ type FullNode interface {
// If oldmsgskip is set, messages from before the requested roots are also not included.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read

// ChainPrune prunes the stored chain state and garbage collects; only supported if you
// are using the splitstore
ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin

// ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore
// if supported by the underlying implementation.
ChainCheckBlockstore(context.Context) error //perm:admin
Expand Down Expand Up @@ -1219,3 +1223,8 @@ type MsigTransaction struct {

Approved []address.Address
}

type PruneOpts struct {
MovingGC bool
RetainState int64
}
14 changes: 14 additions & 0 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions blockstore/idstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ func (b *idstore) Put(ctx context.Context, blk blocks.Block) error {
return b.bs.Put(ctx, blk)
}

func (b *idstore) ForEachKey(f func(cid.Cid) error) error {
iterBstore, ok := b.bs.(BlockstoreIterator)
if !ok {
return xerrors.Errorf("underlying blockstore (type %T) doesn't support fast iteration", b.bs)
}
return iterBstore.ForEachKey(f)
}

func (b *idstore) PutMany(ctx context.Context, blks []blocks.Block) error {
toPut := make([]blocks.Block, 0, len(blks))
for _, blk := range blks {
Expand Down
23 changes: 12 additions & 11 deletions blockstore/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,36 @@ func NewMemory() MemBlockstore {
}

// MemBlockstore is a terminal blockstore that keeps blocks in memory.
type MemBlockstore map[cid.Cid]blocks.Block
// To match behavior of badger blockstore we index by multihash only.
type MemBlockstore map[string]blocks.Block

func (m MemBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
delete(m, k)
delete(m, string(k.Hash()))
return nil
}

func (m MemBlockstore) DeleteMany(ctx context.Context, ks []cid.Cid) error {
for _, k := range ks {
delete(m, k)
delete(m, string(k.Hash()))
}
return nil
}

func (m MemBlockstore) Has(ctx context.Context, k cid.Cid) (bool, error) {
_, ok := m[k]
_, ok := m[string(k.Hash())]
return ok, nil
}

func (m MemBlockstore) View(ctx context.Context, k cid.Cid, callback func([]byte) error) error {
b, ok := m[k]
b, ok := m[string(k.Hash())]
if !ok {
return ipld.ErrNotFound{Cid: k}
}
return callback(b.RawData())
}

func (m MemBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
b, ok := m[k]
b, ok := m[string(k.Hash())]
if !ok {
return nil, ipld.ErrNotFound{Cid: k}
}
Expand All @@ -51,7 +52,7 @@ func (m MemBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error)

// GetSize returns the CIDs mapped BlockSize
func (m MemBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
b, ok := m[k]
b, ok := m[string(k.Hash())]
if !ok {
return 0, ipld.ErrNotFound{Cid: k}
}
Expand All @@ -62,7 +63,7 @@ func (m MemBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
func (m MemBlockstore) Put(ctx context.Context, b blocks.Block) error {
// Convert to a basic block for safety, but try to reuse the existing
// block if it's already a basic block.
k := b.Cid()
k := string(b.Cid().Hash())
if _, ok := b.(*blocks.BasicBlock); !ok {
// If we already have the block, abort.
if _, ok := m[k]; ok {
Expand All @@ -71,7 +72,7 @@ func (m MemBlockstore) Put(ctx context.Context, b blocks.Block) error {
// the error is only for debugging.
b, _ = blocks.NewBlockWithCid(b.RawData(), b.Cid())
}
m[b.Cid()] = b
m[k] = b
return nil
}

Expand All @@ -89,8 +90,8 @@ func (m MemBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
// the given context, closing the channel if it becomes Done.
func (m MemBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
ch := make(chan cid.Cid, len(m))
for k := range m {
ch <- k
for _, b := range m {
ch <- b.Cid()
}
close(ch)
return ch, nil
Expand Down
85 changes: 69 additions & 16 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ var (
// compactionIndexKey stores the compaction index (serial number)
compactionIndexKey = dstore.NewKey("/splitstore/compactionIndex")

// stores the prune index (serial number)
pruneIndexKey = dstore.NewKey("/splitstore/pruneIndex")

log = logging.Logger("splitstore")

errClosing = errors.New("splitstore is closing")

// set this to true if you are debugging the splitstore to enable debug logging
enableDebugLog = false
// set this to true if you want to track origin stack traces in the write log
Expand All @@ -54,6 +59,16 @@ var (
upgradeBoundary = build.Finality
)

type CompactType int

const (
none CompactType = iota
warmup
hot
cold
check
)

func init() {
if os.Getenv("LOTUS_SPLITSTORE_DEBUG_LOG") == "1" {
enableDebugLog = true
Expand Down Expand Up @@ -117,8 +132,9 @@ type hotstore interface {
}

type SplitStore struct {
compacting int32 // compaction/prune/warmup in progress
closing int32 // the splitstore is closing
compacting int32 // flag for when compaction is in progress
compactType CompactType // compaction type, protected by compacting atomic, only meaningful when compacting == 1
closing int32 // the splitstore is closing

cfg *Config
path string
Expand All @@ -140,6 +156,7 @@ type SplitStore struct {
markSetSize int64

compactionIndex int64
pruneIndex int64

ctx context.Context
cancel func()
Expand Down Expand Up @@ -227,6 +244,13 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
return nil, xerrors.Errorf("error resuming compaction: %w", err)
}
}
if ss.pruneCheckpointExists() {
log.Info("found prune checkpoint; resuming prune")
if err := ss.completePrune(); err != nil {
markSetEnv.Close() //nolint:errcheck
return nil, xerrors.Errorf("error resuming prune: %w", err)
}
}

return ss, nil
}
Expand Down Expand Up @@ -260,8 +284,14 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
if has {
return s.has(cid)
}

return s.cold.Has(ctx, cid)
switch s.compactType {
case hot:
return s.cold.Has(ctx, cid)
case cold:
return s.hot.Has(ctx, cid)
default:
return false, xerrors.Errorf("invalid compaction type %d, only hot and cold allowed for critical section", s.compactType)
}
}

has, err := s.hot.Has(ctx, cid)
Expand All @@ -276,8 +306,11 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
}

has, err = s.cold.Has(ctx, cid)
if has && bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
if has {
s.trackTxnRef(cid)
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}
}

return has, err
Expand Down Expand Up @@ -307,8 +340,14 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)
if has {
return s.get(cid)
}

return s.cold.Get(ctx, cid)
switch s.compactType {
case hot:
return s.cold.Get(ctx, cid)
case cold:
return s.hot.Get(ctx, cid)
default:
return nil, xerrors.Errorf("invalid compaction type %d, only hot and cold allowed for critical section", s.compactType)
}
}

blk, err := s.hot.Get(ctx, cid)
Expand All @@ -325,6 +364,7 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

blk, err = s.cold.Get(ctx, cid)
if err == nil {
s.trackTxnRef(cid)
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}
Expand Down Expand Up @@ -361,8 +401,14 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
if has {
return s.getSize(cid)
}

return s.cold.GetSize(ctx, cid)
switch s.compactType {
case hot:
return s.cold.GetSize(ctx, cid)
case cold:
return s.hot.GetSize(ctx, cid)
default:
return 0, xerrors.Errorf("invalid compaction type %d, only hot and cold allowed for critical section", s.compactType)
}
}

size, err := s.hot.GetSize(ctx, cid)
Expand All @@ -379,6 +425,7 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {

size, err = s.cold.GetSize(ctx, cid)
if err == nil {
s.trackTxnRef(cid)
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}
Expand Down Expand Up @@ -408,12 +455,12 @@ func (s *SplitStore) Put(ctx context.Context, blk blocks.Block) error {
s.debug.LogWrite(blk)

// critical section
if s.txnMarkSet != nil {
if s.txnMarkSet != nil && s.compactType == hot { // puts only touch hot store
s.markLiveRefs([]cid.Cid{blk.Cid()})
return nil
}

s.trackTxnRef(blk.Cid())

return nil
}

Expand Down Expand Up @@ -459,12 +506,12 @@ func (s *SplitStore) PutMany(ctx context.Context, blks []blocks.Block) error {
s.debug.LogWriteMany(blks)

// critical section
if s.txnMarkSet != nil {
if s.txnMarkSet != nil && s.compactType == hot { // puts only touch hot store
s.markLiveRefs(batch)
return nil
}

s.trackTxnRefMany(batch)

return nil
}

Expand Down Expand Up @@ -536,8 +583,14 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro
if has {
return s.view(cid, cb)
}

return s.cold.View(ctx, cid, cb)
switch s.compactType {
case hot:
return s.cold.View(ctx, cid, cb)
case cold:
return s.hot.View(ctx, cid, cb)
default:
return xerrors.Errorf("invalid compaction type %d, only hot and cold allowed for critical section", s.compactType)
}
}

// views are (optimistically) protected two-fold:
Expand Down
3 changes: 3 additions & 0 deletions blockstore/splitstore/splitstore_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (s *SplitStore) Check() error {
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
return xerrors.Errorf("can't acquire compaction lock; compacting operation in progress")
}
s.compactType = check

if s.compactionIndex == 0 {
atomic.StoreInt32(&s.compacting, 0)
Expand Down Expand Up @@ -146,6 +147,8 @@ func (s *SplitStore) Info() map[string]interface{} {
info["base epoch"] = s.baseEpoch
info["warmup epoch"] = s.warmupEpoch
info["compactions"] = s.compactionIndex
info["prunes"] = s.pruneIndex
info["compacting"] = s.compacting == 1

sizer, ok := s.hot.(bstore.BlockstoreSize)
if ok {
Expand Down
Loading

0 comments on commit 0c91b0d

Please sign in to comment.