Skip to content

Commit

Permalink
eth/downloader: dynamically move pivot even during chain sync
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Sep 18, 2020
1 parent faba018 commit fb835c0
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 72 deletions.
225 changes: 172 additions & 53 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ type Downloader struct {
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks

// for stateFetcher
// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates

stateSyncStart chan *stateSync
trackStateReq chan *stateReq
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
Expand Down Expand Up @@ -451,10 +454,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
}(time.Now())

// Look up the sync boundaries: the common ancestor and the target block
latest, err := d.fetchHeight(p)
latest, pivot, err := d.fetchHead(p)
if err != nil {
return err
}
if mode == FastSync && pivot == nil {
// If no pivot block was returned, the head is below the min full block
// threshold (i.e. new chian). In that case we won't really fast sync
// anyway, but still need a valid pivot block to avoid some code hitting
// nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()

origin, err := d.findAncestor(p, latest)
Expand All @@ -469,22 +479,21 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.syncStatsLock.Unlock()

// Ensure our origin point is below any fast sync pivot point
pivot := uint64(0)
if mode == FastSync {
if height <= uint64(fsMinFullBlocks) {
origin = 0
} else {
pivot = height - uint64(fsMinFullBlocks)
if pivot <= origin {
origin = pivot - 1
pivotNumber := pivot.Number.Uint64()
if pivotNumber <= origin {
origin = pivotNumber - 1
}
// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
}
}
d.committed = 1
if mode == FastSync && pivot != 0 {
if mode == FastSync && pivot.Number.Uint64() != 0 {
d.committed = 0
}
if mode == FastSync {
Expand Down Expand Up @@ -530,13 +539,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.syncInitHook(origin, height)
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, pivot, td) },
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, td) },
}
if mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
d.pivotLock.Lock()
d.pivotHeader = pivot
d.pivotLock.Unlock()

fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
} else if mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
Expand Down Expand Up @@ -617,46 +630,63 @@ func (d *Downloader) Terminate() {
d.Cancel()
}

// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
p.log.Debug("Retrieving remote chain height")
// fetchHead retrieves the head header and prior pivot block (if available) from
// a remote peer.
func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {
p.log.Debug("Retrieving remote chain head")
mode := d.getMode()

// Request the advertised remote head block and wait for the response
head, _ := p.peer.Head()
go p.peer.RequestHeadersByHash(head, 1, 0, false)
latest, _ := p.peer.Head()
fetch := 1
if mode == FastSync {
fetch = 2 // head + pivot headers
}
go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)

ttl := d.requestTTL()
timeout := time.After(ttl)
mode := d.getMode()
for {
select {
case <-d.cancelCh:
return nil, errCanceled
return nil, nil, errCanceled

case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
// Make sure the peer gave us at least one and at most the requested headers
headers := packet.(*headerPack).headers
if len(headers) != 1 {
p.log.Warn("Multiple headers for single request", "headers", len(headers))
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
if len(headers) == 0 || len(headers) > fetch {
return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch)
}
// The first header needs to be the head, validate against the checkpoint
// and request. If only 1 header was returned, make sure there's no pivot
// or there was not one requested.
head := headers[0]
if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash())
return nil, errUnsyncedPeer
return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
}
if len(headers) == 1 {
if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
}
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
return head, nil, nil
}
// At this point we have 2 headers in total and the first is the
// validated head of the chian. Check the pivot number and return,
pivot := headers[1]
if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
}
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
return head, nil
return head, pivot, nil

case <-timeout:
p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
return nil, errTimeout
return nil, nil, errTimeout

case <-d.bodyCh:
case <-d.receiptCh:
Expand Down Expand Up @@ -871,14 +901,14 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
case <-d.cancelCh:
return 0, errCanceled

case packer := <-d.headerCh:
case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packer.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
if packet.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
headers := packer.(*headerPack).headers
headers := packet.(*headerPack).headers
if len(headers) != 1 {
p.log.Warn("Multiple headers for single request", "headers", len(headers))
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
Expand Down Expand Up @@ -937,12 +967,13 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
// other peers are only accepted if they map cleanly to the skeleton. If no one
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
p.log.Debug("Directing header downloads", "origin", from)
defer p.log.Debug("Header download terminated")

// Create a timeout timer, and the associated header fetcher
skeleton := true // Skeleton assembly phase or finishing up
pivoting := false // Whether the next request is pivot verification
request := time.Now() // time of the last skeleton fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty
Expand All @@ -963,6 +994,20 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
}
}
getNextPivot := func() {
pivoting = true
request = time.Now()

ttl = d.requestTTL()
timeout.Reset(ttl)

d.pivotLock.RLock()
pivot := d.pivotHeader.Number.Uint64()
d.pivotLock.RUnlock()

p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks))
go p.peer.RequestHeadersByNumber(pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep
}
// Start pulling the header chain skeleton until all is done
ancestor := from
getHeaders(from)
Expand All @@ -982,8 +1027,46 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
headerReqTimer.UpdateSince(request)
timeout.Stop()

