Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix heartbeat monitor to properly set peer type #4410

Merged
merged 3 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions factory/heartbeatV2Components.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ElrondNetwork/elrond-go/heartbeat/monitor"
"github.com/ElrondNetwork/elrond-go/heartbeat/processor"
"github.com/ElrondNetwork/elrond-go/heartbeat/sender"
"github.com/ElrondNetwork/elrond-go/process/peer"
"github.com/ElrondNetwork/elrond-go/update"
)

Expand Down Expand Up @@ -179,6 +180,16 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
return nil, err
}

argPeerTypeProvider := peer.ArgPeerTypeProvider{
NodesCoordinator: hcf.processComponents.NodesCoordinator(),
StartEpoch: hcf.processComponents.EpochStartTrigger().MetaEpoch(),
EpochStartEventNotifier: hcf.processComponents.EpochStartNotifier(),
}
peerTypeProvider, err := peer.NewPeerTypeProvider(argPeerTypeProvider)
if err != nil {
return nil, err
}

argsMonitor := monitor.ArgHeartbeatV2Monitor{
Cache: hcf.dataComponents.Datapool().Heartbeats(),
PubKeyConverter: hcf.coreComponents.ValidatorPubKeyConverter(),
Expand All @@ -187,6 +198,7 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
MaxDurationPeerUnresponsive: time.Second * time.Duration(cfg.MaxDurationPeerUnresponsiveInSec),
HideInactiveValidatorInterval: time.Second * time.Duration(cfg.HideInactiveValidatorIntervalInSec),
ShardId: epochBootstrapParams.SelfShardID(),
PeerTypeProvider: peerTypeProvider,
}
heartbeatsMonitor, err := monitor.NewHeartbeatV2Monitor(argsMonitor)
if err != nil {
Expand Down
18 changes: 17 additions & 1 deletion heartbeat/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ArgHeartbeatV2Monitor struct {
MaxDurationPeerUnresponsive time.Duration
HideInactiveValidatorInterval time.Duration
ShardId uint32
PeerTypeProvider heartbeat.PeerTypeProviderHandler
}

type heartbeatV2Monitor struct {
Expand All @@ -40,6 +41,7 @@ type heartbeatV2Monitor struct {
maxDurationPeerUnresponsive time.Duration
hideInactiveValidatorInterval time.Duration
shardId uint32
peerTypeProvider heartbeat.PeerTypeProviderHandler
}

// NewHeartbeatV2Monitor creates a new instance of heartbeatV2Monitor
Expand All @@ -57,6 +59,7 @@ func NewHeartbeatV2Monitor(args ArgHeartbeatV2Monitor) (*heartbeatV2Monitor, err
maxDurationPeerUnresponsive: args.MaxDurationPeerUnresponsive,
hideInactiveValidatorInterval: args.HideInactiveValidatorInterval,
shardId: args.ShardId,
peerTypeProvider: args.PeerTypeProvider,
}, nil
}

Expand All @@ -81,6 +84,9 @@ func checkArgs(args ArgHeartbeatV2Monitor) error {
return fmt.Errorf("%w on HideInactiveValidatorInterval, provided %d, min expected %d",
heartbeat.ErrInvalidTimeDuration, args.HideInactiveValidatorInterval, minDuration)
}
if check.IfNil(args.PeerTypeProvider) {
return heartbeat.ErrNilPeerTypeProvider
}

return nil
}
Expand Down Expand Up @@ -141,7 +147,7 @@ func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interfa

crtTime := time.Now()
messageAge := monitor.getMessageAge(crtTime, payload.Timestamp)
stringType := peerInfo.PeerType.String()
stringType := monitor.computePeerType(peerInfo.PkBytes)
if monitor.shouldSkipMessage(messageAge, stringType) {
return pubKeyHeartbeat, heartbeat.ErrShouldSkipValidator
}
Expand All @@ -167,6 +173,16 @@ func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interfa
return pubKeyHeartbeat, nil
}

func (monitor *heartbeatV2Monitor) computePeerType(pk []byte) string {
peerType, _, err := monitor.peerTypeProvider.ComputeForPubKey(pk)
if err != nil {
log.Warn("heartbeatV2Monitor: computePeerType", "error", err)
return string(common.ObserverList)
}

return string(peerType)
}

