From eb437dabe6ca6bdfee7c27dab3cb5ce81c04e187 Mon Sep 17 00:00:00 2001 From: Artem Barger Date: Sun, 19 Nov 2017 10:12:12 +0200 Subject: [PATCH] [FAB-2641] Prevent loop while gossiping msgs Currently while gossip forward the message during distribution it selects peers from the current membership view, while not filtering the peer which delivered the message. As a consequence message might be send to the source peer one again. This commit extends the peers selection logic by adding the check which allows to avoid sending same message to the source peer back again. Change-Id: I76ceafba2335c27e1186f511d3cdab8a24d600af Signed-off-by: Artem Barger --- gossip/common/common.go | 9 +++ gossip/discovery/discovery.go | 6 +- gossip/discovery/discovery_impl.go | 8 +- gossip/discovery/discovery_test.go | 91 ++++++++++++++++++--- gossip/gossip/channel/channel.go | 5 +- gossip/gossip/channel/channel_test.go | 26 ++++++ gossip/gossip/chanstate.go | 15 +++- gossip/gossip/gossip.go | 7 ++ gossip/gossip/gossip_impl.go | 112 +++++++++++++++++++------- gossip/gossip/gossip_test.go | 63 +++++++++++++++ 10 files changed, 294 insertions(+), 48 deletions(-) diff --git a/gossip/common/common.go b/gossip/common/common.go index 72e10afb557..0b382dd2024 100644 --- a/gossip/common/common.go +++ b/gossip/common/common.go @@ -6,6 +6,8 @@ SPDX-License-Identifier: Apache-2.0 package common +import "bytes" + func init() { // This is just to satisfy the code coverage tool // miss any methods @@ -18,6 +20,13 @@ func init() { // which is the security identifier of a peer type PKIidType []byte +// IsNotSameFilter generate filter function which +// provides a predicate to identify whenever current id +// equals to another one. +func (this PKIidType) IsNotSameFilter(that PKIidType) bool { + return !bytes.Equal(this, that) +} + // MessageAcceptor is a predicate that is used to // determine in which messages the subscriber that created the // instance of the MessageAcceptor is interested in. diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go index a9e2ee871c3..9d6718a58b6 100644 --- a/gossip/discovery/discovery.go +++ b/gossip/discovery/discovery.go @@ -55,13 +55,17 @@ type CommService interface { Ping(peer *NetworkMember) bool // Accept returns a read-only channel for membership messages sent from remote peers - Accept() <-chan *proto.SignedGossipMessage + Accept() <-chan proto.ReceivedMessage // PresumedDead returns a read-only channel for peers that are presumed to be dead PresumedDead() <-chan common.PKIidType // CloseConn orders to close the connection with a certain peer CloseConn(peer *NetworkMember) + + // Forward sends message to the next hop, excluding the hop + // from which message was initially received + Forward(msg proto.ReceivedMessage) } // NetworkMember is a peer's representation diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index c66a58c64d2..d456c0d6f18 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -309,10 +309,11 @@ func (d *gossipDiscoveryImpl) handleMessages() { } } -func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { - if m == nil { +func (d *gossipDiscoveryImpl) handleMsgFromComm(msg proto.ReceivedMessage) { + if msg == nil { return } + m := msg.GetGossipMessage() if m.GetAliveMsg() == nil && m.GetMemRes() == nil && m.GetMemReq() == nil { d.logger.Warning("Got message with wrong type (expected Alive or MembershipResponse or MembershipRequest message):", m.GossipMessage) return @@ -345,13 +346,12 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { } if m.IsAliveMsg() { - if !d.msgStore.Add(m) { return } d.handleAliveMessage(m) - d.comm.Gossip(m) + d.comm.Forward(msg) return } diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index b231bcbd0bb..f13c716dbe7 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -43,6 +43,31 @@ func init() { maxConnectionAttempts = 10000 } +type dummyReceivedMessage struct { + msg *proto.SignedGossipMessage + info *proto.ConnectionInfo +} + +func (*dummyReceivedMessage) Respond(msg *proto.GossipMessage) { + panic("implement me") +} + +func (rm *dummyReceivedMessage) GetGossipMessage() *proto.SignedGossipMessage { + return rm.msg +} + +func (*dummyReceivedMessage) GetSourceEnvelope() *proto.Envelope { + panic("implement me") +} + +func (rm *dummyReceivedMessage) GetConnectionInfo() *proto.ConnectionInfo { + return rm.info +} + +func (*dummyReceivedMessage) Ack(err error) { + panic("implement me") +} + type dummyCommModule struct { msgsReceived uint32 msgsSent uint32 @@ -52,7 +77,7 @@ type dummyCommModule struct { streams map[string]proto.Gossip_GossipStreamClient conns map[string]*grpc.ClientConn lock *sync.RWMutex - incMsgs chan *proto.SignedGossipMessage + incMsgs chan proto.ReceivedMessage lastSeqs map[string]uint64 shouldGossip bool mock *mock.Mock @@ -101,6 +126,17 @@ func (comm *dummyCommModule) Gossip(msg *proto.SignedGossipMessage) { } } +func (comm *dummyCommModule) Forward(msg proto.ReceivedMessage) { + if !comm.shouldGossip { + return + } + comm.lock.Lock() + defer comm.lock.Unlock() + for _, conn := range comm.streams { + conn.Send(msg.GetGossipMessage().Envelope) + } +} + func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *proto.SignedGossipMessage) { comm.lock.RLock() _, exists := comm.streams[peer.Endpoint] @@ -152,7 +188,7 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool { return true } -func (comm *dummyCommModule) Accept() <-chan *proto.SignedGossipMessage { +func (comm *dummyCommModule) Accept() <-chan proto.ReceivedMessage { return comm.incMsgs } @@ -217,7 +253,12 @@ func (g *gossipInstance) GossipStream(stream proto.Gossip_GossipStreamServer) er } lgr.Debug(g.Discovery.Self().Endpoint, "Got message:", gMsg) - g.comm.incMsgs <- gMsg + g.comm.incMsgs <- &dummyReceivedMessage{ + msg: gMsg, + info: &proto.ConnectionInfo{ + ID: common.PKIidType("testID"), + }, + } atomic.AddUint32(&g.comm.msgsReceived, 1) if aliveMsg := gMsg.GetAliveMsg(); aliveMsg != nil { @@ -294,7 +335,7 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st comm := &dummyCommModule{ conns: make(map[string]*grpc.ClientConn), streams: make(map[string]proto.Gossip_GossipStreamClient), - incMsgs: make(chan *proto.SignedGossipMessage, 1000), + incMsgs: make(chan proto.ReceivedMessage, 1000), presumeDead: make(chan common.PKIidType, 10000), id: id, detectedDead: make(chan string, 10000), @@ -364,7 +405,12 @@ func TestBadInput(t *testing.T) { DataMsg: &proto.DataMessage{}, }, }).NoopSign() - inst.Discovery.(*gossipDiscoveryImpl).handleMsgFromComm(s) + inst.Discovery.(*gossipDiscoveryImpl).handleMsgFromComm(&dummyReceivedMessage{ + msg: s, + info: &proto.ConnectionInfo{ + ID: common.PKIidType("testID"), + }, + }) } func TestConnect(t *testing.T) { @@ -944,7 +990,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { // Handling Alive for i := 0; i < peersNum; i++ { for k := 0; k < peersNum; k++ { - instances[i].discoveryImpl().handleMsgFromComm(aliveMsgs[k]) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: aliveMsgs[k], + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) } } @@ -1006,7 +1057,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { // Handling new Alive set for i := 0; i < peersNum; i++ { for k := 0; k < peersNum; k++ { - instances[i].discoveryImpl().handleMsgFromComm(newAliveMsgs[k]) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: newAliveMsgs[k], + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) } } @@ -1041,7 +1097,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { return k == i }, func(k int) { - instances[i].discoveryImpl().handleMsgFromComm(memReqMsgs[k]) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: memReqMsgs[k], + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) }) } @@ -1053,7 +1114,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { // Processing old (later) Alive messages for i := 0; i < peersNum; i++ { for k := 0; k < peersNum; k++ { - instances[i].discoveryImpl().handleMsgFromComm(aliveMsgs[k]) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: aliveMsgs[k], + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) } } @@ -1074,7 +1140,12 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { MemRes: msg, }, }).NoopSign() - instances[i].discoveryImpl().handleMsgFromComm(sMsg) + instances[i].discoveryImpl().handleMsgFromComm(&dummyReceivedMessage{ + msg: sMsg, + info: &proto.ConnectionInfo{ + ID: common.PKIidType(fmt.Sprintf("d%d", i)), + }, + }) } } diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index ea9c0c6a8d3..463d427de81 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -92,6 +92,9 @@ type Adapter interface { // Gossip gossips a message in the channel Gossip(message *proto.SignedGossipMessage) + // Forward sends a message to the next hops + Forward(message proto.ReceivedMessage) + // DeMultiplex de-multiplexes an item to subscribers DeMultiplex(interface{}) @@ -541,7 +544,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) { if added { // Forward the message - gc.Gossip(msg.GetGossipMessage()) + gc.Forward(msg) // DeMultiplex to local subscribers gc.DeMultiplex(m) diff --git a/gossip/gossip/channel/channel_test.go b/gossip/gossip/channel/channel_test.go index 42128aa4a94..bbbe14b19dd 100644 --- a/gossip/gossip/channel/channel_test.go +++ b/gossip/gossip/channel/channel_test.go @@ -186,6 +186,10 @@ func (ga *gossipAdapterMock) Gossip(msg *proto.SignedGossipMessage) { ga.Called(msg) } +func (ga *gossipAdapterMock) Forward(msg proto.ReceivedMessage) { + ga.Called(msg) +} + func (ga *gossipAdapterMock) DeMultiplex(msg interface{}) { ga.Called(msg) } @@ -309,6 +313,7 @@ func TestMsgStoreNotExpire(t *testing.T) { adapter.On("GetMembership").Return([]discovery.NetworkMember{peer2, peer3}) adapter.On("DeMultiplex", mock.Anything) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("GetConf").Return(conf) gc := NewGossipChannel(pkiID1, orgInChannelA, cs, channelA, adapter, jcm) @@ -395,6 +400,7 @@ func TestLeaveChannel(t *testing.T) { cs.On("VerifyBlock", mock.Anything).Return(nil) adapter := new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) members := []discovery.NetworkMember{ {PKIid: pkiIDInOrg1}, @@ -505,6 +511,7 @@ func TestChannelMsgStoreEviction(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { }) @@ -595,6 +602,7 @@ func TestChannelPull(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { msg := arg.Get(0).(*proto.SignedGossipMessage) if !msg.IsDataMsg() { @@ -654,6 +662,7 @@ func TestChannelPullAccessControl(t *testing.T) { adapter.On("GetMembership").Return([]discovery.NetworkMember{peer1, peer2, peer3}) adapter.On("DeMultiplex", mock.Anything) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("GetConf").Return(conf) sentHello := int32(0) @@ -722,6 +731,7 @@ func TestChannelPeerNotInChannel(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) @@ -854,6 +864,7 @@ func TestChannelIsSubscribed(t *testing.T) { configureAdapter(adapter) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1}) @@ -870,6 +881,7 @@ func TestChannelAddToMessageStore(t *testing.T) { configureAdapter(adapter) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { demuxedMsgs <- arg.Get(0).(*proto.SignedGossipMessage) @@ -922,6 +934,7 @@ func TestChannelBlockExpiration(t *testing.T) { configureAdapter(adapter) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { demuxedMsgs <- arg.Get(0).(*proto.SignedGossipMessage) @@ -1012,6 +1025,7 @@ func TestChannelBadBlocks(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("DeMultiplex", mock.Anything).Run(func(args mock.Arguments) { @@ -1050,6 +1064,7 @@ func TestChannelPulledBadBlocks(t *testing.T) { configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("DeMultiplex", mock.Anything) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)}) @@ -1075,6 +1090,7 @@ func TestChannelPulledBadBlocks(t *testing.T) { cs.On("VerifyBlock", mock.Anything).Return(errors.New("Bad block")) adapter = new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) @@ -1095,6 +1111,7 @@ func TestChannelPulledBadBlocks(t *testing.T) { cs.On("VerifyBlock", mock.Anything).Return(nil) adapter = new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) @@ -1119,6 +1136,7 @@ func TestChannelPulledBadBlocks(t *testing.T) { adapter = new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) gc = NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) @@ -1148,6 +1166,7 @@ func TestChannelStateInfoSnapshot(t *testing.T) { configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) sentMessages := make(chan *proto.GossipMessage, 10) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("ValidateStateInfoMessage", mock.Anything).Return(nil) @@ -1264,6 +1283,7 @@ func TestInterOrgExternalEndpointDisclosure(t *testing.T) { adapter.On("GetOrgOfPeer", pkiID2).Return(orgInChannelA) adapter.On("GetOrgOfPeer", pkiID3).Return(api.OrgIdentityType("ORG2")) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything) configureAdapter(adapter) jcm := &joinChanMsg{ @@ -1429,6 +1449,7 @@ func TestChannelReconfigureChannel(t *testing.T) { gc.ConfigureChannel(api.JoinChannelMessage(newJoinChanMsg)) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) @@ -1517,6 +1538,7 @@ func TestGossipChannelEligibility(t *testing.T) { } adapter.On("GetMembership").Return(members) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) adapter.On("GetConf").Return(conf) @@ -1634,6 +1656,7 @@ func TestChannelGetPeers(t *testing.T) { cs := &cryptoService{} adapter := new(gossipAdapterMock) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) members := []discovery.NetworkMember{ @@ -1683,6 +1706,7 @@ func TestOnDemandGossip(t *testing.T) { adapter.On("Gossip", mock.Anything).Run(func(mock.Arguments) { gossipedEvents <- struct{}{} }) + adapter.On("Forward", mock.Anything) gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, api.JoinChannelMessage(&joinChanMsg{})) defer gc.Stop() select { @@ -1706,6 +1730,7 @@ func TestOnDemandGossip(t *testing.T) { adapter.On("Gossip", mock.Anything).Run(func(mock.Arguments) { gossipedEvents <- struct{}{} }) + adapter.On("Forward", mock.Anything) gc.(*gossipChannel).Adapter = adapter select { case <-gossipedEvents: @@ -1733,6 +1758,7 @@ func TestChannelPullWithDigestsFilter(t *testing.T) { adapter := new(gossipAdapterMock) configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) adapter.On("Gossip", mock.Anything) + adapter.On("Forward", mock.Anything) adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) { msg := arg.Get(0).(*proto.SignedGossipMessage) if !msg.IsDataMsg() { diff --git a/gossip/gossip/chanstate.go b/gossip/gossip/chanstate.go index cfac4b0d7c6..73bf523a98d 100644 --- a/gossip/gossip/chanstate.go +++ b/gossip/gossip/chanstate.go @@ -129,7 +129,20 @@ func (ga *gossipAdapterImpl) GetConf() channel.Config { // Gossip gossips a message func (ga *gossipAdapterImpl) Gossip(msg *proto.SignedGossipMessage) { - ga.gossipServiceImpl.emitter.Add(msg) + ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: msg, + filter: func(_ common.PKIidType) bool { + return true + }, + }) +} + +// Forward sends message to the next hops +func (ga *gossipAdapterImpl) Forward(msg proto.ReceivedMessage) { + ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: msg.GetGossipMessage(), + filter: msg.GetConnectionInfo().ID.IsNotSameFilter, + }) } func (ga *gossipAdapterImpl) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) { diff --git a/gossip/gossip/gossip.go b/gossip/gossip/gossip.go index 451a7a0364f..1f0533443f7 100644 --- a/gossip/gossip/gossip.go +++ b/gossip/gossip/gossip.go @@ -73,6 +73,13 @@ type Gossip interface { Stop() } +// emittedGossipMessage encapsulates signed gossip message to compose +// with routing filter to be used while message is forwarded +type emittedGossipMessage struct { + *proto.SignedGossipMessage + filter func(id common.PKIidType) bool +} + // SendCriteria defines how to send a specific message type SendCriteria struct { Timeout time.Duration // Timeout defines the time to wait for acknowledgements diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 7748bbfdbaa..90798d92186 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -360,7 +360,10 @@ func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) { // in case it's a StateInfo message if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetConnectionInfo().ID}) && msg.IsStateInfoMsg() { if g.stateInfoMsgStore.Add(msg) { - g.emitter.Add(msg) + g.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: msg, + filter: m.GetConnectionInfo().ID.IsNotSameFilter, + }) } } if !g.toDie() { @@ -409,7 +412,7 @@ func (g *gossipServiceImpl) forwardDiscoveryMsg(msg proto.ReceivedMessage) { recover() }() - g.discAdapter.incChan <- msg.GetGossipMessage() + g.discAdapter.incChan <- msg } // validateMsg checks the signature of the message if exists, @@ -436,9 +439,9 @@ func (g *gossipServiceImpl) validateMsg(msg proto.ReceivedMessage) bool { } func (g *gossipServiceImpl) sendGossipBatch(a []interface{}) { - msgs2Gossip := make([]*proto.SignedGossipMessage, len(a)) + msgs2Gossip := make([]*emittedGossipMessage, len(a)) for i, e := range a { - msgs2Gossip[i] = e.(*proto.SignedGossipMessage) + msgs2Gossip[i] = e.(*emittedGossipMessage) } g.gossipBatch(msgs2Gossip) } @@ -455,25 +458,25 @@ func (g *gossipServiceImpl) sendGossipBatch(a []interface{}) { // to the same set of peers. // The rest of the messages that have no restrictions on their destinations can be sent // to any group of peers. -func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { +func (g *gossipServiceImpl) gossipBatch(msgs []*emittedGossipMessage) { if g.disc == nil { g.logger.Error("Discovery has not been initialized yet, aborting!") return } - var blocks []*proto.SignedGossipMessage - var stateInfoMsgs []*proto.SignedGossipMessage - var orgMsgs []*proto.SignedGossipMessage - var leadershipMsgs []*proto.SignedGossipMessage + var blocks []*emittedGossipMessage + var stateInfoMsgs []*emittedGossipMessage + var orgMsgs []*emittedGossipMessage + var leadershipMsgs []*emittedGossipMessage isABlock := func(o interface{}) bool { - return o.(*proto.SignedGossipMessage).IsDataMsg() + return o.(*emittedGossipMessage).IsDataMsg() } isAStateInfoMsg := func(o interface{}) bool { - return o.(*proto.SignedGossipMessage).IsStateInfoMsg() + return o.(*emittedGossipMessage).IsStateInfoMsg() } aliveMsgsWithNoEndpointAndInOurOrg := func(o interface{}) bool { - msg := o.(*proto.SignedGossipMessage) + msg := o.(*emittedGossipMessage) if !msg.IsAliveMsg() { return false } @@ -481,10 +484,10 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { return member.Endpoint == "" && g.isInMyorg(discovery.NetworkMember{PKIid: member.PkiId}) } isOrgRestricted := func(o interface{}) bool { - return aliveMsgsWithNoEndpointAndInOurOrg(o) || o.(*proto.SignedGossipMessage).IsOrgRestricted() + return aliveMsgsWithNoEndpointAndInOurOrg(o) || o.(*emittedGossipMessage).IsOrgRestricted() } isLeadershipMsg := func(o interface{}) bool { - return o.(*proto.SignedGossipMessage).IsLeadershipMsg() + return o.(*emittedGossipMessage).IsLeadershipMsg() } // Gossip blocks @@ -508,15 +511,19 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { peerSelector = gc.IsMemberInChan } + peerSelector = filter.CombineRoutingFilters(peerSelector, func(member discovery.NetworkMember) bool { + return stateInfMsg.filter(member.PKIid) + }) + peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), peerSelector) - g.comm.Send(stateInfMsg, peers2Send...) + g.comm.Send(stateInfMsg.SignedGossipMessage, peers2Send...) } // Gossip messages restricted to our org orgMsgs, msgs = partitionMessages(isOrgRestricted, msgs) peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), g.isInMyorg) for _, msg := range orgMsgs { - g.comm.Send(msg, peers2Send...) + g.comm.Send(msg.SignedGossipMessage, g.removeSelfLoop(msg, peers2Send)...) } // Finally, gossip the remaining messages @@ -526,8 +533,11 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { continue } selectByOriginOrg := g.peersByOriginOrgPolicy(discovery.NetworkMember{PKIid: msg.GetAliveMsg().Membership.PkiId}) - peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), selectByOriginOrg) - g.sendAndFilterSecrets(msg, peers2Send...) + selector := filter.CombineRoutingFilters(selectByOriginOrg, func(member discovery.NetworkMember) bool { + return msg.filter(member.PKIid) + }) + peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), selector) + g.sendAndFilterSecrets(msg.SignedGossipMessage, peers2Send...) } } @@ -549,19 +559,19 @@ func (g *gossipServiceImpl) sendAndFilterSecrets(msg *proto.SignedGossipMessage, } // gossipInChan gossips a given GossipMessage slice according to a channel's routing policy. -func (g *gossipServiceImpl) gossipInChan(messages []*proto.SignedGossipMessage, chanRoutingFactory channelRoutingFilterFactory) { +func (g *gossipServiceImpl) gossipInChan(messages []*emittedGossipMessage, chanRoutingFactory channelRoutingFilterFactory) { if len(messages) == 0 { return } totalChannels := extractChannels(messages) var channel common.ChainID - var messagesOfChannel []*proto.SignedGossipMessage + var messagesOfChannel []*emittedGossipMessage for len(totalChannels) > 0 { // Take first channel channel, totalChannels = totalChannels[0], totalChannels[1:] // Extract all messages of that channel grabMsgs := func(o interface{}) bool { - return bytes.Equal(o.(*proto.SignedGossipMessage).Channel, channel) + return bytes.Equal(o.(*emittedGossipMessage).Channel, channel) } messagesOfChannel, messages = partitionMessages(grabMsgs, messages) if len(messagesOfChannel) == 0 { @@ -585,11 +595,23 @@ func (g *gossipServiceImpl) gossipInChan(messages []*proto.SignedGossipMessage, // Send the messages to the remote peers for _, msg := range messagesOfChannel { - g.comm.Send(msg, peers2Send...) + filteredPeers := g.removeSelfLoop(msg, peers2Send) + g.comm.Send(msg.SignedGossipMessage, filteredPeers...) } } } +// removeSelfLoop deletes from the list of peers peer which has sent the message +func (g *gossipServiceImpl) removeSelfLoop(msg *emittedGossipMessage, peers []*comm.RemotePeer) []*comm.RemotePeer { + var result []*comm.RemotePeer + for _, peer := range peers { + if msg.filter(peer.PKIID) { + result = append(result, peer) + } + } + return result +} + // SendByCriteria sends a given message to all peers that match the given SendCriteria func (g *gossipServiceImpl) SendByCriteria(msg *proto.SignedGossipMessage, criteria SendCriteria) error { if criteria.Timeout == 0 { @@ -673,7 +695,12 @@ func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) { if g.conf.PropagateIterations == 0 { return } - g.emitter.Add(sMsg) + g.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: sMsg, + filter: func(_ common.PKIidType) bool { + return true + }, + }) } // Send sends a message to remote peers @@ -818,9 +845,23 @@ func (g *gossipServiceImpl) newDiscoveryAdapter() *discoveryAdapter { if g.conf.PropagateIterations == 0 { return } - g.emitter.Add(msg) + g.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: msg, + filter: func(_ common.PKIidType) bool { + return true + }, + }) }, - incChan: make(chan *proto.SignedGossipMessage), + forwardFunc: func(message proto.ReceivedMessage) { + if g.conf.PropagateIterations == 0 { + return + } + g.emitter.Add(&emittedGossipMessage{ + SignedGossipMessage: message.GetGossipMessage(), + filter: message.GetConnectionInfo().ID.IsNotSameFilter, + }) + }, + incChan: make(chan proto.ReceivedMessage), presumedDead: g.presumedDead, disclosurePolicy: g.disclosurePolicy, } @@ -832,8 +873,9 @@ type discoveryAdapter struct { stopping int32 c comm.Comm presumedDead chan common.PKIidType - incChan chan *proto.SignedGossipMessage + incChan chan proto.ReceivedMessage gossipFunc func(message *proto.SignedGossipMessage) + forwardFunc func(message proto.ReceivedMessage) disclosurePolicy discovery.DisclosurePolicy } @@ -854,6 +896,14 @@ func (da *discoveryAdapter) Gossip(msg *proto.SignedGossipMessage) { da.gossipFunc(msg) } +func (da *discoveryAdapter) Forward(msg proto.ReceivedMessage) { + if da.toDie() { + return + } + + da.forwardFunc(msg) +} + func (da *discoveryAdapter) SendToPeer(peer *discovery.NetworkMember, msg *proto.SignedGossipMessage) { if da.toDie() { return @@ -895,7 +945,7 @@ func (da *discoveryAdapter) Ping(peer *discovery.NetworkMember) bool { return err == nil } -func (da *discoveryAdapter) Accept() <-chan *proto.SignedGossipMessage { +func (da *discoveryAdapter) Accept() <-chan proto.ReceivedMessage { return da.incChan } @@ -1272,9 +1322,9 @@ func (g *gossipServiceImpl) peersByOriginOrgPolicy(peer discovery.NetworkMember) // partitionMessages receives a predicate and a slice of gossip messages // and returns a tuple of two slices: the messages that hold for the predicate // and the rest -func partitionMessages(pred common.MessageAcceptor, a []*proto.SignedGossipMessage) ([]*proto.SignedGossipMessage, []*proto.SignedGossipMessage) { - s1 := []*proto.SignedGossipMessage{} - s2 := []*proto.SignedGossipMessage{} +func partitionMessages(pred common.MessageAcceptor, a []*emittedGossipMessage) ([]*emittedGossipMessage, []*emittedGossipMessage) { + s1 := []*emittedGossipMessage{} + s2 := []*emittedGossipMessage{} for _, m := range a { if pred(m) { s1 = append(s1, m) @@ -1287,7 +1337,7 @@ func partitionMessages(pred common.MessageAcceptor, a []*proto.SignedGossipMessa // extractChannels returns a slice with all channels // of all given GossipMessages -func extractChannels(a []*proto.SignedGossipMessage) []common.ChainID { +func extractChannels(a []*emittedGossipMessage) []common.ChainID { channels := []common.ChainID{} for _, m := range a { if len(m.Channel) == 0 { diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index d8f2f69bb2e..3f8f75c40f5 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -50,6 +50,7 @@ var tests = []func(t *testing.T){ TestConfidentiality, TestAnchorPeer, TestBootstrapPeerMisConfiguration, + TestNoMessagesSelfLoop, } func init() { @@ -568,6 +569,68 @@ func TestMembership(t *testing.T) { } +func TestNoMessagesSelfLoop(t *testing.T) { + t.Parallel() + defer testWG.Done() + portPrefix := 17610 + + boot := newGossipInstance(portPrefix, 0, 100) + boot.JoinChan(&joinChanMsg{}, common.ChainID("A")) + boot.UpdateChannelMetadata(createMetadata(1), common.ChainID("A")) + + peer := newGossipInstance(portPrefix, 1, 100, 0) + peer.JoinChan(&joinChanMsg{}, common.ChainID("A")) + peer.UpdateChannelMetadata(createMetadata(1), common.ChainID("A")) + + // Wait until both peers get connected + waitUntilOrFail(t, checkPeersMembership(t, []Gossip{peer}, 1)) + _, commCh := boot.Accept(func(msg interface{}) bool { + return msg.(proto.ReceivedMessage).GetGossipMessage().IsDataMsg() + }, true) + + wg := sync.WaitGroup{} + wg.Add(2) + + // Make sure sending peer is not getting his own + // message back + go func(ch <-chan proto.ReceivedMessage) { + defer wg.Done() + for { + select { + case msg := <-ch: + { + if msg.GetGossipMessage().IsDataMsg() { + t.Fatal("Should not receive data message back, got", msg) + } + } + // Waiting for 2 seconds to make sure we won't + // get message back w.h.p. + case <-time.After(2 * time.Second): + { + return + } + } + } + }(commCh) + + peerCh, _ := peer.Accept(acceptData, false) + + // Ensure recipient gets his message + go func(ch <-chan *proto.GossipMessage) { + defer wg.Done() + <-ch + }(peerCh) + + boot.Gossip(createDataMsg(uint64(2), []byte{}, common.ChainID("A"))) + waitUntilOrFailBlocking(t, wg.Wait) + + stop := func() { + stopPeers([]Gossip{peer, boot}) + } + + waitUntilOrFailBlocking(t, stop) +} + func TestDissemination(t *testing.T) { t.Parallel() defer testWG.Done()