Skip to content

Commit

Permalink
Merge pull request #3680 from filecoin-project/asr/checkpoint
Browse files Browse the repository at this point in the history
Allow nodes to mark tipsets as checkpointed
  • Loading branch information
arajasek authored Sep 9, 2020
2 parents 311d20e + f055467 commit 7027992
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 26 deletions.
6 changes: 6 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,16 @@ type FullNode interface {
// yet synced block headers.
SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error)

// SyncCheckpoint marks a blocks as checkpointed, meaning that it won't ever fork away from it.
SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) error

// SyncMarkBad marks a blocks as bad, meaning that it won't ever by synced.
// Use with extreme caution.
SyncMarkBad(ctx context.Context, bcid cid.Cid) error

// SyncUnmarkBad unmarks a blocks as bad, making it possible to be validated and synced again.
SyncUnmarkBad(ctx context.Context, bcid cid.Cid) error

// SyncCheckBad checks if a block was marked as bad, and if it was, returns
// the reason.
SyncCheckBad(ctx context.Context, bcid cid.Cid) (string, error)
Expand Down
10 changes: 10 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ type FullNodeStruct struct {
SyncState func(context.Context) (*api.SyncState, error) `perm:"read"`
SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"`
SyncCheckpoint func(ctx context.Context, key types.TipSetKey) error `perm:"admin"`
SyncMarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"`
SyncUnmarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"`
SyncCheckBad func(ctx context.Context, bcid cid.Cid) (string, error) `perm:"read"`

MpoolGetConfig func(context.Context) (*types.MpoolConfig, error) `perm:"read"`
Expand Down Expand Up @@ -704,10 +706,18 @@ func (c *FullNodeStruct) SyncIncomingBlocks(ctx context.Context) (<-chan *types.
return c.Internal.SyncIncomingBlocks(ctx)
}

func (c *FullNodeStruct) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) error {
return c.Internal.SyncCheckpoint(ctx, tsk)
}

func (c *FullNodeStruct) SyncMarkBad(ctx context.Context, bcid cid.Cid) error {
return c.Internal.SyncMarkBad(ctx, bcid)
}

func (c *FullNodeStruct) SyncUnmarkBad(ctx context.Context, bcid cid.Cid) error {
return c.Internal.SyncUnmarkBad(ctx, bcid)
}

func (c *FullNodeStruct) SyncCheckBad(ctx context.Context, bcid cid.Cid) (string, error) {
return c.Internal.SyncCheckBad(ctx, bcid)
}
Expand Down
4 changes: 4 additions & 0 deletions chain/badtscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (bts *BadBlockCache) Add(c cid.Cid, bbr BadBlockReason) {
bts.badBlocks.Add(c, bbr)
}

func (bts *BadBlockCache) Remove(c cid.Cid) {
bts.badBlocks.Remove(c)
}

