Skip to content

Commit

Permalink
triedb/pathdb: fix async node buffer diskroot mismatches when journaling
Browse files Browse the repository at this point in the history
  • Loading branch information
joeylichang authored and VM committed Dec 19, 2023
1 parent e44de3a commit 72d8c83
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 7 deletions.
23 changes: 20 additions & 3 deletions trie/triedb/pathdb/asyncnodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ var _ trienodebuffer = &asyncnodebuffer{}
// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache
// to disk.
type asyncnodebuffer struct {
mux sync.RWMutex
current *nodecache
background *nodecache
mux sync.RWMutex
current *nodecache
background *nodecache
stopFlushing uint64
flushing uint64
}

// newAsyncNodeBuffer initializes the async node buffer with the provided nodes.
Expand Down Expand Up @@ -125,6 +127,10 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
a.mux.Lock()
defer a.mux.Unlock()

if atomic.LoadUint64(&a.stopFlushing) == 1 {
return nil
}

if force {
for {
if atomic.LoadUint64(&a.background.immutable) == 1 {
Expand All @@ -149,7 +155,9 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
atomic.StoreUint64(&a.current.immutable, 1)
a.current, a.background = a.background, a.current

atomic.StoreUint64(&a.flushing, 1)
go func(persistId uint64) {
defer atomic.StoreUint64(&a.flushing, 0)
for {
err := a.background.flush(db, clean, persistId)
if err == nil {
Expand All @@ -162,6 +170,15 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
return nil
}

func (a *asyncnodebuffer) waitAndStopFlushing() {
atomic.StoreUint64(&a.stopFlushing, 1)
if atomic.LoadUint64(&a.flushing) == 1 {
time.Sleep(time.Duration(1) * time.Second)
log.Info("waiting background memory table flush to disk")
}
return
}

func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node {
a.mux.Lock()
defer a.mux.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type trienodebuffer interface {

// getLayers return the size of cached difflayers.
getLayers() uint64

// waitFlushing will block unit writing the trie nodes of trienodebuffer to disk
waitAndStopFlushing()
}

func NewTrieNodeBuffer(sync bool, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) trienodebuffer {
Expand Down
10 changes: 6 additions & 4 deletions trie/triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ func (dl *diffLayer) journal(w io.Writer) error {
// flattening everything down (bad for reorgs). And this function will mark the
// database as read-only to prevent all following mutation to disk.
func (db *Database) Journal(root common.Hash) error {
// Run the journaling
db.lock.Lock()
defer db.lock.Unlock()

// Retrieve the head layer to journal from.
l := db.tree.get(root)
if l == nil {
Expand All @@ -351,10 +355,8 @@ func (db *Database) Journal(root common.Hash) error {
}
start := time.Now()

// Run the journaling
db.lock.Lock()
defer db.lock.Unlock()

// wait and stop the flush trienodebuffer, for async node buffer need fixed diskroot
disk.buffer.waitAndStopFlushing()
// Short circuit if the database is in read only mode.
if db.readOnly {
return errSnapshotReadOnly
Expand Down
4 changes: 4 additions & 0 deletions trie/triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
return nil
}

func (b *nodebuffer) waitAndStopFlushing() {
return
}

// writeNodes writes the trie nodes into the provided database batch.
// Note this function will also inject all the newly written nodes
// into clean cache.
Expand Down

0 comments on commit 72d8c83

Please sign in to comment.