From b8674cadc6d3bfefcc30a11ec28842044cab4fdd Mon Sep 17 00:00:00 2001 From: Adrian Dobrita Date: Tue, 21 May 2024 16:21:06 +0300 Subject: [PATCH 1/6] add system vm critical section --- vm/process/systemVM.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/vm/process/systemVM.go b/vm/process/systemVM.go index 6a3452304fa..90228e4adaa 100644 --- a/vm/process/systemVM.go +++ b/vm/process/systemVM.go @@ -6,9 +6,10 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" + vmcommon "github.com/multiversx/mx-chain-vm-common-go" + "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/vm" - vmcommon "github.com/multiversx/mx-chain-vm-common-go" ) type systemVM struct { @@ -18,6 +19,7 @@ type systemVM struct { asyncCallbackGasLock uint64 asyncCallStepCost uint64 mutGasLock sync.RWMutex + criticalSection sync.Mutex } // ArgsNewSystemVM defines the needed arguments to create a new system vm @@ -101,6 +103,9 @@ func (s *systemVM) RunSmartContractCreate(input *vmcommon.ContractCreateInput) ( // RunSmartContractCall executes a smart contract according to the input func (s *systemVM) RunSmartContractCall(input *vmcommon.ContractCallInput) (*vmcommon.VMOutput, error) { + s.criticalSection.Lock() + defer s.criticalSection.Unlock() + s.systemEI.CleanCache() s.systemEI.SetSCAddress(input.RecipientAddr) s.systemEI.AddTxValueToSmartContract(input.CallValue, input.RecipientAddr) From 8369e8d613d01c47dbf60f3042db61f0a35bcf4d Mon Sep 17 00:00:00 2001 From: Adrian Dobrita Date: Tue, 21 May 2024 16:28:13 +0300 Subject: [PATCH 2/6] add system vm critical section also on contract create --- vm/process/systemVM.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vm/process/systemVM.go b/vm/process/systemVM.go index 90228e4adaa..ca0553cb714 100644 --- a/vm/process/systemVM.go +++ b/vm/process/systemVM.go @@ -70,6 +70,9 @@ func NewSystemVM(args ArgsNewSystemVM) (*systemVM, error) { // RunSmartContractCreate creates and saves a new smart contract to the trie func (s *systemVM) RunSmartContractCreate(input *vmcommon.ContractCreateInput) (*vmcommon.VMOutput, error) { + s.criticalSection.Lock() + defer s.criticalSection.Unlock() + if input == nil { return nil, vm.ErrInputArgsIsNil } From 3177a85df3360872d7e65944e6a3743fac54b16c Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Wed, 22 May 2024 00:36:09 +0300 Subject: [PATCH 3/6] copy validator status in epoch struct --- epochStart/metachain/stakingDataProvider.go | 19 +++++- .../metachain/stakingDataProvider_test.go | 64 ++++++++++++++++--- 2 files changed, 74 insertions(+), 9 deletions(-) diff --git a/epochStart/metachain/stakingDataProvider.go b/epochStart/metachain/stakingDataProvider.go index b655fbe1b16..aa0129a6f1a 100644 --- a/epochStart/metachain/stakingDataProvider.go +++ b/epochStart/metachain/stakingDataProvider.go @@ -580,7 +580,24 @@ func (sdp *stakingDataProvider) GetCurrentEpochValidatorStats() epochStart.Valid sdp.mutStakingData.RLock() defer sdp.mutStakingData.RUnlock() - return sdp.validatorStatsInEpoch + return copyValidatorStatsInEpoch(sdp.validatorStatsInEpoch) +} + +func copyValidatorStatsInEpoch(oldInstance epochStart.ValidatorStatsInEpoch) epochStart.ValidatorStatsInEpoch { + return epochStart.ValidatorStatsInEpoch{ + Eligible: copyMap(oldInstance.Eligible), + Waiting: copyMap(oldInstance.Waiting), + Leaving: copyMap(oldInstance.Leaving), + } +} + +func copyMap(oldMap map[uint32]int) map[uint32]int { + newMap := make(map[uint32]int, len(oldMap)) + for key, value := range oldMap { + newMap[key] = value + } + + return newMap } // IsInterfaceNil return true if underlying object is nil diff --git a/epochStart/metachain/stakingDataProvider_test.go b/epochStart/metachain/stakingDataProvider_test.go index e3bfc1e6259..e11bb45801e 100644 --- a/epochStart/metachain/stakingDataProvider_test.go +++ b/epochStart/metachain/stakingDataProvider_test.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "strings" + "sync" "testing" "github.com/multiversx/mx-chain-core-go/core" @@ -465,16 +466,63 @@ func TestStakingDataProvider_PrepareStakingDataForRewards(t *testing.T) { func TestStakingDataProvider_FillValidatorInfo(t *testing.T) { t.Parallel() - owner := []byte("owner") - topUpVal := big.NewInt(828743) - basePrice := big.NewInt(100000) - stakeVal := big.NewInt(0).Add(topUpVal, basePrice) - numRunContractCalls := 0 + t.Run("should work", func(t *testing.T) { + t.Parallel() - sdp := createStakingDataProviderWithMockArgs(t, owner, topUpVal, stakeVal, &numRunContractCalls) + owner := []byte("owner") + topUpVal := big.NewInt(828743) + basePrice := big.NewInt(100000) + stakeVal := big.NewInt(0).Add(topUpVal, basePrice) + numRunContractCalls := 0 - err := sdp.FillValidatorInfo(&state.ValidatorInfo{PublicKey: []byte("bls key")}) - require.NoError(t, err) + sdp := createStakingDataProviderWithMockArgs(t, owner, topUpVal, stakeVal, &numRunContractCalls) + + err := sdp.FillValidatorInfo(&state.ValidatorInfo{PublicKey: []byte("bls key")}) + require.NoError(t, err) + }) + t.Run("concurrent calls should work", func(t *testing.T) { + t.Parallel() + + defer func() { + r := recover() + if r != nil { + require.Fail(t, "should have not panicked") + } + }() + + owner := []byte("owner") + topUpVal := big.NewInt(828743) + basePrice := big.NewInt(100000) + stakeVal := big.NewInt(0).Add(topUpVal, basePrice) + numRunContractCalls := 0 + + sdp := createStakingDataProviderWithMockArgs(t, owner, topUpVal, stakeVal, &numRunContractCalls) + + wg := sync.WaitGroup{} + numCalls := 100 + wg.Add(numCalls) + + for i := 0; i < numCalls; i++ { + go func(idx int) { + switch idx % 2 { + case 0: + err := sdp.FillValidatorInfo(&state.ValidatorInfo{ + PublicKey: []byte("bls key"), + List: string(common.EligibleList), + ShardId: 0, + }) + require.NoError(t, err) + case 1: + stats := sdp.GetCurrentEpochValidatorStats() + log.Info(fmt.Sprintf("%d", stats.Eligible[0])) + } + + wg.Done() + }(i) + } + + wg.Wait() + }) } func TestCheckAndFillOwnerValidatorAuctionData(t *testing.T) { From c9f1f93f1c2713331c8d7374027769bdef0cb6b7 Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Wed, 22 May 2024 10:53:05 +0300 Subject: [PATCH 4/6] further test + mutex fix --- process/peer/validatorsProvider.go | 4 ++ process/peer/validatorsProviderAuction.go | 13 ++--- process/peer/validatorsProvider_test.go | 69 +++++++++++++++++++++++ 3 files changed, 78 insertions(+), 8 deletions(-) diff --git a/process/peer/validatorsProvider.go b/process/peer/validatorsProvider.go index 7c3b8505310..8caff26430a 100644 --- a/process/peer/validatorsProvider.go +++ b/process/peer/validatorsProvider.go @@ -320,6 +320,10 @@ func shouldCombine(triePeerType common.PeerType, currentPeerType common.PeerType // ForceUpdate will trigger the update process of all caches func (vp *validatorsProvider) ForceUpdate() error { vp.updateCache() + + vp.auctionMutex.Lock() + defer vp.auctionMutex.Unlock() + return vp.updateAuctionListCache() } diff --git a/process/peer/validatorsProviderAuction.go b/process/peer/validatorsProviderAuction.go index 144ace850fb..a31a89f97e8 100644 --- a/process/peer/validatorsProviderAuction.go +++ b/process/peer/validatorsProviderAuction.go @@ -27,9 +27,10 @@ func (vp *validatorsProvider) GetAuctionList() ([]*common.AuctionListValidatorAP } func (vp *validatorsProvider) updateAuctionListCacheIfNeeded() error { - vp.auctionMutex.RLock() + vp.auctionMutex.Lock() + defer vp.auctionMutex.Unlock() + shouldUpdate := time.Since(vp.lastAuctionCacheUpdate) > vp.cacheRefreshIntervalDuration - vp.auctionMutex.RUnlock() if shouldUpdate { return vp.updateAuctionListCache() @@ -38,6 +39,7 @@ func (vp *validatorsProvider) updateAuctionListCacheIfNeeded() error { return nil } +// this func should be called under mutex protection func (vp *validatorsProvider) updateAuctionListCache() error { rootHash := vp.validatorStatistics.LastFinalizedRootHash() if len(rootHash) == 0 { @@ -49,19 +51,15 @@ func (vp *validatorsProvider) updateAuctionListCache() error { return err } - vp.auctionMutex.Lock() vp.cachedRandomness = rootHash - vp.auctionMutex.Unlock() newCache, err := vp.createValidatorsAuctionCache(validatorsMap) if err != nil { return err } - vp.auctionMutex.Lock() vp.lastAuctionCacheUpdate = time.Now() vp.cachedAuctionValidators = newCache - vp.auctionMutex.Unlock() return nil } @@ -96,10 +94,9 @@ func (vp *validatorsProvider) fillAllValidatorsInfo(validatorsMap state.ShardVal return err } +// this func should be called under mutex protection func (vp *validatorsProvider) getSelectedNodesFromAuction(validatorsMap state.ShardValidatorsInfoMapHandler) ([]state.ValidatorInfoHandler, error) { - vp.auctionMutex.RLock() randomness := vp.cachedRandomness - vp.auctionMutex.RUnlock() err := vp.auctionListSelector.SelectNodesFromAuctionList(validatorsMap, randomness) if err != nil { diff --git a/process/peer/validatorsProvider_test.go b/process/peer/validatorsProvider_test.go index 931567a2435..8bb56753660 100644 --- a/process/peer/validatorsProvider_test.go +++ b/process/peer/validatorsProvider_test.go @@ -1044,6 +1044,75 @@ func TestValidatorsProvider_GetAuctionList(t *testing.T) { require.Equal(t, expectedList, list) }) + t.Run("concurrent calls should only update cache once", func(t *testing.T) { + t.Parallel() + + args := createDefaultValidatorsProviderArg() + + args.CacheRefreshIntervalDurationInSec = time.Second * 5 + + expectedRootHash := []byte("root hash") + ctRootHashCalled := uint32(0) + ctSelectNodesFromAuctionList := uint32(0) + ctFillValidatorInfoCalled := uint32(0) + ctGetOwnersDataCalled := uint32(0) + ctComputeUnqualifiedNodes := uint32(0) + + args.ValidatorStatistics = &testscommon.ValidatorStatisticsProcessorStub{ + LastFinalizedRootHashCalled: func() []byte { + atomic.AddUint32(&ctRootHashCalled, 1) + return expectedRootHash + }, + GetValidatorInfoForRootHashCalled: func(rootHash []byte) (state.ShardValidatorsInfoMapHandler, error) { + require.Equal(t, expectedRootHash, rootHash) + return state.NewShardValidatorsInfoMap(), nil + }, + } + args.AuctionListSelector = &stakingcommon.AuctionListSelectorStub{ + SelectNodesFromAuctionListCalled: func(validatorsInfoMap state.ShardValidatorsInfoMapHandler, randomness []byte) error { + atomic.AddUint32(&ctSelectNodesFromAuctionList, 1) + require.Equal(t, expectedRootHash, randomness) + return nil + }, + } + args.StakingDataProvider = &stakingcommon.StakingDataProviderStub{ + FillValidatorInfoCalled: func(validator state.ValidatorInfoHandler) error { + atomic.AddUint32(&ctFillValidatorInfoCalled, 1) + return nil + }, + GetOwnersDataCalled: func() map[string]*epochStart.OwnerData { + atomic.AddUint32(&ctGetOwnersDataCalled, 1) + return nil + }, + ComputeUnQualifiedNodesCalled: func(validatorInfos state.ShardValidatorsInfoMapHandler) ([][]byte, map[string][][]byte, error) { + atomic.AddUint32(&ctComputeUnqualifiedNodes, 1) + return nil, nil, nil + }, + } + vp, _ := NewValidatorsProvider(args) + time.Sleep(args.CacheRefreshIntervalDurationInSec) + + numCalls := 100 + wg := sync.WaitGroup{} + wg.Add(numCalls) + + for i := 0; i < numCalls; i++ { + go func() { + list, err := vp.GetAuctionList() + require.NoError(t, err) + require.Empty(t, list) + + wg.Done() + }() + } + + wg.Wait() + + require.LessOrEqual(t, ctRootHashCalled, uint32(2)) // another call might be from constructor in startRefreshProcess.updateCache + + require.NoError(t, vp.Close()) + }) + } func createMockValidatorInfo() *state.ValidatorInfo { From 3363e309a4f4a1ea42c1cfea9f42eaa1e88aae45 Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Wed, 22 May 2024 11:07:55 +0300 Subject: [PATCH 5/6] further test + mutex fix for validatorsProvider as well --- process/peer/validatorsProvider.go | 15 +++++++------ process/peer/validatorsProvider_test.go | 28 ++++++++++++++++++------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/process/peer/validatorsProvider.go b/process/peer/validatorsProvider.go index 8caff26430a..a7aa60ea7f5 100644 --- a/process/peer/validatorsProvider.go +++ b/process/peer/validatorsProvider.go @@ -129,9 +129,10 @@ func (vp *validatorsProvider) GetLatestValidators() map[string]*validator.Valida } func (vp *validatorsProvider) updateCacheIfNeeded() { - vp.lock.RLock() + vp.lock.Lock() + defer vp.lock.Unlock() + shouldUpdate := time.Since(vp.lastCacheUpdate) > vp.cacheRefreshIntervalDuration - vp.lock.RUnlock() if shouldUpdate { vp.updateCache() @@ -192,7 +193,10 @@ func (vp *validatorsProvider) epochStartEventHandler() nodesCoordinator.EpochSta func (vp *validatorsProvider) startRefreshProcess(ctx context.Context) { for { + vp.lock.Lock() vp.updateCache() + vp.lock.Unlock() + select { case epoch := <-vp.refreshCache: vp.lock.Lock() @@ -206,6 +210,7 @@ func (vp *validatorsProvider) startRefreshProcess(ctx context.Context) { } } +// this func should be called under mutex protection func (vp *validatorsProvider) updateCache() { lastFinalizedRootHash := vp.validatorStatistics.LastFinalizedRootHash() if len(lastFinalizedRootHash) == 0 { @@ -217,16 +222,12 @@ func (vp *validatorsProvider) updateCache() { log.Trace("validatorsProvider - GetLatestValidatorInfos failed", "error", err) } - vp.lock.RLock() epoch := vp.currentEpoch - vp.lock.RUnlock() newCache := vp.createNewCache(epoch, allNodes) - vp.lock.Lock() vp.lastCacheUpdate = time.Now() vp.cache = newCache - vp.lock.Unlock() } func (vp *validatorsProvider) createNewCache( @@ -319,7 +320,9 @@ func shouldCombine(triePeerType common.PeerType, currentPeerType common.PeerType // ForceUpdate will trigger the update process of all caches func (vp *validatorsProvider) ForceUpdate() error { + vp.lock.Lock() vp.updateCache() + vp.lock.Unlock() vp.auctionMutex.Lock() defer vp.auctionMutex.Unlock() diff --git a/process/peer/validatorsProvider_test.go b/process/peer/validatorsProvider_test.go index 8bb56753660..71da53f08e7 100644 --- a/process/peer/validatorsProvider_test.go +++ b/process/peer/validatorsProvider_test.go @@ -1092,23 +1092,37 @@ func TestValidatorsProvider_GetAuctionList(t *testing.T) { vp, _ := NewValidatorsProvider(args) time.Sleep(args.CacheRefreshIntervalDurationInSec) - numCalls := 100 + numCalls := 99 wg := sync.WaitGroup{} wg.Add(numCalls) for i := 0; i < numCalls; i++ { - go func() { - list, err := vp.GetAuctionList() - require.NoError(t, err) - require.Empty(t, list) + go func(idx int) { + switch idx % 3 { + case 0: + list, err := vp.GetAuctionList() + require.NoError(t, err) + require.Empty(t, list) + case 1: + err := vp.ForceUpdate() + require.NoError(t, err) + case 2: + _ = vp.GetLatestValidators() + } wg.Done() - }() + }(i) } wg.Wait() - require.LessOrEqual(t, ctRootHashCalled, uint32(2)) // another call might be from constructor in startRefreshProcess.updateCache + // expectedMaxNumCalls is: + // - 1 from constructor + // - 1 from GetAuctionList, should not update second time + // - 1 from GetLatestValidators, should not update second time + // - 33 calls * 2 from ForceUpdate, calling it twice/call + expectedMaxNumCalls := uint32(1 + 1 + 1 + 66) + require.LessOrEqual(t, ctRootHashCalled, expectedMaxNumCalls) require.NoError(t, vp.Close()) }) From 9e45d8c5f80e18c8aae1c392dacbd47029e215b5 Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Wed, 22 May 2024 11:19:10 +0300 Subject: [PATCH 6/6] use require.NotPanics --- .../metachain/stakingDataProvider_test.go | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/epochStart/metachain/stakingDataProvider_test.go b/epochStart/metachain/stakingDataProvider_test.go index e11bb45801e..6f5ad62868d 100644 --- a/epochStart/metachain/stakingDataProvider_test.go +++ b/epochStart/metachain/stakingDataProvider_test.go @@ -483,13 +483,6 @@ func TestStakingDataProvider_FillValidatorInfo(t *testing.T) { t.Run("concurrent calls should work", func(t *testing.T) { t.Parallel() - defer func() { - r := recover() - if r != nil { - require.Fail(t, "should have not panicked") - } - }() - owner := []byte("owner") topUpVal := big.NewInt(828743) basePrice := big.NewInt(100000) @@ -502,26 +495,28 @@ func TestStakingDataProvider_FillValidatorInfo(t *testing.T) { numCalls := 100 wg.Add(numCalls) - for i := 0; i < numCalls; i++ { - go func(idx int) { - switch idx % 2 { - case 0: - err := sdp.FillValidatorInfo(&state.ValidatorInfo{ - PublicKey: []byte("bls key"), - List: string(common.EligibleList), - ShardId: 0, - }) - require.NoError(t, err) - case 1: - stats := sdp.GetCurrentEpochValidatorStats() - log.Info(fmt.Sprintf("%d", stats.Eligible[0])) - } - - wg.Done() - }(i) - } + require.NotPanics(t, func() { + for i := 0; i < numCalls; i++ { + go func(idx int) { + switch idx % 2 { + case 0: + err := sdp.FillValidatorInfo(&state.ValidatorInfo{ + PublicKey: []byte("bls key"), + List: string(common.EligibleList), + ShardId: 0, + }) + require.NoError(t, err) + case 1: + stats := sdp.GetCurrentEpochValidatorStats() + log.Info(fmt.Sprintf("%d", stats.Eligible[0])) + } + + wg.Done() + }(i) + } - wg.Wait() + wg.Wait() + }) }) }