Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: chain: unit tests for the syncer & sync manager #8072

Merged
merged 4 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,25 +1244,3 @@ func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) {
bbr, ok := syncer.bad.Has(blk)
return bbr.String(), ok
}

func (syncer *Syncer) getLatestBeaconEntry(ctx context.Context, ts *types.TipSet) (*types.BeaconEntry, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arajasek you seem to be the last person who touched this (+ our resident DRAND expert). Can you confirm that we don't want this anymore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping @arajasek

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about the delay here -- Yes, this can definitely be removed.

cur := ts
for i := 0; i < 20; i++ {
cbe := cur.Blocks()[0].BeaconEntries
if len(cbe) > 0 {
return &cbe[len(cbe)-1], nil
}

if cur.Height() == 0 {
return nil, xerrors.Errorf("made it back to genesis block without finding beacon entry")
}

next, err := syncer.store.LoadTipSet(ctx, cur.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to load parents when searching back for latest beacon entry: %w", err)
}
cur = next
}

return nil, xerrors.Errorf("found NO beacon entries in the 20 latest tipsets")
}
32 changes: 32 additions & 0 deletions chain/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
"github.com/stretchr/testify/require"
)

func init() {
Expand Down Expand Up @@ -240,3 +241,34 @@ func TestSyncManager(t *testing.T) {
op3.done()
})
}

func TestSyncManagerBucketSet(t *testing.T) {
ts1 := mock.TipSet(mock.MkBlock(nil, 0, 0))
ts2 := mock.TipSet(mock.MkBlock(ts1, 1, 0))
bucket1 := newSyncTargetBucket(ts1, ts2)
bucketSet := syncBucketSet{buckets: []*syncTargetBucket{bucket1}}

// inserting a tipset (potential sync target) from an existing chain, should add to an existing bucket
//stm: @CHAIN_SYNCER_ADD_SYNC_TARGET_001
ts3 := mock.TipSet(mock.MkBlock(ts2, 2, 0))
bucketSet.Insert(ts3)
require.Equal(t, 1, len(bucketSet.buckets))
require.Equal(t, 3, len(bucketSet.buckets[0].tips))

// inserting a tipset from new chain, should create a new bucket
ts4fork := mock.TipSet(mock.MkBlock(nil, 1, 1))
bucketSet.Insert(ts4fork)
require.Equal(t, 2, len(bucketSet.buckets))
require.Equal(t, 3, len(bucketSet.buckets[0].tips))
require.Equal(t, 1, len(bucketSet.buckets[1].tips))

// Pop removes the best bucket (best sync target), e.g. bucket1
//stm: @CHAIN_SYNCER_SELECT_SYNC_TARGET_001
popped := bucketSet.Pop()
require.Equal(t, popped, bucket1)
require.Equal(t, 1, len(bucketSet.buckets))

// PopRelated removes the bucket containing the given tipset, leaving the set empty
bucketSet.PopRelated(ts4fork)
require.Equal(t, 0, len(bucketSet.buckets))
}
155 changes: 155 additions & 0 deletions chain/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,3 +1098,158 @@ func TestInvalidHeight(t *testing.T) {

tu.mineOnBlock(base, 0, nil, false, true, nil, -1, true)
}

// TestIncomingBlocks mines new blocks and checks if the incoming channel streams new block headers properly
func TestIncomingBlocks(t *testing.T) {
H := 50
tu := prepSyncTest(t, H)

client := tu.addClientNode()
require.NoError(t, tu.mn.LinkAll())

clientNode := tu.nds[client]
//stm: @CHAIN_SYNCER_INCOMING_BLOCKS_001
incoming, err := clientNode.SyncIncomingBlocks(tu.ctx)
require.NoError(tu.t, err)

tu.connect(client, 0)
tu.waitUntilSync(0, client)
tu.compareSourceState(client)

timeout := time.After(10 * time.Second)

for i := 0; i < 5; i++ {
tu.mineNewBlock(0, nil)
tu.waitUntilSync(0, client)
tu.compareSourceState(client)

// just in case, so we don't get deadlocked
select {
case <-incoming:
case <-timeout:
tu.t.Fatal("TestIncomingBlocks timeout")
}
}
}

