Skip to content

Commit

Permalink
core, eth, ethstats: simplify chain head events (#30601)
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe authored Oct 16, 2024
1 parent 15bf90e commit 368e16f
Show file tree
Hide file tree
Showing 16 changed files with 48 additions and 173 deletions.
24 changes: 9 additions & 15 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ type BlockChain struct {
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
Expand Down Expand Up @@ -571,15 +570,14 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
// Send chain head event to update the transaction pool
header := bc.CurrentBlock()
block := bc.GetBlock(header.Hash(), header.Number.Uint64())
if block == nil {
if block := bc.GetBlock(header.Hash(), header.Number.Uint64()); block == nil {
// This should never happen. In practice, previously currentBlock
// contained the entire block whereas now only a "marker", so there
// is an ever so slight chance for a race we should handle.
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash())
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash().Bytes()[:4])
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: header})
return nil
}

Expand All @@ -593,15 +591,14 @@ func (bc *BlockChain) SetHeadWithTimestamp(timestamp uint64) error {
}
// Send chain head event to update the transaction pool
header := bc.CurrentBlock()
block := bc.GetBlock(header.Hash(), header.Number.Uint64())
if block == nil {
if block := bc.GetBlock(header.Hash(), header.Number.Uint64()); block == nil {
// This should never happen. In practice, previously currentBlock
// contained the entire block whereas now only a "marker", so there
// is an ever so slight chance for a race we should handle.
log.Error("Current block not found in database", "block", header.Number, "hash", header.Hash())
return fmt.Errorf("current block missing: #%d [%x..]", header.Number, header.Hash().Bytes()[:4])
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: header})
return nil
}

Expand Down Expand Up @@ -1552,7 +1549,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
// Set new head.
bc.writeHeadBlock(block)

bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
Expand All @@ -1562,7 +1559,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
// we will fire an accumulated ChainHeadEvent and disable fire
// event here.
if emitHeadEvent {
bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()})
}
return CanonStatTy, nil
}
Expand Down Expand Up @@ -1627,7 +1624,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
// Fire a single chain head event if we've progressed the chain
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: lastCanon.Header()})
}
}()
// Start the parallel header verifier
Expand Down Expand Up @@ -2328,9 +2325,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
// Deleted logs + blocks:
var deletedLogs []*types.Log
for i := len(oldChain) - 1; i >= 0; i-- {
// Also send event for blocks removed from the canon chain.
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})

// Collect deleted logs for notification
if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
Expand Down Expand Up @@ -2403,11 +2397,11 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {

// Emit events
logs := bc.collectLogs(head, false)
bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs})
bc.chainFeed.Send(ChainEvent{Header: head.Header()})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: head})
bc.chainHeadFeed.Send(ChainHeadEvent{Header: head.Header()})

context := []interface{}{
"number", head.Number(),
Expand Down
5 changes: 0 additions & 5 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,6 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
}

// SubscribeLogsEvent registers a subscription of []*types.Log.
func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return bc.scope.Track(bc.logsFeed.Subscribe(ch))
Expand Down
79 changes: 0 additions & 79 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,85 +1332,6 @@ func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan Re
}
}

func TestReorgSideEvent(t *testing.T) {
testReorgSideEvent(t, rawdb.HashScheme)
testReorgSideEvent(t, rawdb.PathScheme)
}

func testReorgSideEvent(t *testing.T, scheme string) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
gspec = &Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000000)}},
}
signer = types.LatestSigner(gspec.Config)
)
blockchain, _ := NewBlockChain(rawdb.NewMemoryDatabase(), DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer blockchain.Stop()

_, chain, _ := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 3, func(i int, gen *BlockGen) {})
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}

_, replacementBlocks, _ := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 4, func(i int, gen *BlockGen) {
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, nil), signer, key1)
if i == 2 {
gen.OffsetTime(-9)
}
if err != nil {
t.Fatalf("failed to create tx: %v", err)
}
gen.AddTx(tx)
})
chainSideCh := make(chan ChainSideEvent, 64)
blockchain.SubscribeChainSideEvent(chainSideCh)
if _, err := blockchain.InsertChain(replacementBlocks); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}

expectedSideHashes := map[common.Hash]bool{
chain[0].Hash(): true,
chain[1].Hash(): true,
chain[2].Hash(): true,
}

i := 0

const timeoutDura = 10 * time.Second
timeout := time.NewTimer(timeoutDura)
done:
for {
select {
case ev := <-chainSideCh:
block := ev.Block
if _, ok := expectedSideHashes[block.Hash()]; !ok {
t.Errorf("%d: didn't expect %x to be in side chain", i, block.Hash())
}
i++

if i == len(expectedSideHashes) {
timeout.Stop()

break done
}
timeout.Reset(timeoutDura)

case <-timeout.C:
t.Fatalf("Timeout. Possibly not all blocks were triggered for sideevent: %v", i)
}
}

