diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 00880d73b9b..b1f7e673273 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -938,17 +938,44 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator { g.logger.Warning("Failed associating PKI-ID with certificate:", err) } g.logger.Info("Learned of a new certificate:", idMsg.Cert) - } adapter := pull.PullAdapter{ Sndr: g.comm, MemSvc: g.disc, IdExtractor: pkiIDFromMsg, MsgCons: certConsumer, + DigFilter: g.sameOrgOrOurOrgPullFilter, } return pull.NewPullMediator(conf, adapter) } +func (g *gossipServiceImpl) sameOrgOrOurOrgPullFilter(msg proto.ReceivedMessage) func(string) bool { + peersOrg := g.secAdvisor.OrgByPeerIdentity(msg.GetConnectionInfo().Identity) + if len(peersOrg) == 0 { + g.logger.Warning("Failed determining organization of", msg.GetConnectionInfo()) + return func(_ string) bool { + return false + } + } + + // If the peer is from our org, gossip all identities + if bytes.Equal(g.selfOrg, peersOrg) { + return func(_ string) bool { + return true + } + } + return func(item string) bool { + pkiID := common.PKIidType(item) + msgsOrg := g.getOrgOfPeer(pkiID) + if len(msgsOrg) == 0 { + g.logger.Warning("Failed determining organization of", pkiID) + return false + } + // Peer from our org or identity from our org or identity from peer's org + return bytes.Equal(msgsOrg, g.selfOrg) || bytes.Equal(msgsOrg, peersOrg) + } +} + func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.SignedGossipMessage, error) { pkiID := g.comm.GetPKIid() stateInfMsg := &proto.StateInfo{ diff --git a/gossip/gossip/orgs_test.go b/gossip/gossip/orgs_test.go index 988c7e1b6e4..1feedd56519 100644 --- a/gossip/gossip/orgs_test.go +++ b/gossip/gossip/orgs_test.go @@ -19,11 +19,10 @@ package gossip import ( "bytes" "fmt" - "testing" - "time" - "sync" "sync/atomic" + "testing" + "time" "github.com/hyperledger/fabric/bccsp/factory" "github.com/hyperledger/fabric/gossip/api" @@ -271,6 +270,10 @@ func TestConfidentiality(t *testing.T) { peersInOrg := 3 externalEndpointsInOrg := 2 + // orgA: {12610, 12611, 12612} + // orgB: {12613, 12614, 12615} + // orgC: {12616, 12617, 12618} + // orgD: {12619, 12620, 12621} orgs := []string{"A", "B", "C", "D"} channels := []string{"C0", "C1", "C2", "C3"} isOrgInChan := func(org string, channel string) bool { @@ -335,15 +338,16 @@ func TestConfidentiality(t *testing.T) { finished := int32(0) var wg sync.WaitGroup - membershipMsgs := func(o interface{}) bool { + membershipAndIdentitiesMsgs := func(o interface{}) bool { msg := o.(proto.ReceivedMessage).GetGossipMessage() - return msg.IsAliveMsg() || msg.GetMemRes() != nil + identitiesPull := msg.IsPullMsg() && msg.GetPullMsgType() == proto.PullMsgType_IDENTITY_MSG + return msg.IsAliveMsg() || msg.GetMemRes() != nil || identitiesPull } // Listen to all peers membership messages and forward them to the inspection channel // where they will be inspected, and the test would fail if a confidentiality violation is found for _, p := range peers { wg.Add(1) - _, msgs := p.Accept(membershipMsgs, true) + _, msgs := p.Accept(membershipAndIdentitiesMsgs, true) peerNetMember := p.(*gossipServiceImpl).selfNetworkMember() targetORg := string(cs.OrgByPeerIdentity(api.PeerIdentityType(peerNetMember.InternalEndpoint))) go func(targetOrg string, msgs <-chan proto.ReceivedMessage) { @@ -380,11 +384,15 @@ func TestConfidentiality(t *testing.T) { if isOrgInChan(org, ch) { for _, p := range peers { p.JoinChan(joinChanMsgsByChan[ch], common.ChainID(ch)) + p.UpdateChannelMetadata([]byte{}, common.ChainID(ch)) } } } } + // Sleep a bit, to let peers gossip with each other + time.Sleep(time.Second * 7) + assertMembership := func() bool { for _, org := range orgs { for i, p := range orgs2Peers[org] { @@ -448,13 +456,44 @@ func extractOrgsFromMsg(msg *proto.GossipMessage, sec api.SecurityAdvisor) []str if msg.IsAliveMsg() { return []string{string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))} } + orgs := map[string]struct{}{} - alive := msg.GetMemRes().Alive - dead := msg.GetMemRes().Dead - for _, envp := range append(alive, dead...) { - msg, _ := envp.ToGossipMessage() - orgs[string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))] = struct{}{} + + if msg.IsPullMsg() { + if msg.IsDigestMsg() || msg.IsDataReq() { + var digests []string + if msg.IsDigestMsg() { + digests = msg.GetDataDig().Digests + } else { + digests = msg.GetDataReq().Digests + } + + for _, dig := range digests { + org := sec.OrgByPeerIdentity(api.PeerIdentityType(dig)) + orgs[string(org)] = struct{}{} + } + } + + if msg.IsDataUpdate() { + for _, identityMsg := range msg.GetDataUpdate().Data { + gMsg, _ := identityMsg.ToGossipMessage() + id := string(gMsg.GetPeerIdentity().Cert) + org := sec.OrgByPeerIdentity(api.PeerIdentityType(id)) + orgs[string(org)] = struct{}{} + } + } + } + + if msg.GetMemRes() != nil { + alive := msg.GetMemRes().Alive + dead := msg.GetMemRes().Dead + for _, envp := range append(alive, dead...) { + msg, _ := envp.ToGossipMessage() + orgs[string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))] = struct{}{} + } + } + res := []string{} for org := range orgs { res = append(res, org)