// If the pivot is being checked, move if it became stale and run the real retrieval
var pivot uint64

d.pivotLock.RLock()
if d.pivotHeader != nil {
pivot = d.pivotHeader.Number.Uint64()
}
d.pivotLock.RUnlock()

if pivoting {
if packet.Items() == 2 {
// Retrieve the headers and do some sanity checks, just in case
headers := packet.(*headerPack).headers

if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want {
log.Warn("Peer sent invalid next pivot", "have", have, "want", want)
return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want)
}
if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want {
log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want)
return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want)
}
log.Warn("Pivot seemingly stale, moving", "old", pivot, "new", headers[0].Number)
pivot = headers[0].Number.Uint64()

d.pivotLock.Lock()
d.pivotHeader = headers[0]
d.pivotLock.Unlock()

// Write out the pivot into the database so a rollback beyond
// it will reenable fast sync and update the state root that
// the state syncer will be downloading.
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
}
pivoting = false
getHeaders(from)
continue
}
// If the skeleton's finished, pull any remaining head headers directly from the origin
if packet.Items() == 0 && skeleton {
if skeleton && packet.Items() == 0 {
skeleton = false
getHeaders(from)
continue
Expand Down Expand Up @@ -1061,7 +1144,14 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
return errCanceled
}
from += uint64(len(headers))
getHeaders(from)

// If we're still skeleton filling fast sync, check pivot staleness
// before continuing to the next skeleton filling
if skeleton && pivot > 0 {
getNextPivot()
} else {
getHeaders(from)
}
} else {
// No headers delivered, or all of them being delayed, sleep a bit and retry
p.log.Trace("All headers delayed, waiting")
Expand Down Expand Up @@ -1390,7 +1480,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
var (
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
Expand Down Expand Up @@ -1493,6 +1583,14 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// In case of header only syncing, validate the chunk immediately
if mode == FastSync || mode == LightSync {
// If we're importing pure headers, verify based on their recentness
var pivot uint64

d.pivotLock.RLock()
if d.pivotHeader != nil {
pivot = d.pivotHeader.Number.Uint64()
}
d.pivotLock.RUnlock()

frequency := fsHeaderCheckFrequency
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
Expand Down Expand Up @@ -1609,10 +1707,13 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {

// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
func (d *Downloader) processFastSyncContent() error {
// Start syncing state of the reported head block. This should get us most of
// the state of the pivot block.
sync := d.syncState(latest.Root)
d.pivotLock.RLock()
sync := d.syncState(d.pivotHeader.Root)
d.pivotLock.RUnlock()

defer func() {
// The `sync` object is replaced every time the pivot moves. We need to
// defer close the very last active one, hence the lazy evaluation vs.
Expand All @@ -1627,12 +1728,6 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
}
go closeOnErr(sync)

// Figure out the ideal pivot block. Note, that this goalpost may move if the
// sync takes long enough for the chain head to move significantly.
pivot := uint64(0)
if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
pivot = height - uint64(fsMinFullBlocks)
}
// To cater for moving pivot points, track the pivot block and subsequently
// accumulated download results separately.
var (
Expand All @@ -1659,22 +1754,46 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
if oldPivot != nil {
// If we haven't downloaded the pivot block yet, check pivot staleness
// notifications from the header downloader
d.pivotLock.RLock()
pivot := d.pivotHeader
d.pivotLock.RUnlock()

if oldPivot == nil {
if pivot.Root != sync.root {
sync.Cancel()
sync = d.syncState(pivot.Root)

go closeOnErr(sync)
}
} else {
results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
}
// Split around the pivot block and process the two sides via fast/full sync
if atomic.LoadInt32(&d.committed) == 0 {
latest = results[len(results)-1].Header
if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
pivot = height - uint64(fsMinFullBlocks)
latest := results[len(results)-1].Header
// If the height is above the pivot block by 2 sets, it means the pivot
// become stale in the network and it was garbage collected, move to a
// new pivot.
//
// Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those
// need to be taken into account, otherwise we're detecting the pivot move
// late and will drop peers due to unavailable state!!!
if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) {
log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay))
pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted

d.pivotLock.Lock()
d.pivotHeader = pivot
d.pivotLock.Unlock()

// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
rawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64())
}
}
P, beforeP, afterP := splitAroundPivot(pivot, results)
P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results)
if err := d.commitFastSyncData(beforeP, sync); err != nil {
return err
}
Expand Down
Loading

0 comments on commit fb835c0

Please sign in to comment.