Skip to content

Commit

Permalink
Merge "[FAB-2198] Introduce envelopes to gossip message"
Browse files Browse the repository at this point in the history
  • Loading branch information
jimthematrix authored and Gerrit Code Review committed Feb 13, 2017
2 parents 398e366 + 5dbe29e commit 2203c24
Show file tree
Hide file tree
Showing 20 changed files with 232 additions and 176 deletions.
19 changes: 1 addition & 18 deletions gossip/comm/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Comm interface {

// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
// Each message from the channel can be used to send a reply back to the sender
Accept(common.MessageAcceptor) <-chan ReceivedMessage
Accept(common.MessageAcceptor) <-chan proto.ReceivedMessage

// PresumedDead returns a read-only channel for node endpoints that are suspected to be offline
PresumedDead() <-chan common.PKIidType
Expand All @@ -63,20 +63,3 @@ type RemotePeer struct {
func (p *RemotePeer) String() string {
return fmt.Sprintf("%s, PKIid:%v", p.Endpoint, p.PKIID)
}

// ReceivedMessage is a GossipMessage wrapper that
// enables the user to send a message to the origin from which
// the ReceivedMessage was sent from.
// It also allows to know the identity of the sender
type ReceivedMessage interface {

// Respond sends a GossipMessage to the origin from which this ReceivedMessage was sent from
Respond(msg *proto.GossipMessage)

// GetGossipMessage returns the underlying GossipMessage
GetGossipMessage() *proto.GossipMessage

// GetPKIID returns the PKI-ID of the remote peer
// that sent the message
GetPKIID() common.PKIidType
}
8 changes: 4 additions & 4 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
deadEndpoints: make(chan common.PKIidType, 100),
stopping: int32(0),
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan ReceivedMessage, 0),
subscriptions: make([]chan proto.ReceivedMessage, 0),
blackListedPKIIDs: make([]common.PKIidType, 0),
}
commInst.connStore = newConnStore(commInst, commInst.logger)
Expand Down Expand Up @@ -154,7 +154,7 @@ type commImpl struct {
exitChan chan struct{}
stopping int32
stopWG sync.WaitGroup
subscriptions []chan ReceivedMessage
subscriptions []chan proto.ReceivedMessage
blackListedPKIIDs []common.PKIidType
}

Expand Down Expand Up @@ -290,9 +290,9 @@ func (c *commImpl) Probe(remotePeer *RemotePeer) error {
return err
}

