Skip to content

Commit

Permalink
Merge branch 'rc/2022-july' into optimize-peer-auth-messages-management
Browse files Browse the repository at this point in the history
# Conflicts:
#	process/heartbeat/interceptedHeartbeat.go
  • Loading branch information
iulianpascalau committed Sep 1, 2022
2 parents 440f0ac + 8f960bd commit 3b74080
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 69 deletions.
110 changes: 84 additions & 26 deletions heartbeat/heartbeat.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion heartbeat/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ func (monitor *heartbeatV2Monitor) parseMessage(pid core.PeerID, message interfa
return pubKeyHeartbeat, heartbeat.ErrShouldSkipValidator
}

pk := monitor.pubKeyConverter.Encode(peerInfo.PkBytes)
pkBytes := peerInfo.PkBytes
if stringType == string(common.ObserverList) {
pkBytes = heartbeatV2.GetPubkey()
}
pk := monitor.pubKeyConverter.Encode(pkBytes)
numInstances[pk]++

pubKeyHeartbeat = data.PubKeyHeartbeat{
Expand Down
15 changes: 11 additions & 4 deletions heartbeat/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func createHeartbeatMessage(active bool) *heartbeat.HeartbeatV2 {
Identity: "identity",
Nonce: 0,
PeerSubType: 0,
Pubkey: []byte("public key"),
}
}

