diff --git a/api/api_full.go b/api/api_full.go index e2025f58164..219dc271c40 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -781,8 +781,9 @@ type IpldObject struct { } type ActiveSync struct { - Base *types.TipSet - Target *types.TipSet + WorkerID uint64 + Base *types.TipSet + Target *types.TipSet Stage SyncStateStage Height abi.ChainEpoch diff --git a/chain/store/store.go b/chain/store/store.go index 00a78500ef9..38f569120ef 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -358,6 +358,8 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS // difference between 'bootstrap sync' and 'caught up' sync, we need // some other heuristic. return cs.takeHeaviestTipSet(ctx, ts) + } else if w.Equals(heaviestW) && !ts.Equals(cs.heaviest) { + log.Errorw("weight draw", "currTs", cs.heaviest, "ts", ts) } return nil } diff --git a/chain/sync_manager.go b/chain/sync_manager.go index c25068f60c2..641226e0f6b 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -4,30 +4,43 @@ import ( "context" "os" "sort" + "strconv" "strings" "sync" + "time" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + peer "github.com/libp2p/go-libp2p-core/peer" ) -const BootstrapPeerThreshold = 2 +var ( + BootstrapPeerThreshold = 4 + + RecentSyncBufferSize = 10 + MaxSyncWorkers = 5 + SyncWorkerHistory = 3 -var coalesceForksParents = false + InitialSyncTimeThreshold = 15 * time.Minute + + coalesceTipsets = false +) func init() { - if os.Getenv("LOTUS_SYNC_REL_PARENT") == "yes" { - coalesceForksParents = true + coalesceTipsets = os.Getenv("LOTUS_SYNC_FORMTS_PEND") == "yes" + + if bootstrapPeerThreshold := os.Getenv("LOTUS_SYNC_BOOTSTRAP_PEERS"); bootstrapPeerThreshold != "" { + threshold, err := strconv.Atoi(bootstrapPeerThreshold) + if err != nil { + log.Errorf("failed to parse 'LOTUS_SYNC_BOOTSTRAP_PEERS' env var: %s", err) + } else { + BootstrapPeerThreshold = threshold + } } } -const ( - BSStateInit = 0 - BSStateSelected = 1 - BSStateScheduled = 2 - BSStateComplete = 3 -) - type SyncFunc func(context.Context, *types.TipSet) error // SyncManager manages the chain synchronization process, both at bootstrap time @@ -52,108 +65,467 @@ type SyncManager interface { } type syncManager struct { - lk sync.Mutex - peerHeads map[peer.ID]*types.TipSet + ctx context.Context + cancel func() - bssLk sync.Mutex - bootstrapState int + workq chan peerHead + statusq chan workerStatus - bspThresh int + nextWorker uint64 + pend syncBucketSet + deferred syncBucketSet + heads map[peer.ID]*types.TipSet + recent *syncBuffer - incomingTipSets chan *types.TipSet - syncTargets chan *types.TipSet - syncResults chan *syncResult + initialSyncDone bool - syncStates []*SyncerState - - // Normally this handler is set to `(*Syncer).Sync()`. - doSync func(context.Context, *types.TipSet) error + mx sync.Mutex + state map[uint64]*workerState - stop chan struct{} + history []*workerState + historyI int - // Sync Scheduler fields - activeSyncs map[types.TipSetKey]*types.TipSet - syncQueue syncBucketSet - activeSyncTips syncBucketSet - nextSyncTarget *syncTargetBucket - workerChan chan *types.TipSet + doSync func(context.Context, *types.TipSet) error } var _ SyncManager = (*syncManager)(nil) -type syncResult struct { - ts *types.TipSet - success bool +type peerHead struct { + p peer.ID + ts *types.TipSet } -const syncWorkerCount = 3 +type workerState struct { + id uint64 + ts *types.TipSet + ss *SyncerState + dt time.Duration +} +type workerStatus struct { + id uint64 + err error +} + +// sync manager interface func NewSyncManager(sync SyncFunc) SyncManager { - sm := &syncManager{ - bspThresh: 1, - peerHeads: make(map[peer.ID]*types.TipSet), - syncTargets: make(chan *types.TipSet), - syncResults: make(chan *syncResult), - syncStates: make([]*SyncerState, syncWorkerCount), - incomingTipSets: make(chan *types.TipSet), - activeSyncs: make(map[types.TipSetKey]*types.TipSet), - doSync: sync, - stop: make(chan struct{}), - } - for i := range sm.syncStates { - sm.syncStates[i] = new(SyncerState) + ctx, cancel := context.WithCancel(context.Background()) + return &syncManager{ + ctx: ctx, + cancel: cancel, + + workq: make(chan peerHead), + statusq: make(chan workerStatus), + + heads: make(map[peer.ID]*types.TipSet), + state: make(map[uint64]*workerState), + recent: newSyncBuffer(RecentSyncBufferSize), + history: make([]*workerState, SyncWorkerHistory), + + doSync: sync, } - return sm } func (sm *syncManager) Start() { - go sm.syncScheduler() - for i := 0; i < syncWorkerCount; i++ { - go sm.syncWorker(i) - } + go sm.scheduler() } func (sm *syncManager) Stop() { - close(sm.stop) + select { + case <-sm.ctx.Done(): + default: + sm.cancel() + } } func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { - sm.lk.Lock() - defer sm.lk.Unlock() - sm.peerHeads[p] = ts - - if sm.getBootstrapState() == BSStateInit { - spc := sm.syncedPeerCount() - if spc >= sm.bspThresh { - // Its go time! - target, err := sm.selectSyncTarget() - if err != nil { - log.Error("failed to select sync target: ", err) - return + select { + case sm.workq <- peerHead{p: p, ts: ts}: + case <-sm.ctx.Done(): + case <-ctx.Done(): + } +} + +func (sm *syncManager) State() []SyncerStateSnapshot { + sm.mx.Lock() + workerStates := make([]*workerState, 0, len(sm.state)+len(sm.history)) + for _, ws := range sm.state { + workerStates = append(workerStates, ws) + } + for _, ws := range sm.history { + if ws != nil { + workerStates = append(workerStates, ws) + } + } + sm.mx.Unlock() + + sort.Slice(workerStates, func(i, j int) bool { + return workerStates[i].id < workerStates[j].id + }) + + result := make([]SyncerStateSnapshot, 0, len(workerStates)) + for _, ws := range workerStates { + result = append(result, ws.ss.Snapshot()) + } + + return result +} + +// sync manager internals +func (sm *syncManager) scheduler() { + ticker := time.NewTicker(time.Minute) + tickerC := ticker.C + for { + select { + case head := <-sm.workq: + sm.handlePeerHead(head) + case status := <-sm.statusq: + sm.handleWorkerStatus(status) + case <-tickerC: + if sm.initialSyncDone { + ticker.Stop() + tickerC = nil + sm.handleInitialSyncDone() } - sm.setBootstrapState(BSStateSelected) + case <-sm.ctx.Done(): + return + } + } +} + +func (sm *syncManager) handlePeerHead(head peerHead) { + log.Infof("new peer head: %s %s", head.p, head.ts) + + // have we started syncing yet? + if sm.nextWorker == 0 { + // track the peer head until we start syncing + sm.heads[head.p] = head.ts + + // not yet; do we have enough peers? + if len(sm.heads) < BootstrapPeerThreshold { + // not enough peers; track it and wait + return + } - sm.incomingTipSets <- target + // we are ready to start syncing; select the sync target and spawn a worker + target, err := sm.selectInitialSyncTarget() + if err != nil { + log.Errorf("failed to select initial sync target: %s", err) + return } - log.Infof("sync bootstrap has %d peers", spc) + + log.Infof("selected initial sync target: %s", target) + sm.spawnWorker(target) + return + } + + // we have started syncing, add peer head to the queue if applicable and maybe spawn a worker + // if there is work to do (possibly in a fork) + target, work, err := sm.addSyncTarget(head.ts) + if err != nil { + log.Warnf("failed to add sync target: %s", err) return } - sm.incomingTipSets <- ts + if work { + log.Infof("selected sync target: %s", target) + sm.spawnWorker(target) + } } -func (sm *syncManager) State() []SyncerStateSnapshot { - ret := make([]SyncerStateSnapshot, 0, len(sm.syncStates)) - for _, s := range sm.syncStates { - ret = append(ret, s.Snapshot()) +func (sm *syncManager) handleWorkerStatus(status workerStatus) { + log.Debugf("worker %d done; status error: %s", status.id, status.err) + + sm.mx.Lock() + ws := sm.state[status.id] + delete(sm.state, status.id) + + // we track the last few workers for debug purposes + sm.history[sm.historyI] = ws + sm.historyI++ + sm.historyI %= len(sm.history) + sm.mx.Unlock() + + if status.err != nil { + // we failed to sync this target -- log it and try to work on an extended chain + // if there is nothing related to be worked on, we stop working on this chain. + log.Errorf("error during sync in %s: %s", ws.ts, status.err) + } else { + // add to the recently synced buffer + sm.recent.Push(ws.ts) + // if we are still in initial sync and this was fast enough, mark the end of the initial sync + if !sm.initialSyncDone && ws.dt < InitialSyncTimeThreshold { + sm.initialSyncDone = true + } + } + + // we are done with this target, select the next sync target and spawn a worker if there is work + // to do, because of an extension of this chain. + target, work, err := sm.selectSyncTarget(ws.ts) + if err != nil { + log.Warnf("failed to select sync target: %s", err) + return + } + + if work { + log.Infof("selected sync target: %s", target) + sm.spawnWorker(target) + } +} + +func (sm *syncManager) handleInitialSyncDone() { + // we have just finished the initial sync; spawn some additional workers in deferred syncs + // as needed (and up to MaxSyncWorkers) to ramp up chain sync + for len(sm.state) < MaxSyncWorkers { + target, work, err := sm.selectDeferredSyncTarget() + if err != nil { + log.Errorf("error selecting deferred sync target: %s", err) + return + } + + if !work { + return + } + + log.Infof("selected deferred sync target: %s", target) + sm.spawnWorker(target) + } +} + +func (sm *syncManager) spawnWorker(target *types.TipSet) { + id := sm.nextWorker + sm.nextWorker++ + ws := &workerState{ + id: id, + ts: target, + ss: new(SyncerState), + } + ws.ss.data.WorkerID = id + + sm.mx.Lock() + sm.state[id] = ws + sm.mx.Unlock() + + go sm.worker(ws) +} + +func (sm *syncManager) worker(ws *workerState) { + log.Infof("worker %d syncing in %s", ws.id, ws.ts) + + start := build.Clock.Now() + + ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss) + err := sm.doSync(ctx, ws.ts) + + ws.dt = build.Clock.Since(start) + log.Infof("worker %d done; took %s", ws.id, ws.dt) + select { + case sm.statusq <- workerStatus{id: ws.id, err: err}: + case <-sm.ctx.Done(): + } +} + +// selects the initial sync target by examining known peer heads; only called once for the initial +// sync. +func (sm *syncManager) selectInitialSyncTarget() (*types.TipSet, error) { + var buckets syncBucketSet + + var peerHeads []*types.TipSet + for _, ts := range sm.heads { + peerHeads = append(peerHeads, ts) + } + // clear the map, we don't use it any longer + sm.heads = nil + + sort.Slice(peerHeads, func(i, j int) bool { + return peerHeads[i].Height() < peerHeads[j].Height() + }) + + for _, ts := range peerHeads { + buckets.Insert(ts) + } + + if len(buckets.buckets) > 1 { + log.Warn("caution, multiple distinct chains seen during head selections") + // TODO: we *could* refuse to sync here without user intervention. + // For now, just select the best cluster + } + + return buckets.Heaviest(), nil +} + +// adds a tipset to the potential sync targets; returns true if there is a a tipset to work on. +// this could be either a restart, eg because there is no currently scheduled sync work or a worker +// failed or a potential fork. +func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, error) { + // Note: we don't need the state lock here to access the active worker states, as the only + // competing threads that may access it do so through State() which is read only. + + // if we have recently synced this or any heavier tipset we just ignore it; this can happen + // with an empty worker set after we just finished syncing to a target + if sm.recent.Synced(ts) { + return nil, false, nil + } + + // if the worker set is empty, we have finished syncing and were waiting for the next tipset + // in this case, we just return the tipset as work to be done + if len(sm.state) == 0 { + return ts, true, nil + } + + // check if it is related to any active sync; if so insert into the pending sync queue + for _, ws := range sm.state { + if ts.Equals(ws.ts) { + // ignore it, we are already syncing it + return nil, false, nil + } + + if ts.Parents() == ws.ts.Key() { + // schedule for syncing next; it's an extension of an active sync + sm.pend.Insert(ts) + return nil, false, nil + } + } + + // check to see if it is related to any pending sync; if so insert it into the pending sync queue + if sm.pend.RelatedToAny(ts) { + sm.pend.Insert(ts) + return nil, false, nil + } + + // it's not related to any active or pending sync; this could be a fork in which case we + // start a new worker to sync it, if it is *heavier* than any active or pending set; + // if it is not, we ignore it. + for _, ws := range sm.state { + if isHeavier(ws.ts, ts) { + return nil, false, nil + } + } + + pendHeaviest := sm.pend.Heaviest() + if pendHeaviest != nil && isHeavier(pendHeaviest, ts) { + return nil, false, nil + } + + // if we have not finished the initial sync or have too many workers, add it to the deferred queue; + // it will be processed once a worker is freed from syncing a chain (or the initial sync finishes) + if !sm.initialSyncDone || len(sm.state) >= MaxSyncWorkers { + log.Infof("deferring sync on %s", ts) + sm.deferred.Insert(ts) + return nil, false, nil + } + + // start a new worker, seems heavy enough and unrelated to active or pending syncs + return ts, true, nil +} + +// selects the next sync target after a worker sync has finished; returns true and a target +// TipSet if this chain should continue to sync because there is a heavier related tipset. +func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool, error) { + // we pop the related bucket and if there is any related tipset, we work on the heaviest one next + // if we are not already working on a heavier tipset + related := sm.pend.PopRelated(done) + if related == nil { + return sm.selectDeferredSyncTarget() + } + + heaviest := related.heaviestTipSet() + if isHeavier(done, heaviest) { + return sm.selectDeferredSyncTarget() + } + + for _, ws := range sm.state { + if isHeavier(ws.ts, heaviest) { + return sm.selectDeferredSyncTarget() + } + } + + if sm.recent.Synced(heaviest) { + return sm.selectDeferredSyncTarget() + } + + return heaviest, true, nil +} + +// selects a deferred sync target if there is any; these are sync targets that were not related to +// active syncs and were deferred because there were too many workers running +func (sm *syncManager) selectDeferredSyncTarget() (*types.TipSet, bool, error) { +deferredLoop: + for !sm.deferred.Empty() { + bucket := sm.deferred.Pop() + heaviest := bucket.heaviestTipSet() + + if sm.recent.Synced(heaviest) { + // we have synced it or something heavier recently, skip it + continue deferredLoop + } + + if sm.pend.RelatedToAny(heaviest) { + // this has converged to a pending sync, insert it to the pending queue + sm.pend.Insert(heaviest) + continue deferredLoop + } + + for _, ws := range sm.state { + if ws.ts.Equals(heaviest) || isHeavier(ws.ts, heaviest) { + // we have converged and are already syncing it or we are syncing on something heavier + // ignore it and pop the next deferred bucket + continue deferredLoop + } + + if heaviest.Parents() == ws.ts.Key() { + // we have converged and we are syncing its parent; insert it to the pending queue + sm.pend.Insert(heaviest) + continue deferredLoop + } + + // it's not related to any active or pending sync and this worker is free, so sync it! + return heaviest, true, nil + } } - return ret + + return nil, false, nil } +func isHeavier(a, b *types.TipSet) bool { + return a.ParentWeight().GreaterThan(b.ParentWeight()) +} + +// sync buffer -- this is a circular buffer of recently synced tipsets +type syncBuffer struct { + buf []*types.TipSet + next int +} + +func newSyncBuffer(size int) *syncBuffer { + return &syncBuffer{buf: make([]*types.TipSet, size)} +} + +func (sb *syncBuffer) Push(ts *types.TipSet) { + sb.buf[sb.next] = ts + sb.next++ + sb.next %= len(sb.buf) +} + +func (sb *syncBuffer) Synced(ts *types.TipSet) bool { + for _, rts := range sb.buf { + if rts != nil && (rts.Equals(ts) || isHeavier(rts, ts)) { + return true + } + } + + return false +} + +// sync buckets and related utilities type syncBucketSet struct { buckets []*syncTargetBucket } +type syncTargetBucket struct { + tips []*types.TipSet +} + func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket { var stb syncTargetBucket for _, ts := range tipsets { @@ -250,10 +622,6 @@ func (sbs *syncBucketSet) Empty() bool { return len(sbs.buckets) == 0 } -type syncTargetBucket struct { - tips []*types.TipSet -} - func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { for _, t := range stb.tips { if ts.Equals(t) { @@ -265,19 +633,43 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool { if ts.Parents() == t.Key() { return true } - if coalesceForksParents && ts.Parents() == t.Parents() { - return true - } } return false } func (stb *syncTargetBucket) add(ts *types.TipSet) { - - for _, t := range stb.tips { + for i, t := range stb.tips { if t.Equals(ts) { return } + if coalesceTipsets && t.Height() == ts.Height() && + types.CidArrsEqual(t.Blocks()[0].Parents, ts.Blocks()[0].Parents) { + miners := make(map[address.Address]struct{}) + newTs := []*types.BlockHeader{} + for _, b := range t.Blocks() { + _, have := miners[b.Miner] + if !have { + newTs = append(newTs, b) + miners[b.Miner] = struct{}{} + } + } + for _, b := range ts.Blocks() { + _, have := miners[b.Miner] + if !have { + newTs = append(newTs, b) + miners[b.Miner] = struct{}{} + } + } + + ts2, err := types.NewTipSet(newTs) + if err != nil { + log.Warnf("error while trying to recombine a tipset in a bucket: %+v", err) + continue + } + stb.tips[i] = ts2 + return + } + } stb.tips = append(stb.tips, ts) @@ -296,196 +688,3 @@ func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet { } return best } - -func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) { - var buckets syncBucketSet - - var peerHeads []*types.TipSet - for _, ts := range sm.peerHeads { - peerHeads = append(peerHeads, ts) - } - sort.Slice(peerHeads, func(i, j int) bool { - return peerHeads[i].Height() < peerHeads[j].Height() - }) - - for _, ts := range peerHeads { - buckets.Insert(ts) - } - - if len(buckets.buckets) > 1 { - log.Warn("caution, multiple distinct chains seen during head selections") - // TODO: we *could* refuse to sync here without user intervention. - // For now, just select the best cluster - } - - return buckets.Heaviest(), nil -} - -func (sm *syncManager) syncScheduler() { - for { - select { - case ts, ok := <-sm.incomingTipSets: - if !ok { - log.Info("shutting down sync scheduler") - return - } - - sm.scheduleIncoming(ts) - case res := <-sm.syncResults: - sm.scheduleProcessResult(res) - case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet(): - sm.scheduleWorkSent() - case <-sm.stop: - log.Info("sync scheduler shutting down") - return - } - } -} - -func (sm *syncManager) scheduleIncoming(ts *types.TipSet) { - log.Debug("scheduling incoming tipset sync: ", ts.Cids()) - if sm.getBootstrapState() == BSStateSelected { - sm.setBootstrapState(BSStateScheduled) - sm.syncTargets <- ts - return - } - - var relatedToActiveSync bool - for _, acts := range sm.activeSyncs { - if ts.Equals(acts) { - // ignore, we are already syncing it - return - } - - if ts.Parents() == acts.Key() { - // sync this next, after that sync process finishes - relatedToActiveSync = true - } - } - - if !relatedToActiveSync && sm.activeSyncTips.RelatedToAny(ts) { - relatedToActiveSync = true - } - - // if this is related to an active sync process, immediately bucket it - // we don't want to start a parallel sync process that duplicates work - if relatedToActiveSync { - sm.activeSyncTips.Insert(ts) - return - } - - if sm.getBootstrapState() == BSStateScheduled { - sm.syncQueue.Insert(ts) - return - } - - if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) { - sm.nextSyncTarget.add(ts) - } else { - sm.syncQueue.Insert(ts) - - if sm.nextSyncTarget == nil { - sm.nextSyncTarget = sm.syncQueue.Pop() - sm.workerChan = sm.syncTargets - } - } -} - -func (sm *syncManager) scheduleProcessResult(res *syncResult) { - if res.success && sm.getBootstrapState() != BSStateComplete { - sm.setBootstrapState(BSStateComplete) - } - - delete(sm.activeSyncs, res.ts.Key()) - relbucket := sm.activeSyncTips.PopRelated(res.ts) - if relbucket != nil { - if res.success { - if sm.nextSyncTarget == nil { - sm.nextSyncTarget = relbucket - sm.workerChan = sm.syncTargets - } else { - for _, t := range relbucket.tips { - sm.syncQueue.Insert(t) - } - } - return - } - // TODO: this is the case where we try to sync a chain, and - // fail, and we have more blocks on top of that chain that - // have come in since. The question is, should we try to - // sync these? or just drop them? - log.Error("failed to sync chain but have new unconnected blocks from chain") - } - - if sm.nextSyncTarget == nil && !sm.syncQueue.Empty() { - next := sm.syncQueue.Pop() - if next != nil { - sm.nextSyncTarget = next - sm.workerChan = sm.syncTargets - } - } -} - -func (sm *syncManager) scheduleWorkSent() { - hts := sm.nextSyncTarget.heaviestTipSet() - sm.activeSyncs[hts.Key()] = hts - - if !sm.syncQueue.Empty() { - sm.nextSyncTarget = sm.syncQueue.Pop() - } else { - sm.nextSyncTarget = nil - sm.workerChan = nil - } -} - -func (sm *syncManager) syncWorker(id int) { - ss := sm.syncStates[id] - for { - select { - case ts, ok := <-sm.syncTargets: - if !ok { - log.Info("sync manager worker shutting down") - return - } - - ctx := context.WithValue(context.TODO(), syncStateKey{}, ss) - err := sm.doSync(ctx, ts) - if err != nil { - log.Errorf("sync error: %+v", err) - } - - sm.syncResults <- &syncResult{ - ts: ts, - success: err == nil, - } - } - } -} - -func (sm *syncManager) syncedPeerCount() int { - var count int - for _, ts := range sm.peerHeads { - if ts.Height() > 0 { - count++ - } - } - return count -} - -func (sm *syncManager) getBootstrapState() int { - sm.bssLk.Lock() - defer sm.bssLk.Unlock() - return sm.bootstrapState -} - -func (sm *syncManager) setBootstrapState(v int) { - sm.bssLk.Lock() - defer sm.bssLk.Unlock() - sm.bootstrapState = v -} - -func (sm *syncManager) IsBootstrapped() bool { - sm.bssLk.Lock() - defer sm.bssLk.Unlock() - return sm.bootstrapState == BSStateComplete -} diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 709e03a4108..61985b964ef 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -10,6 +10,10 @@ import ( "github.com/filecoin-project/lotus/chain/types/mock" ) +func init() { + BootstrapPeerThreshold = 1 +} + var genTs = mock.TipSet(mock.MkBlock(nil, 0, 0)) type syncOp struct { @@ -28,7 +32,12 @@ func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, <-ch return nil }).(*syncManager) - sm.bspThresh = thresh + + oldBootstrapPeerThreshold := BootstrapPeerThreshold + BootstrapPeerThreshold = thresh + defer func() { + BootstrapPeerThreshold = oldBootstrapPeerThreshold + }() sm.Start() defer sm.Stop() @@ -87,47 +96,57 @@ func TestSyncManagerEdgeCase(t *testing.T) { runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { sm.SetPeerHead(ctx, "peer1", a) - assertGetSyncOp(t, stc, a) sm.SetPeerHead(ctx, "peer1", b1) sm.SetPeerHead(ctx, "peer1", b2) - // b1 and b2 are being processed - b1op := <-stc - b2op := <-stc - if !b1op.ts.Equals(b1) { - b1op, b2op = b2op, b1op - } + assertGetSyncOp(t, stc, a) - sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0 - sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1 - sm.SetPeerHead(ctx, "peer3", b2) // b2 is related to c2 and even though it is actively synced it is put into activeSyncTips index 0 - sm.SetPeerHead(ctx, "peer1", a) // a is related to b2 and is put into activeSyncTips index 0 + // b1 and b2 are in queue after a; the sync manager should pick the heaviest one which is b2 + bop := <-stc + if !bop.ts.Equals(b2) { + t.Fatalf("Expected tipset %s to sync, but got %s", b2, bop.ts) + } - b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0 - // even though correct one is index 1 + sm.SetPeerHead(ctx, "peer2", c2) + sm.SetPeerHead(ctx, "peer2", c1) + sm.SetPeerHead(ctx, "peer3", b2) + sm.SetPeerHead(ctx, "peer1", a) - b2op.done() - // b2 completes and is not related to c1, so it leaves activeSyncTips as it is + bop.done() - waitUntilAllWorkersAreDone(stc) + // get the next sync target; it should be c1 as the heaviest tipset but added last (same weight as c2) + bop = <-stc + if !bop.ts.Equals(c1) { + t.Fatalf("Expected tipset %s to sync, but got %s", c1, bop.ts) + } - if len(sm.activeSyncTips.buckets) != 0 { - t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String()) + sm.SetPeerHead(ctx, "peer4", d1) + sm.SetPeerHead(ctx, "peer5", e1) + bop.done() + + // get the last sync target; it should be e1 + var last *types.TipSet + for i := 0; i < 10; { + select { + case bop = <-stc: + bop.done() + if last == nil || bop.ts.Height() > last.Height() { + last = bop.ts + } + default: + i++ + time.Sleep(10 * time.Millisecond) + } + } + if !last.Equals(e1) { + t.Fatalf("Expected tipset %s to sync, but got %s", e1, last) } - }) -} -func waitUntilAllWorkersAreDone(stc chan *syncOp) { - for i := 0; i < 10; { - select { - case so := <-stc: - so.done() - default: - i++ - time.Sleep(10 * time.Millisecond) + if len(sm.state) != 0 { + t.Errorf("active syncs expected empty but got: %d", len(sm.state)) } - } + }) } func TestSyncManager(t *testing.T) { diff --git a/chain/syncstate.go b/chain/syncstate.go index 26f9f1c39f0..527d6be4832 100644 --- a/chain/syncstate.go +++ b/chain/syncstate.go @@ -12,13 +12,14 @@ import ( ) type SyncerStateSnapshot struct { - Target *types.TipSet - Base *types.TipSet - Stage api.SyncStateStage - Height abi.ChainEpoch - Message string - Start time.Time - End time.Time + WorkerID uint64 + Target *types.TipSet + Base *types.TipSet + Stage api.SyncStateStage + Height abi.ChainEpoch + Message string + Start time.Time + End time.Time } type SyncerState struct { diff --git a/cli/sync.go b/cli/sync.go index c3f25eb1d56..ff7d4bd6598 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -45,8 +45,8 @@ var syncStatusCmd = &cli.Command{ } fmt.Println("sync status:") - for i, ss := range state.ActiveSyncs { - fmt.Printf("worker %d:\n", i) + for _, ss := range state.ActiveSyncs { + fmt.Printf("worker %d:\n", ss.WorkerID) var base, target []cid.Cid var heightDiff int64 var theight abi.ChainEpoch @@ -263,12 +263,17 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error { return err } + if len(state.ActiveSyncs) == 0 { + time.Sleep(time.Second) + continue + } + head, err := napi.ChainHead(ctx) if err != nil { return err } - working := 0 + working := -1 for i, ss := range state.ActiveSyncs { switch ss.Stage { case api.StageSyncComplete: @@ -279,7 +284,12 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error { } } + if working == -1 { + working = len(state.ActiveSyncs) - 1 + } + ss := state.ActiveSyncs[working] + workerID := ss.WorkerID var baseHeight abi.ChainEpoch var target []cid.Cid @@ -302,7 +312,7 @@ func SyncWait(ctx context.Context, napi api.FullNode, watch bool) error { fmt.Print("\r\x1b[2K\x1b[A") } - fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", working, baseHeight, theight, heightDiff) + fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", workerID, baseHeight, theight, heightDiff) fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height) lastLines = 2 diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go index 05d4c9cb740..1a088fb7721 100644 --- a/node/impl/full/sync.go +++ b/node/impl/full/sync.go @@ -37,13 +37,14 @@ func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) { for i := range states { ss := &states[i] out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{ - Base: ss.Base, - Target: ss.Target, - Stage: ss.Stage, - Height: ss.Height, - Start: ss.Start, - End: ss.End, - Message: ss.Message, + WorkerID: ss.WorkerID, + Base: ss.Base, + Target: ss.Target, + Stage: ss.Stage, + Height: ss.Height, + Start: ss.Start, + End: ss.End, + Message: ss.Message, }) } return out, nil diff --git a/node/test/builder.go b/node/test/builder.go index ea9a8222048..46efb90742c 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -23,6 +23,7 @@ import ( "github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/api/test" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/policy" @@ -50,6 +51,10 @@ import ( "github.com/stretchr/testify/require" ) +func init() { + chain.BootstrapPeerThreshold = 1 +} + func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode { r := repo.NewMemory(nil)