Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent parallel snapshots #4404

Merged
merged 10 commits into from
Sep 14, 2022
129 changes: 89 additions & 40 deletions state/accountsDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/core/atomic"
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go-core/hashing"
"github.com/ElrondNetwork/elrond-go-core/marshal"
Expand Down Expand Up @@ -74,10 +75,11 @@ type AccountsDB struct {
storagePruningManager StoragePruningManager
obsoleteDataTrieHashes map[string][][]byte

lastSnapshot *snapshotInfo
lastRootHash []byte
dataTries common.TriesHolder
entries []JournalEntry
isSnapshotInProgress atomic.Flag
lastSnapshot *snapshotInfo
lastRootHash []byte
dataTries common.TriesHolder
entries []JournalEntry
// TODO use mutOp only for critical sections, and refactor to parallelize as much as possible
mutOp sync.RWMutex
processingMode common.NodeProcessingMode
Expand Down Expand Up @@ -107,7 +109,20 @@ func NewAccountsDB(args ArgsAccountsDB) (*AccountsDB, error) {
return nil, err
}

adb := &AccountsDB{
adb := createAccountsDb(args)

trieStorageManager := adb.mainTrie.GetStorageManager()
val, err := trieStorageManager.GetFromCurrentEpoch([]byte(common.ActiveDBKey))
isActiveDBVal := bytes.Equal(val, []byte(common.ActiveDBVal))
if err != nil || !isActiveDBVal {
startSnapshotAfterRestart(adb, args)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A separate goroutine will be launched due to calling startSnapshotAfterRestart, which is correct ✅

Ideally, we wouldn't call such a function in a constructor (NewAccountsDB), but can be left as it is for the moment (it was the same before).

Also, optionally, extract bytes.Equal(val, []byte(common.ActiveDBVal) to a separate variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted in a separate variable.

startSnapshotAfterRestart is moved from the constructor in #4367

}

return adb, nil
}

func createAccountsDb(args ArgsAccountsDB) *AccountsDB {
return &AccountsDB{
mainTrie: args.Trie,
hasher: args.Hasher,
marshaller: args.Marshaller,
Expand All @@ -123,15 +138,8 @@ func NewAccountsDB(args ArgsAccountsDB) (*AccountsDB, error) {
processingMode: args.ProcessingMode,
lastSnapshot: &snapshotInfo{},
processStatusHandler: args.ProcessStatusHandler,
isSnapshotInProgress: atomic.Flag{},
}

trieStorageManager := adb.mainTrie.GetStorageManager()
val, err := trieStorageManager.GetFromCurrentEpoch([]byte(common.ActiveDBKey))
if err != nil || !bytes.Equal(val, []byte(common.ActiveDBVal)) {
startSnapshotAfterRestart(adb, args)
}

return adb, nil
}

func checkArgsAccountsDB(args ArgsAccountsDB) error {
Expand Down Expand Up @@ -1005,14 +1013,11 @@ func (adb *AccountsDB) RecreateAllTries(rootHash []byte) (map[string]common.Trie
return nil, err
}

recreatedTrie, err := adb.mainTrie.Recreate(rootHash)
allTries, err := adb.recreateMainTrie(rootHash)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recreateMainTrie returns allTries - is not obvious from the name. Is there a better name for recreateMainTrie?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not having any great idea

if err != nil {
return nil, err
}

allTries := make(map[string]common.Trie)
allTries[string(rootHash)] = recreatedTrie

for leaf := range leavesChannel {
account := &userAccount{}
err = adb.marshaller.Unmarshal(account, leaf.Value())
Expand All @@ -1034,6 +1039,18 @@ func (adb *AccountsDB) RecreateAllTries(rootHash []byte) (map[string]common.Trie
return allTries, nil
}

func (adb *AccountsDB) recreateMainTrie(rootHash []byte) (map[string]common.Trie, error) {
recreatedTrie, err := adb.mainTrie.Recreate(rootHash)
if err != nil {
return nil, err
}

allTries := make(map[string]common.Trie)
allTries[string(rootHash)] = recreatedTrie

return allTries, nil
}

// GetTrie returns the trie that has the given rootHash
func (adb *AccountsDB) GetTrie(rootHash []byte) (common.Trie, error) {
adb.mutOp.Lock()
Expand Down Expand Up @@ -1089,53 +1106,85 @@ func (adb *AccountsDB) SnapshotState(rootHash []byte) {
adb.mutOp.Lock()
defer adb.mutOp.Unlock()

trieStorageManager := adb.mainTrie.GetStorageManager()
epoch, err := trieStorageManager.GetLatestStorageEpoch()
if err != nil {
log.Error("snapshotState error", "err", err.Error())
trieStorageManager, epoch, shouldTakeSnapshot := adb.prepareSnapshot(rootHash)
if !shouldTakeSnapshot {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps log a message that we won't take the snapshot yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a log message inside prepareSnapshot:

log.Debug("skipping snapshot",
			"last snapshot rootHash", adb.lastSnapshot.rootHash,
			"rootHash", rootHash,
			"last snapshot epoch", adb.lastSnapshot.epoch,
			"epoch", epoch,
			"isSnapshotInProgress", adb.isSnapshotInProgress.IsSet(),
		)

return
}

snapshotAlreadyTaken := bytes.Equal(adb.lastSnapshot.rootHash, rootHash) && adb.lastSnapshot.epoch == epoch
if !trieStorageManager.ShouldTakeSnapshot() || snapshotAlreadyTaken {
log.Info("starting snapshot user trie", "rootHash", rootHash, "epoch", epoch)
errChan := make(chan error, 1)
stats := newSnapshotStatistics(1)
go func() {
leavesChannel := make(chan core.KeyValueHolder, leavesChannelSize)
stats.NewSnapshotStarted()
trieStorageManager.TakeSnapshot(rootHash, rootHash, leavesChannel, errChan, stats, epoch)
adb.snapshotUserAccountDataTrie(true, rootHash, leavesChannel, errChan, stats, epoch)
trieStorageManager.ExitPruningBufferingMode()

stats.wg.Done()
}()

go adb.processSnapshotCompletion(stats, errChan, rootHash, "snapshotState user trie", epoch)

adb.waitForCompletionIfRunningInImportDB(stats)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, in the future, we can also waitForCompletionIfHeavilySyncing() (when out of sync). From my understanding of the logs (if correct), that would greatly reduce the snapshot time. Does this make sense?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. import db finishes snapshot in 2-3 minutes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshot process is started on a goroutine. During import-db, epochs are processed much faster than an actual epoch time, so there is a chance that an epoch is processed faster than it will take the snapshot to finish. adb.waitForCompletionIfRunningInImportDB(stats) is added to assure that the processing will wait for the snapshot to finish.

}

func (adb *AccountsDB) prepareSnapshot(rootHash []byte) (common.StorageManager, uint32, bool) {
trieStorageManager, epoch, err := adb.getTrieStorageManagerAndLatestEpoch()
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps also log the results of this call (e.g. the returned epoch?) - for debugging purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is logged. If we skip the snapshot, we also log the epoch. If we take the snapshot, we log the epoch on "starting snapshot..." print

log.Error("snapshot user state error", "err", err.Error())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be considered a fatal error (e.g. stop node)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a chance that the next snapshot will succeed, so the node can continue processing. What do you say @iulianpascalau ?

return nil, 0, false
}

if !adb.shouldTakeSnapshot(trieStorageManager, rootHash, epoch) {
log.Debug("skipping snapshot",
"last snapshot rootHash", adb.lastSnapshot.rootHash,
"rootHash", rootHash,
"last snapshot epoch", adb.lastSnapshot.epoch,
"epoch", epoch,
"isSnapshotInProgress", adb.isSnapshotInProgress.IsSet(),
)
return
return nil, 0, false
}

log.Info("starting snapshot", "rootHash", rootHash, "epoch", epoch)

adb.isSnapshotInProgress.SetValue(true)
adb.lastSnapshot.rootHash = rootHash
adb.lastSnapshot.epoch = epoch
err = trieStorageManager.Put([]byte(lastSnapshotStarted), rootHash)
handleLoggingWhenError("could not set lastSnapshotStarted", err, "rootHash", rootHash)

trieStorageManager.EnterPruningBufferingMode()

errChan := make(chan error, 1)
stats := newSnapshotStatistics(1)
go func() {
leavesChannel := make(chan core.KeyValueHolder, leavesChannelSize)
stats.NewSnapshotStarted()
trieStorageManager.TakeSnapshot(rootHash, rootHash, leavesChannel, errChan, stats, epoch)
adb.snapshotUserAccountDataTrie(true, rootHash, leavesChannel, errChan, stats, epoch)
trieStorageManager.ExitPruningBufferingMode()
return trieStorageManager, epoch, true
}

stats.wg.Done()
}()
func (adb *AccountsDB) getTrieStorageManagerAndLatestEpoch() (common.StorageManager, uint32, error) {
trieStorageManager := adb.mainTrie.GetStorageManager()
epoch, err := trieStorageManager.GetLatestStorageEpoch()
if err != nil {
return nil, 0, fmt.Errorf("%w while getting the latest storage epoch", err)
}

go adb.markActiveDBAfterSnapshot(stats, errChan, rootHash, "snapshotState user trie", epoch)
return trieStorageManager, epoch, nil
}

adb.waitForCompletionIfRunningInImportDB(stats)
func (adb *AccountsDB) shouldTakeSnapshot(trieStorageManager common.StorageManager, rootHash []byte, epoch uint32) bool {
snapshotAlreadyTaken := bytes.Equal(adb.lastSnapshot.rootHash, rootHash) && adb.lastSnapshot.epoch == epoch
if snapshotAlreadyTaken {
return false
}

if adb.isSnapshotInProgress.IsSet() {
return false
}

return trieStorageManager.ShouldTakeSnapshot()
}

func (adb *AccountsDB) markActiveDBAfterSnapshot(stats *snapshotStatistics, errChan chan error, rootHash []byte, message string, epoch uint32) {
func (adb *AccountsDB) processSnapshotCompletion(stats *snapshotStatistics, errChan chan error, rootHash []byte, message string, epoch uint32) {
stats.PrintStats(message, rootHash)

defer adb.isSnapshotInProgress.Reset()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the function markActiveDBAfterSnapshot() is now more like a doStuffOnSnapshotCompletion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Renamed to processSnapshotCompletion. I look forward to a better naming idea 😅


trieStorageManager := adb.mainTrie.GetStorageManager()
containsErrorDuringSnapshot := emptyErrChanReturningHadContained(errChan)
shouldNotMarkActive := trieStorageManager.IsClosed() || containsErrorDuringSnapshot
Expand Down
39 changes: 38 additions & 1 deletion state/accountsDB_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,11 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) {
GetLatestStorageEpochCalled: func() (uint32, error) {
return latestEpoch, nil
},
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ chan error, _ common.SnapshotStatisticsHandler, _ uint32) {
TakeSnapshotCalled: func(_ []byte, _ []byte, leavesChan chan core.KeyValueHolder, _ chan error, stats common.SnapshotStatisticsHandler, _ uint32) {
snapshotMutex.Lock()
takeSnapshotCalled++
close(leavesChan)
stats.SnapshotFinished()
snapshotMutex.Unlock()
},
}
Expand Down Expand Up @@ -1129,6 +1131,41 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) {
snapshotMutex.Unlock()
}

func TestAccountsDB_SnapshotStateSkipSnapshotIfSnapshotInProgress(t *testing.T) {
t.Parallel()

rootHashes := [][]byte{[]byte("rootHash1"), []byte("rootHash2"), []byte("rootHash3"), []byte("rootHash4")}
latestEpoch := uint32(0)
snapshotMutex := sync.RWMutex{}
takeSnapshotCalled := 0
trieStub := &trieMock.TrieStub{
GetStorageManagerCalled: func() common.StorageManager {
return &testscommon.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
return latestEpoch, nil
},
TakeSnapshotCalled: func(_ []byte, _ []byte, leavesChan chan core.KeyValueHolder, _ chan error, stats common.SnapshotStatisticsHandler, _ uint32) {
snapshotMutex.Lock()
takeSnapshotCalled++
close(leavesChan)
stats.SnapshotFinished()
snapshotMutex.Unlock()
},
}
},
}
adb := generateAccountDBFromTrie(trieStub)
waitForOpToFinish := time.Millisecond * 100

for _, rootHash := range rootHashes {
adb.SnapshotState(rootHash)
}
time.Sleep(waitForOpToFinish)
snapshotMutex.Lock()
assert.Equal(t, 1, takeSnapshotCalled)
snapshotMutex.Unlock()
}

func TestAccountsDB_SetStateCheckpointWithDataTries(t *testing.T) {
t.Parallel()

Expand Down
67 changes: 9 additions & 58 deletions state/peerAccountsDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package state

import (
"bytes"
"fmt"
"sync"

"github.com/ElrondNetwork/elrond-go/common"
)
Expand All @@ -21,22 +19,7 @@ func NewPeerAccountsDB(args ArgsAccountsDB) (*PeerAccountsDB, error) {
}

adb := &PeerAccountsDB{
&AccountsDB{
mainTrie: args.Trie,
hasher: args.Hasher,
marshaller: args.Marshaller,
accountFactory: args.AccountFactory,
entries: make([]JournalEntry, 0),
dataTries: NewDataTriesHolder(),
mutOp: sync.RWMutex{},
loadCodeMeasurements: &loadingMeasurements{
identifier: "load code",
},
storagePruningManager: args.StoragePruningManager,
processingMode: args.ProcessingMode,
lastSnapshot: &snapshotInfo{},
processStatusHandler: args.ProcessStatusHandler,
},
AccountsDB: createAccountsDb(args),
}

trieStorageManager := adb.mainTrie.GetStorageManager()
Expand All @@ -60,48 +43,24 @@ func (adb *PeerAccountsDB) MarkSnapshotDone() {
handleLoggingWhenError("error while putting active DB value into main storer", err)
}

func (adb *PeerAccountsDB) getTrieStorageManagerAndLatestEpoch() (common.StorageManager, uint32, error) {
trieStorageManager := adb.mainTrie.GetStorageManager()
epoch, err := trieStorageManager.GetLatestStorageEpoch()
if err != nil {
return nil, 0, fmt.Errorf("%w while getting the latest storage epoch", err)
}

return trieStorageManager, epoch, nil
}

// SnapshotState triggers the snapshotting process of the state trie
func (adb *PeerAccountsDB) SnapshotState(rootHash []byte) {
log.Trace("peerAccountsDB.SnapshotState", "root hash", rootHash)
trieStorageManager, epoch, err := adb.getTrieStorageManagerAndLatestEpoch()
if err != nil {
log.Error("SnapshotState error", "err", err.Error())
return
}
adb.mutOp.Lock()
defer adb.mutOp.Unlock()

if !trieStorageManager.ShouldTakeSnapshot() {
log.Debug("skipping snapshot for rootHash", "hash", rootHash)
trieStorageManager, epoch, shouldTakeSnapshot := adb.prepareSnapshot(rootHash)
if !shouldTakeSnapshot {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps log a message for not taking the snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is logged inside prepareSnapshot

return
}

log.Info("starting snapshot", "rootHash", rootHash, "epoch", epoch)

adb.lastSnapshot.rootHash = rootHash
adb.lastSnapshot.epoch = epoch
err = trieStorageManager.Put([]byte(lastSnapshotStarted), rootHash)
if err != nil {
log.Warn("could not set lastSnapshotStarted", "err", err, "rootHash", rootHash)
}

log.Info("starting snapshot peer trie", "rootHash", rootHash, "epoch", epoch)
errChan := make(chan error, 1)
stats := newSnapshotStatistics(0)

trieStorageManager.EnterPruningBufferingMode()
stats.NewSnapshotStarted()
errChan := make(chan error, 1)
trieStorageManager.TakeSnapshot(rootHash, rootHash, nil, errChan, stats, epoch)
trieStorageManager.ExitPruningBufferingMode()

go adb.markActiveDBAfterSnapshot(stats, errChan, rootHash, "snapshotState peer trie", epoch)
go adb.processSnapshotCompletion(stats, errChan, rootHash, "snapshotState peer trie", epoch)

adb.waitForCompletionIfRunningInImportDB(stats)
}
Expand All @@ -128,15 +87,7 @@ func (adb *PeerAccountsDB) SetStateCheckpoint(rootHash []byte) {

// RecreateAllTries recreates all the tries from the accounts DB
func (adb *PeerAccountsDB) RecreateAllTries(rootHash []byte) (map[string]common.Trie, error) {
recreatedTrie, err := adb.mainTrie.Recreate(rootHash)
if err != nil {
return nil, err
}

allTries := make(map[string]common.Trie)
allTries[string(rootHash)] = recreatedTrie

return allTries, nil
return adb.recreateMainTrie(rootHash)
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down