Skip to content

Commit

Permalink
Merge pull request #4024 from ElrondNetwork/feat/refactor-peers-mbs
Browse files Browse the repository at this point in the history
Feat/refactor peers mbs
  • Loading branch information
bogdan-rosianu authored Aug 31, 2022
2 parents ae87e01 + ea57eb4 commit dd4089e
Show file tree
Hide file tree
Showing 146 changed files with 6,666 additions and 917 deletions.
6 changes: 6 additions & 0 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@
SizeInBytes = 26214400 # 25MB per each pair (metachain, destinationShard)
Shards = 4

[ValidatorInfoPool]
Name = "ValidatorInfoPool"
Capacity = 10000
SizeInBytes = 31457280 #30MB
Shards = 4

#PublicKeyPeerId represents the main cache used to map Elrond block signing public keys to their associated peer id's.
[PublicKeyPeerId]
Name = "PublicKeyPeerId"
Expand Down
3 changes: 3 additions & 0 deletions cmd/node/config/enableEpochs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@
# SetSenderInEeiOutputTransferEnableEpoch represents the epoch when setting the sender in eei output transfers will be enabled
SetSenderInEeiOutputTransferEnableEpoch = 4

# RefactorPeersMiniBlocksEnableEpoch represents the epoch when refactor of the peers mini blocks will be enabled
RefactorPeersMiniBlocksEnableEpoch = 5