// TestSyncManualBadTS tests manually marking and unmarking blocks in the bad TS cache
func TestSyncManualBadTS(t *testing.T) {
// Test setup:
// - source node is fully synced,
// - client node is unsynced
// - client manually marked source's head and it's parent as bad
H := 50
tu := prepSyncTest(t, H)

client := tu.addClientNode()
require.NoError(t, tu.mn.LinkAll())

sourceHead, err := tu.nds[source].ChainHead(tu.ctx)
require.NoError(tu.t, err)

clientHead, err := tu.nds[client].ChainHead(tu.ctx)
require.NoError(tu.t, err)

require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync in test setup")

//stm: @CHAIN_SYNCER_MARK_BAD_001
err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHead.Cids()[0])
require.NoError(tu.t, err)

sourceHeadParent := sourceHead.Parents().Cids()[0]
err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHeadParent)
require.NoError(tu.t, err)

//stm: @CHAIN_SYNCER_CHECK_BAD_001
reason, err := tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0])
require.NoError(tu.t, err)
require.NotEqual(tu.t, "", reason, "block is not bad after manually marking")

reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent)
require.NoError(tu.t, err)
require.NotEqual(tu.t, "", reason, "block is not bad after manually marking")

// Assertion 1:
// - client shouldn't be synced after timeout, because the source TS is marked bad.
// - bad block is the first block that should be synced, 1sec should be enough
tu.connect(1, 0)
timeout := time.After(1 * time.Second)
<-timeout

clientHead, err = tu.nds[client].ChainHead(tu.ctx)
require.NoError(tu.t, err)
require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync if source head is bad")

// Assertion 2:
// - after unmarking blocks as bad and reconnecting, source & client should be in sync
//stm: @CHAIN_SYNCER_UNMARK_BAD_001
err = tu.nds[client].SyncUnmarkBad(tu.ctx, sourceHead.Cids()[0])
require.NoError(tu.t, err)

reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0])
require.NoError(tu.t, err)
require.Equal(tu.t, "", reason, "block is still bad after manually unmarking")

err = tu.nds[client].SyncUnmarkAllBad(tu.ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm new to the lotus syncer so I'm curious: why do we need the explicit SyncUnmarkBad on the head if we are unmarking all here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's done intentionally, so as to check that both methods of unmarking a bad block are working as intended. In this way, if a change is commited to either SyncUnmarkBad or SyncUnmarkAllBad, the test will cover them.

While this test strays into the "integration" category, I think it's a valid choice.

require.NoError(tu.t, err)

reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent)
require.NoError(tu.t, err)
require.Equal(tu.t, "", reason, "block is still bad after manually unmarking")

tu.disconnect(1, 0)
tu.connect(1, 0)

tu.waitUntilSync(0, client)
tu.compareSourceState(client)
}

// TestState tests fetching the sync worker state before, during & after the sync
func TestSyncState(t *testing.T) {
H := 50
tu := prepSyncTest(t, H)

client := tu.addClientNode()
require.NoError(t, tu.mn.LinkAll())
clientNode := tu.nds[client]
sourceHead, err := tu.nds[source].ChainHead(tu.ctx)
require.NoError(tu.t, err)

// sync state should be empty before the sync
state, err := clientNode.SyncState(tu.ctx)
require.NoError(tu.t, err)
require.Equal(tu.t, len(state.ActiveSyncs), 0)

tu.connect(client, 0)

// wait until sync starts, or at most `timeout` seconds
timeout := time.After(5 * time.Second)
activeSyncs := []api.ActiveSync{}

for len(activeSyncs) == 0 {
//stm: @CHAIN_SYNCER_STATE_001
state, err = clientNode.SyncState(tu.ctx)
require.NoError(tu.t, err)
activeSyncs = state.ActiveSyncs

sleep := time.After(100 * time.Millisecond)
select {
case <-sleep:
case <-timeout:
tu.t.Fatal("TestSyncState timeout")
}
}

// check state during sync
require.Equal(tu.t, len(activeSyncs), 1)
require.True(tu.t, activeSyncs[0].Target.Equals(sourceHead))

tu.waitUntilSync(0, client)
tu.compareSourceState(client)

// check state after sync
state, err = clientNode.SyncState(tu.ctx)
require.NoError(tu.t, err)
require.Equal(tu.t, len(state.ActiveSyncs), 1)
require.Equal(tu.t, state.ActiveSyncs[0].Stage, api.StageSyncComplete)
}