Skip to content

Commit

Permalink
feat: active pbss on bsc
Browse files Browse the repository at this point in the history
  • Loading branch information
joeylichang committed Sep 24, 2023
1 parent ce61c82 commit 7e3530d
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 45 deletions.
2 changes: 2 additions & 0 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ func pruneAllState(ctx *cli.Context) error {
}

chaindb := utils.MakeChainDatabase(ctx, stack, false, false)
defer chaindb.Close()
pruner, err := pruner.NewAllPruner(chaindb)
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
Expand All @@ -495,6 +496,7 @@ func verifyState(ctx *cli.Context) error {
defer stack.Close()

chaindb := utils.MakeChainDatabase(ctx, stack, true, false)
defer chaindb.Close()
headBlock := rawdb.ReadHeadBlock(chaindb)
if headBlock == nil {
log.Error("Failed to load head block")
Expand Down
16 changes: 14 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ type CacheConfig struct {

// triedbConfig derives the configures for trie database.
func (c *CacheConfig) triedbConfig() *trie.Config {
config := &trie.Config{Preimages: c.Preimages}
config := &trie.Config{
Cache: c.TrieCleanLimit,
Preimages: c.Preimages,
NoTries: c.NoTries,
}
if c.StateScheme == rawdb.HashScheme {
config.HashDB = &hashdb.Config{
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
Expand Down Expand Up @@ -392,6 +396,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Recover(diskRoot); err != nil {
diskRoot = common.Hash{}
log.Warn("failed to recover pathdb by snapshot, reset chain from genesis", "error", err)
}
}
} else if bc.triedb.Scheme() == rawdb.PathScheme {
_, diskRoot = rawdb.ReadAccountTrieNode(bc.db, nil)
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot)
Expand Down Expand Up @@ -875,7 +887,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
log.Crit("Failed to rollback state", "err", err) // Shouldn't happen
}
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
log.Info("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
Expand Down
3 changes: 3 additions & 0 deletions core/rawdb/ancient_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func inspectFreezers(db ethdb.Database) ([]freezerInfo, error) {
infos = append(infos, info)

case stateFreezerName:
if ReadStateScheme(db) != PathScheme {
continue
}
datadir, err := db.AncientDatadir()
if err != nil {
return nil, err
Expand Down
10 changes: 10 additions & 0 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ func (db *cachingDB) OpenStorageTrie(stateRoot common.Hash, address common.Addre
}

func (db *cachingDB) CacheAccount(root common.Hash, t Trie) {
// only the hash scheme trie db support account cache, because the path scheme trie db
// account trie bind the previous layer, touch the dirty data when next access. This is
// related to the implementation of the Reader interface of pathdb.
if db.TrieDB().Scheme() == rawdb.PathScheme {
return
}
if db.accountTrieCache == nil {
return
}
Expand All @@ -289,6 +295,10 @@ func (db *cachingDB) CacheAccount(root common.Hash, t Trie) {
}

func (db *cachingDB) CacheStorage(addrHash common.Hash, root common.Hash, t Trie) {
// ditto `CacheAccount`
if db.TrieDB().Scheme() == rawdb.PathScheme {
return
}
if db.storageTrieCache == nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (s *stateObject) commit() (*trienode.NodeSet, error) {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageCommits += time.Since(start) }(time.Now())
}
root, nodes, err := tr.Commit(false)
root, nodes, err := tr.Commit(true)
if err != nil {
return nil, err
}
Expand Down
45 changes: 31 additions & 14 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,22 @@ func (s *StateDB) Commit(block uint64, failPostCommitFunc func(), postCommitFunc
if root != types.EmptyRootHash {
s.db.CacheAccount(root, s.trie)
}

origin := s.originalRoot
if origin == (common.Hash{}) {
origin = types.EmptyRootHash
}

if root != origin {
start := time.Now()
if err := s.db.TrieDB().Update(root, origin, block, nodes, triestate.New(s.accountsOrigin, s.storagesOrigin, incomplete)); err != nil {
return err
}
s.originalRoot = root
if metrics.EnabledExpensive {
s.TrieDBCommits += time.Since(start)
}
}
}

for _, postFunc := range postCommitFuncs {
Expand Down Expand Up @@ -1705,6 +1721,7 @@ func (s *StateDB) Commit(block uint64, failPostCommitFunc func(), postCommitFunc
if s.pipeCommit {
go commmitTrie()
} else {
commitFuncs = append(commitFuncs, commmitTrie)
defer s.StopPrefetcher()
}
commitRes := make(chan error, len(commitFuncs))
Expand Down Expand Up @@ -1736,20 +1753,20 @@ func (s *StateDB) Commit(block uint64, failPostCommitFunc func(), postCommitFunc
if root == (common.Hash{}) {
root = types.EmptyRootHash
}
origin := s.originalRoot
if origin == (common.Hash{}) {
origin = types.EmptyRootHash
}
if root != origin {
start := time.Now()
if err := s.db.TrieDB().Update(root, origin, block, nodes, triestate.New(s.accountsOrigin, s.storagesOrigin, incomplete)); err != nil {
return common.Hash{}, nil, err
}
s.originalRoot = root
if metrics.EnabledExpensive {
s.TrieDBCommits += time.Since(start)
}
}
//origin := s.originalRoot
//if origin == (common.Hash{}) {
// origin = types.EmptyRootHash
//}
//if root != origin {
// start := time.Now()
// if err := s.db.TrieDB().Update(root, origin, block, nodes, triestate.New(s.accountsOrigin, s.storagesOrigin, incomplete)); err != nil {
// return common.Hash{}, nil, err
// }
// s.originalRoot = root
// if metrics.EnabledExpensive {
// s.TrieDBCommits += time.Since(start)
// }
//}
// Clear all internal flags at the end of commit operation.
s.accounts = make(map[common.Hash][]byte)
s.storages = make(map[common.Hash]map[common.Hash][]byte)
Expand Down
40 changes: 24 additions & 16 deletions eth/downloader/fetchers_concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
}()
ordering := make(map[*eth.Request]int)
timeouts := prque.New[int64, *eth.Request](func(data *eth.Request, index int) {
if index < 0 {
delete(ordering, data)
return
}
ordering[data] = index
})

Expand Down Expand Up @@ -245,14 +249,16 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
req.Close()

if index, live := ordering[req]; live {
timeouts.Remove(index)
if index == 0 {
if !timeout.Stop() {
<-timeout.C
}
if timeouts.Size() > 0 {
_, exp := timeouts.Peek()
timeout.Reset(time.Until(time.Unix(0, -exp)))
if index >= 0 && index < timeouts.Size() {
timeouts.Remove(index)
if index == 0 {
if !timeout.Stop() {
<-timeout.C
}
if timeouts.Size() > 0 {
_, exp := timeouts.Peek()
timeout.Reset(time.Until(time.Unix(0, -exp)))
}
}
}
delete(ordering, req)
Expand Down Expand Up @@ -333,14 +339,16 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
// reschedule the timeout timer.
index, live := ordering[res.Req]
if live {
timeouts.Remove(index)
if index == 0 {
if !timeout.Stop() {
<-timeout.C
}
if timeouts.Size() > 0 {
_, exp := timeouts.Peek()
timeout.Reset(time.Until(time.Unix(0, -exp)))
if index >= 0 && index < timeouts.Size() {
timeouts.Remove(index)
if index == 0 {
if !timeout.Stop() {
<-timeout.C
}
if timeouts.Size() > 0 {
_, exp := timeouts.Peek()
timeout.Reset(time.Until(time.Unix(0, -exp)))
}
}
}
delete(ordering, res.Req)
Expand Down
28 changes: 23 additions & 5 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type handler struct {

// channels for fetcher, syncer, txsyncLoop
quitSync chan struct{}
stopCh chan struct{}

chainSync *chainSyncer
wg sync.WaitGroup
Expand Down Expand Up @@ -198,6 +199,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
stopCh: make(chan struct{}),
}
if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the snap
Expand Down Expand Up @@ -365,6 +367,8 @@ func (h *handler) protoTracker() {
<-h.handlerDoneCh
}
return
case <-h.stopCh:
return
}
}
}
Expand Down Expand Up @@ -729,6 +733,8 @@ func (h *handler) startMaliciousVoteMonitor() {
h.maliciousVoteMonitor.ConflictDetect(event.Vote, pendingBlockNumber)
case <-h.voteMonitorSub.Err():
return
case <-h.stopCh:
return
}
}
}
Expand All @@ -743,7 +749,7 @@ func (h *handler) Stop() {
h.voteMonitorSub.Unsubscribe()
}
}

close(h.stopCh)
// Quit chainSync and txsync64.
// After this is done, no new peers will be accepted.
close(h.quitSync)
Expand Down Expand Up @@ -908,10 +914,18 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) {
func (h *handler) minedBroadcastLoop() {
defer h.wg.Done()

for obj := range h.minedBlockSub.Chan() {
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
h.BroadcastBlock(ev.Block, true) // First propagate block to peers
h.BroadcastBlock(ev.Block, false) // Only then announce to the rest
for {
select {
case obj := <-h.minedBlockSub.Chan():
if obj == nil {
continue
}
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
h.BroadcastBlock(ev.Block, true) // First propagate block to peers
h.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
case <-h.stopCh:
return
}
}
}
Expand All @@ -925,6 +939,8 @@ func (h *handler) txBroadcastLoop() {
h.BroadcastTransactions(event.Txs)
case <-h.txsSub.Err():
return
case <-h.stopCh:
return
}
}
}
Expand All @@ -938,6 +954,8 @@ func (h *handler) txReannounceLoop() {
h.ReannounceTransactions(event.Txs)
case <-h.reannoTxsSub.Err():
return
case <-h.stopCh:
return
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func (cs *chainSyncer) loop() {
<-cs.doneCh
}
return
case <-cs.handler.stopCh:
return
}
}
}
Expand Down
48 changes: 42 additions & 6 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package trie

import (
"errors"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
Expand All @@ -31,7 +33,8 @@ import (
// Config defines all necessary options for database.
type Config struct {
NoTries bool
Preimages bool // Flag whether the preimage of node key is recorded
Preimages bool // Flag whether the preimage of node key is recorded
Cache int
HashDB *hashdb.Config // Configs for hash-based scheme
PathDB *pathdb.Config // Configs for experimental path-based scheme

Expand Down Expand Up @@ -104,8 +107,22 @@ func prepare(diskdb ethdb.Database, config *Config) *Database {
// the legacy hash-based scheme is used by default.
func NewDatabase(diskdb ethdb.Database, config *Config) *Database {
// Sanitize the config and use the default one if it's not specified.
dbScheme := rawdb.ReadStateScheme(diskdb)
if config == nil {
config = HashDefaults
if dbScheme == rawdb.PathScheme {
config = &Config{
PathDB: pathdb.Defaults,
}
} else {
config = HashDefaults
}
}
if config.PathDB == nil && config.HashDB == nil {
if dbScheme == rawdb.PathScheme {
config.PathDB = pathdb.Defaults
} else {
config.HashDB = hashdb.Defaults
}
}
var preimages *preimageStore
if config.Preimages {
Expand All @@ -116,14 +133,33 @@ func NewDatabase(diskdb ethdb.Database, config *Config) *Database {
diskdb: diskdb,
preimages: preimages,
}
if config.HashDB != nil && config.PathDB != nil {
log.Crit("Both 'hash' and 'path' mode are configured")
}
if config.PathDB != nil {
/*
* 1. First, initialize db according to the user config
* 2. Second, initialize the db according to the scheme already used by db
* 3. Last, use the default scheme, namely hash scheme
*/
if config.HashDB != nil {
if rawdb.ReadStateScheme(diskdb) == rawdb.PathScheme {
log.Warn("incompatible state scheme", "old", rawdb.PathScheme, "new", rawdb.HashScheme)
}
db.backend = hashdb.New(diskdb, config.HashDB, mptResolver{})
} else if config.PathDB != nil {
if rawdb.ReadStateScheme(diskdb) == rawdb.HashScheme {
log.Warn("incompatible state scheme", "old", rawdb.HashScheme, "new", rawdb.PathScheme)
}
db.backend = pathdb.New(diskdb, config.PathDB)
} else if strings.Compare(dbScheme, rawdb.PathScheme) == 0 {
if config.PathDB == nil {
config.PathDB = pathdb.Defaults
}
db.backend = pathdb.New(diskdb, config.PathDB)
} else {
if config.HashDB == nil {
config.HashDB = hashdb.Defaults
}
db.backend = hashdb.New(diskdb, config.HashDB, mptResolver{})
}
log.Info("succeed to init trie db", "scheme", db.Scheme())
return db
}

Expand Down
Loading

0 comments on commit 7e3530d

Please sign in to comment.