// make sure no more events are fired
select {
case e := <-chainSideCh:
t.Errorf("unexpected event fired: %v", e)
case <-time.After(250 * time.Millisecond):
}
}

// Tests if the canonical block can be fetched from the database during chain insertion.
func TestCanonicalBlockRetrieval(t *testing.T) {
testCanonicalBlockRetrieval(t, rawdb.HashScheme)
Expand Down
9 changes: 4 additions & 5 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,19 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainH
errc <- nil
return
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
if ev.Header.ParentHash != prevHash {
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?

if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash {
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, ev.Header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
}
}
c.newHead(header.Number.Uint64(), false)
c.newHead(ev.Header.Number.Uint64(), false)

prevHeader, prevHash = header, header.Hash()
prevHeader, prevHash = ev.Header, ev.Header.Hash()
}
}
}
Expand Down
14 changes: 3 additions & 11 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,19 @@
package core

import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }

type ChainEvent struct {
Block *types.Block
Hash common.Hash
Logs []*types.Log
Header *types.Header
}

type ChainSideEvent struct {
Block *types.Block
type ChainHeadEvent struct {
Header *types.Header
}

type ChainHeadEvent struct{ Block *types.Block }
4 changes: 2 additions & 2 deletions core/txindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
if done == nil {
stop = make(chan struct{})
done = make(chan struct{})
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done)
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Header.Number.Uint64(), stop, done)
}
lastHead = head.Block.NumberU64()
lastHead = head.Header.Number.Uint64()
case <-done:
stop = nil
done = nil
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
select {
case event := <-newHeadCh:
// Chain moved forward, store the head for later consumption
newHead = event.Block.Header()
newHead = event.Header

case head := <-resetDone:
// Previous reset finished, update the old head and allow a new reset
Expand Down
4 changes: 0 additions & 4 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,6 @@ func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) e
return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
}

func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChainSideEvent(ch)
}

func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
Expand Down
17 changes: 9 additions & 8 deletions eth/catalyst/simulated_beacon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,16 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
timer := time.NewTimer(12 * time.Second)
for {
select {
case evt := <-chainHeadCh:
for _, includedTx := range evt.Block.Transactions() {
case ev := <-chainHeadCh:
block := ethService.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
for _, includedTx := range block.Transactions() {
includedTxs[includedTx.Hash()] = struct{}{}
}
for _, includedWithdrawal := range evt.Block.Withdrawals() {
for _, includedWithdrawal := range block.Withdrawals() {
includedWithdrawals = append(includedWithdrawals, includedWithdrawal.Index)
}

// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) && evt.Block.Number().Cmp(big.NewInt(2)) == 0 {
if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) && ev.Header.Number.Cmp(big.NewInt(2)) == 0 {
return
}
case <-timer.C:
Expand Down Expand Up @@ -186,11 +186,12 @@ func TestOnDemandSpam(t *testing.T) {
)
for {
select {
case evt := <-chainHeadCh:
for _, itx := range evt.Block.Transactions() {
case ev := <-chainHeadCh:
block := eth.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
for _, itx := range block.Transactions() {
includedTxs[itx.Hash()] = struct{}{}
}
for _, iwx := range evt.Block.Withdrawals() {
for _, iwx := range block.Withdrawals() {
includedWxs = append(includedWxs, iwx.Index)
}
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
Expand Down
2 changes: 1 addition & 1 deletion eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent)

func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
f.headers <- ev.Header
}
}

Expand Down
10 changes: 5 additions & 5 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestBlockSubscription(t *testing.T) {
)

for _, blk := range chain {
chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
chainEvents = append(chainEvents, core.ChainEvent{Header: blk.Header()})
}

chan0 := make(chan *types.Header)
Expand All @@ -213,13 +213,13 @@ func TestBlockSubscription(t *testing.T) {
for i1 != len(chainEvents) || i2 != len(chainEvents) {
select {
case header := <-chan0:
if chainEvents[i1].Hash != header.Hash() {
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash())
if chainEvents[i1].Header.Hash() != header.Hash() {
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Header.Hash(), header.Hash())
}
i1++
case header := <-chan1:
if chainEvents[i2].Hash != header.Hash() {
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash())
if chainEvents[i2].Header.Hash() != header.Hash() {
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Header.Hash(), header.Hash())
}
i2++
}
Expand Down
4 changes: 2 additions & 2 deletions eth/gasprice/gasprice.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ func NewOracle(backend OracleBackend, params Config, startPrice *big.Int) *Oracl
go func() {
var lastHead common.Hash
for ev := range headEvent {
if ev.Block.ParentHash() != lastHead {
if ev.Header.ParentHash != lastHead {
cache.Purge()
}
lastHead = ev.Block.Hash()
lastHead = ev.Header.Hash()
}
}()

Expand Down
Loading

0 comments on commit 368e16f

Please sign in to comment.