Skip to content

Commit

Permalink
Merge pull request #6107 from filecoin-project/feat/checkpoint-sync
Browse files Browse the repository at this point in the history
feat: allow checkpointing to forks
  • Loading branch information
Stebalien authored Apr 29, 2021
2 parents 4db8004 + 8f309b2 commit eb10918
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 87 deletions.
80 changes: 28 additions & 52 deletions chain/checkpoint.go
Original file line number Diff line number Diff line change
@@ -1,81 +1,57 @@
package chain

import (
"encoding/json"
"context"

"github.com/filecoin-project/lotus/chain/types"

"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
)

var CheckpointKey = datastore.NewKey("/chain/checks")

func loadCheckpoint(ds dtypes.MetadataDS) (types.TipSetKey, error) {
haveChks, err := ds.Has(CheckpointKey)
if err != nil {
return types.EmptyTSK, err
}

if !haveChks {
return types.EmptyTSK, nil
func (syncer *Syncer) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) error {
if tsk == types.EmptyTSK {
return xerrors.Errorf("called with empty tsk")
}

tskBytes, err := ds.Get(CheckpointKey)
ts, err := syncer.ChainStore().LoadTipSet(tsk)
if err != nil {
return types.EmptyTSK, err
tss, err := syncer.Exchange.GetBlocks(ctx, tsk, 1)
if err != nil {
return xerrors.Errorf("failed to fetch tipset: %w", err)
} else if len(tss) != 1 {
return xerrors.Errorf("expected 1 tipset, got %d", len(tss))
}
ts = tss[0]
}

var tsk types.TipSetKey
err = json.Unmarshal(tskBytes, &tsk)
if err != nil {
return types.EmptyTSK, err
if err := syncer.switchChain(ctx, ts); err != nil {
return xerrors.Errorf("failed to switch chain when syncing checkpoint: %w", err)
}

return tsk, err
}

func (syncer *Syncer) SetCheckpoint(tsk types.TipSetKey) error {
if tsk == types.EmptyTSK {
return xerrors.Errorf("called with empty tsk")
if err := syncer.ChainStore().SetCheckpoint(ts); err != nil {
return xerrors.Errorf("failed to set the chain checkpoint: %w", err)
}

syncer.checkptLk.Lock()
defer syncer.checkptLk.Unlock()

ts, err := syncer.ChainStore().LoadTipSet(tsk)
if err != nil {
return xerrors.Errorf("cannot find tipset: %w", err)
}
return nil
}

func (syncer *Syncer) switchChain(ctx context.Context, ts *types.TipSet) error {
hts := syncer.ChainStore().GetHeaviestTipSet()
anc, err := syncer.ChainStore().IsAncestorOf(ts, hts)
if err != nil {
return xerrors.Errorf("cannot determine whether checkpoint tipset is in main-chain: %w", err)
if hts.Equals(ts) {
return nil
}

if !hts.Equals(ts) && !anc {
return xerrors.Errorf("cannot mark tipset as checkpoint, since it isn't in the main-chain: %w", err)
if anc, err := syncer.store.IsAncestorOf(ts, hts); err == nil && anc {
return nil
}

tskBytes, err := json.Marshal(tsk)
if err != nil {
return err
// Otherwise, sync the chain and set the head.
if err := syncer.collectChain(ctx, ts, hts, true); err != nil {
return xerrors.Errorf("failed to collect chain for checkpoint: %w", err)
}

err = syncer.ds.Put(CheckpointKey, tskBytes)
if err != nil {
return err
if err := syncer.ChainStore().SetHead(ts); err != nil {
return xerrors.Errorf("failed to set the chain head: %w", err)
}

syncer.checkpt = tsk

return nil
}

func (syncer *Syncer) GetCheckpoint() types.TipSetKey {
syncer.checkptLk.Lock()
defer syncer.checkptLk.Unlock()
return syncer.checkpt
}
89 changes: 89 additions & 0 deletions chain/store/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package store_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/filecoin-project/lotus/chain/gen"
)

func TestChainCheckpoint(t *testing.T) {
cg, err := gen.NewGenerator()
if err != nil {
t.Fatal(err)
}

// Let the first miner mine some blocks.
last := cg.CurTipset.TipSet()
for i := 0; i < 4; i++ {
ts, err := cg.NextTipSetFromMiners(last, cg.Miners[:1])
require.NoError(t, err)

last = ts.TipSet.TipSet()
}

cs := cg.ChainStore()

checkpoint := last
checkpointParents, err := cs.GetTipSetFromKey(checkpoint.Parents())
require.NoError(t, err)

// Set the head to the block before the checkpoint.
err = cs.SetHead(checkpointParents)
require.NoError(t, err)

// Verify it worked.
head := cs.GetHeaviestTipSet()
require.True(t, head.Equals(checkpointParents))

// Try to set the checkpoint in the future, it should fail.
err = cs.SetCheckpoint(checkpoint)
require.Error(t, err)

// Then move the head back.
err = cs.SetHead(checkpoint)
require.NoError(t, err)

// Verify it worked.
head = cs.GetHeaviestTipSet()
require.True(t, head.Equals(checkpoint))

// And checkpoint it.
err = cs.SetCheckpoint(checkpoint)
require.NoError(t, err)

// Let the second miner miner mine a fork
last = checkpointParents
for i := 0; i < 4; i++ {
ts, err := cg.NextTipSetFromMiners(last, cg.Miners[1:])
require.NoError(t, err)

last = ts.TipSet.TipSet()
}

// See if the chain will take the fork, it shouldn't.
err = cs.MaybeTakeHeavierTipSet(context.Background(), last)
require.NoError(t, err)
head = cs.GetHeaviestTipSet()
require.True(t, head.Equals(checkpoint))

// Remove the checkpoint.
err = cs.RemoveCheckpoint()
require.NoError(t, err)

// Now switch to the other fork.
err = cs.MaybeTakeHeavierTipSet(context.Background(), last)
require.NoError(t, err)
head = cs.GetHeaviestTipSet()
require.True(t, head.Equals(last))

// Setting a checkpoint on the other fork should fail.
err = cs.SetCheckpoint(checkpoint)
require.Error(t, err)

// Setting a checkpoint on this fork should succeed.
err = cs.SetCheckpoint(checkpointParents)
require.NoError(t, err)
}
119 changes: 116 additions & 3 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ import (

var log = logging.Logger("chainstore")

var chainHeadKey = dstore.NewKey("head")
var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
var (
chainHeadKey = dstore.NewKey("head")
checkpointKey = dstore.NewKey("/chain/checks")
blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
)

var DefaultTipSetCacheSize = 8192
var DefaultMsgMetaCacheSize = 2048
Expand Down Expand Up @@ -115,6 +118,7 @@ type ChainStore struct {

heaviestLk sync.RWMutex
heaviest *types.TipSet
checkpoint *types.TipSet

bestTips *pubsub.PubSub
pubLk sync.Mutex
Expand Down Expand Up @@ -215,6 +219,15 @@ func (cs *ChainStore) Close() error {
}

func (cs *ChainStore) Load() error {
if err := cs.loadHead(); err != nil {
return err
}
if err := cs.loadCheckpoint(); err != nil {
return err
}
return nil
}
func (cs *ChainStore) loadHead() error {
head, err := cs.metadataDs.Get(chainHeadKey)
if err == dstore.ErrNotFound {
log.Warn("no previous chain state found")
Expand All @@ -239,6 +252,31 @@ func (cs *ChainStore) Load() error {
return nil
}

func (cs *ChainStore) loadCheckpoint() error {
tskBytes, err := cs.metadataDs.Get(checkpointKey)
if err == dstore.ErrNotFound {
return nil
}
if err != nil {
return xerrors.Errorf("failed to load checkpoint from datastore: %w", err)
}

var tsk types.TipSetKey
err = json.Unmarshal(tskBytes, &tsk)
if err != nil {
return err
}

ts, err := cs.LoadTipSet(tsk)
if err != nil {
return xerrors.Errorf("loading tipset: %w", err)
}

cs.checkpoint = ts

return nil
}

func (cs *ChainStore) writeHead(ts *types.TipSet) error {
data, err := json.Marshal(ts.Cids())
if err != nil {
Expand Down Expand Up @@ -439,6 +477,11 @@ func (cs *ChainStore) exceedsForkLength(synced, external *types.TipSet) (bool, e
return false, nil
}

// Now check to see if we've walked back to the checkpoint.
if synced.Equals(cs.checkpoint) {
return true, nil
}

// If we didn't, go back *one* tipset on the `synced` side (incrementing
// the `forkLength`).
if synced.Height() == 0 {
Expand Down Expand Up @@ -467,6 +510,9 @@ func (cs *ChainStore) ForceHeadSilent(_ context.Context, ts *types.TipSet) error

cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
if err := cs.removeCheckpoint(); err != nil {
return err
}
cs.heaviest = ts

err := cs.writeHead(ts)
Expand Down Expand Up @@ -642,13 +688,80 @@ func FlushValidationCache(ds datastore.Batching) error {
}

// SetHead sets the chainstores current 'best' head node.
// This should only be called if something is broken and needs fixing
// This should only be called if something is broken and needs fixing.
//
// This function will bypass and remove any checkpoints.
func (cs *ChainStore) SetHead(ts *types.TipSet) error {
cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
if err := cs.removeCheckpoint(); err != nil {
return err
}
return cs.takeHeaviestTipSet(context.TODO(), ts)
}

// RemoveCheckpoint removes the current checkpoint.
func (cs *ChainStore) RemoveCheckpoint() error {
cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
return cs.removeCheckpoint()
}

func (cs *ChainStore) removeCheckpoint() error {
if err := cs.metadataDs.Delete(checkpointKey); err != nil {
return err
}
cs.checkpoint = nil
return nil
}

// SetCheckpoint will set a checkpoint past which the chainstore will not allow forks.
//
// NOTE: Checkpoints cannot be set beyond ForkLengthThreshold epochs in the past.
func (cs *ChainStore) SetCheckpoint(ts *types.TipSet) error {
tskBytes, err := json.Marshal(ts.Key())
if err != nil {
return err
}

cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()

if ts.Height() > cs.heaviest.Height() {
return xerrors.Errorf("cannot set a checkpoint in the future")
}

// Otherwise, this operation could get _very_ expensive.
if cs.heaviest.Height()-ts.Height() > build.ForkLengthThreshold {
return xerrors.Errorf("cannot set a checkpoint before the fork threshold")
}

if !ts.Equals(cs.heaviest) {
anc, err := cs.IsAncestorOf(ts, cs.heaviest)
if err != nil {
return xerrors.Errorf("cannot determine whether checkpoint tipset is in main-chain: %w", err)
}

if !anc {
return xerrors.Errorf("cannot mark tipset as checkpoint, since it isn't in the main-chain: %w", err)
}
}
err = cs.metadataDs.Put(checkpointKey, tskBytes)
if err != nil {
return err
}

cs.checkpoint = ts
return nil
}

func (cs *ChainStore) GetCheckpoint() *types.TipSet {
cs.heaviestLk.RLock()
chkpt := cs.checkpoint
cs.heaviestLk.RUnlock()
return chkpt
}

// Contains returns whether our BlockStore has all blocks in the supplied TipSet.
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
for _, c := range ts.Cids() {
Expand Down
Loading

0 comments on commit eb10918

Please sign in to comment.