func (bts *BadBlockCache) Has(c cid.Cid) (BadBlockReason, bool) {
rval, ok := bts.badBlocks.Get(c)
if !ok {
Expand Down
81 changes: 81 additions & 0 deletions chain/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package chain

import (
"encoding/json"

"github.com/filecoin-project/lotus/chain/types"

"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
)

var CheckpointKey = datastore.NewKey("/chain/checks")

func loadCheckpoint(ds dtypes.MetadataDS) (types.TipSetKey, error) {
haveChks, err := ds.Has(CheckpointKey)
if err != nil {
return types.EmptyTSK, err
}

if !haveChks {
return types.EmptyTSK, nil
}

tskBytes, err := ds.Get(CheckpointKey)
if err != nil {
return types.EmptyTSK, err
}

var tsk types.TipSetKey
err = json.Unmarshal(tskBytes, &tsk)
if err != nil {
return types.EmptyTSK, err
}

return tsk, err
}

func (syncer *Syncer) SetCheckpoint(tsk types.TipSetKey) error {
if tsk == types.EmptyTSK {
return xerrors.Errorf("called with empty tsk")
}

syncer.checkptLk.Lock()
defer syncer.checkptLk.Unlock()

ts, err := syncer.ChainStore().LoadTipSet(tsk)
if err != nil {
return xerrors.Errorf("cannot find tipset: %w", err)
}

hts := syncer.ChainStore().GetHeaviestTipSet()
anc, err := syncer.ChainStore().IsAncestorOf(ts, hts)
if err != nil {
return xerrors.Errorf("cannot determine whether checkpoint tipset is in main-chain: %w", err)
}

if !hts.Equals(ts) && !anc {
return xerrors.Errorf("cannot mark tipset as checkpoint, since it isn't in the main-chain: %w", err)
}

tskBytes, err := json.Marshal(tsk)
if err != nil {
return err
}

err = syncer.ds.Put(CheckpointKey, tskBytes)
if err != nil {
return err
}

syncer.checkpt = tsk

return nil
}

func (syncer *Syncer) GetCheckpoint() types.TipSetKey {
syncer.checkptLk.Lock()
defer syncer.checkptLk.Unlock()
return syncer.checkpt
}
44 changes: 40 additions & 4 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/filecoin-project/lotus/node/modules/dtypes"

"github.com/filecoin-project/specs-actors/actors/runtime/proof"

"github.com/Gurpartap/async"
Expand Down Expand Up @@ -129,10 +132,16 @@ type Syncer struct {
windowSize int

tickerCtxCancel context.CancelFunc

checkptLk sync.Mutex

checkpt types.TipSetKey

ds dtypes.MetadataDS
}

// NewSyncer creates a new Syncer object.
func NewSyncer(sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) {
func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) {
gen, err := sm.ChainStore().GetGenesis()
if err != nil {
return nil, xerrors.Errorf("getting genesis block: %w", err)
Expand All @@ -143,7 +152,14 @@ func NewSyncer(sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr
return nil, err
}

cp, err := loadCheckpoint(ds)
if err != nil {
return nil, xerrors.Errorf("error loading mpool config: %w", err)
}

s := &Syncer{
ds: ds,
checkpt: cp,
beacon: beacon,
bad: NewBadBlockCache(),
Genesis: gent,
Expand Down Expand Up @@ -1361,7 +1377,7 @@ loop:
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height())
fork, err := syncer.syncFork(ctx, base, known)
if err != nil {
if xerrors.Is(err, ErrForkTooLong) {
if xerrors.Is(err, ErrForkTooLong) || xerrors.Is(err, ErrForkCheckpoint) {
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
log.Warn("adding forked chain to our bad tipset cache")
for _, b := range incoming.Blocks() {
Expand All @@ -1377,14 +1393,23 @@ loop:
}

var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
var ErrForkCheckpoint = fmt.Errorf("fork would require us to diverge from checkpointed block")

// syncFork tries to obtain the chain fragment that links a fork into a common
// ancestor in our view of the chain.
//
// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the
// denylist. Else, we find the common ancestor, and add the missing chain
// If the fork is too long (build.ForkLengthThreshold), or would cause us to diverge from the checkpoint (ErrForkCheckpoint),
// we add the entire subchain to the denylist. Else, we find the common ancestor, and add the missing chain
// fragment until the fork point to the returned []TipSet.
func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {

chkpt := syncer.GetCheckpoint()
if known.Key() == chkpt {
return nil, ErrForkCheckpoint
}

// TODO: Does this mean we always ask for ForkLengthThreshold blocks from the network, even if we just need, like, 2?
// Would it not be better to ask in smaller chunks, given that an ~ForkLengthThreshold is very rare?
tips, err := syncer.Exchange.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold))
if err != nil {
return nil, err
Expand All @@ -1410,12 +1435,18 @@ func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, know
if nts.Height() < tips[cur].Height() {
cur++
} else {
// We will be forking away from nts, check that it isn't checkpointed
if nts.Key() == chkpt {
return nil, ErrForkCheckpoint
}

nts, err = syncer.store.LoadTipSet(nts.Parents())
if err != nil {
return nil, xerrors.Errorf("loading next local tipset: %w", err)
}
}
}

return nil, ErrForkTooLong
}

Expand Down Expand Up @@ -1644,6 +1675,11 @@ func (syncer *Syncer) MarkBad(blk cid.Cid) {
syncer.bad.Add(blk, NewBadBlockReason([]cid.Cid{blk}, "manually marked bad"))
}

// UnmarkBad manually adds a block to the "bad blocks" cache.
func (syncer *Syncer) UnmarkBad(blk cid.Cid) {
syncer.bad.Remove(blk)
}

func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) {
bbr, ok := syncer.bad.Has(blk)
return bbr.String(), ok
Expand Down
72 changes: 72 additions & 0 deletions chain/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,36 @@ func (tu *syncTestUtil) compareSourceState(with int) {
}
}

func (tu *syncTestUtil) assertBad(node int, ts *types.TipSet) {
for _, blk := range ts.Cids() {
rsn, err := tu.nds[node].SyncCheckBad(context.TODO(), blk)
require.NoError(tu.t, err)
require.True(tu.t, len(rsn) != 0)
}
}

func (tu *syncTestUtil) getHead(node int) *types.TipSet {
ts, err := tu.nds[node].ChainHead(context.TODO())
require.NoError(tu.t, err)
return ts
}

func (tu *syncTestUtil) checkpointTs(node int, tsk types.TipSetKey) {
require.NoError(tu.t, tu.nds[node].SyncCheckpoint(context.TODO(), tsk))
}

func (tu *syncTestUtil) waitUntilNodeHasTs(node int, tsk types.TipSetKey) {
for {
_, err := tu.nds[node].ChainGetTipSet(context.TODO(), tsk)
if err != nil {
break
}
}

// Time to allow for syncing and validation
time.Sleep(2 * time.Second)
}

func (tu *syncTestUtil) waitUntilSync(from, to int) {
target, err := tu.nds[from].ChainHead(tu.ctx)
if err != nil {
Expand Down Expand Up @@ -678,3 +708,45 @@ func TestSyncInputs(t *testing.T) {
t.Fatal("should error on block with nil election proof")
}
}

func TestSyncCheckpoint(t *testing.T) {
H := 10
tu := prepSyncTest(t, H)

p1 := tu.addClientNode()
p2 := tu.addClientNode()

fmt.Println("GENESIS: ", tu.g.Genesis().Cid())
tu.loadChainToNode(p1)
tu.loadChainToNode(p2)

base := tu.g.CurTipset
fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height())

// The two nodes fork at this point into 'a' and 'b'
a1 := tu.mineOnBlock(base, p1, []int{0}, true, false, nil)
a := tu.mineOnBlock(a1, p1, []int{0}, true, false, nil)
a = tu.mineOnBlock(a, p1, []int{0}, true, false, nil)

tu.waitUntilSyncTarget(p1, a.TipSet())
tu.checkpointTs(p1, a.TipSet().Key())

require.NoError(t, tu.g.ResyncBankerNonce(a1.TipSet()))
// chain B will now be heaviest
b := tu.mineOnBlock(base, p2, []int{1}, true, false, nil)
b = tu.mineOnBlock(b, p2, []int{1}, true, false, nil)
b = tu.mineOnBlock(b, p2, []int{1}, true, false, nil)
b = tu.mineOnBlock(b, p2, []int{1}, true, false, nil)

fmt.Println("A: ", a.Cids(), a.TipSet().Height())
fmt.Println("B: ", b.Cids(), b.TipSet().Height())

// Now for the fun part!! p1 should mark p2's head as BAD.

require.NoError(t, tu.mn.LinkAll())
tu.connect(p1, p2)
tu.waitUntilNodeHasTs(p1, b.TipSet().Key())
p1Head := tu.getHead(p1)
require.Equal(tu.t, p1Head, a.TipSet())
tu.assertBad(p1, b.TipSet())
}
19 changes: 0 additions & 19 deletions cli/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,25 +337,6 @@ var chainSetHeadCmd = &cli.Command{
},
}

func parseTipSet(ctx context.Context, api api.FullNode, vals []string) (*types.TipSet, error) {
var headers []*types.BlockHeader
for _, c := range vals {
blkc, err := cid.Decode(c)
if err != nil {
return nil, err
}

bh, err := api.ChainGetBlock(ctx, blkc)
if err != nil {
return nil, err
}

headers = append(headers, bh)
}

return types.NewTipSet(headers)
}

var chainListCmd = &cli.Command{
Name: "list",
Usage: "View a segment of the chain",
Expand Down
Loading

0 comments on commit 7027992

Please sign in to comment.