Skip to content

Commit

Permalink
eth, core: track block propagation (#1078)
Browse files Browse the repository at this point in the history
* eth, core: track block announcement and import timings

* eth: track total delay

* eth: add more context and prefix in logs

* eth/fetcher: fix invalid tracking of received at time

* put logs behind log.enable-block-tracking flag

* fix lint

* improve logging

* add comments

* add comments
  • Loading branch information
manav2401 authored Mar 5, 2024
1 parent de45e3f commit cb23b9b
Show file tree
Hide file tree
Showing 20 changed files with 160 additions and 85 deletions.
1 change: 1 addition & 0 deletions builder/files/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ syncmode = "full"
# json = false
# backtrace = ""
# debug = true
# enable-block-tracking = false

[p2p]
# maxpeers = 1
Expand Down
1 change: 1 addition & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ type Block struct {
// inter-peer block relay.
ReceivedAt time.Time
ReceivedFrom interface{}
AnnouncedAt *time.Time
}

// "external" block encoding. used for eth protocol, etc.
Expand Down
9 changes: 5 additions & 4 deletions docs/cli/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ devfakeauthor = false # Run miner without validator set authorization
"32000000" = "0x875500011e5eecc0c554f95d07b31cf59df4ca2505f4dbbfffa7d4e4da917c68"

[log]
vmodule = "" # Per-module verbosity: comma-separated list of <pattern>=<level> (e.g. eth/*=5,p2p=4)
json = false # Format logs with JSON
backtrace = "" # Request a stack trace at a specific logging statement (e.g. "block.go:271")
debug = true # Prepends log messages with call-site location (file and line number) - {requires some effort}
vmodule = "" # Per-module verbosity: comma-separated list of <pattern>=<level> (e.g. eth/*=5,p2p=4)
json = false # Format logs with JSON
backtrace = "" # Request a stack trace at a specific logging statement (e.g. "block.go:271")
debug = true # Prepends log messages with call-site location (file and line number)
enable-block-tracking = false # Enables additional logging of information collected while tracking block lifecycle

[p2p]
maxpeers = 50 # Maximum number of network peers (network disabled if set to 0)
Expand Down
2 changes: 2 additions & 0 deletions docs/cli/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ The ```bor server``` command runs the Bor client.

- ```log.debug```: Prepends log messages with call-site location (file and line number) (default: false)

- ```log.enable-block-tracking```: Enables additional logging of information collected while tracking block lifecycle (default: false)

- ```log.json```: Format logs with JSON (default: false)

- ```vmodule```: Per-module verbosity: comma-separated list of <pattern>=<level> (e.g. eth/*=5,p2p=4)
Expand Down
25 changes: 13 additions & 12 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Merger: eth.merger,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
EthAPI: blockChainAPI,
checker: checker,
txArrivalWait: eth.p2pServer.TxArrivalWait,
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Merger: eth.merger,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
EthAPI: blockChainAPI,
checker: checker,
txArrivalWait: eth.p2pServer.TxArrivalWait,
enableBlockTracking: eth.config.EnableBlockTracking,
}); err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ type Config struct {

// OverrideVerkle (TODO: remove after the fork)
OverrideVerkle *big.Int `toml:",omitempty"`

// EnableBlockTracking allows logging of information collected while tracking block lifecycle
EnableBlockTracking bool
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
110 changes: 72 additions & 38 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,20 @@ type blockAnnounce struct {

// headerFilterTask represents a batch of headers needing fetcher filtering.
type headerFilterTask struct {
peer string // The source peer of block headers
headers []*types.Header // Collection of headers to filter
time time.Time // Arrival time of the headers
peer string // The source peer of block headers
headers []*types.Header // Collection of headers to filter
time time.Time // Arrival time of the headers
announcedTime time.Time // Announcement time of the availability of the block
}

// bodyFilterTask represents a batch of block bodies (transactions and uncles)
// needing fetcher filtering.
type bodyFilterTask struct {
peer string // The source peer of block bodies
transactions [][]*types.Transaction // Collection of transactions per block bodies
uncles [][]*types.Header // Collection of uncles per block bodies
time time.Time // Arrival time of the blocks' contents
peer string // The source peer of block bodies
transactions [][]*types.Transaction // Collection of transactions per block bodies
uncles [][]*types.Header // Collection of uncles per block bodies
time time.Time // Arrival time of the blocks' contents
announcedTime time.Time // Announcement time of the availability of the block
}

// blockOrHeaderInject represents a schedules import operation.
Expand Down Expand Up @@ -197,34 +199,38 @@ type BlockFetcher struct {
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)

// Logging
enableBlockTracking bool // Whether to log information collected while tracking block lifecycle
}

// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn, enableBlockTracking bool) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
enableBlockTracking: enableBlockTracking,
}
}

Expand Down Expand Up @@ -276,7 +282,7 @@ func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {

// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time, announcedAt time.Time) []*types.Header {
log.Trace("Filtering headers", "peer", peer, "headers", len(headers))

// Send the filter channel to the fetcher
Expand All @@ -289,7 +295,7 @@ func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time
}
// Request the filtering of the header list
select {
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time, announcedTime: announcedAt}:
case <-f.quit:
return nil
}
Expand All @@ -304,7 +310,7 @@ func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time

// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time, announcedAt time.Time) ([][]*types.Transaction, [][]*types.Header) {
log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))

// Send the filter channel to the fetcher
Expand All @@ -317,7 +323,7 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac
}
// Request the filtering of the body list
select {
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time, announcedTime: announcedAt}:
case <-f.quit:
return nil, nil
}
Expand Down Expand Up @@ -480,7 +486,7 @@ func (f *BlockFetcher) loop() {
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)

// Create a closure of the fetch and schedule in on a new thread
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
fetchHeader, hashes, announcedAt := f.fetching[hashes[0]].fetchHeader, hashes, f.fetching[hashes[0]].time
go func(peer string) {
if f.fetchingHook != nil {
f.fetchingHook(hashes)
Expand All @@ -504,7 +510,7 @@ func (f *BlockFetcher) loop() {
select {
case res := <-resCh:
res.Done <- nil
f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time))
f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now(), announcedAt)

case <-timeout.C:
// The peer didn't respond in time. The request
Expand Down Expand Up @@ -547,6 +553,7 @@ func (f *BlockFetcher) loop() {

fetchBodies := f.completing[hashes[0]].fetchBodies
bodyFetchMeter.Mark(int64(len(hashes)))
announcedAt := f.completing[hashes[0]].time

go func(peer string, hashes []common.Hash) {
resCh := make(chan *eth.Response)
Expand All @@ -565,7 +572,7 @@ func (f *BlockFetcher) loop() {
res.Done <- nil
// Ignoring withdrawals here, since the block fetcher is not used post-merge.
txs, uncles, _ := res.Res.(*eth.BlockBodiesPacket).Unpack()
f.FilterBodies(peer, txs, uncles, time.Now())
f.FilterBodies(peer, txs, uncles, time.Now(), announcedAt)

case <-timeout.C:
// The peer didn't respond in time. The request
Expand Down Expand Up @@ -631,6 +638,7 @@ func (f *BlockFetcher) loop() {

block := types.NewBlockWithHeader(header)
block.ReceivedAt = task.time
block.AnnouncedAt = &task.announcedTime

complete = append(complete, block)
f.completing[hash] = announce
Expand Down Expand Up @@ -725,6 +733,7 @@ func (f *BlockFetcher) loop() {
if f.getBlock(hash) == nil {
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
block.ReceivedAt = task.time
block.AnnouncedAt = &task.announcedTime
blocks = append(blocks, block)
} else {
f.forgetHash(hash)
Expand Down Expand Up @@ -923,6 +932,31 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
return
}

if f.enableBlockTracking {
// Log the insertion event
var (
msg string
delayInMs uint64
prettyDelay common.PrettyDuration
)

if block.AnnouncedAt != nil {
msg = "[block tracker] Inserted new block with announcement"
delayInMs = uint64(time.Since(*block.AnnouncedAt).Milliseconds())
prettyDelay = common.PrettyDuration(time.Since(*block.AnnouncedAt))
} else {
msg = "[block tracker] Inserted new block without announcement"
delayInMs = uint64(time.Since(block.ReceivedAt).Milliseconds())
prettyDelay = common.PrettyDuration(time.Since(block.ReceivedAt))
}

totalDelayInMs := uint64(time.Now().UnixMilli()) - block.Time()*1000
totalDelay := common.PrettyDuration(time.Millisecond * time.Duration(totalDelayInMs))

log.Info(msg, "number", block.Number().Uint64(), "hash", hash, "delay", prettyDelay, "delayInMs", delayInMs, "totalDelay", totalDelay, "totalDelayInMs", totalDelayInMs)
}

// If import succeeded, broadcast the block
blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)

Expand Down
2 changes: 1 addition & 1 deletion eth/fetcher/block_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newTester(light bool) *fetcherTester {
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
}
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer, false)
tester.fetcher.Start()

return tester
Expand Down
61 changes: 35 additions & 26 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,19 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Merger *consensus.Merger // The manager for eth1/2 transition
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
txArrivalWait time.Duration // Maximum duration to wait for an announced tx before requesting it
checker ethereum.ChainValidator
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
EthAPI *ethapi.BlockChainAPI // EthAPI to interact
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Merger *consensus.Merger // The manager for eth1/2 transition
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
txArrivalWait time.Duration // Maximum duration to wait for an announced tx before requesting it
checker ethereum.ChainValidator
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
EthAPI *ethapi.BlockChainAPI // EthAPI to interact
enableBlockTracking bool // Whether to log information collected while tracking block lifecycle
}

type handler struct {
Expand Down Expand Up @@ -126,6 +127,8 @@ type handler struct {

requiredBlocks map[uint64]common.Hash

enableBlockTracking bool

// channels for fetcher, syncer, txsyncLoop
quitSync chan struct{}

Expand All @@ -144,19 +147,20 @@ func newHandler(config *handlerConfig) (*handler, error) {
}

h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
merger: config.Merger,
ethAPI: config.EthAPI,
requiredBlocks: config.RequiredBlocks,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
merger: config.Merger,
ethAPI: config.EthAPI,
requiredBlocks: config.RequiredBlocks,
enableBlockTracking: config.enableBlockTracking,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
}
if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the snap
Expand Down Expand Up @@ -295,7 +299,7 @@ func newHandler(config *handlerConfig) (*handler, error) {

return n, err
}
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer, h.enableBlockTracking)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer)
Expand Down Expand Up @@ -688,6 +692,11 @@ func (h *handler) minedBroadcastLoop() {

for obj := range h.minedBlockSub.Chan() {
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
if h.enableBlockTracking {
delayInMs := uint64(time.Now().UnixMilli()) - ev.Block.Time()*1000
delay := common.PrettyDuration(time.Millisecond * time.Duration(delayInMs))
log.Info("[block tracker] Broadcasting mined block", "number", ev.Block.NumberU64(), "hash", ev.Block.Hash(), "blockTime", ev.Block.Time(), "now", time.Now().Unix(), "delay", delay, "delayInMs", delayInMs)
}
h.BroadcastBlock(ev.Block, true) // First propagate block to peers
h.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
Expand Down
Loading

0 comments on commit cb23b9b

Please sign in to comment.