diff --git a/factory/heartbeatV2Components.go b/factory/heartbeatV2Components.go index 88d6ff91997..4fd410995f0 100644 --- a/factory/heartbeatV2Components.go +++ b/factory/heartbeatV2Components.go @@ -12,6 +12,7 @@ import ( "github.com/ElrondNetwork/elrond-go/heartbeat/monitor" "github.com/ElrondNetwork/elrond-go/heartbeat/processor" "github.com/ElrondNetwork/elrond-go/heartbeat/sender" + "github.com/ElrondNetwork/elrond-go/process/peer" "github.com/ElrondNetwork/elrond-go/update" ) @@ -179,6 +180,16 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error return nil, err } + argPeerTypeProvider := peer.ArgPeerTypeProvider{ + NodesCoordinator: hcf.processComponents.NodesCoordinator(), + StartEpoch: hcf.processComponents.EpochStartTrigger().MetaEpoch(), + EpochStartEventNotifier: hcf.processComponents.EpochStartNotifier(), + } + peerTypeProvider, err := peer.NewPeerTypeProvider(argPeerTypeProvider) + if err != nil { + return nil, err + } + argsMonitor := monitor.ArgHeartbeatV2Monitor{ Cache: hcf.dataComponents.Datapool().Heartbeats(), PubKeyConverter: hcf.coreComponents.ValidatorPubKeyConverter(), @@ -187,6 +198,7 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error MaxDurationPeerUnresponsive: time.Second * time.Duration(cfg.MaxDurationPeerUnresponsiveInSec), HideInactiveValidatorInterval: time.Second * time.Duration(cfg.HideInactiveValidatorIntervalInSec), ShardId: epochBootstrapParams.SelfShardID(), + PeerTypeProvider: peerTypeProvider, } heartbeatsMonitor, err := monitor.NewHeartbeatV2Monitor(argsMonitor) if err != nil { diff --git a/heartbeat/monitor/monitor.go b/heartbeat/monitor/monitor.go index e10368764d4..a47d60d27c1 100644 --- a/heartbeat/monitor/monitor.go +++ b/heartbeat/monitor/monitor.go @@ -30,6 +30,7 @@ type ArgHeartbeatV2Monitor struct { MaxDurationPeerUnresponsive time.Duration HideInactiveValidatorInterval time.Duration ShardId uint32 + PeerTypeProvider heartbeat.PeerTypeProviderHandler } type heartbeatV2Monitor struct { @@ -40,6 +41,7 @@ type heartbeatV2Monitor struct { maxDurationPeerUnresponsive time.Duration hideInactiveValidatorInterval time.Duration shardId uint32 + peerTypeProvider heartbeat.PeerTypeProviderHandler } // NewHeartbeatV2Monitor creates a new instance of heartbeatV2Monitor @@ -57,6 +59,7 @@ func NewHeartbeatV2Monitor(args ArgHeartbeatV2Monitor) (*heartbeatV2Monitor, err maxDurationPeerUnresponsive: args.MaxDurationPeerUnresponsive, hideInactiveValidatorInterval: args.HideInactiveValidatorInterval, shardId: args.ShardId, + peerTypeProvider: args.PeerTypeProvider, }, nil } @@ -81,6 +84,9 @@ func checkArgs(args ArgHeartbeatV2Monitor) error { return fmt.Errorf("%w on HideInactiveValidatorInterval, provided %d, min expected %d", heartbeat.ErrInvalidTimeDuration, args.HideInactiveValidatorInterval, minDuration) } + if check.IfNil(args.PeerTypeProvider) { + return heartbeat.ErrNilPeerTypeProvider + } return nil } @@ -141,7 +147,7 @@ func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interfa crtTime := time.Now() messageAge := monitor.getMessageAge(crtTime, payload.Timestamp) - stringType := peerInfo.PeerType.String() + stringType := monitor.computePeerType(peerInfo.PkBytes) if monitor.shouldSkipMessage(messageAge, stringType) { return pubKeyHeartbeat, heartbeat.ErrShouldSkipValidator } @@ -167,6 +173,16 @@ func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interfa return pubKeyHeartbeat, nil } +func (monitor *heartbeatV2Monitor) computePeerType(pk []byte) string { + peerType, _, err := monitor.peerTypeProvider.ComputeForPubKey(pk) + if err != nil { + log.Warn("heartbeatV2Monitor: computePeerType", "error", err) + return string(common.ObserverList) + } + + return string(peerType) +} + func (monitor *heartbeatV2Monitor) getMessageAge(crtTime time.Time, messageTimestamp int64) time.Duration { messageTime := time.Unix(messageTimestamp, 0) msgAge := crtTime.Sub(messageTime) diff --git a/heartbeat/monitor/monitor_test.go b/heartbeat/monitor/monitor_test.go index 4ac9f047be1..25657b1c789 100644 --- a/heartbeat/monitor/monitor_test.go +++ b/heartbeat/monitor/monitor_test.go @@ -12,6 +12,7 @@ import ( "github.com/ElrondNetwork/elrond-go/common" "github.com/ElrondNetwork/elrond-go/heartbeat" "github.com/ElrondNetwork/elrond-go/heartbeat/data" + "github.com/ElrondNetwork/elrond-go/heartbeat/mock" "github.com/ElrondNetwork/elrond-go/process" processMocks "github.com/ElrondNetwork/elrond-go/process/mock" "github.com/ElrondNetwork/elrond-go/testscommon" @@ -28,6 +29,7 @@ func createMockHeartbeatV2MonitorArgs() ArgHeartbeatV2Monitor { MaxDurationPeerUnresponsive: time.Second * 3, HideInactiveValidatorInterval: time.Second * 5, ShardId: 0, + PeerTypeProvider: &mock.PeerTypeProviderStub{}, } } @@ -115,6 +117,15 @@ func TestNewHeartbeatV2Monitor(t *testing.T) { assert.True(t, errors.Is(err, heartbeat.ErrInvalidTimeDuration)) assert.True(t, strings.Contains(err.Error(), "HideInactiveValidatorInterval")) }) + t.Run("nil peer type provider should error", func(t *testing.T) { + t.Parallel() + + args := createMockHeartbeatV2MonitorArgs() + args.PeerTypeProvider = nil + monitor, err := NewHeartbeatV2Monitor(args) + assert.True(t, check.IfNil(monitor)) + assert.Equal(t, heartbeat.ErrNilPeerTypeProvider, err) + }) t.Run("should work", func(t *testing.T) { t.Parallel() @@ -167,6 +178,42 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) { _, err := monitor.parseMessage("pid", message, nil) assert.Equal(t, heartbeat.ErrShouldSkipValidator, err) }) + t.Run("should work, peer type provider returns error", func(t *testing.T) { + t.Parallel() + + providedPkBytes := []byte("provided pk") + args := createMockHeartbeatV2MonitorArgs() + args.PeerShardMapper = &processMocks.PeerShardMapperStub{ + GetPeerInfoCalled: func(pid core.PeerID) core.P2PPeerInfo { + return core.P2PPeerInfo{ + PkBytes: providedPkBytes, + } + }, + } + args.PeerTypeProvider = &mock.PeerTypeProviderStub{ + ComputeForPubKeyCalled: func(pubKey []byte) (common.PeerType, uint32, error) { + return "", 0, errors.New("some error") + }, + } + monitor, _ := NewHeartbeatV2Monitor(args) + assert.False(t, check.IfNil(monitor)) + + numInstances := make(map[string]uint64) + message := createHeartbeatMessage(true) + providedPid := core.PeerID("pid") + providedMap := map[string]struct{}{ + providedPid.Pretty(): {}, + } + hb, err := monitor.parseMessage(providedPid, message, numInstances) + assert.Nil(t, err) + checkResults(t, *message, hb, true, providedMap, 0) + assert.Equal(t, 0, len(providedMap)) + pid := args.PubKeyConverter.Encode(providedPkBytes) + entries, ok := numInstances[pid] + assert.True(t, ok) + assert.Equal(t, uint64(1), entries) + assert.Equal(t, string(common.ObserverList), hb.PeerType) + }) t.Run("should work", func(t *testing.T) { t.Parallel() @@ -179,6 +226,12 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) { } }, } + expectedPeerType := common.EligibleList + args.PeerTypeProvider = &mock.PeerTypeProviderStub{ + ComputeForPubKeyCalled: func(pubKey []byte) (common.PeerType, uint32, error) { + return expectedPeerType, 0, nil + }, + } monitor, _ := NewHeartbeatV2Monitor(args) assert.False(t, check.IfNil(monitor)) @@ -196,6 +249,7 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) { entries, ok := numInstances[pid] assert.True(t, ok) assert.Equal(t, uint64(1), entries) + assert.Equal(t, string(expectedPeerType), hb.PeerType) }) }