From 3028163a0b8fefc756b3f6d2175207510e348556 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 8 May 2023 12:33:24 +0300 Subject: [PATCH 01/21] pass iterator channels to syncer --- state/syncer/baseAccountsSyncer.go | 2 + state/syncer/userAccountsSyncer.go | 72 ++++++++++++++----------- state/syncer/validatorAccountsSyncer.go | 7 ++- trie/depthFirstSync.go | 16 +++++- trie/sync.go | 1 + 5 files changed, 66 insertions(+), 32 deletions(-) diff --git a/state/syncer/baseAccountsSyncer.go b/state/syncer/baseAccountsSyncer.go index 18d28fc3370..c942832cc0a 100644 --- a/state/syncer/baseAccountsSyncer.go +++ b/state/syncer/baseAccountsSyncer.go @@ -93,6 +93,7 @@ func (b *baseAccountsSyncer) syncMainTrie( rootHash []byte, trieTopic string, ctx context.Context, + accLeavesChan *common.TrieIteratorChannels, ) (common.Trie, error) { b.rootHash = rootHash atomic.AddInt32(&b.numMaxTries, 1) @@ -116,6 +117,7 @@ func (b *baseAccountsSyncer) syncMainTrie( TimeoutHandler: b.timeoutHandler, MaxHardCapForMissingNodes: b.maxHardCapForMissingNodes, CheckNodesOnDisk: b.checkNodesOnDisk, + AccLeavesChannels: accLeavesChan, } trieSyncer, err := trie.CreateTrieSyncer(arg, b.trieSyncerVersion) if err != nil { diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 5f992eef9d9..a5d8cd58e96 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -2,6 +2,7 @@ package syncer import ( "context" + "encoding/hex" "fmt" "sort" "sync" @@ -17,7 +18,6 @@ import ( "github.com/multiversx/mx-chain-go/process/factory" "github.com/multiversx/mx-chain-go/state" "github.com/multiversx/mx-chain-go/trie" - "github.com/multiversx/mx-chain-go/trie/keyBuilder" logger "github.com/multiversx/mx-chain-logger-go" ) @@ -119,23 +119,42 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { go u.printStatisticsAndUpdateMetrics(ctx) - mainTrie, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx) - if err != nil { - return err + leavesChannels := &common.TrieIteratorChannels{ + LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), + ErrChan: errChan.NewErrChanWrapper(), } - defer func() { - _ = mainTrie.Close() - }() + wgSync := &sync.WaitGroup{} + wgSync.Add(1) - log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries)) + go func() { + mainTrie, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels) + if err != nil { + log.Error("syncMainTrie:", "error", err.Error()) + leavesChannels.ErrChan.WriteInChanNonBlocking(err) + } - err = u.syncAccountDataTries(mainTrie, ctx) - if err != nil { - return err - } + defer func() { + _ = mainTrie.Close() + }() + + log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries)) + + u.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager()) + + wgSync.Done() + }() + + go func() { + err := u.syncAccountDataTries(leavesChannels, ctx) + if err != nil { + log.Error("syncAccountDataTries:", "error", err.Error()) + return + } + }() - u.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager()) + wgSync.Wait() + //u.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager()) return nil } @@ -151,6 +170,11 @@ func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx c u.dataTries[string(rootHash)] = struct{}{} u.syncerMutex.Unlock() + iteratorChannelsForDataTries := &common.TrieIteratorChannels{ + LeavesChan: nil, + ErrChan: nil, + } + arg := trie.ArgTrieSyncer{ RequestHandler: u.requestHandler, InterceptedNodes: u.cacher, @@ -163,6 +187,7 @@ func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx c TimeoutHandler: u.timeoutHandler, MaxHardCapForMissingNodes: u.maxHardCapForMissingNodes, CheckNodesOnDisk: u.checkNodesOnDisk, + AccLeavesChannels: iteratorChannelsForDataTries, } trieSyncer, err := trie.CreateTrieSyncer(arg, u.trieSyncerVersion) if err != nil { @@ -202,34 +227,21 @@ func (u *userAccountsSyncer) updateDataTrieStatistics(trieSyncer trie.TrieSyncer } func (u *userAccountsSyncer) syncAccountDataTries( - mainTrie common.Trie, + leavesChannels *common.TrieIteratorChannels, ctx context.Context, ) error { defer u.printDataTrieStatistics() - mainRootHash, err := mainTrie.RootHash() - if err != nil { - return err - } - - leavesChannels := &common.TrieIteratorChannels{ - LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), - ErrChan: errChan.NewErrChanWrapper(), - } - err = mainTrie.GetAllLeavesOnChannel(leavesChannels, context.Background(), mainRootHash, keyBuilder.NewDisabledKeyBuilder()) - if err != nil { - return err - } - var errFound error errMutex := sync.Mutex{} wg := sync.WaitGroup{} for leaf := range leavesChannels.LeavesChan { + log.Trace("syncAccountDataTries:", "leaf key", hex.EncodeToString(leaf.Key())) u.resetTimeoutHandlerWatchdog() account := state.NewEmptyUserAccount() - err = u.marshalizer.Unmarshal(account, leaf.Value()) + err := u.marshalizer.Unmarshal(account, leaf.Value()) if err != nil { log.Trace("this must be a leaf with code", "err", err) continue @@ -266,7 +278,7 @@ func (u *userAccountsSyncer) syncAccountDataTries( wg.Wait() - err = leavesChannels.ErrChan.ReadFromChanNonBlocking() + err := leavesChannels.ErrChan.ReadFromChanNonBlocking() if err != nil { return err } diff --git a/state/syncer/validatorAccountsSyncer.go b/state/syncer/validatorAccountsSyncer.go index 34b87d1eb78..ce4328a6be8 100644 --- a/state/syncer/validatorAccountsSyncer.go +++ b/state/syncer/validatorAccountsSyncer.go @@ -5,6 +5,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/epochStart" "github.com/multiversx/mx-chain-go/process/factory" "github.com/multiversx/mx-chain-go/trie/statistics" @@ -75,7 +76,11 @@ func (v *validatorAccountsSyncer) SyncAccounts(rootHash []byte) error { go v.printStatisticsAndUpdateMetrics(ctx) - mainTrie, err := v.syncMainTrie(rootHash, factory.ValidatorTrieNodesTopic, ctx) + leavesChannels := &common.TrieIteratorChannels{ + LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), + ErrChan: errChan.NewErrChanWrapper(), + } + mainTrie, err := v.syncMainTrie(rootHash, factory.ValidatorTrieNodesTopic, ctx, leavesChannels) if err != nil { return err } diff --git a/trie/depthFirstSync.go b/trie/depthFirstSync.go index 5f2d088fc7d..1c5ca7c8b73 100644 --- a/trie/depthFirstSync.go +++ b/trie/depthFirstSync.go @@ -2,10 +2,12 @@ package trie import ( "context" + "encoding/hex" "sync" "time" "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/core/keyValStorage" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/common" @@ -32,6 +34,7 @@ type depthFirstTrieSyncer struct { checkNodesOnDisk bool nodes *trieNodesHandler requestedHashes map[string]*request + accLeavesChannels *common.TrieIteratorChannels } // NewDepthFirstTrieSyncer creates a new instance of trieSyncer that uses the depth-first algorithm @@ -59,6 +62,7 @@ func NewDepthFirstTrieSyncer(arg ArgTrieSyncer) (*depthFirstTrieSyncer, error) { timeoutHandler: arg.TimeoutHandler, maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, checkNodesOnDisk: arg.CheckNodesOnDisk, + accLeavesChannels: arg.AccLeavesChannels, } return d, nil @@ -258,7 +262,7 @@ func (d *depthFirstTrieSyncer) storeTrieNode(element node) error { func (d *depthFirstTrieSyncer) storeLeaves(children []node) ([]node, error) { childrenNotLeaves := make([]node, 0, len(children)) for _, element := range children { - _, isLeaf := element.(*leafNode) + leafNodeElement, isLeaf := element.(*leafNode) if !isLeaf { childrenNotLeaves = append(childrenNotLeaves, element) continue @@ -268,6 +272,16 @@ func (d *depthFirstTrieSyncer) storeLeaves(children []node) ([]node, error) { if err != nil { return nil, err } + + log.Trace("storeLeaves: found leaf node", "leafNodeElement.Key", hex.EncodeToString(leafNodeElement.Key)) + + trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) + // TODO: analize error chan + if d.accLeavesChannels.LeavesChan != nil { + d.accLeavesChannels.LeavesChan <- trieLeaf + } + + log.Trace("storeLeaves: found leaf node - DONE", "leafNodeElement.Key", hex.EncodeToString(leafNodeElement.Key)) } return childrenNotLeaves, nil diff --git a/trie/sync.go b/trie/sync.go index 465ebf71a99..e1a3dfef445 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -58,6 +58,7 @@ type ArgTrieSyncer struct { MaxHardCapForMissingNodes int CheckNodesOnDisk bool TimeoutHandler TimeoutHandler + AccLeavesChannels *common.TrieIteratorChannels } // NewTrieSyncer creates a new instance of trieSyncer From acc9cc22c59afa5255eba9d9a911214f9043f3a2 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 9 May 2023 15:01:08 +0300 Subject: [PATCH 02/21] add more goroutines and channels control on sync finish --- state/syncer/userAccountsSyncer.go | 39 ++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index a5d8cd58e96..51849488bbc 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -124,8 +124,12 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { ErrChan: errChan.NewErrChanWrapper(), } - wgSync := &sync.WaitGroup{} - wgSync.Add(1) + wgSyncMainTrie := &sync.WaitGroup{} + wgSyncMainTrie.Add(1) + wgSyncDatatries := &sync.WaitGroup{} + wgSyncDatatries.Add(1) + + mainTreeChan := make(chan common.Trie) go func() { mainTrie, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels) @@ -134,15 +138,17 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { leavesChannels.ErrChan.WriteInChanNonBlocking(err) } - defer func() { - _ = mainTrie.Close() - }() - log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries)) - u.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager()) + mainTreeChan <- mainTrie + + log.Debug("syncMainTrie goroutine: closing leaver channel") - wgSync.Done() + if leavesChannels.LeavesChan != nil { + close(leavesChannels.LeavesChan) + } + + wgSyncMainTrie.Done() }() go func() { @@ -151,10 +157,21 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { log.Error("syncAccountDataTries:", "error", err.Error()) return } + + wgSyncDatatries.Done() }() - wgSync.Wait() - //u.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager()) + mainTrie := <-mainTreeChan + defer func() { + _ = mainTrie.Close() + }() + + log.Debug("StartSyncing: wait for goroutines to finish") + + wgSyncMainTrie.Wait() + wgSyncDatatries.Wait() + + u.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager()) return nil } @@ -267,6 +284,7 @@ func (u *userAccountsSyncer) syncAccountDataTries( newErr := u.syncDataTrie(trieRootHash, address, ctx) if newErr != nil { errMutex.Lock() + log.Error("syncDataTrie: error found", "error", errFound.Error()) errFound = newErr errMutex.Unlock() } @@ -276,6 +294,7 @@ func (u *userAccountsSyncer) syncAccountDataTries( }(account.RootHash, account.Address) } + log.Debug("syncDataTrie: wait for goroutines to finish") wg.Wait() err := leavesChannels.ErrChan.ReadFromChanNonBlocking() From 0bca902df7642b65e2795ef48cf8169e7bea0102 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 10 May 2023 11:15:26 +0300 Subject: [PATCH 03/21] upadte maintrie usage --- state/syncer/userAccountsSyncer.go | 17 ++++------------- state/syncer/validatorAccountsSyncer.go | 2 +- trie/depthFirstSync.go | 6 ++++-- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 51849488bbc..ed196e52214 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -129,18 +129,14 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { wgSyncDatatries := &sync.WaitGroup{} wgSyncDatatries.Add(1) - mainTreeChan := make(chan common.Trie) - go func() { - mainTrie, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels) + _, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels) if err != nil { log.Error("syncMainTrie:", "error", err.Error()) leavesChannels.ErrChan.WriteInChanNonBlocking(err) } - log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries)) - - mainTreeChan <- mainTrie + //log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries)) log.Debug("syncMainTrie goroutine: closing leaver channel") @@ -161,17 +157,12 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { wgSyncDatatries.Done() }() - mainTrie := <-mainTreeChan - defer func() { - _ = mainTrie.Close() - }() - log.Debug("StartSyncing: wait for goroutines to finish") wgSyncMainTrie.Wait() wgSyncDatatries.Wait() - u.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager()) + u.storageMarker.MarkStorerAsSyncedAndActive(u.trieStorageManager) return nil } @@ -260,7 +251,7 @@ func (u *userAccountsSyncer) syncAccountDataTries( account := state.NewEmptyUserAccount() err := u.marshalizer.Unmarshal(account, leaf.Value()) if err != nil { - log.Trace("this must be a leaf with code", "err", err) + log.Trace("this must be a leaf with code", "leaf key", hex.EncodeToString(leaf.Key()), "err", err) continue } diff --git a/state/syncer/validatorAccountsSyncer.go b/state/syncer/validatorAccountsSyncer.go index ce4328a6be8..6f2ea0e2e52 100644 --- a/state/syncer/validatorAccountsSyncer.go +++ b/state/syncer/validatorAccountsSyncer.go @@ -77,7 +77,7 @@ func (v *validatorAccountsSyncer) SyncAccounts(rootHash []byte) error { go v.printStatisticsAndUpdateMetrics(ctx) leavesChannels := &common.TrieIteratorChannels{ - LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), + LeavesChan: nil, ErrChan: errChan.NewErrChanWrapper(), } mainTrie, err := v.syncMainTrie(rootHash, factory.ValidatorTrieNodesTopic, ctx, leavesChannels) diff --git a/trie/depthFirstSync.go b/trie/depthFirstSync.go index 1c5ca7c8b73..7c2a7d7ce8f 100644 --- a/trie/depthFirstSync.go +++ b/trie/depthFirstSync.go @@ -277,9 +277,11 @@ func (d *depthFirstTrieSyncer) storeLeaves(children []node) ([]node, error) { trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) // TODO: analize error chan - if d.accLeavesChannels.LeavesChan != nil { - d.accLeavesChannels.LeavesChan <- trieLeaf + if d.accLeavesChannels.LeavesChan == nil { + log.Trace("storeLeaves: nil leaves chan", "leafNodeElement.Key", hex.EncodeToString(leafNodeElement.Key)) + continue } + d.accLeavesChannels.LeavesChan <- trieLeaf log.Trace("storeLeaves: found leaf node - DONE", "leafNodeElement.Key", hex.EncodeToString(leafNodeElement.Key)) } From 9a29aa737ea9518374bb2da5ee41d285e96178f5 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 10 May 2023 15:34:01 +0300 Subject: [PATCH 04/21] more logs for debugging --- state/syncer/userAccountsSyncer.go | 20 ++++++++------------ trie/depthFirstSync.go | 7 +++++++ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index ed196e52214..428d6404dca 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -126,8 +126,6 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { wgSyncMainTrie := &sync.WaitGroup{} wgSyncMainTrie.Add(1) - wgSyncDatatries := &sync.WaitGroup{} - wgSyncDatatries.Add(1) go func() { _, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels) @@ -147,20 +145,15 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { wgSyncMainTrie.Done() }() - go func() { - err := u.syncAccountDataTries(leavesChannels, ctx) - if err != nil { - log.Error("syncAccountDataTries:", "error", err.Error()) - return - } - - wgSyncDatatries.Done() - }() + err := u.syncAccountDataTries(leavesChannels, ctx) + if err != nil { + log.Error("syncAccountDataTries:", "error", err.Error()) + return err + } log.Debug("StartSyncing: wait for goroutines to finish") wgSyncMainTrie.Wait() - wgSyncDatatries.Wait() u.storageMarker.MarkStorerAsSyncedAndActive(u.trieStorageManager) @@ -244,7 +237,9 @@ func (u *userAccountsSyncer) syncAccountDataTries( errMutex := sync.Mutex{} wg := sync.WaitGroup{} + numInterations := 0 for leaf := range leavesChannels.LeavesChan { + numInterations++ log.Trace("syncAccountDataTries:", "leaf key", hex.EncodeToString(leaf.Key())) u.resetTimeoutHandlerWatchdog() @@ -285,6 +280,7 @@ func (u *userAccountsSyncer) syncAccountDataTries( }(account.RootHash, account.Address) } + log.Debug("syncDataTrie: num leaves chan interations", "count", numInterations) log.Debug("syncDataTrie: wait for goroutines to finish") wg.Wait() diff --git a/trie/depthFirstSync.go b/trie/depthFirstSync.go index 7c2a7d7ce8f..18e3d624557 100644 --- a/trie/depthFirstSync.go +++ b/trie/depthFirstSync.go @@ -35,6 +35,7 @@ type depthFirstTrieSyncer struct { nodes *trieNodesHandler requestedHashes map[string]*request accLeavesChannels *common.TrieIteratorChannels + numLeavesStoreIt int } // NewDepthFirstTrieSyncer creates a new instance of trieSyncer that uses the depth-first algorithm @@ -63,6 +64,7 @@ func NewDepthFirstTrieSyncer(arg ArgTrieSyncer) (*depthFirstTrieSyncer, error) { maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, checkNodesOnDisk: arg.CheckNodesOnDisk, accLeavesChannels: arg.AccLeavesChannels, + numLeavesStoreIt: 0, } return d, nil @@ -92,6 +94,9 @@ func (d *depthFirstTrieSyncer) StartSyncing(rootHash []byte, ctx context.Context timeStart := time.Now() defer func() { d.setSyncDuration(time.Since(timeStart)) + if d.accLeavesChannels.LeavesChan != nil { + log.Debug("StartSyncing: numLeavesStoreIt", "count", d.numLeavesStoreIt) + } }() for { @@ -273,6 +278,8 @@ func (d *depthFirstTrieSyncer) storeLeaves(children []node) ([]node, error) { return nil, err } + d.numLeavesStoreIt++ + log.Trace("storeLeaves: found leaf node", "leafNodeElement.Key", hex.EncodeToString(leafNodeElement.Key)) trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) From e506c955d462657e8d95d97a0405cf13087fe10e Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 10 May 2023 16:17:18 +0300 Subject: [PATCH 05/21] mutex clenaup + nit account check --- state/syncer/userAccountsSyncer.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 428d6404dca..fea4914f552 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -2,7 +2,6 @@ package syncer import ( "context" - "encoding/hex" "fmt" "sort" "sync" @@ -39,7 +38,6 @@ type stats struct { type userAccountsSyncer struct { *baseAccountsSyncer throttler data.GoRoutineThrottler - syncerMutex sync.Mutex pubkeyCoverter core.PubkeyConverter mutStatistics sync.RWMutex @@ -134,8 +132,6 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { leavesChannels.ErrChan.WriteInChanNonBlocking(err) } - //log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries)) - log.Debug("syncMainTrie goroutine: closing leaver channel") if leavesChannels.LeavesChan != nil { @@ -151,8 +147,6 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { return err } - log.Debug("StartSyncing: wait for goroutines to finish") - wgSyncMainTrie.Wait() u.storageMarker.MarkStorerAsSyncedAndActive(u.trieStorageManager) @@ -161,15 +155,15 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { } func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx context.Context) error { - u.syncerMutex.Lock() + u.mutex.Lock() _, ok := u.dataTries[string(rootHash)] if ok { - u.syncerMutex.Unlock() + u.mutex.Unlock() return nil } u.dataTries[string(rootHash)] = struct{}{} - u.syncerMutex.Unlock() + u.mutex.Unlock() iteratorChannelsForDataTries := &common.TrieIteratorChannels{ LeavesChan: nil, @@ -240,17 +234,17 @@ func (u *userAccountsSyncer) syncAccountDataTries( numInterations := 0 for leaf := range leavesChannels.LeavesChan { numInterations++ - log.Trace("syncAccountDataTries:", "leaf key", hex.EncodeToString(leaf.Key())) + log.Trace("syncAccountDataTries:", "leaf key", leaf.Key()) u.resetTimeoutHandlerWatchdog() account := state.NewEmptyUserAccount() err := u.marshalizer.Unmarshal(account, leaf.Value()) if err != nil { - log.Trace("this must be a leaf with code", "leaf key", hex.EncodeToString(leaf.Key()), "err", err) + log.Trace("this must be a leaf with code", "leaf key", leaf.Key(), "err", err) continue } - if len(account.RootHash) == 0 { + if common.IsEmptyTrie(account.RootHash) { continue } @@ -281,7 +275,6 @@ func (u *userAccountsSyncer) syncAccountDataTries( } log.Debug("syncDataTrie: num leaves chan interations", "count", numInterations) - log.Debug("syncDataTrie: wait for goroutines to finish") wg.Wait() err := leavesChannels.ErrChan.ReadFromChanNonBlocking() From 4d737ba529c3383afdf24512e229a6406dcf98c7 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 10 May 2023 16:21:03 +0300 Subject: [PATCH 06/21] mutex protation and cleanup --- state/syncer/baseAccountsSyncer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/state/syncer/baseAccountsSyncer.go b/state/syncer/baseAccountsSyncer.go index c942832cc0a..70c24c3d0a0 100644 --- a/state/syncer/baseAccountsSyncer.go +++ b/state/syncer/baseAccountsSyncer.go @@ -27,7 +27,6 @@ type baseAccountsSyncer struct { timeoutHandler trie.TimeoutHandler shardId uint32 cacher storage.Cacher - rootHash []byte maxTrieLevelInMemory uint name string maxHardCapForMissingNodes int @@ -95,7 +94,6 @@ func (b *baseAccountsSyncer) syncMainTrie( ctx context.Context, accLeavesChan *common.TrieIteratorChannels, ) (common.Trie, error) { - b.rootHash = rootHash atomic.AddInt32(&b.numMaxTries, 1) log.Trace("syncing main trie", "roothash", rootHash) @@ -104,7 +102,9 @@ func (b *baseAccountsSyncer) syncMainTrie( return nil, err } + b.mutex.Lock() b.dataTries[string(rootHash)] = struct{}{} + b.mutex.Unlock() arg := trie.ArgTrieSyncer{ RequestHandler: b.requestHandler, InterceptedNodes: b.cacher, From 38c26aee33e50d412b9df970a4f6b51c66c4aa96 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 10 May 2023 17:03:43 +0300 Subject: [PATCH 07/21] remove roothash nil vars --- state/syncer/userAccountsSyncer.go | 1 - state/syncer/validatorAccountsSyncer.go | 1 - 2 files changed, 2 deletions(-) diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index fea4914f552..fed0c273078 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -81,7 +81,6 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer, timeoutHandler: timeoutHandler, shardId: args.ShardId, cacher: args.Cacher, - rootHash: nil, maxTrieLevelInMemory: args.MaxTrieLevelInMemory, name: fmt.Sprintf("user accounts for shard %s", core.GetShardIDString(args.ShardId)), maxHardCapForMissingNodes: args.MaxHardCapForMissingNodes, diff --git a/state/syncer/validatorAccountsSyncer.go b/state/syncer/validatorAccountsSyncer.go index 6f2ea0e2e52..6538959758b 100644 --- a/state/syncer/validatorAccountsSyncer.go +++ b/state/syncer/validatorAccountsSyncer.go @@ -43,7 +43,6 @@ func NewValidatorAccountsSyncer(args ArgsNewValidatorAccountsSyncer) (*validator timeoutHandler: timeoutHandler, shardId: core.MetachainShardId, cacher: args.Cacher, - rootHash: nil, maxTrieLevelInMemory: args.MaxTrieLevelInMemory, name: "peer accounts", maxHardCapForMissingNodes: args.MaxHardCapForMissingNodes, From 91448e299407511cc55d317b55093ba1617b4c2a Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 10 May 2023 17:18:36 +0300 Subject: [PATCH 08/21] fix mutes for syncer --- state/syncer/baseAccountsSyncer.go | 2 -- state/syncer/userAccountsSyncer.go | 7 ++++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/state/syncer/baseAccountsSyncer.go b/state/syncer/baseAccountsSyncer.go index 70c24c3d0a0..d29a3a1dec2 100644 --- a/state/syncer/baseAccountsSyncer.go +++ b/state/syncer/baseAccountsSyncer.go @@ -102,9 +102,7 @@ func (b *baseAccountsSyncer) syncMainTrie( return nil, err } - b.mutex.Lock() b.dataTries[string(rootHash)] = struct{}{} - b.mutex.Unlock() arg := trie.ArgTrieSyncer{ RequestHandler: b.requestHandler, InterceptedNodes: b.cacher, diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index fed0c273078..1257b34f260 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -38,6 +38,7 @@ type stats struct { type userAccountsSyncer struct { *baseAccountsSyncer throttler data.GoRoutineThrottler + syncerMutex sync.Mutex pubkeyCoverter core.PubkeyConverter mutStatistics sync.RWMutex @@ -154,15 +155,15 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { } func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx context.Context) error { - u.mutex.Lock() + u.syncerMutex.Lock() _, ok := u.dataTries[string(rootHash)] if ok { - u.mutex.Unlock() + u.syncerMutex.Unlock() return nil } u.dataTries[string(rootHash)] = struct{}{} - u.mutex.Unlock() + u.syncerMutex.Unlock() iteratorChannelsForDataTries := &common.TrieIteratorChannels{ LeavesChan: nil, From 50cad3bda6326f82b1004939860e3941bbb637dc Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 11 May 2023 19:00:57 +0300 Subject: [PATCH 09/21] get leaves also from storeTrieNode --- trie/depthFirstSync.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/trie/depthFirstSync.go b/trie/depthFirstSync.go index 18e3d624557..6b7e20e7478 100644 --- a/trie/depthFirstSync.go +++ b/trie/depthFirstSync.go @@ -261,6 +261,13 @@ func (d *depthFirstTrieSyncer) storeTrieNode(element node) error { d.trieSyncStatistics.AddNumBytesReceived(uint64(numBytes)) d.updateStats(uint64(numBytes), element) + leafNodeElement, isLeaf := element.(*leafNode) + if isLeaf && d.accLeavesChannels.LeavesChan != nil { + log.Debug("storeTrieNode: found leaf node") + trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) + d.accLeavesChannels.LeavesChan <- trieLeaf + } + return nil } From 4e258229d8e57abce7e253372a6b6a1bfacbab6f Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 15 May 2023 17:15:08 +0300 Subject: [PATCH 10/21] add unit tests for account leaves chan --- trie/depthFirstSync_test.go | 22 ++++ trie/sync.go | 9 ++ trie/sync_test.go | 206 ++++++++++++++++++++++-------------- 3 files changed, 160 insertions(+), 77 deletions(-) diff --git a/trie/depthFirstSync_test.go b/trie/depthFirstSync_test.go index 6ace7fbdb3f..42227dea6f9 100644 --- a/trie/depthFirstSync_test.go +++ b/trie/depthFirstSync_test.go @@ -4,12 +4,14 @@ import ( "bytes" "context" "fmt" + "sync" "testing" "time" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/errors" "github.com/multiversx/mx-chain-go/storage" "github.com/stretchr/testify/assert" @@ -109,6 +111,10 @@ func TestDepthFirstTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { arg := createMockArgument(time.Minute) arg.RequestHandler = createRequesterResolver(trSource, arg.InterceptedNodes, nil) + arg.AccLeavesChannels = &common.TrieIteratorChannels{ + LeavesChan: make(chan core.KeyValueHolder, 110), + ErrChan: errChan.NewErrChanWrapper(), + } d, _ := NewDepthFirstTrieSyncer(arg) ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30) @@ -135,6 +141,22 @@ func TestDepthFirstTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { assert.True(t, d.NumTrieNodes() > d.NumLeaves()) assert.True(t, d.NumBytes() > 0) assert.True(t, d.Duration() > 0) + + wg := &sync.WaitGroup{} + wg.Add(numKeysValues) + + numLeavesOnChan := 0 + go func() { + for range arg.AccLeavesChannels.LeavesChan { + numLeavesOnChan++ + wg.Done() + } + }() + + wg.Wait() + + assert.Equal(t, numKeysValues, numLeavesOnChan) + log.Info("synced trie", "num trie nodes", d.NumTrieNodes(), "num leaves", d.NumLeaves(), diff --git a/trie/sync.go b/trie/sync.go index e1a3dfef445..a599554b75d 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -120,6 +120,15 @@ func checkArguments(arg ArgTrieSyncer) error { if arg.MaxHardCapForMissingNodes < 1 { return fmt.Errorf("%w provided: %v", ErrInvalidMaxHardCapForMissingNodes, arg.MaxHardCapForMissingNodes) } + if arg.AccLeavesChannels == nil { + return ErrNilTrieIteratorChannels + } + if arg.AccLeavesChannels.LeavesChan == nil { + return ErrNilTrieIteratorLeavesChannel + } + if arg.AccLeavesChannels.ErrChan == nil { + return ErrNilTrieIteratorErrChannel + } return nil } diff --git a/trie/sync_test.go b/trie/sync_test.go index cf56628be2c..46758aa7065 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -6,8 +6,11 @@ import ( "testing" "time" + "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data" + "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/testscommon" "github.com/multiversx/mx-chain-go/testscommon/hashingMocks" trieMock "github.com/multiversx/mx-chain-go/testscommon/trie" @@ -26,6 +29,11 @@ func createMockArgument(timeout time.Duration) ArgTrieSyncer { }, } + leavesChannels := &common.TrieIteratorChannels{ + LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), + ErrChan: errChan.NewErrChanWrapper(), + } + return ArgTrieSyncer{ RequestHandler: &testscommon.RequestHandlerStub{}, InterceptedNodes: testscommon.NewCacherMock(), @@ -37,116 +45,160 @@ func createMockArgument(timeout time.Duration) ArgTrieSyncer { TrieSyncStatistics: statistics.NewTrieSyncStatistics(), TimeoutHandler: testscommon.NewTimeoutHandlerMock(timeout), MaxHardCapForMissingNodes: 500, + AccLeavesChannels: leavesChannels, } } -func TestNewTrieSyncer_NilRequestHandlerShouldErr(t *testing.T) { +func TestNewTrieSyncer(t *testing.T) { t.Parallel() - arg := createMockArgument(time.Minute) - arg.RequestHandler = nil + t.Run("nil request handler", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.Equal(t, err, ErrNilRequestHandler) -} + arg := createMockArgument(time.Minute) + arg.RequestHandler = nil -func TestNewTrieSyncer_NilInterceptedNodesShouldErr(t *testing.T) { - t.Parallel() + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.Equal(t, err, ErrNilRequestHandler) + }) - arg := createMockArgument(time.Minute) - arg.InterceptedNodes = nil + t.Run("nil intercepted nodes", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.Equal(t, err, data.ErrNilCacher) -} + arg := createMockArgument(time.Minute) + arg.InterceptedNodes = nil -func TestNewTrieSyncer_EmptyTopicShouldErr(t *testing.T) { - t.Parallel() + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.Equal(t, err, data.ErrNilCacher) + }) - arg := createMockArgument(time.Minute) - arg.Topic = "" + t.Run("empty topic should fail", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.Equal(t, err, ErrInvalidTrieTopic) -} + arg := createMockArgument(time.Minute) + arg.Topic = "" -func TestNewTrieSyncer_NilTrieStatisticsShouldErr(t *testing.T) { - t.Parallel() + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.Equal(t, err, ErrInvalidTrieTopic) + }) - arg := createMockArgument(time.Minute) - arg.TrieSyncStatistics = nil + t.Run("nil trie statistics", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.Equal(t, err, ErrNilTrieSyncStatistics) -} + arg := createMockArgument(time.Minute) + arg.TrieSyncStatistics = nil -func TestNewTrieSyncer_NilDatabaseShouldErr(t *testing.T) { - t.Parallel() + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.Equal(t, err, ErrNilTrieSyncStatistics) + }) - arg := createMockArgument(time.Minute) - arg.DB = nil + t.Run("nil database", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.True(t, errors.Is(err, ErrNilDatabase)) -} + arg := createMockArgument(time.Minute) + arg.DB = nil -func TestNewTrieSyncer_NilMarshalizerShouldErr(t *testing.T) { - t.Parallel() + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.True(t, errors.Is(err, ErrNilDatabase)) + }) - arg := createMockArgument(time.Minute) - arg.Marshalizer = nil + t.Run("nil marshalizer", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.True(t, errors.Is(err, ErrNilMarshalizer)) -} + arg := createMockArgument(time.Minute) + arg.Marshalizer = nil -func TestNewTrieSyncer_NilHasherShouldErr(t *testing.T) { - t.Parallel() + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.True(t, errors.Is(err, ErrNilMarshalizer)) + }) - arg := createMockArgument(time.Minute) - arg.Hasher = nil + t.Run("nil hasher", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.True(t, errors.Is(err, ErrNilHasher)) -} + arg := createMockArgument(time.Minute) + arg.Hasher = nil -func TestNewTrieSyncer_NilTimeoutHandlerShouldErr(t *testing.T) { - t.Parallel() + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.True(t, errors.Is(err, ErrNilHasher)) + }) - arg := createMockArgument(time.Minute) - arg.TimeoutHandler = nil + t.Run("nil timeout handler", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.True(t, errors.Is(err, ErrNilTimeoutHandler)) -} + arg := createMockArgument(time.Minute) + arg.TimeoutHandler = nil -func TestNewTrieSyncer_InvalidMaxHardCapForMissingNodesShouldErr(t *testing.T) { - t.Parallel() + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.True(t, errors.Is(err, ErrNilTimeoutHandler)) + }) - arg := createMockArgument(time.Minute) - arg.MaxHardCapForMissingNodes = 0 + t.Run("invalid max hard capacity for missing nodes", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.True(t, errors.Is(err, ErrInvalidMaxHardCapForMissingNodes)) -} + arg := createMockArgument(time.Minute) + arg.MaxHardCapForMissingNodes = 0 -func TestNewTrieSyncer_ShouldWork(t *testing.T) { - t.Parallel() + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.True(t, errors.Is(err, ErrInvalidMaxHardCapForMissingNodes)) + }) - arg := createMockArgument(time.Minute) + t.Run("nil accounts leaves channels", func(t *testing.T) { + t.Parallel() - ts, err := NewTrieSyncer(arg) - assert.False(t, check.IfNil(ts)) - assert.Nil(t, err) + arg := createMockArgument(time.Minute) + arg.AccLeavesChannels = nil + + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.True(t, errors.Is(err, ErrNilTrieIteratorChannels)) + }) + + t.Run("nil leaves channel", func(t *testing.T) { + t.Parallel() + + arg := createMockArgument(time.Minute) + arg.AccLeavesChannels = &common.TrieIteratorChannels{ + LeavesChan: nil, + ErrChan: errChan.NewErrChanWrapper(), + } + + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.True(t, errors.Is(err, ErrNilTrieIteratorLeavesChannel)) + }) + + t.Run("nil err channel", func(t *testing.T) { + t.Parallel() + + arg := createMockArgument(time.Minute) + arg.AccLeavesChannels = &common.TrieIteratorChannels{ + LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), + ErrChan: nil, + } + + ts, err := NewTrieSyncer(arg) + assert.True(t, check.IfNil(ts)) + assert.True(t, errors.Is(err, ErrNilTrieIteratorErrChannel)) + }) + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + arg := createMockArgument(time.Minute) + + ts, err := NewTrieSyncer(arg) + assert.False(t, check.IfNil(ts)) + assert.Nil(t, err) + }) } func TestTrieSync_InterceptedNodeShouldNotBeAddedToNodesForTrieIfNodeReceived(t *testing.T) { From 6068c06177aee368e3141c81ffa107f15e4d993c Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 15 May 2023 17:49:35 +0300 Subject: [PATCH 11/21] update unit tests + remove debugging code --- state/syncer/export_test.go | 4 +- state/syncer/userAccountsSyncer.go | 9 ++-- state/syncer/userAccountsSyncer_test.go | 64 ++++++++++++------------- trie/depthFirstSync.go | 12 ----- trie/sync.go | 6 --- trie/sync_test.go | 28 ----------- 6 files changed, 37 insertions(+), 86 deletions(-) diff --git a/state/syncer/export_test.go b/state/syncer/export_test.go index dcea224d65e..e8fade258ac 100644 --- a/state/syncer/export_test.go +++ b/state/syncer/export_test.go @@ -19,8 +19,8 @@ func CheckBaseAccountsSyncerArgs(args ArgsNewBaseAccountsSyncer) error { // SyncAccountDataTries - func (u *userAccountsSyncer) SyncAccountDataTries( - mainTrie common.Trie, + leavesChannels *common.TrieIteratorChannels, ctx context.Context, ) error { - return u.syncAccountDataTries(mainTrie, ctx) + return u.syncAccountDataTries(leavesChannels, ctx) } diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 1257b34f260..8d3cd1a7888 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -225,15 +225,17 @@ func (u *userAccountsSyncer) syncAccountDataTries( leavesChannels *common.TrieIteratorChannels, ctx context.Context, ) error { + if leavesChannels == nil { + return trie.ErrNilTrieIteratorChannels + } + defer u.printDataTrieStatistics() var errFound error errMutex := sync.Mutex{} wg := sync.WaitGroup{} - numInterations := 0 for leaf := range leavesChannels.LeavesChan { - numInterations++ log.Trace("syncAccountDataTries:", "leaf key", leaf.Key()) u.resetTimeoutHandlerWatchdog() @@ -264,8 +266,8 @@ func (u *userAccountsSyncer) syncAccountDataTries( newErr := u.syncDataTrie(trieRootHash, address, ctx) if newErr != nil { errMutex.Lock() - log.Error("syncDataTrie: error found", "error", errFound.Error()) errFound = newErr + log.Error("syncDataTrie: error found", "error", errFound.Error()) errMutex.Unlock() } atomic.AddInt32(&u.numTriesSynced, 1) @@ -274,7 +276,6 @@ func (u *userAccountsSyncer) syncAccountDataTries( }(account.RootHash, account.Address) } - log.Debug("syncDataTrie: num leaves chan interations", "count", numInterations) wg.Wait() err := leavesChannels.ErrChan.ReadFromChanNonBlocking() diff --git a/state/syncer/userAccountsSyncer_test.go b/state/syncer/userAccountsSyncer_test.go index f0080682107..51184d76d91 100644 --- a/state/syncer/userAccountsSyncer_test.go +++ b/state/syncer/userAccountsSyncer_test.go @@ -6,18 +6,20 @@ import ( "testing" "time" + "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/api/mock" "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/state" "github.com/multiversx/mx-chain-go/state/syncer" "github.com/multiversx/mx-chain-go/testscommon" - trieMocks "github.com/multiversx/mx-chain-go/testscommon/trie" "github.com/multiversx/mx-chain-go/trie" "github.com/multiversx/mx-chain-go/trie/hashesHolder" + "github.com/multiversx/mx-chain-go/trie/keyBuilder" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -171,43 +173,15 @@ func emptyTrie() common.Trie { func TestUserAccountsSyncer_SyncAccountDataTries(t *testing.T) { t.Parallel() - t.Run("failed to get trie root hash", func(t *testing.T) { + t.Run("nil leaves chan should fail", func(t *testing.T) { t.Parallel() - expectedErr := errors.New("expected err") - tr := &trieMocks.TrieStub{ - RootCalled: func() ([]byte, error) { - return nil, expectedErr - }, - } - args := getDefaultUserAccountsSyncerArgs() s, err := syncer.NewUserAccountsSyncer(args) require.Nil(t, err) - err = s.SyncAccountDataTries(tr, context.TODO()) - require.Equal(t, expectedErr, err) - }) - - t.Run("failed to get all leaves on channel", func(t *testing.T) { - t.Parallel() - - expectedErr := errors.New("expected err") - tr := &trieMocks.TrieStub{ - RootCalled: func() ([]byte, error) { - return []byte("rootHash"), nil - }, - GetAllLeavesOnChannelCalled: func(leavesChannels *common.TrieIteratorChannels, ctx context.Context, rootHash []byte, keyBuilder common.KeyBuilder) error { - return expectedErr - }, - } - - args := getDefaultUserAccountsSyncerArgs() - s, err := syncer.NewUserAccountsSyncer(args) - require.Nil(t, err) - - err = s.SyncAccountDataTries(tr, context.TODO()) - require.Equal(t, expectedErr, err) + err = s.SyncAccountDataTries(nil, context.TODO()) + require.Equal(t, trie.ErrNilTrieIteratorChannels, err) }) t.Run("throttler cannot process and closed context should fail", func(t *testing.T) { @@ -254,10 +228,21 @@ func TestUserAccountsSyncer_SyncAccountDataTries(t *testing.T) { _ = tr.Update([]byte("ddog"), accountBytes) _ = tr.Commit() + leavesChannels := &common.TrieIteratorChannels{ + LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), + ErrChan: errChan.NewErrChanWrapper(), + } + + rootHash, err := tr.RootHash() + require.Nil(t, err) + + err = tr.GetAllLeavesOnChannel(leavesChannels, context.TODO(), rootHash, keyBuilder.NewDisabledKeyBuilder()) + require.Nil(t, err) + ctx, cancel := context.WithCancel(context.TODO()) cancel() - err = s.SyncAccountDataTries(tr, ctx) + err = s.SyncAccountDataTries(leavesChannels, ctx) require.Equal(t, data.ErrTimeIsOut, err) }) @@ -300,7 +285,18 @@ func TestUserAccountsSyncer_SyncAccountDataTries(t *testing.T) { _ = tr.Update([]byte("ddog"), accountBytes) _ = tr.Commit() - err = s.SyncAccountDataTries(tr, context.TODO()) + leavesChannels := &common.TrieIteratorChannels{ + LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), + ErrChan: errChan.NewErrChanWrapper(), + } + + rootHash, err := tr.RootHash() + require.Nil(t, err) + + err = tr.GetAllLeavesOnChannel(leavesChannels, context.TODO(), rootHash, keyBuilder.NewDisabledKeyBuilder()) + require.Nil(t, err) + + err = s.SyncAccountDataTries(leavesChannels, context.TODO()) require.Nil(t, err) }) } diff --git a/trie/depthFirstSync.go b/trie/depthFirstSync.go index 6b7e20e7478..e6f05ca23c2 100644 --- a/trie/depthFirstSync.go +++ b/trie/depthFirstSync.go @@ -35,7 +35,6 @@ type depthFirstTrieSyncer struct { nodes *trieNodesHandler requestedHashes map[string]*request accLeavesChannels *common.TrieIteratorChannels - numLeavesStoreIt int } // NewDepthFirstTrieSyncer creates a new instance of trieSyncer that uses the depth-first algorithm @@ -64,7 +63,6 @@ func NewDepthFirstTrieSyncer(arg ArgTrieSyncer) (*depthFirstTrieSyncer, error) { maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, checkNodesOnDisk: arg.CheckNodesOnDisk, accLeavesChannels: arg.AccLeavesChannels, - numLeavesStoreIt: 0, } return d, nil @@ -94,9 +92,6 @@ func (d *depthFirstTrieSyncer) StartSyncing(rootHash []byte, ctx context.Context timeStart := time.Now() defer func() { d.setSyncDuration(time.Since(timeStart)) - if d.accLeavesChannels.LeavesChan != nil { - log.Debug("StartSyncing: numLeavesStoreIt", "count", d.numLeavesStoreIt) - } }() for { @@ -263,7 +258,6 @@ func (d *depthFirstTrieSyncer) storeTrieNode(element node) error { leafNodeElement, isLeaf := element.(*leafNode) if isLeaf && d.accLeavesChannels.LeavesChan != nil { - log.Debug("storeTrieNode: found leaf node") trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) d.accLeavesChannels.LeavesChan <- trieLeaf } @@ -285,10 +279,6 @@ func (d *depthFirstTrieSyncer) storeLeaves(children []node) ([]node, error) { return nil, err } - d.numLeavesStoreIt++ - - log.Trace("storeLeaves: found leaf node", "leafNodeElement.Key", hex.EncodeToString(leafNodeElement.Key)) - trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) // TODO: analize error chan if d.accLeavesChannels.LeavesChan == nil { @@ -296,8 +286,6 @@ func (d *depthFirstTrieSyncer) storeLeaves(children []node) ([]node, error) { continue } d.accLeavesChannels.LeavesChan <- trieLeaf - - log.Trace("storeLeaves: found leaf node - DONE", "leafNodeElement.Key", hex.EncodeToString(leafNodeElement.Key)) } return childrenNotLeaves, nil diff --git a/trie/sync.go b/trie/sync.go index a599554b75d..4bc56b65077 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -123,12 +123,6 @@ func checkArguments(arg ArgTrieSyncer) error { if arg.AccLeavesChannels == nil { return ErrNilTrieIteratorChannels } - if arg.AccLeavesChannels.LeavesChan == nil { - return ErrNilTrieIteratorLeavesChannel - } - if arg.AccLeavesChannels.ErrChan == nil { - return ErrNilTrieIteratorErrChannel - } return nil } diff --git a/trie/sync_test.go b/trie/sync_test.go index 46758aa7065..891f46f526c 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -162,34 +162,6 @@ func TestNewTrieSyncer(t *testing.T) { assert.True(t, errors.Is(err, ErrNilTrieIteratorChannels)) }) - t.Run("nil leaves channel", func(t *testing.T) { - t.Parallel() - - arg := createMockArgument(time.Minute) - arg.AccLeavesChannels = &common.TrieIteratorChannels{ - LeavesChan: nil, - ErrChan: errChan.NewErrChanWrapper(), - } - - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.True(t, errors.Is(err, ErrNilTrieIteratorLeavesChannel)) - }) - - t.Run("nil err channel", func(t *testing.T) { - t.Parallel() - - arg := createMockArgument(time.Minute) - arg.AccLeavesChannels = &common.TrieIteratorChannels{ - LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), - ErrChan: nil, - } - - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.True(t, errors.Is(err, ErrNilTrieIteratorErrChannel)) - }) - t.Run("should work", func(t *testing.T) { t.Parallel() From 784a0b26abfc6e1fa8f3f28a63bba01cded043fe Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 16 May 2023 13:16:18 +0300 Subject: [PATCH 12/21] remove duplicated leaf handling --- trie/depthFirstSync.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/trie/depthFirstSync.go b/trie/depthFirstSync.go index e6f05ca23c2..f62bc3ea88d 100644 --- a/trie/depthFirstSync.go +++ b/trie/depthFirstSync.go @@ -2,7 +2,6 @@ package trie import ( "context" - "encoding/hex" "sync" "time" @@ -268,7 +267,7 @@ func (d *depthFirstTrieSyncer) storeTrieNode(element node) error { func (d *depthFirstTrieSyncer) storeLeaves(children []node) ([]node, error) { childrenNotLeaves := make([]node, 0, len(children)) for _, element := range children { - leafNodeElement, isLeaf := element.(*leafNode) + _, isLeaf := element.(*leafNode) if !isLeaf { childrenNotLeaves = append(childrenNotLeaves, element) continue @@ -278,14 +277,6 @@ func (d *depthFirstTrieSyncer) storeLeaves(children []node) ([]node, error) { if err != nil { return nil, err } - - trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) - // TODO: analize error chan - if d.accLeavesChannels.LeavesChan == nil { - log.Trace("storeLeaves: nil leaves chan", "leafNodeElement.Key", hex.EncodeToString(leafNodeElement.Key)) - continue - } - d.accLeavesChannels.LeavesChan <- trieLeaf } return childrenNotLeaves, nil From 89b16439916ae37913b32a94458279697fea2a9a Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 16 May 2023 15:56:24 +0300 Subject: [PATCH 13/21] add leaves channel for trie syncer version 1 and 2 --- trie/doubleListSync.go | 9 +++++++++ trie/doubleListSync_test.go | 22 ++++++++++++++++++++++ trie/sync.go | 10 ++++++++++ trie/sync_test.go | 24 ++++++++++++++++++++++++ 4 files changed, 65 insertions(+) diff --git a/trie/doubleListSync.go b/trie/doubleListSync.go index 6477023c7d2..2441bcaf82b 100644 --- a/trie/doubleListSync.go +++ b/trie/doubleListSync.go @@ -6,6 +6,7 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/core/keyValStorage" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/common" @@ -44,6 +45,7 @@ type doubleListTrieSyncer struct { existingNodes map[string]node missingHashes map[string]struct{} requestedHashes map[string]*request + accLeavesChannels *common.TrieIteratorChannels } // NewDoubleListTrieSyncer creates a new instance of trieSyncer that uses 2 list for keeping the "margin" nodes. @@ -74,6 +76,7 @@ func NewDoubleListTrieSyncer(arg ArgTrieSyncer) (*doubleListTrieSyncer, error) { timeoutHandler: arg.TimeoutHandler, maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, checkNodesOnDisk: arg.CheckNodesOnDisk, + accLeavesChannels: arg.AccLeavesChannels, } return d, nil @@ -208,6 +211,12 @@ func (d *doubleListTrieSyncer) processExistingNodes() error { return err } + leafNodeElement, isLeaf := element.(*leafNode) + if isLeaf && d.accLeavesChannels.LeavesChan != nil { + trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) + d.accLeavesChannels.LeavesChan <- trieLeaf + } + d.timeoutHandler.ResetWatchdog() var children []node diff --git a/trie/doubleListSync_test.go b/trie/doubleListSync_test.go index 719d578e5c6..577d2ef0269 100644 --- a/trie/doubleListSync_test.go +++ b/trie/doubleListSync_test.go @@ -4,12 +4,14 @@ import ( "bytes" "context" "fmt" + "sync" "testing" "time" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/errors" "github.com/multiversx/mx-chain-go/storage" @@ -214,6 +216,10 @@ func TestDoubleListTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { arg := createMockArgument(time.Minute) arg.RequestHandler = createRequesterResolver(trSource, arg.InterceptedNodes, nil) + arg.AccLeavesChannels = &common.TrieIteratorChannels{ + LeavesChan: make(chan core.KeyValueHolder, 110), + ErrChan: errChan.NewErrChanWrapper(), + } d, _ := NewDoubleListTrieSyncer(arg) ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30) @@ -240,6 +246,22 @@ func TestDoubleListTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { assert.True(t, d.NumTrieNodes() > d.NumLeaves()) assert.True(t, d.NumBytes() > 0) assert.True(t, d.Duration() > 0) + + wg := &sync.WaitGroup{} + wg.Add(numKeysValues) + + numLeavesOnChan := 0 + go func() { + for range arg.AccLeavesChannels.LeavesChan { + numLeavesOnChan++ + wg.Done() + } + }() + + wg.Wait() + + assert.Equal(t, numKeysValues, numLeavesOnChan) + log.Info("synced trie", "num trie nodes", d.NumTrieNodes(), "num leaves", d.NumLeaves(), diff --git a/trie/sync.go b/trie/sync.go index 4bc56b65077..c78dcca6583 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -9,6 +9,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/core/keyValStorage" "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" @@ -41,6 +42,7 @@ type trieSyncer struct { trieSyncStatistics data.SyncStatisticsHandler timeoutHandler TimeoutHandler maxHardCapForMissingNodes int + accLeavesChannels *common.TrieIteratorChannels } const maxNewMissingAddedPerTurn = 10 @@ -87,6 +89,7 @@ func NewTrieSyncer(arg ArgTrieSyncer) (*trieSyncer, error) { trieSyncStatistics: arg.TrieSyncStatistics, timeoutHandler: arg.TimeoutHandler, maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, + accLeavesChannels: arg.AccLeavesChannels, } return ts, nil @@ -249,6 +252,13 @@ func (ts *trieSyncer) checkIfSynced() (bool, error) { if err != nil { return false, err } + + leafNodeElement, isLeaf := currentNode.(*leafNode) + if isLeaf && ts.accLeavesChannels.LeavesChan != nil { + trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) + ts.accLeavesChannels.LeavesChan <- trieLeaf + } + ts.timeoutHandler.ResetWatchdog() ts.updateStats(uint64(numBytes), currentNode) diff --git a/trie/sync_test.go b/trie/sync_test.go index 891f46f526c..aada347e4f6 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -3,6 +3,7 @@ package trie import ( "context" "errors" + "sync" "testing" "time" @@ -241,6 +242,10 @@ func TestTrieSync_FoundInStorageShouldNotRequest(t *testing.T) { err = bn.commitSnapshot(db, nil, nil, context.Background(), statistics.NewTrieStatistics(), &testscommon.ProcessStatusHandlerStub{}, 0) require.Nil(t, err) + leaves, err := bn.getChildren(db) + require.Nil(t, err) + numLeaves := len(leaves) + arg := createMockArgument(timeout) arg.RequestHandler = &testscommon.RequestHandlerStub{ RequestTrieNodesCalled: func(destShardID uint32, hashes [][]byte, topic string) { @@ -250,10 +255,29 @@ func TestTrieSync_FoundInStorageShouldNotRequest(t *testing.T) { arg.DB = trieStorage arg.Marshalizer = testMarshalizer arg.Hasher = testHasher + arg.AccLeavesChannels = &common.TrieIteratorChannels{ + LeavesChan: make(chan core.KeyValueHolder, 110), + ErrChan: errChan.NewErrChanWrapper(), + } ts, err := NewTrieSyncer(arg) require.Nil(t, err) err = ts.StartSyncing(rootHash, context.Background()) assert.Nil(t, err) + + wg := &sync.WaitGroup{} + wg.Add(numLeaves) + + numLeavesOnChan := 0 + go func() { + for range arg.AccLeavesChannels.LeavesChan { + numLeavesOnChan++ + wg.Done() + } + }() + + wg.Wait() + + assert.Equal(t, numLeaves, numLeavesOnChan) } From da22403d4c1752fa03e08edf5d40f80d13540334 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 16 May 2023 17:45:05 +0300 Subject: [PATCH 14/21] remove logs and unused variables --- state/syncer/baseAccountsSyncer.go | 15 ++++++--------- state/syncer/userAccountsSyncer.go | 18 +++++++++--------- state/syncer/validatorAccountsSyncer.go | 4 ++-- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/state/syncer/baseAccountsSyncer.go b/state/syncer/baseAccountsSyncer.go index d29a3a1dec2..9d301710f58 100644 --- a/state/syncer/baseAccountsSyncer.go +++ b/state/syncer/baseAccountsSyncer.go @@ -93,13 +93,12 @@ func (b *baseAccountsSyncer) syncMainTrie( trieTopic string, ctx context.Context, accLeavesChan *common.TrieIteratorChannels, -) (common.Trie, error) { +) error { atomic.AddInt32(&b.numMaxTries, 1) - log.Trace("syncing main trie", "roothash", rootHash) - dataTrie, err := trie.NewTrie(b.trieStorageManager, b.marshalizer, b.hasher, b.maxTrieLevelInMemory) + _, err := trie.NewTrie(b.trieStorageManager, b.marshalizer, b.hasher, b.maxTrieLevelInMemory) if err != nil { - return nil, err + return err } b.dataTries[string(rootHash)] = struct{}{} @@ -119,19 +118,17 @@ func (b *baseAccountsSyncer) syncMainTrie( } trieSyncer, err := trie.CreateTrieSyncer(arg, b.trieSyncerVersion) if err != nil { - return nil, err + return err } err = trieSyncer.StartSyncing(rootHash, ctx) if err != nil { - return nil, err + return err } atomic.AddInt32(&b.numTriesSynced, 1) - log.Trace("finished syncing main trie", "roothash", rootHash) - - return dataTrie.Recreate(rootHash) + return nil } func (b *baseAccountsSyncer) printStatisticsAndUpdateMetrics(ctx context.Context) { diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 8d3cd1a7888..0c6a40d39b7 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -126,24 +126,18 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { wgSyncMainTrie.Add(1) go func() { - _, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels) + err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels) if err != nil { - log.Error("syncMainTrie:", "error", err.Error()) leavesChannels.ErrChan.WriteInChanNonBlocking(err) } - log.Debug("syncMainTrie goroutine: closing leaver channel") - - if leavesChannels.LeavesChan != nil { - close(leavesChannels.LeavesChan) - } + safelyCloseChan(leavesChannels.LeavesChan) wgSyncMainTrie.Done() }() err := u.syncAccountDataTries(leavesChannels, ctx) if err != nil { - log.Error("syncAccountDataTries:", "error", err.Error()) return err } @@ -154,6 +148,12 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { return nil } +func safelyCloseChan(ch chan core.KeyValueHolder) { + if ch != nil { + close(ch) + } +} + func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx context.Context) error { u.syncerMutex.Lock() _, ok := u.dataTries[string(rootHash)] @@ -166,7 +166,7 @@ func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx c u.syncerMutex.Unlock() iteratorChannelsForDataTries := &common.TrieIteratorChannels{ - LeavesChan: nil, + LeavesChan: nil, // not used for data tries ErrChan: nil, } diff --git a/state/syncer/validatorAccountsSyncer.go b/state/syncer/validatorAccountsSyncer.go index 6538959758b..3554312e0d8 100644 --- a/state/syncer/validatorAccountsSyncer.go +++ b/state/syncer/validatorAccountsSyncer.go @@ -79,12 +79,12 @@ func (v *validatorAccountsSyncer) SyncAccounts(rootHash []byte) error { LeavesChan: nil, ErrChan: errChan.NewErrChanWrapper(), } - mainTrie, err := v.syncMainTrie(rootHash, factory.ValidatorTrieNodesTopic, ctx, leavesChannels) + err := v.syncMainTrie(rootHash, factory.ValidatorTrieNodesTopic, ctx, leavesChannels) if err != nil { return err } - v.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager()) + v.storageMarker.MarkStorerAsSyncedAndActive(v.trieStorageManager) return nil } From b4866287995de5a1a5e6056089a384659062429f Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 16 May 2023 17:46:26 +0300 Subject: [PATCH 15/21] remove log trace --- state/syncer/userAccountsSyncer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 0c6a40d39b7..a851053ec0e 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -236,7 +236,6 @@ func (u *userAccountsSyncer) syncAccountDataTries( wg := sync.WaitGroup{} for leaf := range leavesChannels.LeavesChan { - log.Trace("syncAccountDataTries:", "leaf key", leaf.Key()) u.resetTimeoutHandlerWatchdog() account := state.NewEmptyUserAccount() From 0b1b7742ad3bcf889d4353d7406b688deabc5869 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 16 May 2023 18:02:38 +0300 Subject: [PATCH 16/21] added sync logs --- state/syncer/baseAccountsSyncer.go | 3 +++ state/syncer/userAccountsSyncer.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/state/syncer/baseAccountsSyncer.go b/state/syncer/baseAccountsSyncer.go index 9d301710f58..0945c77d6ca 100644 --- a/state/syncer/baseAccountsSyncer.go +++ b/state/syncer/baseAccountsSyncer.go @@ -96,6 +96,7 @@ func (b *baseAccountsSyncer) syncMainTrie( ) error { atomic.AddInt32(&b.numMaxTries, 1) + log.Trace("syncing main trie", "roothash", rootHash) _, err := trie.NewTrie(b.trieStorageManager, b.marshalizer, b.hasher, b.maxTrieLevelInMemory) if err != nil { return err @@ -128,6 +129,8 @@ func (b *baseAccountsSyncer) syncMainTrie( atomic.AddInt32(&b.numTriesSynced, 1) + log.Trace("finished syncing main trie", "roothash", rootHash) + return nil } diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index a851053ec0e..30885fa5ffc 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -145,6 +145,8 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { u.storageMarker.MarkStorerAsSyncedAndActive(u.trieStorageManager) + log.Debug("main trie and data tries synced", "num data tries", len(u.dataTries)) + return nil } From cfe47ddf5357103ce0c6e4e2baba94e087b02f41 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 17 May 2023 13:32:13 +0300 Subject: [PATCH 17/21] fixes after review: refactorings --- common/channels.go | 9 +++++ common/constants.go | 4 +++ state/syncer/baseAccountsSyncer.go | 4 --- state/syncer/userAccountsSyncer.go | 36 +++++++------------- trie/depthFirstSync.go | 7 +--- trie/doubleListSync.go | 7 +--- trie/sync.go | 14 +++++--- trie/trieStorageManager.go | 24 +++++-------- trie/trieStorageManagerWithoutCheckpoints.go | 2 +- trie/trieStorageManagerWithoutSnapshot.go | 2 +- 10 files changed, 48 insertions(+), 61 deletions(-) diff --git a/common/channels.go b/common/channels.go index 177ac89f5c5..cade1183496 100644 --- a/common/channels.go +++ b/common/channels.go @@ -1,5 +1,7 @@ package common +import "github.com/multiversx/mx-chain-core-go/core" + // GetClosedUnbufferedChannel returns an instance of a 'chan struct{}' that is already closed func GetClosedUnbufferedChannel() chan struct{} { ch := make(chan struct{}) @@ -7,3 +9,10 @@ func GetClosedUnbufferedChannel() chan struct{} { return ch } + +// SafelyCloseKeyValueHolderChan will close the channel if not nil +func SafelyCloseKeyValueHolderChan(ch chan core.KeyValueHolder) { + if ch != nil { + close(ch) + } +} diff --git a/common/constants.go b/common/constants.go index 7dc897076e9..66cbd149718 100644 --- a/common/constants.go +++ b/common/constants.go @@ -814,6 +814,10 @@ const ( // TrieLeavesChannelDefaultCapacity represents the default value to be used as capacity for getting all trie leaves on // a channel TrieLeavesChannelDefaultCapacity = 100 + + // TrieLeavesChannelSyncCapacity represents the value to be used as capacity for getting main trie + // leaf nodes for trie sync + TrieLeavesChannelSyncCapacity = 1000 ) // ApiOutputFormat represents the format type returned by api diff --git a/state/syncer/baseAccountsSyncer.go b/state/syncer/baseAccountsSyncer.go index 0945c77d6ca..cf0d99c1ef2 100644 --- a/state/syncer/baseAccountsSyncer.go +++ b/state/syncer/baseAccountsSyncer.go @@ -97,10 +97,6 @@ func (b *baseAccountsSyncer) syncMainTrie( atomic.AddInt32(&b.numMaxTries, 1) log.Trace("syncing main trie", "roothash", rootHash) - _, err := trie.NewTrie(b.trieStorageManager, b.marshalizer, b.hasher, b.maxTrieLevelInMemory) - if err != nil { - return err - } b.dataTries[string(rootHash)] = struct{}{} arg := trie.ArgTrieSyncer{ diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 30885fa5ffc..3c318984b4b 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -118,7 +118,7 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { go u.printStatisticsAndUpdateMetrics(ctx) leavesChannels := &common.TrieIteratorChannels{ - LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), + LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelSyncCapacity), ErrChan: errChan.NewErrChanWrapper(), } @@ -131,7 +131,7 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { leavesChannels.ErrChan.WriteInChanNonBlocking(err) } - safelyCloseChan(leavesChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(leavesChannels.LeavesChan) wgSyncMainTrie.Done() }() @@ -143,19 +143,18 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { wgSyncMainTrie.Wait() + err = leavesChannels.ErrChan.ReadFromChanNonBlocking() + if err != nil { + return err + } + u.storageMarker.MarkStorerAsSyncedAndActive(u.trieStorageManager) - log.Debug("main trie and data tries synced", "num data tries", len(u.dataTries)) + log.Debug("main trie and data tries synced", "main trie root hash", rootHash, "num data tries", len(u.dataTries)) return nil } -func safelyCloseChan(ch chan core.KeyValueHolder) { - if ch != nil { - close(ch) - } -} - func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx context.Context) error { u.syncerMutex.Lock() _, ok := u.dataTries[string(rootHash)] @@ -233,8 +232,6 @@ func (u *userAccountsSyncer) syncAccountDataTries( defer u.printDataTrieStatistics() - var errFound error - errMutex := sync.Mutex{} wg := sync.WaitGroup{} for leaf := range leavesChannels.LeavesChan { @@ -264,12 +261,10 @@ func (u *userAccountsSyncer) syncAccountDataTries( defer u.throttler.EndProcessing() log.Trace("sync data trie", "roothash", trieRootHash) - newErr := u.syncDataTrie(trieRootHash, address, ctx) - if newErr != nil { - errMutex.Lock() - errFound = newErr - log.Error("syncDataTrie: error found", "error", errFound.Error()) - errMutex.Unlock() + err := u.syncDataTrie(trieRootHash, address, ctx) + if err != nil { + log.Error("sync data trie: error found", "roothash", trieRootHash, "error", err.Error()) + leavesChannels.ErrChan.WriteInChanNonBlocking(err) } atomic.AddInt32(&u.numTriesSynced, 1) log.Trace("finished sync data trie", "roothash", trieRootHash) @@ -279,12 +274,7 @@ func (u *userAccountsSyncer) syncAccountDataTries( wg.Wait() - err := leavesChannels.ErrChan.ReadFromChanNonBlocking() - if err != nil { - return err - } - - return errFound + return nil } func (u *userAccountsSyncer) printDataTrieStatistics() { diff --git a/trie/depthFirstSync.go b/trie/depthFirstSync.go index f62bc3ea88d..2152892dc53 100644 --- a/trie/depthFirstSync.go +++ b/trie/depthFirstSync.go @@ -6,7 +6,6 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/core" - "github.com/multiversx/mx-chain-core-go/core/keyValStorage" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/common" @@ -255,11 +254,7 @@ func (d *depthFirstTrieSyncer) storeTrieNode(element node) error { d.trieSyncStatistics.AddNumBytesReceived(uint64(numBytes)) d.updateStats(uint64(numBytes), element) - leafNodeElement, isLeaf := element.(*leafNode) - if isLeaf && d.accLeavesChannels.LeavesChan != nil { - trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) - d.accLeavesChannels.LeavesChan <- trieLeaf - } + writeLeafNodeToChan(element, d.accLeavesChannels.LeavesChan) return nil } diff --git a/trie/doubleListSync.go b/trie/doubleListSync.go index 2441bcaf82b..315f1e966a0 100644 --- a/trie/doubleListSync.go +++ b/trie/doubleListSync.go @@ -6,7 +6,6 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/core" - "github.com/multiversx/mx-chain-core-go/core/keyValStorage" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/common" @@ -211,11 +210,7 @@ func (d *doubleListTrieSyncer) processExistingNodes() error { return err } - leafNodeElement, isLeaf := element.(*leafNode) - if isLeaf && d.accLeavesChannels.LeavesChan != nil { - trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) - d.accLeavesChannels.LeavesChan <- trieLeaf - } + writeLeafNodeToChan(element, d.accLeavesChannels.LeavesChan) d.timeoutHandler.ResetWatchdog() diff --git a/trie/sync.go b/trie/sync.go index c78dcca6583..bfff5ad8bb7 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -253,11 +253,7 @@ func (ts *trieSyncer) checkIfSynced() (bool, error) { return false, err } - leafNodeElement, isLeaf := currentNode.(*leafNode) - if isLeaf && ts.accLeavesChannels.LeavesChan != nil { - trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) - ts.accLeavesChannels.LeavesChan <- trieLeaf - } + writeLeafNodeToChan(currentNode, ts.accLeavesChannels.LeavesChan) ts.timeoutHandler.ResetWatchdog() @@ -378,6 +374,14 @@ func trieNode( return decodedNode, nil } +func writeLeafNodeToChan(element node, ch chan core.KeyValueHolder) { + leafNodeElement, isLeaf := element.(*leafNode) + if isLeaf && ch != nil { + trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) + ch <- trieLeaf + } +} + func (ts *trieSyncer) requestNodes() uint32 { ts.mutOperation.RLock() numUnResolvedNodes := uint32(len(ts.nodesForTrie)) diff --git a/trie/trieStorageManager.go b/trie/trieStorageManager.go index c5304e45428..56048f1665b 100644 --- a/trie/trieStorageManager.go +++ b/trie/trieStorageManager.go @@ -330,19 +330,19 @@ func (tsm *trieStorageManager) TakeSnapshot( ) { if iteratorChannels.ErrChan == nil { log.Error("programming error in trieStorageManager.TakeSnapshot, cannot take snapshot because errChan is nil") - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } if tsm.IsClosed() { - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } if bytes.Equal(rootHash, common.EmptyTrieHash) { log.Trace("should not snapshot an empty trie") - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } @@ -363,7 +363,7 @@ func (tsm *trieStorageManager) TakeSnapshot( case tsm.snapshotReq <- snapshotEntry: case <-tsm.closer.ChanClose(): tsm.ExitPruningBufferingMode() - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() } } @@ -380,19 +380,19 @@ func (tsm *trieStorageManager) SetCheckpoint( ) { if iteratorChannels.ErrChan == nil { log.Error("programming error in trieStorageManager.SetCheckpoint, cannot set checkpoint because errChan is nil") - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } if tsm.IsClosed() { - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } if bytes.Equal(rootHash, common.EmptyTrieHash) { log.Trace("should not set checkpoint for empty trie") - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } @@ -410,21 +410,15 @@ func (tsm *trieStorageManager) SetCheckpoint( case tsm.checkpointReq <- checkpointEntry: case <-tsm.closer.ChanClose(): tsm.ExitPruningBufferingMode() - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() } } -func safelyCloseChan(ch chan core.KeyValueHolder) { - if ch != nil { - close(ch) - } -} - func (tsm *trieStorageManager) finishOperation(snapshotEntry *snapshotsQueueEntry, message string) { tsm.ExitPruningBufferingMode() log.Trace(message, "rootHash", snapshotEntry.rootHash) - safelyCloseChan(snapshotEntry.iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(snapshotEntry.iteratorChannels.LeavesChan) snapshotEntry.stats.SnapshotFinished() } diff --git a/trie/trieStorageManagerWithoutCheckpoints.go b/trie/trieStorageManagerWithoutCheckpoints.go index d2f4b93e507..1aceb707ec7 100644 --- a/trie/trieStorageManagerWithoutCheckpoints.go +++ b/trie/trieStorageManagerWithoutCheckpoints.go @@ -30,7 +30,7 @@ func (tsm *trieStorageManagerWithoutCheckpoints) SetCheckpoint( stats common.SnapshotStatisticsHandler, ) { if iteratorChannels != nil { - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) } stats.SnapshotFinished() diff --git a/trie/trieStorageManagerWithoutSnapshot.go b/trie/trieStorageManagerWithoutSnapshot.go index 337a74f8f9d..7ac0348e0cf 100644 --- a/trie/trieStorageManagerWithoutSnapshot.go +++ b/trie/trieStorageManagerWithoutSnapshot.go @@ -38,7 +38,7 @@ func (tsm *trieStorageManagerWithoutSnapshot) PutInEpochWithoutCache(key []byte, // TakeSnapshot does nothing, as snapshots are disabled for this implementation func (tsm *trieStorageManagerWithoutSnapshot) TakeSnapshot(_ string, _ []byte, _ []byte, iteratorChannels *common.TrieIteratorChannels, _ chan []byte, stats common.SnapshotStatisticsHandler, _ uint32) { if iteratorChannels != nil { - safelyCloseChan(iteratorChannels.LeavesChan) + common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) } stats.SnapshotFinished() } From d58e024fdf5be4810269e15d6f579e5966e48076 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 17 May 2023 13:40:27 +0300 Subject: [PATCH 18/21] refactored to use only leaves chan --- state/syncer/baseAccountsSyncer.go | 4 ++-- state/syncer/userAccountsSyncer.go | 9 ++------- state/syncer/validatorAccountsSyncer.go | 12 ++++++------ trie/depthFirstSync.go | 6 +++--- trie/doubleListSync.go | 6 +++--- trie/sync.go | 11 ++++------- 6 files changed, 20 insertions(+), 28 deletions(-) diff --git a/state/syncer/baseAccountsSyncer.go b/state/syncer/baseAccountsSyncer.go index cf0d99c1ef2..5956fd62976 100644 --- a/state/syncer/baseAccountsSyncer.go +++ b/state/syncer/baseAccountsSyncer.go @@ -92,7 +92,7 @@ func (b *baseAccountsSyncer) syncMainTrie( rootHash []byte, trieTopic string, ctx context.Context, - accLeavesChan *common.TrieIteratorChannels, + accLeavesChan chan core.KeyValueHolder, ) error { atomic.AddInt32(&b.numMaxTries, 1) @@ -111,7 +111,7 @@ func (b *baseAccountsSyncer) syncMainTrie( TimeoutHandler: b.timeoutHandler, MaxHardCapForMissingNodes: b.maxHardCapForMissingNodes, CheckNodesOnDisk: b.checkNodesOnDisk, - AccLeavesChannels: accLeavesChan, + AccLeavesChan: accLeavesChan, } trieSyncer, err := trie.CreateTrieSyncer(arg, b.trieSyncerVersion) if err != nil { diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 3c318984b4b..7f79b53a65c 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -126,7 +126,7 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { wgSyncMainTrie.Add(1) go func() { - err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels) + err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels.LeavesChan) if err != nil { leavesChannels.ErrChan.WriteInChanNonBlocking(err) } @@ -166,11 +166,6 @@ func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx c u.dataTries[string(rootHash)] = struct{}{} u.syncerMutex.Unlock() - iteratorChannelsForDataTries := &common.TrieIteratorChannels{ - LeavesChan: nil, // not used for data tries - ErrChan: nil, - } - arg := trie.ArgTrieSyncer{ RequestHandler: u.requestHandler, InterceptedNodes: u.cacher, @@ -183,7 +178,7 @@ func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx c TimeoutHandler: u.timeoutHandler, MaxHardCapForMissingNodes: u.maxHardCapForMissingNodes, CheckNodesOnDisk: u.checkNodesOnDisk, - AccLeavesChannels: iteratorChannelsForDataTries, + AccLeavesChan: nil, // not used for data tries } trieSyncer, err := trie.CreateTrieSyncer(arg, u.trieSyncerVersion) if err != nil { diff --git a/state/syncer/validatorAccountsSyncer.go b/state/syncer/validatorAccountsSyncer.go index 3554312e0d8..856d3ddc2cc 100644 --- a/state/syncer/validatorAccountsSyncer.go +++ b/state/syncer/validatorAccountsSyncer.go @@ -5,7 +5,6 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-go/common" - "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/epochStart" "github.com/multiversx/mx-chain-go/process/factory" "github.com/multiversx/mx-chain-go/trie/statistics" @@ -75,11 +74,12 @@ func (v *validatorAccountsSyncer) SyncAccounts(rootHash []byte) error { go v.printStatisticsAndUpdateMetrics(ctx) - leavesChannels := &common.TrieIteratorChannels{ - LeavesChan: nil, - ErrChan: errChan.NewErrChanWrapper(), - } - err := v.syncMainTrie(rootHash, factory.ValidatorTrieNodesTopic, ctx, leavesChannels) + err := v.syncMainTrie( + rootHash, + factory.ValidatorTrieNodesTopic, + ctx, + nil, // not used for validator accounts syncer + ) if err != nil { return err } diff --git a/trie/depthFirstSync.go b/trie/depthFirstSync.go index 2152892dc53..8924bc29f1a 100644 --- a/trie/depthFirstSync.go +++ b/trie/depthFirstSync.go @@ -32,7 +32,7 @@ type depthFirstTrieSyncer struct { checkNodesOnDisk bool nodes *trieNodesHandler requestedHashes map[string]*request - accLeavesChannels *common.TrieIteratorChannels + accLeavesChan chan core.KeyValueHolder } // NewDepthFirstTrieSyncer creates a new instance of trieSyncer that uses the depth-first algorithm @@ -60,7 +60,7 @@ func NewDepthFirstTrieSyncer(arg ArgTrieSyncer) (*depthFirstTrieSyncer, error) { timeoutHandler: arg.TimeoutHandler, maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, checkNodesOnDisk: arg.CheckNodesOnDisk, - accLeavesChannels: arg.AccLeavesChannels, + accLeavesChan: arg.AccLeavesChan, } return d, nil @@ -254,7 +254,7 @@ func (d *depthFirstTrieSyncer) storeTrieNode(element node) error { d.trieSyncStatistics.AddNumBytesReceived(uint64(numBytes)) d.updateStats(uint64(numBytes), element) - writeLeafNodeToChan(element, d.accLeavesChannels.LeavesChan) + writeLeafNodeToChan(element, d.accLeavesChan) return nil } diff --git a/trie/doubleListSync.go b/trie/doubleListSync.go index 315f1e966a0..88e9717e689 100644 --- a/trie/doubleListSync.go +++ b/trie/doubleListSync.go @@ -44,7 +44,7 @@ type doubleListTrieSyncer struct { existingNodes map[string]node missingHashes map[string]struct{} requestedHashes map[string]*request - accLeavesChannels *common.TrieIteratorChannels + leavesChan chan core.KeyValueHolder } // NewDoubleListTrieSyncer creates a new instance of trieSyncer that uses 2 list for keeping the "margin" nodes. @@ -75,7 +75,7 @@ func NewDoubleListTrieSyncer(arg ArgTrieSyncer) (*doubleListTrieSyncer, error) { timeoutHandler: arg.TimeoutHandler, maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, checkNodesOnDisk: arg.CheckNodesOnDisk, - accLeavesChannels: arg.AccLeavesChannels, + leavesChan: arg.AccLeavesChan, } return d, nil @@ -210,7 +210,7 @@ func (d *doubleListTrieSyncer) processExistingNodes() error { return err } - writeLeafNodeToChan(element, d.accLeavesChannels.LeavesChan) + writeLeafNodeToChan(element, d.leavesChan) d.timeoutHandler.ResetWatchdog() diff --git a/trie/sync.go b/trie/sync.go index bfff5ad8bb7..23f12ae9916 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -42,7 +42,7 @@ type trieSyncer struct { trieSyncStatistics data.SyncStatisticsHandler timeoutHandler TimeoutHandler maxHardCapForMissingNodes int - accLeavesChannels *common.TrieIteratorChannels + leavesChan chan core.KeyValueHolder } const maxNewMissingAddedPerTurn = 10 @@ -60,7 +60,7 @@ type ArgTrieSyncer struct { MaxHardCapForMissingNodes int CheckNodesOnDisk bool TimeoutHandler TimeoutHandler - AccLeavesChannels *common.TrieIteratorChannels + AccLeavesChan chan core.KeyValueHolder } // NewTrieSyncer creates a new instance of trieSyncer @@ -89,7 +89,7 @@ func NewTrieSyncer(arg ArgTrieSyncer) (*trieSyncer, error) { trieSyncStatistics: arg.TrieSyncStatistics, timeoutHandler: arg.TimeoutHandler, maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, - accLeavesChannels: arg.AccLeavesChannels, + leavesChan: arg.AccLeavesChan, } return ts, nil @@ -123,9 +123,6 @@ func checkArguments(arg ArgTrieSyncer) error { if arg.MaxHardCapForMissingNodes < 1 { return fmt.Errorf("%w provided: %v", ErrInvalidMaxHardCapForMissingNodes, arg.MaxHardCapForMissingNodes) } - if arg.AccLeavesChannels == nil { - return ErrNilTrieIteratorChannels - } return nil } @@ -253,7 +250,7 @@ func (ts *trieSyncer) checkIfSynced() (bool, error) { return false, err } - writeLeafNodeToChan(currentNode, ts.accLeavesChannels.LeavesChan) + writeLeafNodeToChan(currentNode, ts.leavesChan) ts.timeoutHandler.ResetWatchdog() From a714373fbde63ea079cd77c4684882248a606c6f Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 17 May 2023 16:28:36 +0300 Subject: [PATCH 19/21] fix trie sync tests --- trie/depthFirstSync_test.go | 8 ++------ trie/doubleListSync_test.go | 7 +------ trie/sync_test.go | 26 ++------------------------ 3 files changed, 5 insertions(+), 36 deletions(-) diff --git a/trie/depthFirstSync_test.go b/trie/depthFirstSync_test.go index 42227dea6f9..faf577ca851 100644 --- a/trie/depthFirstSync_test.go +++ b/trie/depthFirstSync_test.go @@ -11,7 +11,6 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-go/common" - "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/errors" "github.com/multiversx/mx-chain-go/storage" "github.com/stretchr/testify/assert" @@ -111,10 +110,7 @@ func TestDepthFirstTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { arg := createMockArgument(time.Minute) arg.RequestHandler = createRequesterResolver(trSource, arg.InterceptedNodes, nil) - arg.AccLeavesChannels = &common.TrieIteratorChannels{ - LeavesChan: make(chan core.KeyValueHolder, 110), - ErrChan: errChan.NewErrChanWrapper(), - } + arg.AccLeavesChan = make(chan core.KeyValueHolder, 110) d, _ := NewDepthFirstTrieSyncer(arg) ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30) @@ -147,7 +143,7 @@ func TestDepthFirstTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { numLeavesOnChan := 0 go func() { - for range arg.AccLeavesChannels.LeavesChan { + for range arg.AccLeavesChan { numLeavesOnChan++ wg.Done() } diff --git a/trie/doubleListSync_test.go b/trie/doubleListSync_test.go index 577d2ef0269..9aa2a2b449d 100644 --- a/trie/doubleListSync_test.go +++ b/trie/doubleListSync_test.go @@ -11,7 +11,6 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-go/common" - "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/errors" "github.com/multiversx/mx-chain-go/storage" @@ -216,10 +215,6 @@ func TestDoubleListTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { arg := createMockArgument(time.Minute) arg.RequestHandler = createRequesterResolver(trSource, arg.InterceptedNodes, nil) - arg.AccLeavesChannels = &common.TrieIteratorChannels{ - LeavesChan: make(chan core.KeyValueHolder, 110), - ErrChan: errChan.NewErrChanWrapper(), - } d, _ := NewDoubleListTrieSyncer(arg) ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30) @@ -252,7 +247,7 @@ func TestDoubleListTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { numLeavesOnChan := 0 go func() { - for range arg.AccLeavesChannels.LeavesChan { + for range arg.AccLeavesChan { numLeavesOnChan++ wg.Done() } diff --git a/trie/sync_test.go b/trie/sync_test.go index aada347e4f6..a016a571609 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -10,8 +10,6 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data" - "github.com/multiversx/mx-chain-go/common" - "github.com/multiversx/mx-chain-go/common/errChan" "github.com/multiversx/mx-chain-go/testscommon" "github.com/multiversx/mx-chain-go/testscommon/hashingMocks" trieMock "github.com/multiversx/mx-chain-go/testscommon/trie" @@ -30,11 +28,6 @@ func createMockArgument(timeout time.Duration) ArgTrieSyncer { }, } - leavesChannels := &common.TrieIteratorChannels{ - LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity), - ErrChan: errChan.NewErrChanWrapper(), - } - return ArgTrieSyncer{ RequestHandler: &testscommon.RequestHandlerStub{}, InterceptedNodes: testscommon.NewCacherMock(), @@ -46,7 +39,7 @@ func createMockArgument(timeout time.Duration) ArgTrieSyncer { TrieSyncStatistics: statistics.NewTrieSyncStatistics(), TimeoutHandler: testscommon.NewTimeoutHandlerMock(timeout), MaxHardCapForMissingNodes: 500, - AccLeavesChannels: leavesChannels, + AccLeavesChan: make(chan core.KeyValueHolder, 100), } } @@ -152,17 +145,6 @@ func TestNewTrieSyncer(t *testing.T) { assert.True(t, errors.Is(err, ErrInvalidMaxHardCapForMissingNodes)) }) - t.Run("nil accounts leaves channels", func(t *testing.T) { - t.Parallel() - - arg := createMockArgument(time.Minute) - arg.AccLeavesChannels = nil - - ts, err := NewTrieSyncer(arg) - assert.True(t, check.IfNil(ts)) - assert.True(t, errors.Is(err, ErrNilTrieIteratorChannels)) - }) - t.Run("should work", func(t *testing.T) { t.Parallel() @@ -255,10 +237,6 @@ func TestTrieSync_FoundInStorageShouldNotRequest(t *testing.T) { arg.DB = trieStorage arg.Marshalizer = testMarshalizer arg.Hasher = testHasher - arg.AccLeavesChannels = &common.TrieIteratorChannels{ - LeavesChan: make(chan core.KeyValueHolder, 110), - ErrChan: errChan.NewErrChanWrapper(), - } ts, err := NewTrieSyncer(arg) require.Nil(t, err) @@ -271,7 +249,7 @@ func TestTrieSync_FoundInStorageShouldNotRequest(t *testing.T) { numLeavesOnChan := 0 go func() { - for range arg.AccLeavesChannels.LeavesChan { + for range arg.AccLeavesChan { numLeavesOnChan++ wg.Done() } From 096e95268734190dd693f9f05dd5937c12ae2876 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 19 May 2023 14:12:00 +0300 Subject: [PATCH 20/21] fixes after review: renamings and small optimizations --- common/channels.go | 4 ++-- state/syncer/baseAccountsSyncer.go | 4 ++-- state/syncer/userAccountsSyncer.go | 4 ++-- trie/depthFirstSync.go | 6 +++--- trie/depthFirstSync_test.go | 4 ++-- trie/doubleListSync.go | 2 +- trie/doubleListSync_test.go | 2 +- trie/sync.go | 16 +++++++++++----- trie/sync_test.go | 4 ++-- trie/trieStorageManager.go | 18 +++++++++--------- trie/trieStorageManagerWithoutCheckpoints.go | 2 +- trie/trieStorageManagerWithoutSnapshot.go | 2 +- 12 files changed, 37 insertions(+), 31 deletions(-) diff --git a/common/channels.go b/common/channels.go index cade1183496..ba240d76b7b 100644 --- a/common/channels.go +++ b/common/channels.go @@ -10,8 +10,8 @@ func GetClosedUnbufferedChannel() chan struct{} { return ch } -// SafelyCloseKeyValueHolderChan will close the channel if not nil -func SafelyCloseKeyValueHolderChan(ch chan core.KeyValueHolder) { +// CloseKeyValueHolderChan will close the channel if not nil +func CloseKeyValueHolderChan(ch chan core.KeyValueHolder) { if ch != nil { close(ch) } diff --git a/state/syncer/baseAccountsSyncer.go b/state/syncer/baseAccountsSyncer.go index 5956fd62976..af0ef1fb456 100644 --- a/state/syncer/baseAccountsSyncer.go +++ b/state/syncer/baseAccountsSyncer.go @@ -92,7 +92,7 @@ func (b *baseAccountsSyncer) syncMainTrie( rootHash []byte, trieTopic string, ctx context.Context, - accLeavesChan chan core.KeyValueHolder, + leavesChan chan core.KeyValueHolder, ) error { atomic.AddInt32(&b.numMaxTries, 1) @@ -111,7 +111,7 @@ func (b *baseAccountsSyncer) syncMainTrie( TimeoutHandler: b.timeoutHandler, MaxHardCapForMissingNodes: b.maxHardCapForMissingNodes, CheckNodesOnDisk: b.checkNodesOnDisk, - AccLeavesChan: accLeavesChan, + LeavesChan: leavesChan, } trieSyncer, err := trie.CreateTrieSyncer(arg, b.trieSyncerVersion) if err != nil { diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 7f79b53a65c..34d3d8a8638 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -131,7 +131,7 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error { leavesChannels.ErrChan.WriteInChanNonBlocking(err) } - common.SafelyCloseKeyValueHolderChan(leavesChannels.LeavesChan) + common.CloseKeyValueHolderChan(leavesChannels.LeavesChan) wgSyncMainTrie.Done() }() @@ -178,7 +178,7 @@ func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx c TimeoutHandler: u.timeoutHandler, MaxHardCapForMissingNodes: u.maxHardCapForMissingNodes, CheckNodesOnDisk: u.checkNodesOnDisk, - AccLeavesChan: nil, // not used for data tries + LeavesChan: nil, // not used for data tries } trieSyncer, err := trie.CreateTrieSyncer(arg, u.trieSyncerVersion) if err != nil { diff --git a/trie/depthFirstSync.go b/trie/depthFirstSync.go index 8924bc29f1a..2af9bbb5e72 100644 --- a/trie/depthFirstSync.go +++ b/trie/depthFirstSync.go @@ -32,7 +32,7 @@ type depthFirstTrieSyncer struct { checkNodesOnDisk bool nodes *trieNodesHandler requestedHashes map[string]*request - accLeavesChan chan core.KeyValueHolder + leavesChan chan core.KeyValueHolder } // NewDepthFirstTrieSyncer creates a new instance of trieSyncer that uses the depth-first algorithm @@ -60,7 +60,7 @@ func NewDepthFirstTrieSyncer(arg ArgTrieSyncer) (*depthFirstTrieSyncer, error) { timeoutHandler: arg.TimeoutHandler, maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, checkNodesOnDisk: arg.CheckNodesOnDisk, - accLeavesChan: arg.AccLeavesChan, + leavesChan: arg.LeavesChan, } return d, nil @@ -254,7 +254,7 @@ func (d *depthFirstTrieSyncer) storeTrieNode(element node) error { d.trieSyncStatistics.AddNumBytesReceived(uint64(numBytes)) d.updateStats(uint64(numBytes), element) - writeLeafNodeToChan(element, d.accLeavesChan) + writeLeafNodeToChan(element, d.leavesChan) return nil } diff --git a/trie/depthFirstSync_test.go b/trie/depthFirstSync_test.go index faf577ca851..4fc6d9194aa 100644 --- a/trie/depthFirstSync_test.go +++ b/trie/depthFirstSync_test.go @@ -110,7 +110,7 @@ func TestDepthFirstTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { arg := createMockArgument(time.Minute) arg.RequestHandler = createRequesterResolver(trSource, arg.InterceptedNodes, nil) - arg.AccLeavesChan = make(chan core.KeyValueHolder, 110) + arg.LeavesChan = make(chan core.KeyValueHolder, 110) d, _ := NewDepthFirstTrieSyncer(arg) ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30) @@ -143,7 +143,7 @@ func TestDepthFirstTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { numLeavesOnChan := 0 go func() { - for range arg.AccLeavesChan { + for range arg.LeavesChan { numLeavesOnChan++ wg.Done() } diff --git a/trie/doubleListSync.go b/trie/doubleListSync.go index 88e9717e689..cfd7120e7f8 100644 --- a/trie/doubleListSync.go +++ b/trie/doubleListSync.go @@ -75,7 +75,7 @@ func NewDoubleListTrieSyncer(arg ArgTrieSyncer) (*doubleListTrieSyncer, error) { timeoutHandler: arg.TimeoutHandler, maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, checkNodesOnDisk: arg.CheckNodesOnDisk, - leavesChan: arg.AccLeavesChan, + leavesChan: arg.LeavesChan, } return d, nil diff --git a/trie/doubleListSync_test.go b/trie/doubleListSync_test.go index 9aa2a2b449d..a519db35d2e 100644 --- a/trie/doubleListSync_test.go +++ b/trie/doubleListSync_test.go @@ -247,7 +247,7 @@ func TestDoubleListTrieSyncer_StartSyncingNewTrieShouldWork(t *testing.T) { numLeavesOnChan := 0 go func() { - for range arg.AccLeavesChan { + for range arg.LeavesChan { numLeavesOnChan++ wg.Done() } diff --git a/trie/sync.go b/trie/sync.go index 23f12ae9916..5acd55c6b44 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -60,7 +60,7 @@ type ArgTrieSyncer struct { MaxHardCapForMissingNodes int CheckNodesOnDisk bool TimeoutHandler TimeoutHandler - AccLeavesChan chan core.KeyValueHolder + LeavesChan chan core.KeyValueHolder } // NewTrieSyncer creates a new instance of trieSyncer @@ -89,7 +89,7 @@ func NewTrieSyncer(arg ArgTrieSyncer) (*trieSyncer, error) { trieSyncStatistics: arg.TrieSyncStatistics, timeoutHandler: arg.TimeoutHandler, maxHardCapForMissingNodes: arg.MaxHardCapForMissingNodes, - leavesChan: arg.AccLeavesChan, + leavesChan: arg.LeavesChan, } return ts, nil @@ -372,11 +372,17 @@ func trieNode( } func writeLeafNodeToChan(element node, ch chan core.KeyValueHolder) { + if ch == nil { + return + } + leafNodeElement, isLeaf := element.(*leafNode) - if isLeaf && ch != nil { - trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) - ch <- trieLeaf + if !isLeaf { + return } + + trieLeaf := keyValStorage.NewKeyValStorage(leafNodeElement.Key, leafNodeElement.Value) + ch <- trieLeaf } func (ts *trieSyncer) requestNodes() uint32 { diff --git a/trie/sync_test.go b/trie/sync_test.go index a016a571609..3b783f90c11 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -39,7 +39,7 @@ func createMockArgument(timeout time.Duration) ArgTrieSyncer { TrieSyncStatistics: statistics.NewTrieSyncStatistics(), TimeoutHandler: testscommon.NewTimeoutHandlerMock(timeout), MaxHardCapForMissingNodes: 500, - AccLeavesChan: make(chan core.KeyValueHolder, 100), + LeavesChan: make(chan core.KeyValueHolder, 100), } } @@ -249,7 +249,7 @@ func TestTrieSync_FoundInStorageShouldNotRequest(t *testing.T) { numLeavesOnChan := 0 go func() { - for range arg.AccLeavesChan { + for range arg.LeavesChan { numLeavesOnChan++ wg.Done() } diff --git a/trie/trieStorageManager.go b/trie/trieStorageManager.go index 56048f1665b..99fa6895bb7 100644 --- a/trie/trieStorageManager.go +++ b/trie/trieStorageManager.go @@ -330,19 +330,19 @@ func (tsm *trieStorageManager) TakeSnapshot( ) { if iteratorChannels.ErrChan == nil { log.Error("programming error in trieStorageManager.TakeSnapshot, cannot take snapshot because errChan is nil") - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } if tsm.IsClosed() { - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } if bytes.Equal(rootHash, common.EmptyTrieHash) { log.Trace("should not snapshot an empty trie") - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } @@ -363,7 +363,7 @@ func (tsm *trieStorageManager) TakeSnapshot( case tsm.snapshotReq <- snapshotEntry: case <-tsm.closer.ChanClose(): tsm.ExitPruningBufferingMode() - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() } } @@ -380,19 +380,19 @@ func (tsm *trieStorageManager) SetCheckpoint( ) { if iteratorChannels.ErrChan == nil { log.Error("programming error in trieStorageManager.SetCheckpoint, cannot set checkpoint because errChan is nil") - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } if tsm.IsClosed() { - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } if bytes.Equal(rootHash, common.EmptyTrieHash) { log.Trace("should not set checkpoint for empty trie") - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() return } @@ -410,7 +410,7 @@ func (tsm *trieStorageManager) SetCheckpoint( case tsm.checkpointReq <- checkpointEntry: case <-tsm.closer.ChanClose(): tsm.ExitPruningBufferingMode() - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) stats.SnapshotFinished() } } @@ -418,7 +418,7 @@ func (tsm *trieStorageManager) SetCheckpoint( func (tsm *trieStorageManager) finishOperation(snapshotEntry *snapshotsQueueEntry, message string) { tsm.ExitPruningBufferingMode() log.Trace(message, "rootHash", snapshotEntry.rootHash) - common.SafelyCloseKeyValueHolderChan(snapshotEntry.iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(snapshotEntry.iteratorChannels.LeavesChan) snapshotEntry.stats.SnapshotFinished() } diff --git a/trie/trieStorageManagerWithoutCheckpoints.go b/trie/trieStorageManagerWithoutCheckpoints.go index 1aceb707ec7..975a9a10111 100644 --- a/trie/trieStorageManagerWithoutCheckpoints.go +++ b/trie/trieStorageManagerWithoutCheckpoints.go @@ -30,7 +30,7 @@ func (tsm *trieStorageManagerWithoutCheckpoints) SetCheckpoint( stats common.SnapshotStatisticsHandler, ) { if iteratorChannels != nil { - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) } stats.SnapshotFinished() diff --git a/trie/trieStorageManagerWithoutSnapshot.go b/trie/trieStorageManagerWithoutSnapshot.go index 7ac0348e0cf..7e538eaf184 100644 --- a/trie/trieStorageManagerWithoutSnapshot.go +++ b/trie/trieStorageManagerWithoutSnapshot.go @@ -38,7 +38,7 @@ func (tsm *trieStorageManagerWithoutSnapshot) PutInEpochWithoutCache(key []byte, // TakeSnapshot does nothing, as snapshots are disabled for this implementation func (tsm *trieStorageManagerWithoutSnapshot) TakeSnapshot(_ string, _ []byte, _ []byte, iteratorChannels *common.TrieIteratorChannels, _ chan []byte, stats common.SnapshotStatisticsHandler, _ uint32) { if iteratorChannels != nil { - common.SafelyCloseKeyValueHolderChan(iteratorChannels.LeavesChan) + common.CloseKeyValueHolderChan(iteratorChannels.LeavesChan) } stats.SnapshotFinished() } From 64cb479ba7daf295afc340217b7689e060b13725 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 22 May 2023 14:19:08 +0300 Subject: [PATCH 21/21] remove doubled error log --- state/syncer/userAccountsSyncer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/state/syncer/userAccountsSyncer.go b/state/syncer/userAccountsSyncer.go index 34d3d8a8638..fec67fe8fc6 100644 --- a/state/syncer/userAccountsSyncer.go +++ b/state/syncer/userAccountsSyncer.go @@ -258,7 +258,6 @@ func (u *userAccountsSyncer) syncAccountDataTries( log.Trace("sync data trie", "roothash", trieRootHash) err := u.syncDataTrie(trieRootHash, address, ctx) if err != nil { - log.Error("sync data trie: error found", "roothash", trieRootHash, "error", err.Error()) leavesChannels.ErrChan.WriteInChanNonBlocking(err) } atomic.AddInt32(&u.numTriesSynced, 1)