diff --git a/gossip/common/common.go b/gossip/common/common.go index 72e10afb557..0b382dd2024 100644 --- a/gossip/common/common.go +++ b/gossip/common/common.go @@ -6,6 +6,8 @@ SPDX-License-Identifier: Apache-2.0 package common +import "bytes" + func init() { // This is just to satisfy the code coverage tool // miss any methods @@ -18,6 +20,13 @@ func init() { // which is the security identifier of a peer type PKIidType []byte +// IsNotSameFilter generate filter function which +// provides a predicate to identify whenever current id +// equals to another one. +func (this PKIidType) IsNotSameFilter(that PKIidType) bool { + return !bytes.Equal(this, that) +} + // MessageAcceptor is a predicate that is used to // determine in which messages the subscriber that created the // instance of the MessageAcceptor is interested in. diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go index a9e2ee871c3..9d6718a58b6 100644 --- a/gossip/discovery/discovery.go +++ b/gossip/discovery/discovery.go @@ -55,13 +55,17 @@ type CommService interface { Ping(peer *NetworkMember) bool // Accept returns a read-only channel for membership messages sent from remote peers - Accept() <-chan *proto.SignedGossipMessage + Accept() <-chan proto.ReceivedMessage // PresumedDead returns a read-only channel for peers that are presumed to be dead PresumedDead() <-chan common.PKIidType // CloseConn orders to close the connection with a certain peer CloseConn(peer *NetworkMember) + + // Forward sends message to the next hop, excluding the hop + // from which message was initially received + Forward(msg proto.ReceivedMessage) } // NetworkMember is a peer's representation diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index c66a58c64d2..d456c0d6f18 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -309,10 +309,11 @@ func (d *gossipDiscoveryImpl) handleMessages() { } } -func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { - if m == nil { +func (d *gossipDiscoveryImpl) handleMsgFromComm(msg proto.ReceivedMessage) { + if msg == nil { return } + m := msg.GetGossipMessage() if m.GetAliveMsg() == nil && m.GetMemRes() == nil && m.GetMemReq() == nil { d.logger.Warning("Got message with wrong type (expected Alive or MembershipResponse or MembershipRequest message):", m.GossipMessage) return @@ -345,13 +346,12 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { } if m.IsAliveMsg() { - if !d.msgStore.Add(m) { return } d.handleAliveMessage(m) - d.comm.Gossip(m) + d.comm.Forward(msg) return } diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index b231bcbd0bb..f13c716dbe7 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -43,6 +43,31 @@ func init() { maxConnectionAttempts = 10000 } +type dummyReceivedMessage struct { + msg *proto.SignedGossipMessage + info *proto.ConnectionInfo +} + +func (*dummyReceivedMessage) Respond(msg *proto.GossipMessage) { + panic("implement me") +} + +func (rm *dummyReceivedMessage) GetGossipMessage() *proto.SignedGossipMessage { + return rm.msg +} + +func (*dummyReceivedMessage) GetSourceEnvelope() *proto.Envelope { + panic("implement me") +} + +func (rm *dummyReceivedMessage) GetConnectionInfo() *proto.ConnectionInfo { + return rm.info +} + +func (*dummyReceivedMessage) Ack(err error) { + panic("implement me") +} + type dummyCommModule struct { msgsReceived uint32 msgsSent uint32 @@ -52,7 +77,7 @@ type dummyCommModule struct { streams map[string]proto.Gossip_GossipStreamClient conns map[string]*grpc.ClientConn lock *sync.RWMutex - incMsgs chan *proto.SignedGossipMessage + incMsgs chan proto.ReceivedMessage lastSeqs map[string]uint64 shouldGossip bool mock *mock.Mock @@ -101,6 +126,17 @@ func (comm *dummyCommModule) Gossip(msg *proto.SignedGossipMessage) { } } +func (comm *dummyCommModule) Forward(msg proto.ReceivedMessage) { + if !comm.shouldGossip { + return + } + comm.lock.Lock() + defer comm.lock.Unlock() + for _, conn := range comm.streams { + conn.Send(msg.GetGossipMessage().Envelope) + } +} + func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *proto.SignedGossipMessage) { comm.lock.RLock() _, exists := comm.streams[peer.Endpoint] @@ -152,7 +188,7 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool { return true } -func (comm *dummyCommModule) Accept() <-chan *proto.SignedGossipMessage { +func (comm *dummyCommModule) Accept() <-chan proto.ReceivedMessage { return comm.incMsgs } @@ -217,7 +253,12 @@ func (g *gossipInstance) GossipStream(stream proto.Gossip_GossipStreamServer) er } lgr.Debug(g.Discovery.Self().Endpoint, "Got message:", gMsg) - g.comm.incMsgs <- gMsg + g.comm.incMsgs <- &dummyReceivedMessage{ + msg: gMsg, + info: &proto.ConnectionInfo{ + ID: common.PKIidType("testID"), + }, + } atomic.AddUint32(&g.comm.msgsReceived, 1) if aliveMsg := gMsg.GetAliveMsg(); aliveMsg != nil { @@ -294,7 +335,7 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st comm := &dummyCommModule{ conns: make(map[string]*grpc.ClientConn), streams: make(map[string]proto.Gossip_GossipStreamClient), - incMsgs: make(chan *proto.SignedGossipMessage, 1000), + incMsgs: make(chan proto.ReceivedMessage, 1000), presumeDead: make(chan common.PKIidType, 10000), id: id, detectedDead: make(chan string, 10000), @@ -364,7 +405,12 @@ func TestBadInput(t *testing.T) { DataMsg: &proto.DataMessage{}, }, }).NoopSign() - inst.Discovery.(*gossipDiscoveryImpl).handleMsgFromComm(s) + inst.Discovery.(*gossipDiscoveryImpl).handleMsgFromComm(&dummyReceivedMessage{ + msg: s, + info: &proto.ConnectionInfo{ + ID: common.PKIidType("testID"), + }, + }) } func TestConnect(t *testing.T) { @@ -944,7 +990,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { // Handling Alive for i := 0; i < peersNum; i++ { for k := 0; k < peersNum; k++ { - instances[i].discoveryImpl().handleMsgFromComm(aliveMsgs[k]) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: aliveMsgs[k], + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) } } @@ -1006,7 +1057,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { // Handling new Alive set for i := 0; i < peersNum; i++ { for k := 0; k < peersNum; k++ { - instances[i].discoveryImpl().handleMsgFromComm(newAliveMsgs[k]) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: newAliveMsgs[k], + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) } } @@ -1041,7 +1097,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { return k == i }, func(k int) { - instances[i].discoveryImpl().handleMsgFromComm(memReqMsgs[k]) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: memReqMsgs[k], + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) }) } @@ -1053,7 +1114,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { // Processing old (later) Alive messages for i := 0; i < peersNum; i++ { for k := 0; k < peersNum; k++ { - instances[i].discoveryImpl().handleMsgFromComm(aliveMsgs[k]) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: aliveMsgs[k], + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) } } @@ -1074,7 +1140,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { MemRes: msg, }, }).NoopSign() - instances[i].discoveryImpl().handleMsgFromComm(sMsg) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: sMsg, + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) } } diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index ea9c0c6a8d3..463d427de81 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -92,6 +92,9 @@ type Adapter interface { // Gossip gossips a message in the channel Gossip(message *proto.SignedGossipMessage) + // Forward sends a message to the next hops + Forward(message proto.ReceivedMessage) + // DeMultiplex de-multiplexes an item to subscribers DeMultiplex(interface{}) @@ -541,7 +544,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) { if added { // Forward the message - gc.Gossip(msg.GetGossipMessage()) + gc.Forward(msg) // DeMultiplex to local subscribers gc.DeMultiplex(m) diff --git a/gossip/gossip/channel/channel_test.go b/gossip/gossip/channel/channel_test.go index 42128aa4a94..bbbe14b19dd 100644 --- a/gossip/gossip/channel/channel_test.go +++ b/gossip/gossip/channel/channel_test.go @@ -186,6 +186,10 @@ func (ga *gossipAdapterMock) Gossip(msg *proto.SignedGossipMessage) { ga.Called(msg) } +func (ga *gossipAdapterMock) Forward(msg proto.ReceivedMessage) { + ga.Called(msg) +} + func (ga *gossipAdapterMock) DeMultiplex(msg interface{}) { ga.Called(msg) } @@ -309,6 +313,7 @@ func TestMsgStoreNotExpire(t *testing.T) { adapter.On("GetMembership").Return([]discovery.NetworkMember{peer2, peer3}) adapter.On("DeMultiplex", mock.Anything) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("GetConf").Return(conf) gc := NewGossipChannel(pkiID1, orgInChannelA, cs, channelA, adapter, jcm) @@ -395,6 +400,7 @@ func TestLeaveChannel(t *testing.T) { cs.On("VerifyBlock", mock.Anything).Return(nil) adapter := new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) members := []discovery.NetworkMember{ {PKIid: pkiIDInOrg1}, @@ -505,6 +511,7 @@ func TestChannelMsgStoreEviction(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { }) @@ -595,6 +602,7 @@ func TestChannelPull(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { msg := arg.Get(0).(*proto.SignedGossipMessage) if !msg.IsDataMsg() { @@ -654,6 +662,7 @@ func TestChannelPullAccessControl(t *testing.T) { adapter.On("GetMembership").Return([]discovery.NetworkMember{peer1, peer2, peer3}) adapter.On("DeMultiplex", mock.Anything) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("GetConf").Return(conf) sentHello := int32(0) @@ -722,6 +731,7 @@ func TestChannelPeerNotInChannel(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) @@ -854,6 +864,7 @@ func TestChannelIsSubscribed(t *testing.T) { configureAdapter(adapter) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1}) @@ -870,6 +881,7 @@ func TestChannelAddToMessageStore(t *testing.T) { configureAdapter(adapter) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { demuxedMsgs <- arg.Get(0).(*proto.SignedGossipMessage) @@ -922,6 +934,7 @@ func TestChannelBlockExpiration(t *testing.T) { configureAdapter(adapter) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { demuxedMsgs <- arg.Get(0).(*proto.SignedGossipMessage) @@ -1012,6 +1025,7 @@ func TestChannelBadBlocks(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("DeMultiplex", mock.Anything).Run(func(args mock.Arguments) { @@ -1050,6 +1064,7 @@ func TestChannelPulledBadBlocks(t *testing.T) { configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("DeMultiplex", mock.Anything) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)}) @@ -1075,6 +1090,7 @@ func TestChannelPulledBadBlocks(t *testing.T) { cs.On("VerifyBlock", mock.Anything).Return(errors.New("Bad block")) adapter = new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) @@ -1095,6 +1111,7 @@ func TestChannelPulledBadBlocks(t *testing.T) { cs.On("VerifyBlock", mock.Anything).Return(nil) adapter = new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) @@ -1119,6 +1136,7 @@ func TestChannelPulledBadBlocks(t *testing.T) { adapter = new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) @@ -1148,6 +1166,7 @@ func TestChannelStateInfoSnapshot(t *testing.T) { configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) sentMessages := make(chan *proto.GossipMessage, 10) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("ValidateStateInfoMessage", mock.Anything).Return(nil) @@ -1264,6 +1283,7 @@ func TestInterOrgExternalEndpointDisclosure(t *testing.T) { adapter.On("GetOrgOfPeer", pkiID2).Return(orgInChannelA) adapter.On("GetOrgOfPeer", pkiID3).Return(api.OrgIdentityType("ORG2")) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) configureAdapter(adapter) jcm := &joinChanMsg{ @@ -1429,6 +1449,7 @@ func TestChannelReconfigureChannel(t *testing.T) { gc.ConfigureChannel(api.JoinChannelMessage(newJoinChanMsg)) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) @@ -1517,6 +1538,7 @@ func TestGossipChannelEligibility(t *testing.T) { } adapter.On("GetMembership").Return(members) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) adapter.On("GetConf").Return(conf) @@ -1634,6 +1656,7 @@ func TestChannelGetPeers(t *testing.T) { cs := &cryptoService{} adapter := new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) members := []discovery.NetworkMember{ @@ -1683,6 +1706,7 @@ func TestOnDemandGossip(t *testing.T) { adapter.On("Gossip", mock.Anything).Run(func(mock.Arguments) { gossipedEvents <- struct{}{} }) + adapter.On("Forward", mock.Anything) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, api.JoinChannelMessage(&joinChanMsg{})) defer gc.Stop() select { @@ -1706,6 +1730,7 @@ func TestOnDemandGossip(t *testing.T) { adapter.On("Gossip", mock.Anything).Run(func(mock.Arguments) { gossipedEvents <- struct{}{} }) + adapter.On("Forward", mock.Anything) gc.(*gossipChannel).Adapter = adapter select { case <-gossipedEvents: @@ -1733,6 +1758,7 @@ func TestChannelPullWithDigestsFilter(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { msg := arg.Get(0).(*proto.SignedGossipMessage) if !msg.IsDataMsg() { diff --git a/gossip/gossip/chanstate.go b/gossip/gossip/chanstate.go index cfac4b0d7c6..73bf523a98d 100644 --- a/gossip/gossip/chanstate.go +++ b/gossip/gossip/chanstate.go @@ -129,7 +129,20 @@ func (ga *gossipAdapterImpl) GetConf() channel.Config { // Gossip gossips a message func (ga *gossipAdapterImpl) Gossip(msg *proto.SignedGossipMessage) { - ga.gossipServiceImpl.emitter.Add(msg) + ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: msg, + filter: func(_ common.PKIidType) bool { + return true + }, + }) +} + +// Forward sends message to the next hops +func (ga *gossipAdapterImpl) Forward(msg proto.ReceivedMessage) { + ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: msg.GetGossipMessage(), + filter: msg.GetConnectionInfo().ID.IsNotSameFilter, + }) } func (ga *gossipAdapterImpl) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) { diff --git a/gossip/gossip/gossip.go b/gossip/gossip/gossip.go index 451a7a0364f..1f0533443f7 100644 --- a/gossip/gossip/gossip.go +++ b/gossip/gossip/gossip.go @@ -73,6 +73,13 @@ type Gossip interface { Stop() } +// emittedGossipMessage encapsulates signed gossip message to compose +// with routing filter to be used while message is forwarded +type emittedGossipMessage struct { + *proto.SignedGossipMessage + filter func(id common.PKIidType) bool +} + // SendCriteria defines how to send a specific message type SendCriteria struct { Timeout time.Duration // Timeout defines the time to wait for acknowledgements diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 7748bbfdbaa..90798d92186 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -360,7 +360,10 @@ func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) { // in case it's a StateInfo message if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetConnectionInfo().ID}) && msg.IsStateInfoMsg() { if g.stateInfoMsgStore.Add(msg) { - g.emitter.Add(msg) + g.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: msg, + filter: m.GetConnectionInfo().ID.IsNotSameFilter, + }) } } if !g.toDie() { @@ -409,7 +412,7 @@ func (g *gossipServiceImpl) forwardDiscoveryMsg(msg proto.ReceivedMessage) { recover() }() - g.discAdapter.incChan <- msg.GetGossipMessage() + g.discAdapter.incChan <- msg } // validateMsg checks the signature of the message if exists, @@ -436,9 +439,9 @@ func (g *gossipServiceImpl) validateMsg(msg proto.ReceivedMessage) bool { } func (g *gossipServiceImpl) sendGossipBatch(a []interface{}) { - msgs2Gossip := make([]*proto.SignedGossipMessage, len(a)) + msgs2Gossip := make([]*emittedGossipMessage, len(a)) for i, e := range a { - msgs2Gossip[i] = e.(*proto.SignedGossipMessage) + msgs2Gossip[i] = e.(*emittedGossipMessage) } g.gossipBatch(msgs2Gossip) } @@ -455,25 +458,25 @@ func (g *gossipServiceImpl) sendGossipBatch(a []interface{}) { // to the same set of peers. // The rest of the messages that have no restrictions on their destinations can be sent // to any group of peers. -func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { +func (g *gossipServiceImpl) gossipBatch(msgs []*emittedGossipMessage) { if g.disc == nil { g.logger.Error("Discovery has not been initialized yet, aborting!") return } - var blocks []*proto.SignedGossipMessage - var stateInfoMsgs []*proto.SignedGossipMessage - var orgMsgs []*proto.SignedGossipMessage - var leadershipMsgs []*proto.SignedGossipMessage + var blocks []*emittedGossipMessage + var stateInfoMsgs []*emittedGossipMessage + var orgMsgs []*emittedGossipMessage + var leadershipMsgs []*emittedGossipMessage isABlock := func(o interface{}) bool { - return o.(*proto.SignedGossipMessage).IsDataMsg() + return o.(*emittedGossipMessage).IsDataMsg() } isAStateInfoMsg := func(o interface{}) bool { - return o.(*proto.SignedGossipMessage).IsStateInfoMsg() + return o.(*emittedGossipMessage).IsStateInfoMsg() } aliveMsgsWithNoEndpointAndInOurOrg := func(o interface{}) bool { - msg := o.(*proto.SignedGossipMessage) + msg := o.(*emittedGossipMessage) if !msg.IsAliveMsg() { return false } @@ -481,10 +484,10 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { return member.Endpoint == "" && g.isInMyorg(discovery.NetworkMember{PKIid: member.PkiId}) } isOrgRestricted := func(o interface{}) bool { - return aliveMsgsWithNoEndpointAndInOurOrg(o) || o.(*proto.SignedGossipMessage).IsOrgRestricted() + return aliveMsgsWithNoEndpointAndInOurOrg(o) || o.(*emittedGossipMessage).IsOrgRestricted() } isLeadershipMsg := func(o interface{}) bool { - return o.(*proto.SignedGossipMessage).IsLeadershipMsg() + return o.(*emittedGossipMessage).IsLeadershipMsg() } // Gossip blocks @@ -508,15 +511,19 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { peerSelector = gc.IsMemberInChan } + peerSelector = filter.CombineRoutingFilters(peerSelector, func(member discovery.NetworkMember) bool { + return stateInfMsg.filter(member.PKIid) + }) + peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), peerSelector) - g.comm.Send(stateInfMsg, peers2Send...) + g.comm.Send(stateInfMsg.SignedGossipMessage, peers2Send...) } // Gossip messages restricted to our org orgMsgs, msgs = partitionMessages(isOrgRestricted, msgs) peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), g.isInMyorg) for _, msg := range orgMsgs { - g.comm.Send(msg, peers2Send...) + g.comm.Send(msg.SignedGossipMessage, g.removeSelfLoop(msg, peers2Send)...) } // Finally, gossip the remaining messages @@ -526,8 +533,11 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { continue } selectByOriginOrg := g.peersByOriginOrgPolicy(discovery.NetworkMember{PKIid: msg.GetAliveMsg().Membership.PkiId}) - peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), selectByOriginOrg) - g.sendAndFilterSecrets(msg, peers2Send...) + selector := filter.CombineRoutingFilters(selectByOriginOrg, func(member discovery.NetworkMember) bool { + return msg.filter(member.PKIid) + }) + peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), selector) + g.sendAndFilterSecrets(msg.SignedGossipMessage, peers2Send...) } } @@ -549,19 +559,19 @@ func (g *gossipServiceImpl) sendAndFilterSecrets(msg *proto.SignedGossipMessage, } // gossipInChan gossips a given GossipMessage slice according to a channel's routing policy. -func (g *gossipServiceImpl) gossipInChan(messages []*proto.SignedGossipMessage, chanRoutingFactory channelRoutingFilterFactory) { +func (g *gossipServiceImpl) gossipInChan(messages []*emittedGossipMessage, chanRoutingFactory channelRoutingFilterFactory) { if len(messages) == 0 { return } totalChannels := extractChannels(messages) var channel common.ChainID - var messagesOfChannel []*proto.SignedGossipMessage + var messagesOfChannel []*emittedGossipMessage for len(totalChannels) > 0 { // Take first channel channel, totalChannels = totalChannels[0], totalChannels[1:] // Extract all messages of that channel grabMsgs := func(o interface{}) bool { - return bytes.Equal(o.(*proto.SignedGossipMessage).Channel, channel) + return bytes.Equal(o.(*emittedGossipMessage).Channel, channel) } messagesOfChannel, messages = partitionMessages(grabMsgs, messages) if len(messagesOfChannel) == 0 { @@ -585,11 +595,23 @@ func (g *gossipServiceImpl) gossipInChan(messages []*proto.SignedGossipMessage, // Send the messages to the remote peers for _, msg := range messagesOfChannel { - g.comm.Send(msg, peers2Send...) + filteredPeers := g.removeSelfLoop(msg, peers2Send) + g.comm.Send(msg.SignedGossipMessage, filteredPeers...) } } } +// removeSelfLoop deletes from the list of peers peer which has sent the message +func (g *gossipServiceImpl) removeSelfLoop(msg *emittedGossipMessage, peers []*comm.RemotePeer) []*comm.RemotePeer { + var result []*comm.RemotePeer + for _, peer := range peers { + if msg.filter(peer.PKIID) { + result = append(result, peer) + } + } + return result +} + // SendByCriteria sends a given message to all peers that match the given SendCriteria func (g *gossipServiceImpl) SendByCriteria(msg *proto.SignedGossipMessage, criteria SendCriteria) error { if criteria.Timeout == 0 { @@ -673,7 +695,12 @@ func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) { if g.conf.PropagateIterations == 0 { return } - g.emitter.Add(sMsg) + g.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: sMsg, + filter: func(_ common.PKIidType) bool { + return true + }, + }) } // Send sends a message to remote peers @@ -818,9 +845,23 @@ func (g *gossipServiceImpl) newDiscoveryAdapter() *discoveryAdapter { if g.conf.PropagateIterations == 0 { return } - g.emitter.Add(msg) + g.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: msg, + filter: func(_ common.PKIidType) bool { + return true + }, + }) }, - incChan: make(chan *proto.SignedGossipMessage), + forwardFunc: func(message proto.ReceivedMessage) { + if g.conf.PropagateIterations == 0 { + return + } + g.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: message.GetGossipMessage(), + filter: message.GetConnectionInfo().ID.IsNotSameFilter, + }) + }, + incChan: make(chan proto.ReceivedMessage), presumedDead: g.presumedDead, disclosurePolicy: g.disclosurePolicy, } @@ -832,8 +873,9 @@ type discoveryAdapter struct { stopping int32 c comm.Comm presumedDead chan common.PKIidType - incChan chan *proto.SignedGossipMessage + incChan chan proto.ReceivedMessage gossipFunc func(message *proto.SignedGossipMessage) + forwardFunc func(message proto.ReceivedMessage) disclosurePolicy discovery.DisclosurePolicy } @@ -854,6 +896,14 @@ func (da *discoveryAdapter) Gossip(msg *proto.SignedGossipMessage) { da.gossipFunc(msg) } +func (da *discoveryAdapter) Forward(msg proto.ReceivedMessage) { + if da.toDie() { + return + } + + da.forwardFunc(msg) +} + func (da *discoveryAdapter) SendToPeer(peer *discovery.NetworkMember, msg *proto.SignedGossipMessage) { if da.toDie() { return @@ -895,7 +945,7 @@ func (da *discoveryAdapter) Ping(peer *discovery.NetworkMember) bool { return err == nil } -func (da *discoveryAdapter) Accept() <-chan *proto.SignedGossipMessage { +func (da *discoveryAdapter) Accept() <-chan proto.ReceivedMessage { return da.incChan } @@ -1272,9 +1322,9 @@ func (g *gossipServiceImpl) peersByOriginOrgPolicy(peer discovery.NetworkMember) // partitionMessages receives a predicate and a slice of gossip messages // and returns a tuple of two slices: the messages that hold for the predicate // and the rest -func partitionMessages(pred common.MessageAcceptor, a []*proto.SignedGossipMessage) ([]*proto.SignedGossipMessage, []*proto.SignedGossipMessage) { - s1 := []*proto.SignedGossipMessage{} - s2 := []*proto.SignedGossipMessage{} +func partitionMessages(pred common.MessageAcceptor, a []*emittedGossipMessage) ([]*emittedGossipMessage, []*emittedGossipMessage) { + s1 := []*emittedGossipMessage{} + s2 := []*emittedGossipMessage{} for _, m := range a { if pred(m) { s1 = append(s1, m) @@ -1287,7 +1337,7 @@ func partitionMessages(pred common.MessageAcceptor, a []*proto.SignedGossipMessa // extractChannels returns a slice with all channels // of all given GossipMessages -func extractChannels(a []*proto.SignedGossipMessage) []common.ChainID { +func extractChannels(a []*emittedGossipMessage) []common.ChainID { channels := []common.ChainID{} for _, m := range a { if len(m.Channel) == 0 { diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index d8f2f69bb2e..3f8f75c40f5 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -50,6 +50,7 @@ var tests = []func(t *testing.T){ TestConfidentiality, TestAnchorPeer, TestBootstrapPeerMisConfiguration, + TestNoMessagesSelfLoop, } func init() { @@ -568,6 +569,68 @@ func TestMembership(t *testing.T) { } +func TestNoMessagesSelfLoop(t *testing.T) { + t.Parallel() + defer testWG.Done() + portPrefix := 17610 + + boot := newGossipInstance(portPrefix, 0, 100) + boot.JoinChan(&joinChanMsg{}, common.ChainID("A")) + boot.UpdateChannelMetadata(createMetadata(1), common.ChainID("A")) + + peer := newGossipInstance(portPrefix, 1, 100, 0) + peer.JoinChan(&joinChanMsg{}, common.ChainID("A")) + peer.UpdateChannelMetadata(createMetadata(1), common.ChainID("A")) + + // Wait until both peers get connected + waitUntilOrFail(t, checkPeersMembership(t, []Gossip{peer}, 1)) + _, commCh := boot.Accept(func(msg interface{}) bool { + return msg.(proto.ReceivedMessage).GetGossipMessage().IsDataMsg() + }, true) + + wg := sync.WaitGroup{} + wg.Add(2) + + // Make sure sending peer is not getting his own + // message back + go func(ch <-chan proto.ReceivedMessage) { + defer wg.Done() + for { + select { + case msg := <-ch: + { + if msg.GetGossipMessage().IsDataMsg() { + t.Fatal("Should not receive data message back, got", msg) + } + } + // Waiting for 2 seconds to make sure we won't + // get message back w.h.p. + case <-time.After(2 * time.Second): + { + return + } + } + } + }(commCh) + + peerCh, _ := peer.Accept(acceptData, false) + + // Ensure recipient gets his message + go func(ch <-chan *proto.GossipMessage) { + defer wg.Done() + <-ch + }(peerCh) + + boot.Gossip(createDataMsg(uint64(2), []byte{}, common.ChainID("A"))) + waitUntilOrFailBlocking(t, wg.Wait) + + stop := func() { + stopPeers([]Gossip{peer, boot}) + } + + waitUntilOrFailBlocking(t, stop) +} + func TestDissemination(t *testing.T) { t.Parallel() defer testWG.Done()