Skip to content

Commit

Permalink
Merge pull request #5226 from multiversx/trie-sync-improvements
Browse files Browse the repository at this point in the history
Trie sync optimizations
  • Loading branch information
ssd04 authored May 22, 2023
2 parents 5a1414d + 64cb479 commit 90977a3
Show file tree
Hide file tree
Showing 16 changed files with 273 additions and 183 deletions.
9 changes: 9 additions & 0 deletions common/channels.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package common

import "github.com/multiversx/mx-chain-core-go/core"

// GetClosedUnbufferedChannel returns an instance of a 'chan struct{}' that is already closed
func GetClosedUnbufferedChannel() chan struct{} {
ch := make(chan struct{})
close(ch)

return ch
}

// CloseKeyValueHolderChan will close the channel if not nil
func CloseKeyValueHolderChan(ch chan core.KeyValueHolder) {
if ch != nil {
close(ch)
}
}
4 changes: 4 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,10 @@ const (
// TrieLeavesChannelDefaultCapacity represents the default value to be used as capacity for getting all trie leaves on
// a channel
TrieLeavesChannelDefaultCapacity = 100

// TrieLeavesChannelSyncCapacity represents the value to be used as capacity for getting main trie
// leaf nodes for trie sync
TrieLeavesChannelSyncCapacity = 1000
)

// ApiOutputFormat represents the format type returned by api
Expand Down
16 changes: 6 additions & 10 deletions state/syncer/baseAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type baseAccountsSyncer struct {
timeoutHandler trie.TimeoutHandler
shardId uint32
cacher storage.Cacher
rootHash []byte
maxTrieLevelInMemory uint
name string
maxHardCapForMissingNodes int
Expand Down Expand Up @@ -93,15 +92,11 @@ func (b *baseAccountsSyncer) syncMainTrie(
rootHash []byte,
trieTopic string,
ctx context.Context,
) (common.Trie, error) {
b.rootHash = rootHash
leavesChan chan core.KeyValueHolder,
) error {
atomic.AddInt32(&b.numMaxTries, 1)

log.Trace("syncing main trie", "roothash", rootHash)
dataTrie, err := trie.NewTrie(b.trieStorageManager, b.marshalizer, b.hasher, b.maxTrieLevelInMemory)
if err != nil {
return nil, err
}

b.dataTries[string(rootHash)] = struct{}{}
arg := trie.ArgTrieSyncer{
Expand All @@ -116,22 +111,23 @@ func (b *baseAccountsSyncer) syncMainTrie(
TimeoutHandler: b.timeoutHandler,
MaxHardCapForMissingNodes: b.maxHardCapForMissingNodes,
CheckNodesOnDisk: b.checkNodesOnDisk,
LeavesChan: leavesChan,
}
trieSyncer, err := trie.CreateTrieSyncer(arg, b.trieSyncerVersion)
if err != nil {
return nil, err
return err
}

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return nil, err
return err
}

atomic.AddInt32(&b.numTriesSynced, 1)

log.Trace("finished syncing main trie", "roothash", rootHash)

return dataTrie.Recreate(rootHash)
return nil
}

func (b *baseAccountsSyncer) printStatisticsAndUpdateMetrics(ctx context.Context) {
Expand Down
4 changes: 2 additions & 2 deletions state/syncer/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func CheckBaseAccountsSyncerArgs(args ArgsNewBaseAccountsSyncer) error {

// SyncAccountDataTries -
func (u *userAccountsSyncer) SyncAccountDataTries(
mainTrie common.Trie,
leavesChannels *common.TrieIteratorChannels,
ctx context.Context,
) error {
return u.syncAccountDataTries(mainTrie, ctx)
return u.syncAccountDataTries(leavesChannels, ctx)
}
77 changes: 37 additions & 40 deletions state/syncer/userAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/trie"
"github.com/multiversx/mx-chain-go/trie/keyBuilder"
logger "github.com/multiversx/mx-chain-logger-go"
)

Expand Down Expand Up @@ -83,7 +82,6 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,
timeoutHandler: timeoutHandler,
shardId: args.ShardId,
cacher: args.Cacher,
rootHash: nil,
maxTrieLevelInMemory: args.MaxTrieLevelInMemory,
name: fmt.Sprintf("user accounts for shard %s", core.GetShardIDString(args.ShardId)),
maxHardCapForMissingNodes: args.MaxHardCapForMissingNodes,
Expand Down Expand Up @@ -119,23 +117,40 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error {

go u.printStatisticsAndUpdateMetrics(ctx)

mainTrie, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx)
if err != nil {
return err
leavesChannels := &common.TrieIteratorChannels{
LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelSyncCapacity),
ErrChan: errChan.NewErrChanWrapper(),
}

defer func() {
_ = mainTrie.Close()
wgSyncMainTrie := &sync.WaitGroup{}
wgSyncMainTrie.Add(1)

go func() {
err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels.LeavesChan)
if err != nil {
leavesChannels.ErrChan.WriteInChanNonBlocking(err)
}

common.CloseKeyValueHolderChan(leavesChannels.LeavesChan)

wgSyncMainTrie.Done()
}()

log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries))
err := u.syncAccountDataTries(leavesChannels, ctx)
if err != nil {
return err
}