Expand Down Expand Up @@ -182,6 +183,7 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) {
t.Parallel()

providedPkBytes := []byte("provided pk")
providedPkBytesFromMessage := []byte("provided pk message")
args := createMockHeartbeatV2MonitorArgs()
args.PeerShardMapper = &processMocks.PeerShardMapperStub{
GetPeerInfoCalled: func(pid core.PeerID) core.P2PPeerInfo {
Expand All @@ -200,6 +202,7 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) {

numInstances := make(map[string]uint64)
message := createHeartbeatMessage(true)
message.Pubkey = providedPkBytesFromMessage
providedPid := core.PeerID("pid")
providedMap := map[string]struct{}{
providedPid.Pretty(): {},
Expand All @@ -208,11 +211,15 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) {
assert.Nil(t, err)
checkResults(t, *message, hb, true, providedMap, 0)
assert.Equal(t, 0, len(providedMap))
pid := args.PubKeyConverter.Encode(providedPkBytes)
entries, ok := numInstances[pid]
pkFromMsg := args.PubKeyConverter.Encode(providedPkBytesFromMessage)
entries, ok := numInstances[pkFromMsg]
assert.True(t, ok)
assert.Equal(t, uint64(1), entries)
assert.Equal(t, string(common.ObserverList), hb.PeerType)

pkFromPSM := args.PubKeyConverter.Encode(providedPkBytes)
_, ok = numInstances[pkFromPSM]
assert.False(t, ok)
})
t.Run("should work", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -245,8 +252,8 @@ func TestHeartbeatV2Monitor_parseMessage(t *testing.T) {
assert.Nil(t, err)
checkResults(t, *message, hb, true, providedMap, 0)
assert.Equal(t, 0, len(providedMap))
pid := args.PubKeyConverter.Encode(providedPkBytes)
entries, ok := numInstances[pid]
pk := args.PubKeyConverter.Encode(providedPkBytes)
entries, ok := numInstances[pk]
assert.True(t, ok)
assert.Equal(t, uint64(1), entries)
assert.Equal(t, string(expectedPeerType), hb.PeerType)
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/processor/peerAuthenticationRequestsProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func (processor *peerAuthenticationRequestsProcessor) startRequestingMessages(ct
requestsTimer := time.NewTimer(processor.delayBetweenRequests)
for {
if processor.isThresholdReached(sortedValidatorsKeys) {
log.Debug("received enough messages, closing peerAuthenticationRequestsProcessor go routine")
log.Debug("received enough messages, closing peerAuthenticationRequestsProcessor go routine",
"received", processor.peerAuthenticationPool.Len(),
"validators", len(sortedValidatorsKeys))
return
}

Expand Down
1 change: 1 addition & 0 deletions heartbeat/proto/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ message HeartbeatV2 {
string Identity = 4;
uint64 Nonce = 5;
uint32 PeerSubType = 6;
bytes Pubkey = 7;
}

// PeerAuthentication represents the DTO used to pass peer authentication information such as public key, peer id,
Expand Down
26 changes: 26 additions & 0 deletions heartbeat/sender/baseSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go-core/core/random"
"github.com/ElrondNetwork/elrond-go-core/marshal"
crypto "github.com/ElrondNetwork/elrond-go-crypto"
"github.com/ElrondNetwork/elrond-go/heartbeat"
)

Expand All @@ -24,6 +25,8 @@ type argBaseSender struct {
timeBetweenSends time.Duration
timeBetweenSendsWhenError time.Duration
thresholdBetweenSends float64
redundancyHandler heartbeat.NodeRedundancyHandler
privKey crypto.PrivateKey
}

type baseSender struct {
Expand All @@ -34,6 +37,10 @@ type baseSender struct {
timeBetweenSends time.Duration
timeBetweenSendsWhenError time.Duration
thresholdBetweenSends float64
redundancy heartbeat.NodeRedundancyHandler
privKey crypto.PrivateKey
publicKey crypto.PublicKey
observerPublicKey crypto.PublicKey
}

func createBaseSender(args argBaseSender) baseSender {
Expand All @@ -44,6 +51,10 @@ func createBaseSender(args argBaseSender) baseSender {
timeBetweenSends: args.timeBetweenSends,
timeBetweenSendsWhenError: args.timeBetweenSendsWhenError,
thresholdBetweenSends: args.thresholdBetweenSends,
redundancy: args.redundancyHandler,
privKey: args.privKey,
publicKey: args.privKey.GeneratePublic(),
observerPublicKey: args.redundancyHandler.ObserverPrivateKey().GeneratePublic(),
}
bs.timerHandler = &timerWrapper{
timer: time.NewTimer(bs.computeRandomDuration(bs.timeBetweenSends)),
Expand Down Expand Up @@ -72,6 +83,12 @@ func checkBaseSenderArgs(args argBaseSender) error {
return fmt.Errorf("%w for thresholdBetweenSends, receieved %f, min allowed %f, max allowed %f",
heartbeat.ErrInvalidThreshold, args.thresholdBetweenSends, minThresholdBetweenSends, maxThresholdBetweenSends)
}
if check.IfNil(args.privKey) {
return heartbeat.ErrNilPrivateKey
}
if check.IfNil(args.redundancyHandler) {
return heartbeat.ErrNilRedundancyHandler
}

return nil
}
Expand All @@ -84,3 +101,12 @@ func (bs *baseSender) computeRandomDuration(baseDuration time.Duration) time.Dur
ret := time.Duration(timeBetweenSendsInNano + int64(randThreshold))
return ret
}

func (bs *baseSender) getCurrentPrivateAndPublicKeys() (crypto.PrivateKey, crypto.PublicKey) {
shouldUseOriginalKeys := !bs.redundancy.IsRedundancyNode() || (bs.redundancy.IsRedundancyNode() && !bs.redundancy.IsMainMachineActive())
if shouldUseOriginalKeys {
return bs.privKey, bs.publicKey
}

return bs.redundancy.ObserverPrivateKey(), bs.observerPublicKey
}
4 changes: 4 additions & 0 deletions heartbeat/sender/baseSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"testing"
"time"

"github.com/ElrondNetwork/elrond-go/heartbeat/mock"
"github.com/ElrondNetwork/elrond-go/testscommon"
"github.com/ElrondNetwork/elrond-go/testscommon/cryptoMocks"
"github.com/ElrondNetwork/elrond-go/testscommon/p2pmocks"
"github.com/stretchr/testify/assert"
)
Expand All @@ -17,6 +19,8 @@ func createMockBaseArgs() argBaseSender {
timeBetweenSends: time.Second,
timeBetweenSendsWhenError: time.Second,
thresholdBetweenSends: 0.1,
privKey: &cryptoMocks.PrivateKeyStub{},
redundancyHandler: &mock.RedundancyHandlerStub{},
}
}

Expand Down
9 changes: 8 additions & 1 deletion heartbeat/sender/heartbeatSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func newHeartbeatSender(args argHeartbeatSender) (*heartbeatSender, error) {
versionNumber: args.versionNumber,
nodeDisplayName: args.nodeDisplayName,
identity: args.identity,
currentBlockProvider: args.currentBlockProvider,
peerSubType: args.peerSubType,
currentBlockProvider: args.currentBlockProvider,
}, nil
}

Expand Down Expand Up @@ -101,13 +101,20 @@ func (sender *heartbeatSender) execute() error {
nonce = currentBlock.GetNonce()
}

_, pk := sender.getCurrentPrivateAndPublicKeys()
pkBytes, err := pk.ToByteArray()
if err != nil {
return err
}

msg := &heartbeat.HeartbeatV2{
Payload: payloadBytes,
VersionNumber: sender.versionNumber,
NodeDisplayName: sender.nodeDisplayName,
Identity: sender.identity,
Nonce: nonce,
PeerSubType: uint32(sender.peerSubType),
Pubkey: pkBytes,
}

msgBytes, err := sender.marshaller.Marshal(msg)
Expand Down
Loading

0 comments on commit 3b74080

Please sign in to comment.