Skip to content

Commit

Permalink
Merge pull request #4463 from ElrondNetwork/consensus-multikey-integr…
Browse files Browse the repository at this point in the history
…ation

Integrated multikey in consensus
  • Loading branch information
iulianpascalau authored Sep 20, 2022
2 parents d61698d + ba20a63 commit 7fe1d70
Show file tree
Hide file tree
Showing 55 changed files with 1,777 additions and 573 deletions.
50 changes: 34 additions & 16 deletions consensus/broadcast/commonMessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type delayedBroadcaster interface {
SetValidatorData(data *delayedBroadcastData) error
SetHeaderForValidator(vData *validatorHeaderBroadcastData) error
SetBroadcastHandlers(
mbBroadcast func(mbData map[uint32][]byte) error,
txBroadcast func(txData map[string][][]byte) error,
headerBroadcast func(header data.HeaderHandler) error,
mbBroadcast func(mbData map[uint32][]byte, pkBytes []byte) error,
txBroadcast func(txData map[string][][]byte, pkBytes []byte) error,
headerBroadcast func(header data.HeaderHandler, pkBytes []byte) error,
) error
Close()
}
Expand All @@ -39,25 +39,25 @@ type commonMessenger struct {
marshalizer marshal.Marshalizer
hasher hashing.Hasher
messenger consensus.P2PMessenger
privateKey crypto.PrivateKey
shardCoordinator sharding.Coordinator
peerSignatureHandler crypto.PeerSignatureHandler
delayedBlockBroadcaster delayedBroadcaster
keysHandler consensus.KeysHandler
}

// CommonMessengerArgs holds the arguments for creating commonMessenger instance
type CommonMessengerArgs struct {
Marshalizer marshal.Marshalizer
Hasher hashing.Hasher
Messenger consensus.P2PMessenger
PrivateKey crypto.PrivateKey
ShardCoordinator sharding.Coordinator
PeerSignatureHandler crypto.PeerSignatureHandler
HeadersSubscriber consensus.HeadersPoolSubscriber
InterceptorsContainer process.InterceptorsContainer
MaxDelayCacheSize uint32
MaxValidatorDelayCacheSize uint32
AlarmScheduler core.TimersScheduler
KeysHandler consensus.KeysHandler
}

func checkCommonMessengerNilParameters(
Expand All @@ -72,9 +72,6 @@ func checkCommonMessengerNilParameters(
if check.IfNil(args.Messenger) {
return spos.ErrNilMessenger
}
if check.IfNil(args.PrivateKey) {
return spos.ErrNilPrivateKey
}
if check.IfNil(args.ShardCoordinator) {
return spos.ErrNilShardCoordinator
}
Expand All @@ -93,13 +90,17 @@ func checkCommonMessengerNilParameters(
if args.MaxDelayCacheSize == 0 || args.MaxValidatorDelayCacheSize == 0 {
return spos.ErrInvalidCacheSize
}
if check.IfNil(args.KeysHandler) {
return ErrNilKeysHandler
}

return nil
}

// BroadcastConsensusMessage will send on consensus topic the consensus message
func (cm *commonMessenger) BroadcastConsensusMessage(message *consensus.Message) error {
signature, err := cm.peerSignatureHandler.GetPeerSignature(cm.privateKey, message.OriginatorPid)
privateKey := cm.keysHandler.GetHandledPrivateKey(message.PubKey)
signature, err := cm.peerSignatureHandler.GetPeerSignature(privateKey, message.OriginatorPid)
if err != nil {
return err
}
Expand All @@ -114,18 +115,18 @@ func (cm *commonMessenger) BroadcastConsensusMessage(message *consensus.Message)
consensusTopic := common.ConsensusTopic +
cm.shardCoordinator.CommunicationIdentifier(cm.shardCoordinator.SelfId())

cm.messenger.Broadcast(consensusTopic, buff)
cm.broadcast(consensusTopic, buff, message.PubKey)

return nil
}

// BroadcastMiniBlocks will send on miniblocks topic the cross-shard miniblocks
func (cm *commonMessenger) BroadcastMiniBlocks(miniBlocks map[uint32][]byte) error {
func (cm *commonMessenger) BroadcastMiniBlocks(miniBlocks map[uint32][]byte, pkBytes []byte) error {
for k, v := range miniBlocks {
miniBlocksTopic := factory.MiniBlocksTopic +
cm.shardCoordinator.CommunicationIdentifier(k)

cm.messenger.Broadcast(miniBlocksTopic, v)
cm.broadcast(miniBlocksTopic, v, pkBytes)
}

if len(miniBlocks) > 0 {
Expand All @@ -138,7 +139,7 @@ func (cm *commonMessenger) BroadcastMiniBlocks(miniBlocks map[uint32][]byte) err
}

// BroadcastTransactions will send on transaction topic the transactions
func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byte) error {
func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byte, pkBytes []byte) error {
dataPacker, err := partitioning.NewSimpleDataPacker(cm.marshalizer)
if err != nil {
return err
Expand All @@ -155,7 +156,7 @@ func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byt
}

for _, buff := range packets {
cm.messenger.Broadcast(topic, buff)
cm.broadcast(topic, buff, pkBytes)
}
}

Expand All @@ -172,12 +173,13 @@ func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byt
func (cm *commonMessenger) BroadcastBlockData(
miniBlocks map[uint32][]byte,
transactions map[string][][]byte,
pkBytes []byte,
extraDelayForBroadcast time.Duration,
) {
time.Sleep(extraDelayForBroadcast)

if len(miniBlocks) > 0 {
err := cm.BroadcastMiniBlocks(miniBlocks)
err := cm.BroadcastMiniBlocks(miniBlocks, pkBytes)
if err != nil {
log.Warn("commonMessenger.BroadcastBlockData: broadcast miniblocks", "error", err.Error())
}
Expand All @@ -186,7 +188,7 @@ func (cm *commonMessenger) BroadcastBlockData(
time.Sleep(common.ExtraDelayBetweenBroadcastMbsAndTxs)

if len(transactions) > 0 {
err := cm.BroadcastTransactions(transactions)
err := cm.BroadcastTransactions(transactions, pkBytes)
if err != nil {
log.Warn("commonMessenger.BroadcastBlockData: broadcast transactions", "error", err.Error())
}
Expand Down Expand Up @@ -223,3 +225,19 @@ func (cm *commonMessenger) extractMetaMiniBlocksAndTransactions(

return metaMiniBlocks, metaTransactions
}

func (cm *commonMessenger) broadcast(topic string, data []byte, pkBytes []byte) {
if cm.keysHandler.IsOriginalPublicKeyOfTheNode(pkBytes) {
cm.messenger.Broadcast(topic, data)
return
}

skBytes, pid, err := cm.keysHandler.GetP2PIdentity(pkBytes)
if err != nil {
log.Error("setup error in commonMessenger.broadcast - public key is managed but does not contain p2p sign info",
"pk", pkBytes, "error", err)
return
}

cm.messenger.BroadcastUsingPrivateKey(topic, data, pid, skBytes)
}
Loading

0 comments on commit 7fe1d70

Please sign in to comment.