wgSyncMainTrie.Wait()

err = u.syncAccountDataTries(mainTrie, ctx)
err = leavesChannels.ErrChan.ReadFromChanNonBlocking()
if err != nil {
return err
}

u.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager())
u.storageMarker.MarkStorerAsSyncedAndActive(u.trieStorageManager)

log.Debug("main trie and data tries synced", "main trie root hash", rootHash, "num data tries", len(u.dataTries))

return nil
}
Expand Down Expand Up @@ -163,6 +178,7 @@ func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, address []byte, ctx c
TimeoutHandler: u.timeoutHandler,
MaxHardCapForMissingNodes: u.maxHardCapForMissingNodes,
CheckNodesOnDisk: u.checkNodesOnDisk,
LeavesChan: nil, // not used for data tries
}
trieSyncer, err := trie.CreateTrieSyncer(arg, u.trieSyncerVersion)
if err != nil {
Expand Down Expand Up @@ -202,40 +218,28 @@ func (u *userAccountsSyncer) updateDataTrieStatistics(trieSyncer trie.TrieSyncer
}

func (u *userAccountsSyncer) syncAccountDataTries(
mainTrie common.Trie,
leavesChannels *common.TrieIteratorChannels,
ctx context.Context,
) error {
defer u.printDataTrieStatistics()

mainRootHash, err := mainTrie.RootHash()
if err != nil {
return err
if leavesChannels == nil {
return trie.ErrNilTrieIteratorChannels
}

leavesChannels := &common.TrieIteratorChannels{
LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity),
ErrChan: errChan.NewErrChanWrapper(),
}
err = mainTrie.GetAllLeavesOnChannel(leavesChannels, context.Background(), mainRootHash, keyBuilder.NewDisabledKeyBuilder())
if err != nil {
return err
}
defer u.printDataTrieStatistics()

var errFound error
errMutex := sync.Mutex{}
wg := sync.WaitGroup{}

for leaf := range leavesChannels.LeavesChan {
u.resetTimeoutHandlerWatchdog()

account := state.NewEmptyUserAccount()
err = u.marshalizer.Unmarshal(account, leaf.Value())
err := u.marshalizer.Unmarshal(account, leaf.Value())
if err != nil {
log.Trace("this must be a leaf with code", "err", err)
log.Trace("this must be a leaf with code", "leaf key", leaf.Key(), "err", err)
continue
}

if len(account.RootHash) == 0 {
if common.IsEmptyTrie(account.RootHash) {
continue
}

Expand All @@ -252,11 +256,9 @@ func (u *userAccountsSyncer) syncAccountDataTries(
defer u.throttler.EndProcessing()

log.Trace("sync data trie", "roothash", trieRootHash)
newErr := u.syncDataTrie(trieRootHash, address, ctx)
if newErr != nil {
errMutex.Lock()
errFound = newErr
errMutex.Unlock()
err := u.syncDataTrie(trieRootHash, address, ctx)
if err != nil {
leavesChannels.ErrChan.WriteInChanNonBlocking(err)
}
atomic.AddInt32(&u.numTriesSynced, 1)
log.Trace("finished sync data trie", "roothash", trieRootHash)
Expand All @@ -266,12 +268,7 @@ func (u *userAccountsSyncer) syncAccountDataTries(

wg.Wait()

err = leavesChannels.ErrChan.ReadFromChanNonBlocking()
if err != nil {
return err
}

return errFound
return nil
}

func (u *userAccountsSyncer) printDataTrieStatistics() {
Expand Down
64 changes: 30 additions & 34 deletions state/syncer/userAccountsSyncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import (
"testing"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/hashing"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/api/mock"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/syncer"
"github.com/multiversx/mx-chain-go/testscommon"
trieMocks "github.com/multiversx/mx-chain-go/testscommon/trie"
"github.com/multiversx/mx-chain-go/trie"
"github.com/multiversx/mx-chain-go/trie/hashesHolder"
"github.com/multiversx/mx-chain-go/trie/keyBuilder"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -171,43 +173,15 @@ func emptyTrie() common.Trie {
func TestUserAccountsSyncer_SyncAccountDataTries(t *testing.T) {
t.Parallel()

t.Run("failed to get trie root hash", func(t *testing.T) {
t.Run("nil leaves chan should fail", func(t *testing.T) {
t.Parallel()

expectedErr := errors.New("expected err")
tr := &trieMocks.TrieStub{
RootCalled: func() ([]byte, error) {
return nil, expectedErr
},
}

args := getDefaultUserAccountsSyncerArgs()
s, err := syncer.NewUserAccountsSyncer(args)
require.Nil(t, err)

err = s.SyncAccountDataTries(tr, context.TODO())
require.Equal(t, expectedErr, err)
})

t.Run("failed to get all leaves on channel", func(t *testing.T) {
t.Parallel()

expectedErr := errors.New("expected err")
tr := &trieMocks.TrieStub{
RootCalled: func() ([]byte, error) {
return []byte("rootHash"), nil
},
GetAllLeavesOnChannelCalled: func(leavesChannels *common.TrieIteratorChannels, ctx context.Context, rootHash []byte, keyBuilder common.KeyBuilder) error {
return expectedErr
},
}

args := getDefaultUserAccountsSyncerArgs()
s, err := syncer.NewUserAccountsSyncer(args)
require.Nil(t, err)

err = s.SyncAccountDataTries(tr, context.TODO())
require.Equal(t, expectedErr, err)
err = s.SyncAccountDataTries(nil, context.TODO())
require.Equal(t, trie.ErrNilTrieIteratorChannels, err)
})

t.Run("throttler cannot process and closed context should fail", func(t *testing.T) {
Expand Down Expand Up @@ -254,10 +228,21 @@ func TestUserAccountsSyncer_SyncAccountDataTries(t *testing.T) {
_ = tr.Update([]byte("ddog"), accountBytes)
_ = tr.Commit()

leavesChannels := &common.TrieIteratorChannels{
LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity),
ErrChan: errChan.NewErrChanWrapper(),
}

rootHash, err := tr.RootHash()
require.Nil(t, err)

err = tr.GetAllLeavesOnChannel(leavesChannels, context.TODO(), rootHash, keyBuilder.NewDisabledKeyBuilder())
require.Nil(t, err)

ctx, cancel := context.WithCancel(context.TODO())
cancel()

err = s.SyncAccountDataTries(tr, ctx)
err = s.SyncAccountDataTries(leavesChannels, ctx)
require.Equal(t, data.ErrTimeIsOut, err)
})

Expand Down Expand Up @@ -300,7 +285,18 @@ func TestUserAccountsSyncer_SyncAccountDataTries(t *testing.T) {
_ = tr.Update([]byte("ddog"), accountBytes)
_ = tr.Commit()

err = s.SyncAccountDataTries(tr, context.TODO())
leavesChannels := &common.TrieIteratorChannels{
LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity),
ErrChan: errChan.NewErrChanWrapper(),
}

rootHash, err := tr.RootHash()
require.Nil(t, err)

err = tr.GetAllLeavesOnChannel(leavesChannels, context.TODO(), rootHash, keyBuilder.NewDisabledKeyBuilder())
require.Nil(t, err)

err = s.SyncAccountDataTries(leavesChannels, context.TODO())
require.Nil(t, err)
})
}
Expand Down
10 changes: 7 additions & 3 deletions state/syncer/validatorAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func NewValidatorAccountsSyncer(args ArgsNewValidatorAccountsSyncer) (*validator
timeoutHandler: timeoutHandler,
shardId: core.MetachainShardId,
cacher: args.Cacher,
rootHash: nil,
maxTrieLevelInMemory: args.MaxTrieLevelInMemory,
name: "peer accounts",
maxHardCapForMissingNodes: args.MaxHardCapForMissingNodes,
Expand Down Expand Up @@ -75,12 +74,17 @@ func (v *validatorAccountsSyncer) SyncAccounts(rootHash []byte) error {

go v.printStatisticsAndUpdateMetrics(ctx)

mainTrie, err := v.syncMainTrie(rootHash, factory.ValidatorTrieNodesTopic, ctx)
err := v.syncMainTrie(
rootHash,
factory.ValidatorTrieNodesTopic,
ctx,
nil, // not used for validator accounts syncer
)
if err != nil {
return err
}

v.storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager())
v.storageMarker.MarkStorerAsSyncedAndActive(v.trieStorageManager)

return nil
}
Expand Down
Loading

0 comments on commit 90977a3

Please sign in to comment.