# MaxNodesChangeEnableEpoch holds configuration for changing the maximum number of nodes and the enabling epoch
MaxNodesChangeEnableEpoch = [
{ EpochEnable = 0, MaxNumNodes = 36, NodesToShufflePerShard = 4 },
Expand Down
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ const PeerAuthenticationTopic = "peerAuthentication"
// ConnectionTopic represents the topic used when sending the new connection message data
const ConnectionTopic = "connection"

// ValidatorInfoTopic is the topic used for validatorInfo signaling
const ValidatorInfoTopic = "validatorInfo"

// PathShardPlaceholder represents the placeholder for the shard ID in paths
const PathShardPlaceholder = "[S]"

Expand Down
6 changes: 6 additions & 0 deletions common/enablers/enableEpochsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (handler *enableEpochsHandler) EpochConfirmed(epoch uint32, _ uint64) {
handler.setFlagValue(epoch >= handler.enableEpochsConfig.CheckExecuteOnReadOnlyEnableEpoch, handler.checkExecuteOnReadOnlyFlag, "checkExecuteOnReadOnlyFlag")
handler.setFlagValue(epoch >= handler.enableEpochsConfig.SetSenderInEeiOutputTransferEnableEpoch, handler.setSenderInEeiOutputTransferFlag, "setSenderInEeiOutputTransferFlag")
handler.setFlagValue(epoch >= handler.enableEpochsConfig.ESDTMetadataContinuousCleanupEnableEpoch, handler.changeDelegationOwnerFlag, "changeDelegationOwnerFlag")
handler.setFlagValue(epoch >= handler.enableEpochsConfig.RefactorPeersMiniBlocksEnableEpoch, handler.refactorPeersMiniBlocksFlag, "refactorPeersMiniBlocksFlag")
}

func (handler *enableEpochsHandler) setFlagValue(value bool, flag *atomic.Flag, flagName string) {
Expand Down Expand Up @@ -203,6 +204,11 @@ func (handler *enableEpochsHandler) MiniBlockPartialExecutionEnableEpoch() uint3
return handler.enableEpochsConfig.MiniBlockPartialExecutionEnableEpoch
}

// RefactorPeersMiniBlocksEnableEpoch returns the epoch when refactor of peers mini blocks becomes active
func (handler *enableEpochsHandler) RefactorPeersMiniBlocksEnableEpoch() uint32 {
return handler.enableEpochsConfig.RefactorPeersMiniBlocksEnableEpoch
}

// IsInterfaceNil returns true if there is no value under the interface
func (handler *enableEpochsHandler) IsInterfaceNil() bool {
return handler == nil
Expand Down
7 changes: 7 additions & 0 deletions common/enablers/epochFlags.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type epochFlagsHolder struct {
checkExecuteOnReadOnlyFlag *atomic.Flag
setSenderInEeiOutputTransferFlag *atomic.Flag
changeDelegationOwnerFlag *atomic.Flag
refactorPeersMiniBlocksFlag *atomic.Flag
}

func newEpochFlagsHolder() *epochFlagsHolder {
Expand Down Expand Up @@ -163,6 +164,7 @@ func newEpochFlagsHolder() *epochFlagsHolder {
checkExecuteOnReadOnlyFlag: &atomic.Flag{},
setSenderInEeiOutputTransferFlag: &atomic.Flag{},
changeDelegationOwnerFlag: &atomic.Flag{},
refactorPeersMiniBlocksFlag: &atomic.Flag{},
}
}

Expand Down Expand Up @@ -608,3 +610,8 @@ func (holder *epochFlagsHolder) IsESDTNFTImprovementV1FlagEnabled() bool {
func (holder *epochFlagsHolder) IsChangeDelegationOwnerFlagEnabled() bool {
return holder.changeDelegationOwnerFlag.IsSet()
}

// IsRefactorPeersMiniBlocksFlagEnabled returns true if refactorPeersMiniBlocksFlag is enabled
func (holder *epochFlagsHolder) IsRefactorPeersMiniBlocksFlagEnabled() bool {
return holder.refactorPeersMiniBlocksFlag.IsSet()
}
2 changes: 2 additions & 0 deletions common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type EnableEpochsHandler interface {
CheckExecuteReadOnlyEnableEpoch() uint32
StorageAPICostOptimizationEnableEpoch() uint32
MiniBlockPartialExecutionEnableEpoch() uint32
RefactorPeersMiniBlocksEnableEpoch() uint32
IsSCDeployFlagEnabled() bool
IsBuiltInFunctionsFlagEnabled() bool
IsRelayedTransactionsFlagEnabled() bool
Expand Down Expand Up @@ -289,6 +290,7 @@ type EnableEpochsHandler interface {
IsESDTNFTImprovementV1FlagEnabled() bool
IsSetSenderInEeiOutputTransferFlagEnabled() bool
IsChangeDelegationOwnerFlagEnabled() bool
IsRefactorPeersMiniBlocksFlagEnabled() bool

IsInterfaceNil() bool
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type Config struct {
WhiteListPool CacheConfig
WhiteListerVerifiedTxs CacheConfig
SmartContractDataPool CacheConfig
ValidatorInfoPool CacheConfig
TrieSyncStorage TrieSyncStorageConfig
EpochStartConfig EpochStartConfig
AddressPubkeyConverter PubkeyConfig
Expand Down
1 change: 1 addition & 0 deletions config/epochConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type EnableEpochs struct {
MiniBlockPartialExecutionEnableEpoch uint32
ESDTMetadataContinuousCleanupEnableEpoch uint32
SetSenderInEeiOutputTransferEnableEpoch uint32
RefactorPeersMiniBlocksEnableEpoch uint32
}

// GasScheduleByEpochs represents a gas schedule toml entry that will be applied from the provided epoch
Expand Down
3 changes: 3 additions & 0 deletions dataRetriever/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ const UnsignedTxPoolName = "uTxPool"

// RewardTxPoolName defines the name of the reward transactions pool
const RewardTxPoolName = "rewardTxPool"

// ValidatorsInfoPoolName defines the name of the validators info pool
const ValidatorsInfoPoolName = "validatorsInfoPool"
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@ type transactionMapCacher struct {
txsForBlock map[string]data.TransactionHandler
}

// NewCurrentBlockPool returns a new pool to be used for current block
func NewCurrentBlockPool() *transactionMapCacher {
// NewCurrentBlockTransactionsPool returns a new transactions pool to be used for the current block
func NewCurrentBlockTransactionsPool() *transactionMapCacher {
return &transactionMapCacher{
mutTxs: sync.RWMutex{},
txsForBlock: make(map[string]data.TransactionHandler),
}
}

// Clean creates a new pool
// Clean creates a new transaction pool
func (tmc *transactionMapCacher) Clean() {
tmc.mutTxs.Lock()
tmc.txsForBlock = make(map[string]data.TransactionHandler)
tmc.mutTxs.Unlock()
}

// GetTx returns the element saved for the hash
// GetTx gets the transaction for the given hash
func (tmc *transactionMapCacher) GetTx(txHash []byte) (data.TransactionHandler, error) {
tmc.mutTxs.RLock()
defer tmc.mutTxs.RUnlock()
Expand All @@ -43,7 +43,7 @@ func (tmc *transactionMapCacher) GetTx(txHash []byte) (data.TransactionHandler,
return tx, nil
}

// AddTx writes the tx to the map
// AddTx adds the transaction for the given hash
func (tmc *transactionMapCacher) AddTx(txHash []byte, tx data.TransactionHandler) {
if check.IfNil(tx) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestCurrentBlockPool_AddGetCleanTx(t *testing.T) {

txHash := []byte("hash")
tx := &transaction.Transaction{}
currentBlockPool := NewCurrentBlockPool()
currentBlockPool := NewCurrentBlockTransactionsPool()
require.False(t, currentBlockPool.IsInterfaceNil())

currentBlockPool.AddTx(txHash, tx)
Expand All @@ -31,5 +31,4 @@ func TestCurrentBlockPool_AddGetCleanTx(t *testing.T) {
txFromPool, err = currentBlockPool.GetTx(txHash)
require.Nil(t, txFromPool)
require.Equal(t, dataRetriever.ErrTxNotFoundInBlockPool, err)

}
60 changes: 60 additions & 0 deletions dataRetriever/dataPool/currentEpochValidatorInfoPool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package dataPool

import (
"sync"

"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go/dataRetriever"
"github.com/ElrondNetwork/elrond-go/state"
)

var _ dataRetriever.ValidatorInfoCacher = (*validatorInfoMapCacher)(nil)

type validatorInfoMapCacher struct {
mutValidatorInfo sync.RWMutex
validatorInfoForEpoch map[string]*state.ShardValidatorInfo
}

// NewCurrentEpochValidatorInfoPool returns a new validator info pool to be used for the current epoch
func NewCurrentEpochValidatorInfoPool() *validatorInfoMapCacher {
return &validatorInfoMapCacher{
mutValidatorInfo: sync.RWMutex{},
validatorInfoForEpoch: make(map[string]*state.ShardValidatorInfo),
}
}

// Clean creates a new validator info pool
func (vimc *validatorInfoMapCacher) Clean() {
vimc.mutValidatorInfo.Lock()
vimc.validatorInfoForEpoch = make(map[string]*state.ShardValidatorInfo)
vimc.mutValidatorInfo.Unlock()
}

// GetValidatorInfo gets the validator info for the given hash
func (vimc *validatorInfoMapCacher) GetValidatorInfo(validatorInfoHash []byte) (*state.ShardValidatorInfo, error) {
vimc.mutValidatorInfo.RLock()
defer vimc.mutValidatorInfo.RUnlock()

validatorInfo, ok := vimc.validatorInfoForEpoch[string(validatorInfoHash)]
if !ok {
return nil, dataRetriever.ErrValidatorInfoNotFoundInEpochPool
}

return validatorInfo, nil
}

// AddValidatorInfo adds the validator info for the given hash
func (vimc *validatorInfoMapCacher) AddValidatorInfo(validatorInfoHash []byte, validatorInfo *state.ShardValidatorInfo) {
if check.IfNil(validatorInfo) {
return
}

vimc.mutValidatorInfo.Lock()
vimc.validatorInfoForEpoch[string(validatorInfoHash)] = validatorInfo
vimc.mutValidatorInfo.Unlock()
}

// IsInterfaceNil returns true if underlying object is nil
func (vimc *validatorInfoMapCacher) IsInterfaceNil() bool {
return vimc == nil
}
34 changes: 34 additions & 0 deletions dataRetriever/dataPool/currentEpochValidatorInfoPool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dataPool

import (
"testing"

"github.com/ElrondNetwork/elrond-go/dataRetriever"
"github.com/ElrondNetwork/elrond-go/state"
"github.com/stretchr/testify/require"
)

func TestCurrentEpochValidatorInfoPool_AddGetCleanTx(t *testing.T) {
t.Parallel()

validatorInfoHash := []byte("hash")
validatorInfo := &state.ShardValidatorInfo{}
currentValidatorInfoPool := NewCurrentEpochValidatorInfoPool()
require.False(t, currentValidatorInfoPool.IsInterfaceNil())

currentValidatorInfoPool.AddValidatorInfo(validatorInfoHash, validatorInfo)
currentValidatorInfoPool.AddValidatorInfo(validatorInfoHash, nil)

validatorInfoFromPool, err := currentValidatorInfoPool.GetValidatorInfo([]byte("wrong hash"))
require.Nil(t, validatorInfoFromPool)
require.Equal(t, dataRetriever.ErrValidatorInfoNotFoundInEpochPool, err)

validatorInfoFromPool, err = currentValidatorInfoPool.GetValidatorInfo(validatorInfoHash)
require.Nil(t, err)
require.Equal(t, validatorInfo, validatorInfoFromPool)

currentValidatorInfoPool.Clean()
validatorInfoFromPool, err = currentValidatorInfoPool.GetValidatorInfo(validatorInfoHash)
require.Nil(t, validatorInfoFromPool)
require.Equal(t, dataRetriever.ErrValidatorInfoNotFoundInEpochPool, err)
}
94 changes: 58 additions & 36 deletions dataRetriever/dataPool/dataPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,38 @@ var _ dataRetriever.PoolsHolder = (*dataPool)(nil)
var log = logger.GetOrCreate("dataRetriever/dataPool")

type dataPool struct {
transactions dataRetriever.ShardedDataCacherNotifier
unsignedTransactions dataRetriever.ShardedDataCacherNotifier
rewardTransactions dataRetriever.ShardedDataCacherNotifier
headers dataRetriever.HeadersPool
miniBlocks storage.Cacher
peerChangesBlocks storage.Cacher
trieNodes storage.Cacher
trieNodesChunks storage.Cacher
currBlockTxs dataRetriever.TransactionCacher
smartContracts storage.Cacher
peerAuthentications storage.Cacher
heartbeats storage.Cacher
transactions dataRetriever.ShardedDataCacherNotifier
unsignedTransactions dataRetriever.ShardedDataCacherNotifier
rewardTransactions dataRetriever.ShardedDataCacherNotifier
headers dataRetriever.HeadersPool
miniBlocks storage.Cacher
peerChangesBlocks storage.Cacher
trieNodes storage.Cacher
trieNodesChunks storage.Cacher
currBlockTxs dataRetriever.TransactionCacher
currEpochValidatorInfo dataRetriever.ValidatorInfoCacher
smartContracts storage.Cacher
peerAuthentications storage.Cacher
heartbeats storage.Cacher
validatorsInfo dataRetriever.ShardedDataCacherNotifier
}

// DataPoolArgs represents the data pool's constructor structure
type DataPoolArgs struct {
Transactions dataRetriever.ShardedDataCacherNotifier
UnsignedTransactions dataRetriever.ShardedDataCacherNotifier
RewardTransactions dataRetriever.ShardedDataCacherNotifier
Headers dataRetriever.HeadersPool
MiniBlocks storage.Cacher
PeerChangesBlocks storage.Cacher
TrieNodes storage.Cacher
TrieNodesChunks storage.Cacher
CurrentBlockTransactions dataRetriever.TransactionCacher
SmartContracts storage.Cacher
PeerAuthentications storage.Cacher
Heartbeats storage.Cacher
Transactions dataRetriever.ShardedDataCacherNotifier
UnsignedTransactions dataRetriever.ShardedDataCacherNotifier
RewardTransactions dataRetriever.ShardedDataCacherNotifier
Headers dataRetriever.HeadersPool
MiniBlocks storage.Cacher
PeerChangesBlocks storage.Cacher
TrieNodes storage.Cacher
TrieNodesChunks storage.Cacher
CurrentBlockTransactions dataRetriever.TransactionCacher
CurrentEpochValidatorInfo dataRetriever.ValidatorInfoCacher
SmartContracts storage.Cacher
PeerAuthentications storage.Cacher
Heartbeats storage.Cacher
ValidatorsInfo dataRetriever.ShardedDataCacherNotifier
}

// NewDataPool creates a data pools holder object
Expand All @@ -65,6 +69,9 @@ func NewDataPool(args DataPoolArgs) (*dataPool, error) {
if check.IfNil(args.CurrentBlockTransactions) {
return nil, dataRetriever.ErrNilCurrBlockTxs
}
if check.IfNil(args.CurrentEpochValidatorInfo) {
return nil, dataRetriever.ErrNilCurrentEpochValidatorInfo
}
if check.IfNil(args.TrieNodes) {
return nil, dataRetriever.ErrNilTrieNodesPool
}
Expand All @@ -80,20 +87,25 @@ func NewDataPool(args DataPoolArgs) (*dataPool, error) {
if check.IfNil(args.Heartbeats) {
return nil, dataRetriever.ErrNilHeartbeatPool
}
if check.IfNil(args.ValidatorsInfo) {
return nil, dataRetriever.ErrNilValidatorInfoPool
}

return &dataPool{
transactions: args.Transactions,
unsignedTransactions: args.UnsignedTransactions,
rewardTransactions: args.RewardTransactions,
headers: args.Headers,
miniBlocks: args.MiniBlocks,
peerChangesBlocks: args.PeerChangesBlocks,
trieNodes: args.TrieNodes,
trieNodesChunks: args.TrieNodesChunks,
currBlockTxs: args.CurrentBlockTransactions,
smartContracts: args.SmartContracts,
peerAuthentications: args.PeerAuthentications,
heartbeats: args.Heartbeats,
transactions: args.Transactions,
unsignedTransactions: args.UnsignedTransactions,
rewardTransactions: args.RewardTransactions,
headers: args.Headers,
miniBlocks: args.MiniBlocks,
peerChangesBlocks: args.PeerChangesBlocks,
trieNodes: args.TrieNodes,
trieNodesChunks: args.TrieNodesChunks,
currBlockTxs: args.CurrentBlockTransactions,
currEpochValidatorInfo: args.CurrentEpochValidatorInfo,
smartContracts: args.SmartContracts,
peerAuthentications: args.PeerAuthentications,
heartbeats: args.Heartbeats,
validatorsInfo: args.ValidatorsInfo,
}, nil
}

Expand All @@ -102,6 +114,11 @@ func (dp *dataPool) CurrentBlockTxs() dataRetriever.TransactionCacher {
return dp.currBlockTxs
}

// CurrentEpochValidatorInfo returns the holder for current epoch validator info
func (dp *dataPool) CurrentEpochValidatorInfo() dataRetriever.ValidatorInfoCacher {
return dp.currEpochValidatorInfo
}

// Transactions returns the holder for transactions
func (dp *dataPool) Transactions() dataRetriever.ShardedDataCacherNotifier {
return dp.transactions
Expand Down Expand Up @@ -157,6 +174,11 @@ func (dp *dataPool) Heartbeats() storage.Cacher {
return dp.heartbeats
}

// ValidatorsInfo returns the holder for validators info
func (dp *dataPool) ValidatorsInfo() dataRetriever.ShardedDataCacherNotifier {
return dp.validatorsInfo
}

// Close closes all the components
func (dp *dataPool) Close() error {
var lastError error
Expand Down
Loading

0 comments on commit dd4089e

Please sign in to comment.