func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan ReceivedMessage {
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage {
genericChan := c.msgPublisher.AddChannel(acceptor)
specificChan := make(chan ReceivedMessage, 10)
specificChan := make(chan proto.ReceivedMessage, 10)

if c.isStopping() {
c.logger.Warning("Accept() called but comm module is stopping, returning empty channel")
Expand Down
16 changes: 8 additions & 8 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func newCommInstance(port int, sec api.MessageCryptoService) (Comm, error) {
return inst, err
}

func handshaker(endpoint string, comm Comm, t *testing.T, sigMutator func([]byte) []byte, pkiIDmutator func([]byte) []byte) <-chan ReceivedMessage {
func handshaker(endpoint string, comm Comm, t *testing.T, sigMutator func([]byte) []byte, pkiIDmutator func([]byte) []byte) <-chan proto.ReceivedMessage {
c := &commImpl{}
err := generateCertificates("key.pem", "cert.pem")
defer os.Remove("cert.pem")
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestBasic(t *testing.T) {
m1 := comm1.Accept(acceptAll)
m2 := comm2.Accept(acceptAll)
out := make(chan uint64, 2)
reader := func(ch <-chan ReceivedMessage) {
reader := func(ch <-chan proto.ReceivedMessage) {
m := <-ch
out <- m.GetGossipMessage().Nonce
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestBlackListPKIid(t *testing.T) {
defer comm3.Stop()
defer comm4.Stop()

reader := func(instance string, out chan uint64, in <-chan ReceivedMessage) {
reader := func(instance string, out chan uint64, in <-chan proto.ReceivedMessage) {
for {
msg := <-in
if msg == nil {
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestResponses(t *testing.T) {
defer comm1.Stop()
defer comm2.Stop()

nonceIncrememter := func(msg ReceivedMessage) ReceivedMessage {
nonceIncrememter := func(msg proto.ReceivedMessage) proto.ReceivedMessage {
msg.GetGossipMessage().Nonce++
return msg
}
Expand Down Expand Up @@ -365,11 +365,11 @@ func TestAccept(t *testing.T) {
comm2, _ := newCommInstance(7612, naiveSec)

evenNONCESelector := func(m interface{}) bool {
return m.(ReceivedMessage).GetGossipMessage().Nonce%2 == 0
return m.(proto.ReceivedMessage).GetGossipMessage().Nonce%2 == 0
}

oddNONCESelector := func(m interface{}) bool {
return m.(ReceivedMessage).GetGossipMessage().Nonce%2 != 0
return m.(proto.ReceivedMessage).GetGossipMessage().Nonce%2 != 0
}

evenNONCES := comm1.Accept(evenNONCESelector)
Expand All @@ -381,7 +381,7 @@ func TestAccept(t *testing.T) {
out := make(chan uint64, defRecvBuffSize)
sem := make(chan struct{}, 0)

readIntoSlice := func(a *[]uint64, ch <-chan ReceivedMessage) {
readIntoSlice := func(a *[]uint64, ch <-chan proto.ReceivedMessage) {
for m := range ch {
*a = append(*a, m.GetGossipMessage().Nonce)
out <- m.GetGossipMessage().Nonce
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestReConnections(t *testing.T) {
comm1, _ := newCommInstance(3611, naiveSec)
comm2, _ := newCommInstance(3612, naiveSec)

reader := func(out chan uint64, in <-chan ReceivedMessage) {
reader := func(out chan uint64, in <-chan proto.ReceivedMessage) {
for {
msg := <-in
if msg == nil {
Expand Down
14 changes: 11 additions & 3 deletions gossip/comm/mock/mock_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type packetMock struct {
type channelMock struct {
accept common.MessageAcceptor

channel chan comm.ReceivedMessage
channel chan proto.ReceivedMessage
}

type commMock struct {
Expand Down Expand Up @@ -91,11 +91,19 @@ func (packet *packetMock) Respond(msg *proto.GossipMessage) {
}
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// constructed with
func (packet *packetMock) GetSourceMessage() *proto.SignedGossipMessage {
return nil
}

// GetGossipMessage returns the underlying GossipMessage
func (packet *packetMock) GetGossipMessage() *proto.GossipMessage {
return packet.msg.(*proto.GossipMessage)
}

// GetPKIID returns the PKI-ID of the remote peer
// that sent the message
func (packet *packetMock) GetPKIID() common.PKIidType {
return nil
}
Expand Down Expand Up @@ -151,8 +159,8 @@ func (mock *commMock) Probe(peer *comm.RemotePeer) error {

// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
// Each message from the channel can be used to send a reply back to the sender
func (mock *commMock) Accept(accept common.MessageAcceptor) <-chan comm.ReceivedMessage {
ch := make(chan comm.ReceivedMessage)
func (mock *commMock) Accept(accept common.MessageAcceptor) <-chan proto.ReceivedMessage {
ch := make(chan proto.ReceivedMessage)
mock.acceptors = append(mock.acceptors, &channelMock{accept, ch})
return ch
}
Expand Down
4 changes: 2 additions & 2 deletions gossip/comm/mock/mock_comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestMockComm(t *testing.T) {
defer comm1.Stop()

msgCh := comm1.Accept(func(message interface{}) bool {
return message.(comm.ReceivedMessage).GetGossipMessage().GetStateRequest() != nil ||
message.(comm.ReceivedMessage).GetGossipMessage().GetStateResponse() != nil
return message.(proto.ReceivedMessage).GetGossipMessage().GetStateRequest() != nil ||
message.(proto.ReceivedMessage).GetGossipMessage().GetStateResponse() != nil
})

comm2 := NewCommMock(second.endpoint, members)
Expand Down
6 changes: 6 additions & 0 deletions gossip/comm/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ type ReceivedMessageImpl struct {
conn *connection
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// constructed with
func (m *ReceivedMessageImpl) GetSourceMessage() *proto.SignedGossipMessage {
return nil
}

// Respond sends a msg to the source that sent the ReceivedMessageImpl
func (m *ReceivedMessageImpl) Respond(msg *proto.GossipMessage) {
m.conn.send(msg, func(e error) {})
Expand Down
3 changes: 1 addition & 2 deletions gossip/election/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
proto "github.com/hyperledger/fabric/protos/gossip"
Expand Down Expand Up @@ -61,7 +60,7 @@ type gossip interface {
// If passThrough is false, the messages are processed by the gossip layer beforehand.
// If passThrough is true, the gossip layer doesn't intervene and the messages
// can be used to send a reply back to the sender
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage)
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage)

// Gossip sends a message to other peers to the network
Gossip(msg *proto.GossipMessage)
Expand Down
3 changes: 1 addition & 2 deletions gossip/election/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
proto "github.com/hyperledger/fabric/protos/gossip"
Expand Down Expand Up @@ -216,7 +215,7 @@ func (g *peerMockGossip) Peers() []discovery.NetworkMember {
return res
}

func (g *peerMockGossip) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage) {
func (g *peerMockGossip) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage) {
ch := make(chan *proto.GossipMessage, 100)
g.acceptorLock.Lock()
g.acceptors = append(g.acceptors, &mockAcceptor{
Expand Down
5 changes: 2 additions & 3 deletions gossip/gossip/certstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip/pull"
"github.com/hyperledger/fabric/gossip/identity"
Expand Down Expand Up @@ -63,7 +62,7 @@ func newCertStore(puller pull.Mediator, idMapper identity.Mapper, selfIdentity a

puller.Add(certStore.createIdentityMessage())

puller.RegisterMsgHook(pull.ResponseMsgType, func(_ []string, msgs []*proto.GossipMessage, _ comm.ReceivedMessage) {
puller.RegisterMsgHook(pull.ResponseMsgType, func(_ []string, msgs []*proto.GossipMessage, _ proto.ReceivedMessage) {
for _, msg := range msgs {
pkiID := common.PKIidType(msg.GetPeerIdentity().PkiID)
cert := api.PeerIdentityType(msg.GetPeerIdentity().Cert)
Expand All @@ -78,7 +77,7 @@ func newCertStore(puller pull.Mediator, idMapper identity.Mapper, selfIdentity a
return certStore
}

func (cs *certStore) handleMessage(msg comm.ReceivedMessage) {
func (cs *certStore) handleMessage(msg proto.ReceivedMessage) {
if update := msg.GetGossipMessage().GetDataUpdate(); update != nil {
for _, m := range update.Data {
if !m.IsIdentityMsg() {
Expand Down
18 changes: 12 additions & 6 deletions gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type sentMsg struct {
mock.Mock
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// constructed with
func (s *sentMsg) GetSourceMessage() *proto.SignedGossipMessage {
return nil
}

func (s *sentMsg) Respond(msg *proto.GossipMessage) {
s.Called(msg)
}
Expand Down Expand Up @@ -80,30 +86,30 @@ func (m *membershipSvcMock) GetMembership() []discovery.NetworkMember {
}

func TestCertStoreBadSignature(t *testing.T) {
badSignature := func(nonce uint64) comm.ReceivedMessage {
badSignature := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createBadlySignedUpdateMessage())
}

testCertificateUpdate(t, badSignature, false)
}

func TestCertStoreMismatchedIdentity(t *testing.T) {
mismatchedIdentity := func(nonce uint64) comm.ReceivedMessage {
mismatchedIdentity := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createMismatchedUpdateMessage())
}

testCertificateUpdate(t, mismatchedIdentity, false)
}

func TestCertStoreShouldSucceed(t *testing.T) {
totallyFineIdentity := func(nonce uint64) comm.ReceivedMessage {
totallyFineIdentity := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createValidUpdateMessage())
}

testCertificateUpdate(t, totallyFineIdentity, true)
}

func testCertificateUpdate(t *testing.T, updateFactory func(uint64) comm.ReceivedMessage, shouldSucceed bool) {
func testCertificateUpdate(t *testing.T, updateFactory func(uint64) proto.ReceivedMessage, shouldSucceed bool) {
config := pull.PullConfig{
MsgType: proto.PullMsgType_IdentityMsg,
PeerCountToSelect: 1,
Expand Down Expand Up @@ -257,7 +263,7 @@ func createValidUpdateMessage() *proto.GossipMessage {
return m
}

func createUpdateMessage(nonce uint64, idMsg *proto.GossipMessage) comm.ReceivedMessage {
func createUpdateMessage(nonce uint64, idMsg *proto.GossipMessage) proto.ReceivedMessage {
update := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Content: &proto.GossipMessage_DataUpdate{
Expand All @@ -271,7 +277,7 @@ func createUpdateMessage(nonce uint64, idMsg *proto.GossipMessage) comm.Received
return &sentMsg{msg: update}
}

func createDigest(nonce uint64) comm.ReceivedMessage {
func createDigest(nonce uint64) proto.ReceivedMessage {
digest := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Content: &proto.GossipMessage_DataDig{
Expand Down
6 changes: 3 additions & 3 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type GossipChannel interface {
IsSubscribed(member discovery.NetworkMember) bool

// HandleMessage processes a message sent by a remote peer
HandleMessage(comm.ReceivedMessage)
HandleMessage(proto.ReceivedMessage)

// AddToMsgStore adds a given GossipMessage to the message store
AddToMsgStore(msg *proto.GossipMessage)
Expand Down Expand Up @@ -354,7 +354,7 @@ func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) {
}

// HandleMessage processes a message sent by a remote peer
func (gc *gossipChannel) HandleMessage(msg comm.ReceivedMessage) {
func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
if !gc.verifyMsg(msg) {
return
}
Expand Down Expand Up @@ -500,7 +500,7 @@ func (gc *gossipChannel) createStateInfoSnapshot() *proto.GossipMessage {
}
}

func (gc *gossipChannel) verifyMsg(msg comm.ReceivedMessage) bool {
func (gc *gossipChannel) verifyMsg(msg proto.ReceivedMessage) bool {
if msg == nil {
gc.logger.Warning("Messsage is nil")
return false
Expand Down
6 changes: 6 additions & 0 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type receivedMsg struct {
mock.Mock
}

// GetSourceMessage Returns the SignedGossipMessage the ReceivedMessage was
// constructed with
func (m *receivedMsg) GetSourceMessage() *proto.SignedGossipMessage {
return nil
}

func (m *receivedMsg) GetGossipMessage() *proto.GossipMessage {
return m.msg
}
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Gossip interface {
// If passThrough is false, the messages are processed by the gossip layer beforehand.
// If passThrough is true, the gossip layer doesn't intervene and the messages
// can be used to send a reply back to the sender
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage)
Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage)

// JoinChan makes the Gossip instance join a channel
JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID)
Expand Down
Loading

0 comments on commit 2203c24

Please sign in to comment.