Skip to content

Commit

Permalink
[FAB-872] Gossip multiChannel support
Browse files Browse the repository at this point in the history
This commit links gossip/channel To the gossip implementation, and:
1) Adds routing logic to the gossip dissemination function
   that disseminates messages according to the message channel
   and whether it should be routed only in the organization or not.
2) Changes the gossip.Gossip API to accomodate to multi-channel changes
3) Changes the gossip tests to accomodate to multi-channel environment
4) Adds tests that ensure that peers do NOT get blocks of channels
    they haven't joined them with a JoinChannel invocation.
5) Adds a test that disseminates blocks from all peers to all peers.
6) Since I have lots of tests now, made them run in parallel, to save time

Change-Id: Ibe6ad886def11e18f2be70b80dd19e4e8395b16f
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Jan 9, 2017
1 parent a5b12f2 commit fdf2f7a
Show file tree
Hide file tree
Showing 15 changed files with 1,112 additions and 310 deletions.
3 changes: 2 additions & 1 deletion core/committer/noopssinglechain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/events/producer"
gossipcommon "github.com/hyperledger/fabric/gossip/common"
gossip_proto "github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -195,7 +196,7 @@ func (d *DeliverService) readUntilClose() {
logger.Debug("Validating block, chainID", d.chainID)
validator.Validate(t.Block)

numberOfPeers := len(service.GetGossipService().GetPeers())
numberOfPeers := len(service.GetGossipService().PeersOfChannel(gossipcommon.ChainID(d.chainID)))
// Create payload with a block received
payload := createPayload(seqNum, t.Block)
// Use payload to create gossip message
Expand Down
63 changes: 63 additions & 0 deletions gossip/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filter

import (
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/util"
)

// RoutingFilter defines a predicate on a NetworkMember
// It is used to assert whether a given NetworkMember should be
// selected for be given a message
type RoutingFilter func(discovery.NetworkMember) bool

// CombineRoutingFilters returns the logical AND of given routing filters
func CombineRoutingFilters(filters ...RoutingFilter) RoutingFilter {
return func(member discovery.NetworkMember) bool {
for _, filter := range filters {
if !filter(member) {
return false
}
}
return true
}
}

// SelectPeers returns a slice of peers that match a list of routing filters
func SelectPeers(k int, peerPool []discovery.NetworkMember, filters ...RoutingFilter) []*comm.RemotePeer {
var indices []int
if len(peerPool) <= k {
indices = make([]int, len(peerPool))
for i := 0; i < len(peerPool); i++ {
indices[i] = i
}
} else {
indices = util.GetRandomIndices(k, len(peerPool)-1)
}

var remotePeers []*comm.RemotePeer
for _, index := range indices {
peer := peerPool[index]
if CombineRoutingFilters(filters ...)(peer) {
remotePeers = append(remotePeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.Endpoint})
}

}
return remotePeers
}
13 changes: 12 additions & 1 deletion gossip/gossip/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package gossip

