Skip to content

Commit

Permalink
Added chain2head event in Ethstats
Browse files Browse the repository at this point in the history
  • Loading branch information
0xsharma committed Dec 22, 2021
1 parent 9957d91 commit 3a67447
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 22 deletions.
4 changes: 0 additions & 4 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,6 @@ func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) e
return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
}

func (b *EthAPIBackend) SubscribeChain2HeadEvent(ch chan<- core.Chain2HeadEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChain2HeadEvent(ch)
}

func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChainSideEvent(ch)
}
Expand Down
5 changes: 5 additions & 0 deletions eth/bor_api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,8 @@ func (b *EthAPIBackend) GetBorBlockTransactionWithBlockHash(ctx context.Context,
func (b *EthAPIBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription {
return b.eth.BlockChain().SubscribeStateSyncEvent(ch)
}

// SubscribeChain2HeadEvent subscribes to reorg/head/fork event
func (b *EthAPIBackend) SubscribeChain2HeadEvent(ch chan<- core.Chain2HeadEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChain2HeadEvent(ch)
}
81 changes: 73 additions & 8 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
txChanSize = 4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10

// chain2HeadChanSize is the size of channel listening to Chain2HeadEvent.
chain2HeadChanSize = 10
)

// backend encompasses the bare-minimum functionality needed for ethstats reporting
Expand All @@ -70,10 +73,15 @@ type backend interface {
SyncProgress() ethereum.SyncProgress
}

type extendedBackend interface {
backend
SubscribeChain2HeadEvent(ch chan<- core.Chain2HeadEvent) event.Subscription
}

// fullNodeBackend encompasses the functionality necessary for a full node
// reporting to ethstats
type fullNodeBackend interface {
backend
extendedBackend
Miner() *miner.Miner
BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
CurrentBlock() *types.Block
Expand All @@ -84,7 +92,7 @@ type fullNodeBackend interface {
// chain statistics up to a monitoring server.
type Service struct {
server *p2p.Server // Peer-to-peer server to retrieve networking infos
backend backend
backend extendedBackend
engine consensus.Engine // Consensus engine to retrieve variadic block fields

node string // Name of the node to display on the monitoring page
Expand All @@ -96,6 +104,9 @@ type Service struct {

headSub event.Subscription
txSub event.Subscription

//bor related sub
chain2headSub event.Subscription
}

// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
Expand Down Expand Up @@ -168,7 +179,7 @@ func parseEthstatsURL(url string) (parts []string, err error) {
}

// New returns a monitoring service ready for stats reporting.
func New(node *node.Node, backend backend, engine consensus.Engine, url string) error {
func New(node *node.Node, backend extendedBackend, engine consensus.Engine, url string) error {
parts, err := parseEthstatsURL(url)
if err != nil {
return err
Expand All @@ -195,7 +206,9 @@ func (s *Service) Start() error {
s.headSub = s.backend.SubscribeChainHeadEvent(chainHeadCh)
txEventCh := make(chan core.NewTxsEvent, txChanSize)
s.txSub = s.backend.SubscribeNewTxsEvent(txEventCh)
go s.loop(chainHeadCh, txEventCh)
chain2HeadCh := make(chan core.Chain2HeadEvent, chain2HeadChanSize)
s.chain2headSub = s.backend.SubscribeChain2HeadEvent(chain2HeadCh)
go s.loop(chainHeadCh, chain2HeadCh, txEventCh)

log.Info("Stats daemon started")
return nil
Expand All @@ -211,12 +224,13 @@ func (s *Service) Stop() error {

// loop keeps trying to connect to the netstats server, reporting chain events
// until termination.
func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core.NewTxsEvent) {
func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, chain2HeadCh chan core.Chain2HeadEvent, txEventCh chan core.NewTxsEvent) {
// Start a goroutine that exhausts the subscriptions to avoid events piling up
var (
quitCh = make(chan struct{})
headCh = make(chan *types.Block, 1)
txCh = make(chan struct{}, 1)
quitCh = make(chan struct{})
headCh = make(chan *types.Block, 1)
txCh = make(chan struct{}, 1)
head2Ch = make(chan core.Chain2HeadEvent, 1)
)
go func() {
var lastTx mclock.AbsTime
Expand All @@ -231,6 +245,13 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
default:
}

// Notify of chain2head events, but drop if too frequent
case chain2head := <-chain2HeadCh:
select {
case head2Ch <- chain2head:
default:
}

// Notify of new transaction events, but drop if too frequent
case <-txEventCh:
if time.Duration(mclock.Now()-lastTx) < time.Second {
Expand Down Expand Up @@ -333,6 +354,12 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
if err = s.reportPending(conn); err != nil {
log.Warn("Post-block transaction stats report failed", "err", err)
}

case chain2head := <-head2Ch:
if err = s.reportChain2Head(conn, &chain2head); err != nil {
log.Warn("Reorg stats report failed", "err", err)
}

case <-txCh:
if err = s.reportPending(conn); err != nil {
log.Warn("Transaction stats report failed", "err", err)
Expand Down Expand Up @@ -750,6 +777,44 @@ func (s *Service) reportPending(conn *connWrapper) error {
return conn.WriteJSON(report)
}

type Chain2HeadStats struct {
NewChain []*blockStats
OldChain []*blockStats
Type string
}

// reportChain2Head checks for reorg and sends current head to stats server.
func (s *Service) reportChain2Head(conn *connWrapper, chain2HeadData *core.Chain2HeadEvent) error {

var chain2headStats Chain2HeadStats

// assemble new chain
for _, block := range chain2HeadData.NewChain {
chain2headStats.NewChain = append(chain2headStats.NewChain, s.assembleBlockStats(block))
}

// assemble old chain
for _, block := range chain2HeadData.OldChain {
chain2headStats.OldChain = append(chain2headStats.OldChain, s.assembleBlockStats(block))
}

chain2headStats.Type = chain2HeadData.Type

// Assemble the block report and send it to the server
log.Trace("Reorg Detected", "reorg root block number", chain2headStats.NewChain[0].Number, "block hash", chain2headStats.NewChain[0].Hash)

stats := map[string]interface{}{
"id": s.node,
"reorg root block number": chain2headStats.NewChain[0].Number,
"reorg root block hash": chain2headStats.NewChain[0].Hash,
"details": chain2headStats,
}
report := map[string][]interface{}{
"emit": {"Chain2Head", stats},
}
return conn.WriteJSON(report)
}

// nodeStats is the information to report about the local node.
type nodeStats struct {
Active bool `json:"active"`
Expand Down
2 changes: 2 additions & 0 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type Backend interface {
GetBorBlockTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
GetBorBlockTransactionWithBlockHash(ctx context.Context, txHash common.Hash, blockHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)

SubscribeChain2HeadEvent(ch chan<- core.Chain2HeadEvent) event.Subscription

ChainConfig() *params.ChainConfig
Engine() consensus.Engine
}
Expand Down
5 changes: 5 additions & 0 deletions les/bor_api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ func (b *LesApiBackend) GetRootHash(ctx context.Context, starBlockNr uint64, end
func (b *LesApiBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription {
return b.eth.blockchain.SubscribeStateSyncEvent(ch)
}

// SubscribeChain2HeadEvent subscribe head/fork/reorg events.
func (b *LesApiBackend) SubscribeChain2HeadEvent(ch chan<- core.Chain2HeadEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChain2HeadEvent(ch)
}
26 changes: 16 additions & 10 deletions light/lightchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ var (
// headers, downloading block bodies and receipts on demand through an ODR
// interface. It only does header validation during chain insertion.
type LightChain struct {
hc *core.HeaderChain
indexerConfig *IndexerConfig
chainDb ethdb.Database
engine consensus.Engine
odr OdrBackend
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
hc *core.HeaderChain
indexerConfig *IndexerConfig
chainDb ethdb.Database
engine consensus.Engine
odr OdrBackend
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
chain2HeadFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block

bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
Expand Down Expand Up @@ -561,6 +562,11 @@ func (lc *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
return lc.scope.Track(new(event.Feed).Subscribe(ch))
}

// SubscribeChain2HeadEvent registers a subscription of Reorg/head/fork events.
func (lc *LightChain) SubscribeChain2HeadEvent(ch chan<- core.Chain2HeadEvent) event.Subscription {
return lc.scope.Track(lc.chain2HeadFeed.Subscribe(ch))
}

// DisableCheckFreq disables header validation. This is used for ultralight mode.
func (lc *LightChain) DisableCheckFreq() {
atomic.StoreInt32(&lc.disableCheckFreq, 1)
Expand Down

0 comments on commit 3a67447

Please sign in to comment.