Skip to content

Commit

Permalink
Merge pull request #5354 from multiversx/update_metrics_for_managed_keys
Browse files Browse the repository at this point in the history
Update metrics for managed keys
  • Loading branch information
sstanculeanu authored Jun 21, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 561065c + 729e925 commit c2f41cb
Showing 17 changed files with 375 additions and 181 deletions.
8 changes: 5 additions & 3 deletions consensus/spos/bls/subroundStartRound.go
Original file line number Diff line number Diff line change
@@ -159,10 +159,12 @@ func (sr *subroundStartRound) initCurrentRound() bool {
msg = " (my turn in multi-key)"
}
if leader == sr.SelfPubKey() {
msg = " (my turn)"
}
if len(msg) != 0 {
sr.AppStatusHandler().Increment(common.MetricCountLeader)
sr.AppStatusHandler().SetStringValue(common.MetricConsensusRoundState, "proposed")
sr.AppStatusHandler().SetStringValue(common.MetricConsensusState, "proposer")
msg = " (my turn)"
}

log.Debug("step 0: preparing the round",
@@ -181,10 +183,10 @@ func (sr *subroundStartRound) initCurrentRound() bool {
}
sr.AppStatusHandler().SetStringValue(common.MetricConsensusState, "not in consensus group")
} else {
if leader != sr.SelfPubKey() {
if leader != sr.SelfPubKey() && !sr.IsKeyManagedByCurrentNode([]byte(leader)) {
sr.AppStatusHandler().Increment(common.MetricCountConsensus)
sr.AppStatusHandler().SetStringValue(common.MetricConsensusState, "participant")
}
sr.AppStatusHandler().SetStringValue(common.MetricConsensusState, "participant")
}

err = sr.SigningHandler().Reset(pubKeys)
221 changes: 221 additions & 0 deletions consensus/spos/bls/subroundStartRound_test.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"github.com/multiversx/mx-chain-go/consensus/spos"
"github.com/multiversx/mx-chain-go/consensus/spos/bls"
"github.com/multiversx/mx-chain-go/sharding/nodesCoordinator"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/multiversx/mx-chain-go/testscommon/shardingMocks"
"github.com/multiversx/mx-chain-go/testscommon/statusHandler"
"github.com/stretchr/testify/assert"
@@ -421,6 +422,226 @@ func TestSubroundStartRound_InitCurrentRoundShouldReturnTrue(t *testing.T) {
assert.True(t, r)
}

func TestSubroundStartRound_InitCurrentRoundShouldMetrics(t *testing.T) {
t.Parallel()

t.Run("not in consensus node", func(t *testing.T) {
t.Parallel()

wasCalled := false
container := mock.InitConsensusCore()
keysHandler := &testscommon.KeysHandlerStub{}
appStatusHandler := &statusHandler.AppStatusHandlerStub{
SetStringValueHandler: func(key string, value string) {
if key == common.MetricConsensusState {
wasCalled = true
assert.Equal(t, value, "not in consensus group")
}
},
}
ch := make(chan bool, 1)
consensusState := initConsensusStateWithKeysHandler(keysHandler)
consensusState.SetSelfPubKey("not in consensus")
sr, _ := spos.NewSubround(
-1,
bls.SrStartRound,
bls.SrBlock,
int64(85*roundTimeDuration/100),
int64(95*roundTimeDuration/100),
"(START_ROUND)",
consensusState,
ch,
executeStoredMessages,
container,
chainID,
currentPid,
appStatusHandler,
)

srStartRound, _ := bls.NewSubroundStartRound(
sr,
extend,
bls.ProcessingThresholdPercent,
displayStatistics,
executeStoredMessages,
)
srStartRound.Check()
assert.True(t, wasCalled)
})
t.Run("participant node", func(t *testing.T) {
t.Parallel()

wasCalled := false
container := mock.InitConsensusCore()
keysHandler := &testscommon.KeysHandlerStub{}
appStatusHandler := &statusHandler.AppStatusHandlerStub{
SetStringValueHandler: func(key string, value string) {
if key == common.MetricConsensusState {
wasCalled = true
assert.Equal(t, value, "participant")
}
},
}
ch := make(chan bool, 1)
consensusState := initConsensusStateWithKeysHandler(keysHandler)
sr, _ := spos.NewSubround(
-1,
bls.SrStartRound,
bls.SrBlock,
int64(85*roundTimeDuration/100),
int64(95*roundTimeDuration/100),
"(START_ROUND)",
consensusState,
ch,
executeStoredMessages,
container,
chainID,
currentPid,
appStatusHandler,
)

srStartRound, _ := bls.NewSubroundStartRound(
sr,
extend,
bls.ProcessingThresholdPercent,
displayStatistics,
executeStoredMessages,
)
srStartRound.Check()
assert.True(t, wasCalled)
})
t.Run("main key leader", func(t *testing.T) {
t.Parallel()

wasMetricConsensusStateCalled := false
wasMetricCountLeaderCalled := false
cntMetricConsensusRoundStateCalled := 0
container := mock.InitConsensusCore()
keysHandler := &testscommon.KeysHandlerStub{}
appStatusHandler := &statusHandler.AppStatusHandlerStub{
SetStringValueHandler: func(key string, value string) {
if key == common.MetricConsensusState {
wasMetricConsensusStateCalled = true
assert.Equal(t, value, "proposer")
}
if key == common.MetricConsensusRoundState {
cntMetricConsensusRoundStateCalled++
switch cntMetricConsensusRoundStateCalled {
case 1:
assert.Equal(t, value, "")
case 2:
assert.Equal(t, value, "proposed")
default:
assert.Fail(t, "should have been called only twice")
}
}
},
IncrementHandler: func(key string) {
if key == common.MetricCountLeader {
wasMetricCountLeaderCalled = true
}
},
}
ch := make(chan bool, 1)
consensusState := initConsensusStateWithKeysHandler(keysHandler)
leader, _ := consensusState.GetLeader()
consensusState.SetSelfPubKey(leader)
sr, _ := spos.NewSubround(
-1,
bls.SrStartRound,
bls.SrBlock,
int64(85*roundTimeDuration/100),
int64(95*roundTimeDuration/100),
"(START_ROUND)",
consensusState,
ch,
executeStoredMessages,
container,
chainID,
currentPid,
appStatusHandler,
)

srStartRound, _ := bls.NewSubroundStartRound(
sr,
extend,
bls.ProcessingThresholdPercent,
displayStatistics,
executeStoredMessages,
)
srStartRound.Check()
assert.True(t, wasMetricConsensusStateCalled)
assert.True(t, wasMetricCountLeaderCalled)
assert.Equal(t, 2, cntMetricConsensusRoundStateCalled)
})
t.Run("managed key leader", func(t *testing.T) {
t.Parallel()

wasMetricConsensusStateCalled := false
wasMetricCountLeaderCalled := false
cntMetricConsensusRoundStateCalled := 0
container := mock.InitConsensusCore()
keysHandler := &testscommon.KeysHandlerStub{}
appStatusHandler := &statusHandler.AppStatusHandlerStub{
SetStringValueHandler: func(key string, value string) {
if key == common.MetricConsensusState {
wasMetricConsensusStateCalled = true
assert.Equal(t, value, "proposer")
}
if key == common.MetricConsensusRoundState {
cntMetricConsensusRoundStateCalled++
switch cntMetricConsensusRoundStateCalled {
case 1:
assert.Equal(t, value, "")
case 2:
assert.Equal(t, value, "proposed")
default:
assert.Fail(t, "should have been called only twice")
}
}
},
IncrementHandler: func(key string) {
if key == common.MetricCountLeader {
wasMetricCountLeaderCalled = true
}
},
}
ch := make(chan bool, 1)
consensusState := initConsensusStateWithKeysHandler(keysHandler)
leader, _ := consensusState.GetLeader()
keysHandler.IsKeyManagedByCurrentNodeCalled = func(pkBytes []byte) bool {
return string(pkBytes) == leader
}
sr, _ := spos.NewSubround(
-1,
bls.SrStartRound,
bls.SrBlock,
int64(85*roundTimeDuration/100),
int64(95*roundTimeDuration/100),
"(START_ROUND)",
consensusState,
ch,
executeStoredMessages,
container,
chainID,
currentPid,
appStatusHandler,
)

srStartRound, _ := bls.NewSubroundStartRound(
sr,
extend,
bls.ProcessingThresholdPercent,
displayStatistics,
executeStoredMessages,
)
srStartRound.Check()
assert.True(t, wasMetricConsensusStateCalled)
assert.True(t, wasMetricCountLeaderCalled)
assert.Equal(t, 2, cntMetricConsensusRoundStateCalled)
})
}

func TestSubroundStartRound_GenerateNextConsensusGroupShouldReturnErr(t *testing.T) {
t.Parallel()

78 changes: 40 additions & 38 deletions factory/processing/blockProcessorCreator.go
Original file line number Diff line number Diff line change
@@ -423,6 +423,7 @@ func (pcf *processComponentsFactory) newShardBlockProcessor(
ReceiptsRepository: receiptsRepository,
OutportDataProvider: outportDataProvider,
BlockProcessingCutoffHandler: blockProcessingCutoffHandler,
ManagedPeersHolder: pcf.crypto.ManagedPeersHolder(),
}
arguments := block.ArgShardProcessor{
ArgBaseProcessor: argumentsBaseProcessor,
@@ -837,6 +838,7 @@ func (pcf *processComponentsFactory) newMetaBlockProcessor(
ReceiptsRepository: receiptsRepository,
OutportDataProvider: outportDataProvider,
BlockProcessingCutoffHandler: blockProcessingCutoffhandler,
ManagedPeersHolder: pcf.crypto.ManagedPeersHolder(),
}

esdtOwnerAddress, err := pcf.coreData.AddressPubKeyConverter().Decode(pcf.systemSCConfig.ESDTSystemSCConfig.OwnerAddress)
@@ -959,25 +961,25 @@ func (pcf *processComponentsFactory) createVMFactoryShard(
}

argsHook := hooks.ArgBlockChainHook{
Accounts: accounts,
PubkeyConv: pcf.coreData.AddressPubKeyConverter(),
StorageService: pcf.data.StorageService(),
BlockChain: pcf.data.Blockchain(),
ShardCoordinator: pcf.bootstrapComponents.ShardCoordinator(),
Marshalizer: pcf.coreData.InternalMarshalizer(),
Uint64Converter: pcf.coreData.Uint64ByteSliceConverter(),
BuiltInFunctions: builtInFuncs,
DataPool: pcf.data.Datapool(),
CompiledSCPool: pcf.data.Datapool().SmartContracts(),
WorkingDir: pcf.flagsConfig.WorkingDir,
NFTStorageHandler: nftStorageHandler,
GlobalSettingsHandler: globalSettingsHandler,
EpochNotifier: pcf.coreData.EpochNotifier(),
EnableEpochsHandler: pcf.coreData.EnableEpochsHandler(),
NilCompiledSCStore: false,
ConfigSCStorage: configSCStorage,
GasSchedule: pcf.gasSchedule,
Counter: counter,
Accounts: accounts,
PubkeyConv: pcf.coreData.AddressPubKeyConverter(),
StorageService: pcf.data.StorageService(),
BlockChain: pcf.data.Blockchain(),
ShardCoordinator: pcf.bootstrapComponents.ShardCoordinator(),
Marshalizer: pcf.coreData.InternalMarshalizer(),
Uint64Converter: pcf.coreData.Uint64ByteSliceConverter(),
BuiltInFunctions: builtInFuncs,
DataPool: pcf.data.Datapool(),
CompiledSCPool: pcf.data.Datapool().SmartContracts(),
WorkingDir: pcf.flagsConfig.WorkingDir,
NFTStorageHandler: nftStorageHandler,
GlobalSettingsHandler: globalSettingsHandler,
EpochNotifier: pcf.coreData.EpochNotifier(),
EnableEpochsHandler: pcf.coreData.EnableEpochsHandler(),
NilCompiledSCStore: false,
ConfigSCStorage: configSCStorage,
GasSchedule: pcf.gasSchedule,
Counter: counter,
MissingTrieNodesNotifier: notifier,
}

@@ -1010,25 +1012,25 @@ func (pcf *processComponentsFactory) createVMFactoryMeta(
globalSettingsHandler vmcommon.ESDTGlobalSettingsHandler,
) (process.VirtualMachinesContainerFactory, error) {
argsHook := hooks.ArgBlockChainHook{
Accounts: accounts,
PubkeyConv: pcf.coreData.AddressPubKeyConverter(),
StorageService: pcf.data.StorageService(),
BlockChain: pcf.data.Blockchain(),
ShardCoordinator: pcf.bootstrapComponents.ShardCoordinator(),
Marshalizer: pcf.coreData.InternalMarshalizer(),
Uint64Converter: pcf.coreData.Uint64ByteSliceConverter(),
BuiltInFunctions: builtInFuncs,
DataPool: pcf.data.Datapool(),
CompiledSCPool: pcf.data.Datapool().SmartContracts(),
ConfigSCStorage: configSCStorage,
WorkingDir: pcf.flagsConfig.WorkingDir,
NFTStorageHandler: nftStorageHandler,
GlobalSettingsHandler: globalSettingsHandler,
EpochNotifier: pcf.coreData.EpochNotifier(),
EnableEpochsHandler: pcf.coreData.EnableEpochsHandler(),
NilCompiledSCStore: false,
GasSchedule: pcf.gasSchedule,
Counter: counters.NewDisabledCounter(),
Accounts: accounts,
PubkeyConv: pcf.coreData.AddressPubKeyConverter(),
StorageService: pcf.data.StorageService(),
BlockChain: pcf.data.Blockchain(),
ShardCoordinator: pcf.bootstrapComponents.ShardCoordinator(),
Marshalizer: pcf.coreData.InternalMarshalizer(),
Uint64Converter: pcf.coreData.Uint64ByteSliceConverter(),
BuiltInFunctions: builtInFuncs,
DataPool: pcf.data.Datapool(),
CompiledSCPool: pcf.data.Datapool().SmartContracts(),
ConfigSCStorage: configSCStorage,
WorkingDir: pcf.flagsConfig.WorkingDir,
NFTStorageHandler: nftStorageHandler,
GlobalSettingsHandler: globalSettingsHandler,
EpochNotifier: pcf.coreData.EpochNotifier(),
EnableEpochsHandler: pcf.coreData.EnableEpochsHandler(),
NilCompiledSCStore: false,
GasSchedule: pcf.gasSchedule,
Counter: counters.NewDisabledCounter(),
MissingTrieNodesNotifier: syncer.NewMissingTrieNodesNotifier(),
}

1 change: 1 addition & 0 deletions integrationTests/testProcessorNode.go
Original file line number Diff line number Diff line change
@@ -2100,6 +2100,7 @@ func (tpn *TestProcessorNode) initBlockProcessor(stateCheckpointModulus uint) {
ReceiptsRepository: &testscommon.ReceiptsRepositoryStub{},
OutportDataProvider: &outport.OutportDataProviderStub{},
BlockProcessingCutoffHandler: &testscommon.BlockProcessingCutoffStub{},
ManagedPeersHolder: &testscommon.ManagedPeersHolderStub{},
}

if check.IfNil(tpn.EpochStartNotifier) {
1 change: 1 addition & 0 deletions integrationTests/testSyncNode.go
Original file line number Diff line number Diff line change
@@ -103,6 +103,7 @@ func (tpn *TestProcessorNode) initBlockProcessorWithSync() {
ReceiptsRepository: &testscommon.ReceiptsRepositoryStub{},
OutportDataProvider: &outport.OutportDataProviderStub{},
BlockProcessingCutoffHandler: &testscommon.BlockProcessingCutoffStub{},
ManagedPeersHolder: &testscommon.ManagedPeersHolderStub{},
}

if tpn.ShardCoordinator.SelfId() == core.MetachainShardId {
1 change: 0 additions & 1 deletion outport/process/outportDataProvider.go
Original file line number Diff line number Diff line change
@@ -50,7 +50,6 @@ type ArgPrepareOutportSaveBlockData struct {
}

type outportDataProvider struct {
isImportDBMode bool
shardID uint32
numOfShards uint32
alteredAccountsProvider AlteredAccountsProviderHandler
1 change: 1 addition & 0 deletions process/block/argProcessor.go
Original file line number Diff line number Diff line change
@@ -91,6 +91,7 @@ type ArgBaseProcessor struct {
ProcessedMiniBlocksTracker process.ProcessedMiniBlocksTracker
ReceiptsRepository receiptsRepository
BlockProcessingCutoffHandler cutoff.BlockProcessingCutoffHandler
ManagedPeersHolder common.ManagedPeersHolder
}

// ArgShardProcessor holds all dependencies required by the process data factory in order to create
Loading

0 comments on commit c2f41cb

Please sign in to comment.