import (
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -44,6 +45,10 @@ type batchingEmitter interface {
// latency: the maximum delay that each message can be stored without being forwarded
// cb: a callback that is called in order for the forwarding to take place
func newBatchingEmitter(iterations, burstSize int, latency time.Duration, cb emitBatchCallback) batchingEmitter {
if iterations < 0 {
panic(fmt.Errorf("Got a negative iterations number"))
}

p := &batchingEmitterImpl{
cb: cb,
delay: latency,
Expand All @@ -54,7 +59,10 @@ func newBatchingEmitter(iterations, burstSize int, latency time.Duration, cb emi
stopFlag: int32(0),
}

go p.periodicEmit()
if iterations != 0 {
go p.periodicEmit()
}

return p
}

Expand Down Expand Up @@ -126,6 +134,9 @@ func (p *batchingEmitterImpl) Size() int {
}

func (p *batchingEmitterImpl) Add(message interface{}) {
if p.iterations == 0 {
return
}
p.lock.Lock()
defer p.lock.Unlock()

Expand Down
5 changes: 2 additions & 3 deletions gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func (m *membershipSvcMock) GetMembership() []discovery.NetworkMember {
}

func TestCertStoreBadSignature(t *testing.T) {
t.Parallel()
badSignature := func(nonce uint64) comm.ReceivedMessage {
return createUpdateMessage(nonce, createBadlySignedUpdateMessage())
}
Expand All @@ -90,7 +89,6 @@ func TestCertStoreBadSignature(t *testing.T) {
}

func TestCertStoreMismatchedIdentity(t *testing.T) {
t.Parallel()
mismatchedIdentity := func(nonce uint64) comm.ReceivedMessage {
return createUpdateMessage(nonce, createMismatchedUpdateMessage())
}
Expand All @@ -99,7 +97,6 @@ func TestCertStoreMismatchedIdentity(t *testing.T) {
}

func TestCertStoreShouldSucceed(t *testing.T) {
t.Parallel()
totallyFineIdentity := func(nonce uint64) comm.ReceivedMessage {
return createUpdateMessage(nonce, createValidUpdateMessage())
}
Expand Down Expand Up @@ -129,6 +126,8 @@ func testCertificateUpdate(t *testing.T, updateFactory func(uint64) comm.Receive
Mediator: pullMediator,
}, identity.NewIdentityMapper(&naiveCryptoService{}), api.PeerIdentityType("SELF"), &naiveCryptoService{})

defer pullMediator.Stop()

wg := sync.WaitGroup{}
wg.Add(1)
sentHello := false
Expand Down
40 changes: 10 additions & 30 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/filter"
"github.com/hyperledger/fabric/gossip/gossip/msgstore"
"github.com/hyperledger/fabric/gossip/gossip/pull"
"github.com/hyperledger/fabric/gossip/proto"
Expand Down Expand Up @@ -163,7 +164,7 @@ func NewGossipChannel(mcs api.MessageCryptoService, chainID common.ChainID, adap
gc.blocksPuller.Remove(m.(*proto.GossipMessage))
})

gc.stateInfoMsgStore = msgstore.NewMessageStore(comparator, func(m interface{}) {})
gc.stateInfoMsgStore = NewStateInfoMessageStore()
gc.blocksPuller = gc.createBlockPuller()

gc.ConfigureChannel(joinMsg)
Expand Down Expand Up @@ -219,7 +220,10 @@ func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {

func (gc *gossipChannel) requestStateInfo() {
req := gc.createStateInfoRequest()
endpoints := selectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)
endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsSubscribed)
if len(endpoints) == 0 {
endpoints = filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)
}
gc.Send(req, endpoints...)
}

Expand Down Expand Up @@ -531,31 +535,7 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.GossipMessage) {
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}

// selectPeers returns a slice of peers that match a list of routing filters
func selectPeers(k int, peerPool []discovery.NetworkMember, filters ...func(discovery.NetworkMember) bool) []*comm.RemotePeer {
var indices []int
if len(peerPool) < k {
indices = make([]int, len(peerPool))
for i := 0; i < len(peerPool); i++ {
indices[i] = i
}
} else {
indices = util.GetRandomIndices(k, len(peerPool)-1)
}

var remotePeers []*comm.RemotePeer
for _, index := range indices {
peer := peerPool[index]
passesFilters := true
for _, filter := range filters {
if !filter(peer) {
passesFilters = false
}
}
if passesFilters {
remotePeers = append(remotePeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.Endpoint})
}

}
return remotePeers
}
// NewStateInfoMessageStore returns a MessageStore
func NewStateInfoMessageStore() msgstore.MessageStore {
return msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {})
}
165 changes: 165 additions & 0 deletions gossip/gossip/chanstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gossip

import (
"sync"
"sync/atomic"
"time"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/gossip/channel"
"github.com/hyperledger/fabric/gossip/gossip/msgstore"
"github.com/hyperledger/fabric/gossip/gossip/pull"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/gossip/util"
)

type channelState struct {
stopping int32
sync.RWMutex
channels map[string]channel.GossipChannel
g *gossipServiceImpl
}