func (monitor *heartbeatV2Monitor) getMessageAge(crtTime time.Time, messageTimestamp int64) time.Duration {
messageTime := time.Unix(messageTimestamp, 0)
msgAge := crtTime.Sub(messageTime)
Expand Down
54 changes: 54 additions & 0 deletions heartbeat/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ElrondNetwork/elrond-go/common"
"github.com/ElrondNetwork/elrond-go/heartbeat"
"github.com/ElrondNetwork/elrond-go/heartbeat/data"
"github.com/ElrondNetwork/elrond-go/heartbeat/mock"
"github.com/ElrondNetwork/elrond-go/process"
processMocks "github.com/ElrondNetwork/elrond-go/process/mock"
"github.com/ElrondNetwork/elrond-go/testscommon"
Expand All @@ -28,6 +29,7 @@ func createMockHeartbeatV2MonitorArgs() ArgHeartbeatV2Monitor {
MaxDurationPeerUnresponsive: time.Second * 3,
HideInactiveValidatorInterval: time.Second * 5,
ShardId: 0,
PeerTypeProvider: &mock.PeerTypeProviderStub{},
}
}

Expand Down Expand Up @@ -115,6 +117,15 @@ func TestNewHeartbeatV2Monitor(t *testing.T) {
assert.True(t, errors.Is(err, heartbeat.ErrInvalidTimeDuration))
assert.True(t, strings.Contains(err.Error(), "HideInactiveValidatorInterval"))
})
t.Run("nil peer type provider should error", func(t *testing.T) {
t.Parallel()

args := createMockHeartbeatV2MonitorArgs()
args.PeerTypeProvider = nil
monitor, err := NewHeartbeatV2Monitor(args)
assert.True(t, check.IfNil(monitor))
assert.Equal(t, heartbeat.ErrNilPeerTypeProvider, err)
})
t.Run("should work", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -167,6 +178,42 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) {
_, err := monitor.parseMessage("pid", message, nil)
assert.Equal(t, heartbeat.ErrShouldSkipValidator, err)
})
t.Run("should work, peer type provider returns error", func(t *testing.T) {
t.Parallel()

providedPkBytes := []byte("provided pk")
args := createMockHeartbeatV2MonitorArgs()
args.PeerShardMapper = &processMocks.PeerShardMapperStub{
GetPeerInfoCalled: func(pid core.PeerID) core.P2PPeerInfo {
return core.P2PPeerInfo{
PkBytes: providedPkBytes,
}
},
}
args.PeerTypeProvider = &mock.PeerTypeProviderStub{
ComputeForPubKeyCalled: func(pubKey []byte) (common.PeerType, uint32, error) {
return "", 0, errors.New("some error")
},
}
monitor, _ := NewHeartbeatV2Monitor(args)
assert.False(t, check.IfNil(monitor))

numInstances := make(map[string]uint64)
message := createHeartbeatMessage(true)
providedPid := core.PeerID("pid")
providedMap := map[string]struct{}{
providedPid.Pretty(): {},
}
hb, err := monitor.parseMessage(providedPid, message, numInstances)
assert.Nil(t, err)
checkResults(t, *message, hb, true, providedMap, 0)
assert.Equal(t, 0, len(providedMap))
pid := args.PubKeyConverter.Encode(providedPkBytes)
entries, ok := numInstances[pid]
assert.True(t, ok)
assert.Equal(t, uint64(1), entries)
assert.Equal(t, string(common.ObserverList), hb.PeerType)
})
t.Run("should work", func(t *testing.T) {
t.Parallel()

Expand All @@ -179,6 +226,12 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) {
}
},
}
expectedPeerType := common.EligibleList
args.PeerTypeProvider = &mock.PeerTypeProviderStub{
ComputeForPubKeyCalled: func(pubKey []byte) (common.PeerType, uint32, error) {
return expectedPeerType, 0, nil
},
}
monitor, _ := NewHeartbeatV2Monitor(args)
assert.False(t, check.IfNil(monitor))

Expand All @@ -196,6 +249,7 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) {
entries, ok := numInstances[pid]
assert.True(t, ok)
assert.Equal(t, uint64(1), entries)
assert.Equal(t, string(expectedPeerType), hb.PeerType)
})
}

Expand Down