diff --git a/trie/triedb/pathdb/asyncnodebuffer.go b/trie/triedb/pathdb/asyncnodebuffer.go index c8c3921177..24b29f6593 100644 --- a/trie/triedb/pathdb/asyncnodebuffer.go +++ b/trie/triedb/pathdb/asyncnodebuffer.go @@ -21,9 +21,11 @@ var _ trienodebuffer = &asyncnodebuffer{} // asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache // to disk. type asyncnodebuffer struct { - mux sync.RWMutex - current *nodecache - background *nodecache + mux sync.RWMutex + current *nodecache + background *nodecache + stopFlushing uint64 + flushing uint64 } // newAsyncNodeBuffer initializes the async node buffer with the provided nodes. @@ -125,6 +127,10 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, a.mux.Lock() defer a.mux.Unlock() + if atomic.LoadUint64(&a.stopFlushing) == 1 { + return nil + } + if force { for { if atomic.LoadUint64(&a.background.immutable) == 1 { @@ -149,7 +155,9 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, atomic.StoreUint64(&a.current.immutable, 1) a.current, a.background = a.background, a.current + atomic.StoreUint64(&a.flushing, 1) go func(persistId uint64) { + defer atomic.StoreUint64(&a.flushing, 0) for { err := a.background.flush(db, clean, persistId) if err == nil { @@ -162,6 +170,15 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, return nil } +func (a *asyncnodebuffer) waitAndStopFlushing() { + atomic.StoreUint64(&a.stopFlushing, 1) + if atomic.LoadUint64(&a.flushing) == 1 { + time.Sleep(time.Duration(1) * time.Second) + log.Info("waiting background memory table flush to disk") + } + return +} + func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node { a.mux.Lock() defer a.mux.Unlock() diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index 59be04c744..36ff11a1a2 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -72,6 +72,9 @@ type trienodebuffer interface { // getLayers return the size of cached difflayers. getLayers() uint64 + + // waitFlushing will block unit writing the trie nodes of trienodebuffer to disk + waitAndStopFlushing() } func NewTrieNodeBuffer(sync bool, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) trienodebuffer { diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index 3c7884cdf4..4d550dd472 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -338,6 +338,10 @@ func (dl *diffLayer) journal(w io.Writer) error { // flattening everything down (bad for reorgs). And this function will mark the // database as read-only to prevent all following mutation to disk. func (db *Database) Journal(root common.Hash) error { + // Run the journaling + db.lock.Lock() + defer db.lock.Unlock() + // Retrieve the head layer to journal from. l := db.tree.get(root) if l == nil { @@ -351,10 +355,8 @@ func (db *Database) Journal(root common.Hash) error { } start := time.Now() - // Run the journaling - db.lock.Lock() - defer db.lock.Unlock() - + // wait and stop the flush trienodebuffer, for async node buffer need fixed diskroot + disk.buffer.waitAndStopFlushing() // Short circuit if the database is in read only mode. if db.readOnly { return errSnapshotReadOnly diff --git a/trie/triedb/pathdb/nodebuffer.go b/trie/triedb/pathdb/nodebuffer.go index 9d79df37ff..12caf148d6 100644 --- a/trie/triedb/pathdb/nodebuffer.go +++ b/trie/triedb/pathdb/nodebuffer.go @@ -241,6 +241,10 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui return nil } +func (b *nodebuffer) waitAndStopFlushing() { + return +} + // writeNodes writes the trie nodes into the provided database batch. // Note this function will also inject all the newly written nodes // into clean cache.