-
Notifications
You must be signed in to change notification settings - Fork 205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent parallel snapshots #4404
Changes from all commits
296f7f9
08bb77b
b1c21ab
5b0ac6f
09612ca
0ddd82b
8e84a1d
fd20734
ed4d26e
731530c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ import ( | |
"time" | ||
|
||
"github.com/ElrondNetwork/elrond-go-core/core" | ||
"github.com/ElrondNetwork/elrond-go-core/core/atomic" | ||
"github.com/ElrondNetwork/elrond-go-core/core/check" | ||
"github.com/ElrondNetwork/elrond-go-core/hashing" | ||
"github.com/ElrondNetwork/elrond-go-core/marshal" | ||
|
@@ -74,10 +75,11 @@ type AccountsDB struct { | |
storagePruningManager StoragePruningManager | ||
obsoleteDataTrieHashes map[string][][]byte | ||
|
||
lastSnapshot *snapshotInfo | ||
lastRootHash []byte | ||
dataTries common.TriesHolder | ||
entries []JournalEntry | ||
isSnapshotInProgress atomic.Flag | ||
lastSnapshot *snapshotInfo | ||
lastRootHash []byte | ||
dataTries common.TriesHolder | ||
entries []JournalEntry | ||
// TODO use mutOp only for critical sections, and refactor to parallelize as much as possible | ||
mutOp sync.RWMutex | ||
processingMode common.NodeProcessingMode | ||
|
@@ -109,7 +111,20 @@ func NewAccountsDB(args ArgsAccountsDB) (*AccountsDB, error) { | |
return nil, err | ||
} | ||
|
||
adb := &AccountsDB{ | ||
adb := createAccountsDb(args) | ||
|
||
trieStorageManager := adb.mainTrie.GetStorageManager() | ||
val, err := trieStorageManager.GetFromCurrentEpoch([]byte(common.ActiveDBKey)) | ||
isActiveDBVal := bytes.Equal(val, []byte(common.ActiveDBVal)) | ||
if err != nil || !isActiveDBVal { | ||
startSnapshotAfterRestart(adb, args) | ||
} | ||
|
||
return adb, nil | ||
} | ||
|
||
func createAccountsDb(args ArgsAccountsDB) *AccountsDB { | ||
return &AccountsDB{ | ||
mainTrie: args.Trie, | ||
hasher: args.Hasher, | ||
marshaller: args.Marshaller, | ||
|
@@ -126,15 +141,8 @@ func NewAccountsDB(args ArgsAccountsDB) (*AccountsDB, error) { | |
shouldSerializeSnapshots: args.ShouldSerializeSnapshots, | ||
lastSnapshot: &snapshotInfo{}, | ||
processStatusHandler: args.ProcessStatusHandler, | ||
isSnapshotInProgress: atomic.Flag{}, | ||
} | ||
|
||
trieStorageManager := adb.mainTrie.GetStorageManager() | ||
val, err := trieStorageManager.GetFromCurrentEpoch([]byte(common.ActiveDBKey)) | ||
if err != nil || !bytes.Equal(val, []byte(common.ActiveDBVal)) { | ||
startSnapshotAfterRestart(adb, args) | ||
} | ||
|
||
return adb, nil | ||
} | ||
|
||
func checkArgsAccountsDB(args ArgsAccountsDB) error { | ||
|
@@ -1008,14 +1016,11 @@ func (adb *AccountsDB) RecreateAllTries(rootHash []byte) (map[string]common.Trie | |
return nil, err | ||
} | ||
|
||
recreatedTrie, err := adb.mainTrie.Recreate(rootHash) | ||
allTries, err := adb.recreateMainTrie(rootHash) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not having any great idea |
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
allTries := make(map[string]common.Trie) | ||
allTries[string(rootHash)] = recreatedTrie | ||
|
||
for leaf := range leavesChannel { | ||
account := &userAccount{} | ||
err = adb.marshaller.Unmarshal(account, leaf.Value()) | ||
|
@@ -1037,6 +1042,18 @@ func (adb *AccountsDB) RecreateAllTries(rootHash []byte) (map[string]common.Trie | |
return allTries, nil | ||
} | ||
|
||
func (adb *AccountsDB) recreateMainTrie(rootHash []byte) (map[string]common.Trie, error) { | ||
recreatedTrie, err := adb.mainTrie.Recreate(rootHash) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
allTries := make(map[string]common.Trie) | ||
allTries[string(rootHash)] = recreatedTrie | ||
|
||
return allTries, nil | ||
} | ||
|
||
// GetTrie returns the trie that has the given rootHash | ||
func (adb *AccountsDB) GetTrie(rootHash []byte) (common.Trie, error) { | ||
adb.mutOp.Lock() | ||
|
@@ -1092,53 +1109,85 @@ func (adb *AccountsDB) SnapshotState(rootHash []byte) { | |
adb.mutOp.Lock() | ||
defer adb.mutOp.Unlock() | ||
|
||
trieStorageManager := adb.mainTrie.GetStorageManager() | ||
epoch, err := trieStorageManager.GetLatestStorageEpoch() | ||
if err != nil { | ||
log.Error("snapshotState error", "err", err.Error()) | ||
trieStorageManager, epoch, shouldTakeSnapshot := adb.prepareSnapshot(rootHash) | ||
if !shouldTakeSnapshot { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps log a message that we won't take the snapshot yet? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a log message inside prepareSnapshot:
|
||
return | ||
} | ||
|
||
snapshotAlreadyTaken := bytes.Equal(adb.lastSnapshot.rootHash, rootHash) && adb.lastSnapshot.epoch == epoch | ||
if !trieStorageManager.ShouldTakeSnapshot() || snapshotAlreadyTaken { | ||
log.Info("starting snapshot user trie", "rootHash", rootHash, "epoch", epoch) | ||
errChan := make(chan error, 1) | ||
stats := newSnapshotStatistics(1) | ||
go func() { | ||
leavesChannel := make(chan core.KeyValueHolder, leavesChannelSize) | ||
stats.NewSnapshotStarted() | ||
trieStorageManager.TakeSnapshot(rootHash, rootHash, leavesChannel, errChan, stats, epoch) | ||
adb.snapshotUserAccountDataTrie(true, rootHash, leavesChannel, errChan, stats, epoch) | ||
trieStorageManager.ExitPruningBufferingMode() | ||
|
||
stats.wg.Done() | ||
}() | ||
|
||
go adb.processSnapshotCompletion(stats, errChan, rootHash, "snapshotState user trie", epoch) | ||
|
||
adb.waitForCompletionIfAppropriate(stats) | ||
} | ||
|
||
func (adb *AccountsDB) prepareSnapshot(rootHash []byte) (common.StorageManager, uint32, bool) { | ||
trieStorageManager, epoch, err := adb.getTrieStorageManagerAndLatestEpoch() | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps also log the results of this call (e.g. the returned epoch?) - for debugging purposes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is logged. If we skip the snapshot, we also log the epoch. If we take the snapshot, we log the epoch on "starting snapshot..." print |
||
log.Error("snapshot user state error", "err", err.Error()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be considered a fatal error (e.g. stop node)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There might be a chance that the next snapshot will succeed, so the node can continue processing. What do you say @iulianpascalau ? |
||
return nil, 0, false | ||
} | ||
|
||
if !adb.shouldTakeSnapshot(trieStorageManager, rootHash, epoch) { | ||
log.Debug("skipping snapshot", | ||
"last snapshot rootHash", adb.lastSnapshot.rootHash, | ||
"rootHash", rootHash, | ||
"last snapshot epoch", adb.lastSnapshot.epoch, | ||
"epoch", epoch, | ||
"isSnapshotInProgress", adb.isSnapshotInProgress.IsSet(), | ||
) | ||
return | ||
return nil, 0, false | ||
} | ||
|
||
log.Info("starting snapshot", "rootHash", rootHash, "epoch", epoch) | ||
|
||
adb.isSnapshotInProgress.SetValue(true) | ||
adb.lastSnapshot.rootHash = rootHash | ||
adb.lastSnapshot.epoch = epoch | ||
err = trieStorageManager.Put([]byte(lastSnapshotStarted), rootHash) | ||
handleLoggingWhenError("could not set lastSnapshotStarted", err, "rootHash", rootHash) | ||
|
||
trieStorageManager.EnterPruningBufferingMode() | ||
|
||
errChan := make(chan error, 1) | ||
stats := newSnapshotStatistics(1) | ||
go func() { | ||
leavesChannel := make(chan core.KeyValueHolder, leavesChannelSize) | ||
stats.NewSnapshotStarted() | ||
trieStorageManager.TakeSnapshot(rootHash, rootHash, leavesChannel, errChan, stats, epoch) | ||
adb.snapshotUserAccountDataTrie(true, rootHash, leavesChannel, errChan, stats, epoch) | ||
trieStorageManager.ExitPruningBufferingMode() | ||
return trieStorageManager, epoch, true | ||
} | ||
|
||
stats.wg.Done() | ||
}() | ||
func (adb *AccountsDB) getTrieStorageManagerAndLatestEpoch() (common.StorageManager, uint32, error) { | ||
trieStorageManager := adb.mainTrie.GetStorageManager() | ||
epoch, err := trieStorageManager.GetLatestStorageEpoch() | ||
if err != nil { | ||
return nil, 0, fmt.Errorf("%w while getting the latest storage epoch", err) | ||
} | ||
|
||
go adb.markActiveDBAfterSnapshot(stats, errChan, rootHash, "snapshotState user trie", epoch) | ||
return trieStorageManager, epoch, nil | ||
} | ||
|
||
adb.waitForCompletionIfAppropriate(stats) | ||
func (adb *AccountsDB) shouldTakeSnapshot(trieStorageManager common.StorageManager, rootHash []byte, epoch uint32) bool { | ||
snapshotAlreadyTaken := bytes.Equal(adb.lastSnapshot.rootHash, rootHash) && adb.lastSnapshot.epoch == epoch | ||
if snapshotAlreadyTaken { | ||
return false | ||
} | ||
|
||
if adb.isSnapshotInProgress.IsSet() { | ||
return false | ||
} | ||
|
||
return trieStorageManager.ShouldTakeSnapshot() | ||
} | ||
|
||
func (adb *AccountsDB) markActiveDBAfterSnapshot(stats *snapshotStatistics, errChan chan error, rootHash []byte, message string, epoch uint32) { | ||
func (adb *AccountsDB) processSnapshotCompletion(stats *snapshotStatistics, errChan chan error, rootHash []byte, message string, epoch uint32) { | ||
stats.PrintStats(message, rootHash) | ||
|
||
defer adb.isSnapshotInProgress.Reset() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. Renamed to |
||
|
||
trieStorageManager := adb.mainTrie.GetStorageManager() | ||
containsErrorDuringSnapshot := emptyErrChanReturningHadContained(errChan) | ||
shouldNotMarkActive := trieStorageManager.IsClosed() || containsErrorDuringSnapshot | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,6 @@ package state | |
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/ElrondNetwork/elrond-go/common" | ||
) | ||
|
@@ -21,22 +19,7 @@ func NewPeerAccountsDB(args ArgsAccountsDB) (*PeerAccountsDB, error) { | |
} | ||
|
||
adb := &PeerAccountsDB{ | ||
&AccountsDB{ | ||
mainTrie: args.Trie, | ||
hasher: args.Hasher, | ||
marshaller: args.Marshaller, | ||
accountFactory: args.AccountFactory, | ||
entries: make([]JournalEntry, 0), | ||
dataTries: NewDataTriesHolder(), | ||
mutOp: sync.RWMutex{}, | ||
loadCodeMeasurements: &loadingMeasurements{ | ||
identifier: "load code", | ||
}, | ||
storagePruningManager: args.StoragePruningManager, | ||
processingMode: args.ProcessingMode, | ||
lastSnapshot: &snapshotInfo{}, | ||
processStatusHandler: args.ProcessStatusHandler, | ||
}, | ||
AccountsDB: createAccountsDb(args), | ||
} | ||
|
||
trieStorageManager := adb.mainTrie.GetStorageManager() | ||
|
@@ -60,48 +43,24 @@ func (adb *PeerAccountsDB) MarkSnapshotDone() { | |
handleLoggingWhenError("error while putting active DB value into main storer", err) | ||
} | ||
|
||
func (adb *PeerAccountsDB) getTrieStorageManagerAndLatestEpoch() (common.StorageManager, uint32, error) { | ||
trieStorageManager := adb.mainTrie.GetStorageManager() | ||
epoch, err := trieStorageManager.GetLatestStorageEpoch() | ||
if err != nil { | ||
return nil, 0, fmt.Errorf("%w while getting the latest storage epoch", err) | ||
} | ||
|
||
return trieStorageManager, epoch, nil | ||
} | ||
|
||
// SnapshotState triggers the snapshotting process of the state trie | ||
func (adb *PeerAccountsDB) SnapshotState(rootHash []byte) { | ||
log.Trace("peerAccountsDB.SnapshotState", "root hash", rootHash) | ||
trieStorageManager, epoch, err := adb.getTrieStorageManagerAndLatestEpoch() | ||
if err != nil { | ||
log.Error("SnapshotState error", "err", err.Error()) | ||
return | ||
} | ||
adb.mutOp.Lock() | ||
defer adb.mutOp.Unlock() | ||
|
||
if !trieStorageManager.ShouldTakeSnapshot() { | ||
log.Debug("skipping snapshot for rootHash", "hash", rootHash) | ||
trieStorageManager, epoch, shouldTakeSnapshot := adb.prepareSnapshot(rootHash) | ||
if !shouldTakeSnapshot { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps log a message for not taking the snapshot? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is logged inside |
||
return | ||
} | ||
|
||
log.Info("starting snapshot", "rootHash", rootHash, "epoch", epoch) | ||
|
||
adb.lastSnapshot.rootHash = rootHash | ||
adb.lastSnapshot.epoch = epoch | ||
err = trieStorageManager.Put([]byte(lastSnapshotStarted), rootHash) | ||
if err != nil { | ||
log.Warn("could not set lastSnapshotStarted", "err", err, "rootHash", rootHash) | ||
} | ||
|
||
log.Info("starting snapshot peer trie", "rootHash", rootHash, "epoch", epoch) | ||
errChan := make(chan error, 1) | ||
stats := newSnapshotStatistics(0) | ||
|
||
trieStorageManager.EnterPruningBufferingMode() | ||
stats.NewSnapshotStarted() | ||
errChan := make(chan error, 1) | ||
trieStorageManager.TakeSnapshot(rootHash, rootHash, nil, errChan, stats, epoch) | ||
trieStorageManager.ExitPruningBufferingMode() | ||
|
||
go adb.markActiveDBAfterSnapshot(stats, errChan, rootHash, "snapshotState peer trie", epoch) | ||
go adb.processSnapshotCompletion(stats, errChan, rootHash, "snapshotState peer trie", epoch) | ||
|
||
adb.waitForCompletionIfAppropriate(stats) | ||
} | ||
|
@@ -128,15 +87,7 @@ func (adb *PeerAccountsDB) SetStateCheckpoint(rootHash []byte) { | |
|
||
// RecreateAllTries recreates all the tries from the accounts DB | ||
func (adb *PeerAccountsDB) RecreateAllTries(rootHash []byte) (map[string]common.Trie, error) { | ||
recreatedTrie, err := adb.mainTrie.Recreate(rootHash) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
allTries := make(map[string]common.Trie) | ||
allTries[string(rootHash)] = recreatedTrie | ||
|
||
return allTries, nil | ||
return adb.recreateMainTrie(rootHash) | ||
} | ||
|
||
// IsInterfaceNil returns true if there is no value under the interface | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A separate goroutine will be launched due to calling
startSnapshotAfterRestart
, which is correct ✅Ideally, we wouldn't call such a function in a constructor (
NewAccountsDB
), but can be left as it is for the moment (it was the same before).Also, optionally, extract
bytes.Equal(val, []byte(common.ActiveDBVal)
to a separate variable.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted in a separate variable.
startSnapshotAfterRestart
is moved from the constructor in #4367