func (cs *channelState) stop() {
if cs.isStopping() {
return
}
atomic.StoreInt32(&cs.stopping, int32(1))
cs.Lock()
defer cs.Unlock()
for _, gc := range cs.channels {
gc.Stop()
}
}

func (cs *channelState) isStopping() bool {
return atomic.LoadInt32(&cs.stopping) == int32(1)
}

func (cs *channelState) getGossipChannelByChainID(chainID common.ChainID) channel.GossipChannel {
if cs.isStopping() {
return nil
}
cs.Lock()
defer cs.Unlock()
return cs.channels[string(chainID)]
}

func (cs *channelState) joinChannel(joinMsg api.JoinChannelMessage, chainID common.ChainID) {
if cs.isStopping() {
return
}
cs.Lock()
defer cs.Unlock()
if gc, exists := cs.channels[string(chainID)]; !exists {
cs.channels[string(chainID)] = channel.NewGossipChannel(cs.g.mcs, chainID, &gossipAdapterImpl{gossipServiceImpl: cs.g, Discovery: cs.g.disc}, joinMsg)
} else {
gc.ConfigureChannel(joinMsg)
}
}

type gossipAdapterImpl struct {
*gossipServiceImpl
discovery.Discovery
}

func (ga *gossipAdapterImpl) GetConf() channel.Config {
return channel.Config{
ID: ga.conf.ID,
MaxBlockCountToStore: ga.conf.MaxBlockCountToStore,
PublishStateInfoInterval: ga.conf.PublishStateInfoInterval,
PullInterval: ga.conf.PullInterval,
PullPeerNum: ga.conf.PullPeerNum,
RequestStateInfoInterval: ga.conf.RequestStateInfoInterval,
}
}

// Gossip gossips a message
func (ga *gossipAdapterImpl) Gossip(msg *proto.GossipMessage) {
ga.gossipServiceImpl.emitter.Add(msg)
}

// ValidateStateInfoMessage returns error if a message isn't valid
// nil otherwise
func (ga *gossipAdapterImpl) ValidateStateInfoMessage(msg *proto.GossipMessage) error {
return ga.gossipServiceImpl.validateStateInfoMsg(msg.GetStateInfo())
}

// OrgByPeerIdentity extracts the organization identifier from a peer's identity
func (ga *gossipAdapterImpl) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType {
return ga.gossipServiceImpl.secAdvisor.OrgByPeerIdentity(identity)
}

// GetOrgOfPeer returns the organization identifier of a certain peer
func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType {
return ga.gossipServiceImpl.getOrgOfPeer(PKIID)
}

// Adapter enables the gossipChannel
// to communicate with gossipServiceImpl.

// Adapter connects a GossipChannel to the gossip implementation
type Adapter interface {

// GetConf returns the configuration
// of the GossipChannel
GetConf() Config

// Gossip gossips a message
Gossip(*proto.GossipMessage)

// DeMultiplex publishes a message to subscribers
DeMultiplex(interface{})

// GetMembership returns the peers that are considered alive
GetMembership() []discovery.NetworkMember

// Send sends a message to a list of peers
Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer)

// ValidateStateInfoMessage returns error if a message isn't valid
// nil otherwise
ValidateStateInfoMessage(*proto.GossipMessage) error

// OrgByPeerIdentity extracts the organization identifier from a peer's identity
OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType

// GetOrgOfPeer returns the organization identifier of a certain peer
GetOrgOfPeer(common.PKIidType) api.OrgIdentityType
}

type gossipChannel struct {
Adapter
sync.RWMutex
shouldGossipStateInfo int32
stopChan chan struct{}
stateInfoMsg *proto.GossipMessage
orgs []api.OrgIdentityType
joinMsg api.JoinChannelMessage
blockMsgStore msgstore.MessageStore
stateInfoMsgStore msgstore.MessageStore
chainID common.ChainID
blocksPuller pull.Mediator
logger *util.Logger
stateInfoPublishScheduler *time.Ticker
stateInfoRequestScheduler *time.Ticker
}
Loading

0 comments on commit fdf2f7a

Please sign in to comment.