Skip to content

Commit

Permalink
ligth sync: download difflayer
Browse files Browse the repository at this point in the history
Signed-off-by: kyrie-yl <lei.y@binance.com>
  • Loading branch information
kyrie-yl committed Sep 2, 2021
1 parent 2ae0ddf commit cbd10bf
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 8 deletions.
25 changes: 18 additions & 7 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ type Downloader struct {
quitLock sync.Mutex // Lock to prevent double closes

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func(SyncMode, string, []*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func(SyncMode, string, []*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

// LightChain encapsulates functions required to synchronise a light chain.
Expand Down Expand Up @@ -220,8 +220,10 @@ type BlockChain interface {
Snapshots() *snapshot.Tree
}

type DownloadOption func(downloader *Downloader) *Downloader

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, options ...DownloadOption) *Downloader {
if lightchain == nil {
lightchain = chain
}
Expand Down Expand Up @@ -252,6 +254,11 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
},
trackStateReq: make(chan *stateReq),
}
for _, option := range options {
if dl != nil {
dl = option(dl)
}
}
go dl.qosTuner()
go dl.stateFetcher()
return dl
Expand Down Expand Up @@ -290,6 +297,10 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
}
}

func (d *Downloader) SetBodyFetchHook(hook func(SyncMode, string, []*types.Header)) {
d.bodyFetchHook = hook
}

// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0
Expand Down Expand Up @@ -1359,7 +1370,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - kind: textual label of the type being downloaded to display in log messages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
fetchHook func(SyncMode, string, []*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
Expand Down Expand Up @@ -1508,7 +1519,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(request.Headers)
fetchHook(d.getMode(),peer.id, request.Headers)
}
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
Expand Down
6 changes: 5 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func newHandler(config *handlerConfig) (*handler, error) {
if atomic.LoadUint32(&h.fastSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 {
h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database)
}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer)
var downloadOptions []downloader.DownloadOption
if h.lightSync {
downloadOptions = append(downloadOptions, DiffBodiesFetchOption(h.peers))
}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer, downloadOptions...)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
Expand Down
20 changes: 20 additions & 0 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/protocols/diff"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
Expand Down Expand Up @@ -66,6 +68,24 @@ type peerSet struct {
closed bool
}

func DiffBodiesFetchOption(peers *peerSet) downloader.DownloadOption {
return func(dl *downloader.Downloader) *downloader.Downloader {
var hook = func(mode downloader.SyncMode, peerID string, headers []*types.Header) {
if mode == downloader.FullSync {
if ep := peers.peer(peerID); ep != nil && ep.diffExt != nil {
hashes := make([]common.Hash, 0,len(headers))
for _, header := range headers {
hashes = append(hashes, header.Hash())
}
ep.diffExt.RequestDiffLayers(hashes)
}
}
}
dl.SetBodyFetchHook(hook)
return dl
}
}

// newPeerSet creates a new peer set to track the active participants.
func newPeerSet() *peerSet {
return &peerSet{
Expand Down

0 comments on commit cbd10bf

Please sign in to comment.