diff --git a/core/committer/noopssinglechain/client.go b/core/committer/noopssinglechain/client.go index 12ebba24046..dea69b5cd80 100644 --- a/core/committer/noopssinglechain/client.go +++ b/core/committer/noopssinglechain/client.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/fabric/core/committer/txvalidator" "github.com/hyperledger/fabric/core/peer" "github.com/hyperledger/fabric/events/producer" + gossipcommon "github.com/hyperledger/fabric/gossip/common" gossip_proto "github.com/hyperledger/fabric/gossip/proto" "github.com/hyperledger/fabric/gossip/service" "github.com/hyperledger/fabric/protos/common" @@ -195,7 +196,7 @@ func (d *DeliverService) readUntilClose() { logger.Debug("Validating block, chainID", d.chainID) validator.Validate(t.Block) - numberOfPeers := len(service.GetGossipService().GetPeers()) + numberOfPeers := len(service.GetGossipService().PeersOfChannel(gossipcommon.ChainID(d.chainID))) // Create payload with a block received payload := createPayload(seqNum, t.Block) // Use payload to create gossip message diff --git a/gossip/filter/filter.go b/gossip/filter/filter.go new file mode 100644 index 00000000000..ce2b9838a7e --- /dev/null +++ b/gossip/filter/filter.go @@ -0,0 +1,63 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/util" +) + +// RoutingFilter defines a predicate on a NetworkMember +// It is used to assert whether a given NetworkMember should be +// selected for be given a message +type RoutingFilter func(discovery.NetworkMember) bool + +// CombineRoutingFilters returns the logical AND of given routing filters +func CombineRoutingFilters(filters ...RoutingFilter) RoutingFilter { + return func(member discovery.NetworkMember) bool { + for _, filter := range filters { + if !filter(member) { + return false + } + } + return true + } +} + +// SelectPeers returns a slice of peers that match a list of routing filters +func SelectPeers(k int, peerPool []discovery.NetworkMember, filters ...RoutingFilter) []*comm.RemotePeer { + var indices []int + if len(peerPool) <= k { + indices = make([]int, len(peerPool)) + for i := 0; i < len(peerPool); i++ { + indices[i] = i + } + } else { + indices = util.GetRandomIndices(k, len(peerPool)-1) + } + + var remotePeers []*comm.RemotePeer + for _, index := range indices { + peer := peerPool[index] + if CombineRoutingFilters(filters ...)(peer) { + remotePeers = append(remotePeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.Endpoint}) + } + + } + return remotePeers +} diff --git a/gossip/gossip/batcher.go b/gossip/gossip/batcher.go index 37f48b8c5cf..58bdf44f1ab 100644 --- a/gossip/gossip/batcher.go +++ b/gossip/gossip/batcher.go @@ -17,6 +17,7 @@ limitations under the License. package gossip import ( + "fmt" "sync" "sync/atomic" "time" @@ -44,6 +45,10 @@ type batchingEmitter interface { // latency: the maximum delay that each message can be stored without being forwarded // cb: a callback that is called in order for the forwarding to take place func newBatchingEmitter(iterations, burstSize int, latency time.Duration, cb emitBatchCallback) batchingEmitter { + if iterations < 0 { + panic(fmt.Errorf("Got a negative iterations number")) + } + p := &batchingEmitterImpl{ cb: cb, delay: latency, @@ -54,7 +59,10 @@ func newBatchingEmitter(iterations, burstSize int, latency time.Duration, cb emi stopFlag: int32(0), } - go p.periodicEmit() + if iterations != 0 { + go p.periodicEmit() + } + return p } @@ -126,6 +134,9 @@ func (p *batchingEmitterImpl) Size() int { } func (p *batchingEmitterImpl) Add(message interface{}) { + if p.iterations == 0 { + return + } p.lock.Lock() defer p.lock.Unlock() diff --git a/gossip/gossip/certstore_test.go b/gossip/gossip/certstore_test.go index fe953a85b1f..9ac46748ba7 100644 --- a/gossip/gossip/certstore_test.go +++ b/gossip/gossip/certstore_test.go @@ -81,7 +81,6 @@ func (m *membershipSvcMock) GetMembership() []discovery.NetworkMember { } func TestCertStoreBadSignature(t *testing.T) { - t.Parallel() badSignature := func(nonce uint64) comm.ReceivedMessage { return createUpdateMessage(nonce, createBadlySignedUpdateMessage()) } @@ -90,7 +89,6 @@ func TestCertStoreBadSignature(t *testing.T) { } func TestCertStoreMismatchedIdentity(t *testing.T) { - t.Parallel() mismatchedIdentity := func(nonce uint64) comm.ReceivedMessage { return createUpdateMessage(nonce, createMismatchedUpdateMessage()) } @@ -99,7 +97,6 @@ func TestCertStoreMismatchedIdentity(t *testing.T) { } func TestCertStoreShouldSucceed(t *testing.T) { - t.Parallel() totallyFineIdentity := func(nonce uint64) comm.ReceivedMessage { return createUpdateMessage(nonce, createValidUpdateMessage()) } @@ -129,6 +126,8 @@ func testCertificateUpdate(t *testing.T, updateFactory func(uint64) comm.Receive Mediator: pullMediator, }, identity.NewIdentityMapper(&naiveCryptoService{}), api.PeerIdentityType("SELF"), &naiveCryptoService{}) + defer pullMediator.Stop() + wg := sync.WaitGroup{} wg.Add(1) sentHello := false diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index 45279cd2f5c..810b6065f24 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -27,6 +27,7 @@ import ( "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/filter" "github.com/hyperledger/fabric/gossip/gossip/msgstore" "github.com/hyperledger/fabric/gossip/gossip/pull" "github.com/hyperledger/fabric/gossip/proto" @@ -163,7 +164,7 @@ func NewGossipChannel(mcs api.MessageCryptoService, chainID common.ChainID, adap gc.blocksPuller.Remove(m.(*proto.GossipMessage)) }) - gc.stateInfoMsgStore = msgstore.NewMessageStore(comparator, func(m interface{}) {}) + gc.stateInfoMsgStore = NewStateInfoMessageStore() gc.blocksPuller = gc.createBlockPuller() gc.ConfigureChannel(joinMsg) @@ -219,7 +220,10 @@ func (gc *gossipChannel) GetPeers() []discovery.NetworkMember { func (gc *gossipChannel) requestStateInfo() { req := gc.createStateInfoRequest() - endpoints := selectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan) + endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsSubscribed) + if len(endpoints) == 0 { + endpoints = filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan) + } gc.Send(req, endpoints...) } @@ -531,31 +535,7 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.GossipMessage) { atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1)) } -// selectPeers returns a slice of peers that match a list of routing filters -func selectPeers(k int, peerPool []discovery.NetworkMember, filters ...func(discovery.NetworkMember) bool) []*comm.RemotePeer { - var indices []int - if len(peerPool) < k { - indices = make([]int, len(peerPool)) - for i := 0; i < len(peerPool); i++ { - indices[i] = i - } - } else { - indices = util.GetRandomIndices(k, len(peerPool)-1) - } - - var remotePeers []*comm.RemotePeer - for _, index := range indices { - peer := peerPool[index] - passesFilters := true - for _, filter := range filters { - if !filter(peer) { - passesFilters = false - } - } - if passesFilters { - remotePeers = append(remotePeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.Endpoint}) - } - - } - return remotePeers -} +// NewStateInfoMessageStore returns a MessageStore +func NewStateInfoMessageStore() msgstore.MessageStore { + return msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}) +} \ No newline at end of file diff --git a/gossip/gossip/chanstate.go b/gossip/gossip/chanstate.go new file mode 100644 index 00000000000..75b278eb2cc --- /dev/null +++ b/gossip/gossip/chanstate.go @@ -0,0 +1,165 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/gossip/channel" + "github.com/hyperledger/fabric/gossip/gossip/msgstore" + "github.com/hyperledger/fabric/gossip/gossip/pull" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/util" +) + +type channelState struct { + stopping int32 + sync.RWMutex + channels map[string]channel.GossipChannel + g *gossipServiceImpl +} + +func (cs *channelState) stop() { + if cs.isStopping() { + return + } + atomic.StoreInt32(&cs.stopping, int32(1)) + cs.Lock() + defer cs.Unlock() + for _, gc := range cs.channels { + gc.Stop() + } +} + +func (cs *channelState) isStopping() bool { + return atomic.LoadInt32(&cs.stopping) == int32(1) +} + +func (cs *channelState) getGossipChannelByChainID(chainID common.ChainID) channel.GossipChannel { + if cs.isStopping() { + return nil + } + cs.Lock() + defer cs.Unlock() + return cs.channels[string(chainID)] +} + +func (cs *channelState) joinChannel(joinMsg api.JoinChannelMessage, chainID common.ChainID) { + if cs.isStopping() { + return + } + cs.Lock() + defer cs.Unlock() + if gc, exists := cs.channels[string(chainID)]; !exists { + cs.channels[string(chainID)] = channel.NewGossipChannel(cs.g.mcs, chainID, &gossipAdapterImpl{gossipServiceImpl: cs.g, Discovery: cs.g.disc}, joinMsg) + } else { + gc.ConfigureChannel(joinMsg) + } +} + +type gossipAdapterImpl struct { + *gossipServiceImpl + discovery.Discovery +} + +func (ga *gossipAdapterImpl) GetConf() channel.Config { + return channel.Config{ + ID: ga.conf.ID, + MaxBlockCountToStore: ga.conf.MaxBlockCountToStore, + PublishStateInfoInterval: ga.conf.PublishStateInfoInterval, + PullInterval: ga.conf.PullInterval, + PullPeerNum: ga.conf.PullPeerNum, + RequestStateInfoInterval: ga.conf.RequestStateInfoInterval, + } +} + +// Gossip gossips a message +func (ga *gossipAdapterImpl) Gossip(msg *proto.GossipMessage) { + ga.gossipServiceImpl.emitter.Add(msg) +} + +// ValidateStateInfoMessage returns error if a message isn't valid +// nil otherwise +func (ga *gossipAdapterImpl) ValidateStateInfoMessage(msg *proto.GossipMessage) error { + return ga.gossipServiceImpl.validateStateInfoMsg(msg.GetStateInfo()) +} + +// OrgByPeerIdentity extracts the organization identifier from a peer's identity +func (ga *gossipAdapterImpl) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType { + return ga.gossipServiceImpl.secAdvisor.OrgByPeerIdentity(identity) +} + +// GetOrgOfPeer returns the organization identifier of a certain peer +func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType { + return ga.gossipServiceImpl.getOrgOfPeer(PKIID) +} + +// Adapter enables the gossipChannel +// to communicate with gossipServiceImpl. + +// Adapter connects a GossipChannel to the gossip implementation +type Adapter interface { + + // GetConf returns the configuration + // of the GossipChannel + GetConf() Config + + // Gossip gossips a message + Gossip(*proto.GossipMessage) + + // DeMultiplex publishes a message to subscribers + DeMultiplex(interface{}) + + // GetMembership returns the peers that are considered alive + GetMembership() []discovery.NetworkMember + + // Send sends a message to a list of peers + Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) + + // ValidateStateInfoMessage returns error if a message isn't valid + // nil otherwise + ValidateStateInfoMessage(*proto.GossipMessage) error + + // OrgByPeerIdentity extracts the organization identifier from a peer's identity + OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType + + // GetOrgOfPeer returns the organization identifier of a certain peer + GetOrgOfPeer(common.PKIidType) api.OrgIdentityType +} + +type gossipChannel struct { + Adapter + sync.RWMutex + shouldGossipStateInfo int32 + stopChan chan struct{} + stateInfoMsg *proto.GossipMessage + orgs []api.OrgIdentityType + joinMsg api.JoinChannelMessage + blockMsgStore msgstore.MessageStore + stateInfoMsgStore msgstore.MessageStore + chainID common.ChainID + blocksPuller pull.Mediator + logger *util.Logger + stateInfoPublishScheduler *time.Ticker + stateInfoRequestScheduler *time.Ticker +} diff --git a/gossip/gossip/gossip.go b/gossip/gossip/gossip.go index 57aad6aa1f6..b899c4d3a79 100644 --- a/gossip/gossip/gossip.go +++ b/gossip/gossip/gossip.go @@ -17,9 +17,10 @@ limitations under the License. package gossip import ( - "crypto/tls" "time" + "crypto/tls" + "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" @@ -32,11 +33,20 @@ type Gossip interface { // Send sends a message to remote peers Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) - // GetPeers returns a mapping of endpoint --> []discovery.NetworkMember - GetPeers() []discovery.NetworkMember + // GetPeers returns the NetworkMembers considered alive + Peers() []discovery.NetworkMember + + // PeersOfChannel returns the NetworkMembers considered alive + // and also subscribed to the channel given + PeersOfChannel(common.ChainID) []discovery.NetworkMember // UpdateMetadata updates the self metadata of the discovery layer - UpdateMetadata([]byte) + // the peer publishes to other peers + UpdateMetadata(metadata []byte) + + // UpdateChannelMetadata updates the self metadata the peer + // publishes to other peers about its channel-related state + UpdateChannelMetadata(metadata []byte, chainID common.ChainID) // Gossip sends a message to other peers to the network Gossip(msg *proto.GossipMessage) @@ -47,28 +57,35 @@ type Gossip interface { // can be used to send a reply back to the sender Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage) + // JoinChan makes the Gossip instance join a channel + JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID) + // Stop stops the gossip component Stop() } // Config is the configuration of the gossip component type Config struct { - BindPort int - ID string - SelfEndpoint string - BootstrapPeers []string - PropagateIterations int - PropagatePeerNum int + BindPort int // Port we bind to, used only for tests + ID string // ID of this instance + SelfEndpoint string // Endpoint we publish to remote peers + BootstrapPeers []string // Peers we connect to at startup + PropagateIterations int // Number of times a message is pushed to remote peers + PropagatePeerNum int // Number of peers selected to push messages to - MaxMessageCountToStore int + MaxBlockCountToStore int // Maximum count of blocks we store in memory + StateInfoRetentionInterval time.Duration // TODO: this would be a maximum time a stateInfo message is kept until expired - MaxPropagationBurstSize int - MaxPropagationBurstLatency time.Duration + MaxPropagationBurstSize int // Max number of messages stored until it triggers a push to remote peers + MaxPropagationBurstLatency time.Duration // Max time between consecutive message pushes - PullInterval time.Duration - PullPeerNum int + PullInterval time.Duration // Determines frequency of pull phases + PullPeerNum int // Number of peers to pull from - PublishCertPeriod time.Duration + SkipBlockVerification bool // Should we skip verifying block messages or not - TLSServerCert *tls.Certificate + PublishCertPeriod time.Duration // Time from startup certificates are included in Alive messages + PublishStateInfoInterval time.Duration // Determines frequency of pushing state info messages to peers + RequestStateInfoInterval time.Duration // Determines frequency of pulling state info messages from peers + TLSServerCert *tls.Certificate // TLS certificate of the peer } diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index aa4545427c5..72a52fd2942 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -29,6 +29,8 @@ import ( "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/filter" + "github.com/hyperledger/fabric/gossip/gossip/channel" "github.com/hyperledger/fabric/gossip/gossip/msgstore" "github.com/hyperledger/fabric/gossip/gossip/pull" "github.com/hyperledger/fabric/gossip/identity" @@ -43,6 +45,8 @@ const ( acceptChanSize = 100 ) +type channelRoutingFilterFactory func(channel.GossipChannel) filter.RoutingFilter + type gossipServiceImpl struct { selfIdentity api.PeerIdentityType includeIdentityPeriod time.Time @@ -51,23 +55,26 @@ type gossipServiceImpl struct { presumedDead chan common.PKIidType disc discovery.Discovery comm comm.Comm + incTime time.Time + selfOrg api.OrgIdentityType *comm.ChannelDeMultiplexer - logger *util.Logger - stopSignal *sync.WaitGroup - conf *Config - toDieChan chan struct{} - stopFlag int32 - msgStore msgstore.MessageStore - emitter batchingEmitter - goRoutines []uint64 - discAdapter *discoveryAdapter - disSecAdap *discoverySecurityAdapter - mcs api.MessageCryptoService - blocksPuller pull.Mediator + logger *util.Logger + stopSignal *sync.WaitGroup + conf *Config + toDieChan chan struct{} + stopFlag int32 + emitter batchingEmitter + discAdapter *discoveryAdapter + secAdvisor api.SecurityAdvisor + chanState *channelState + disSecAdap *discoverySecurityAdapter + mcs api.MessageCryptoService + aliveMsgStore msgstore.MessageStore + stateInfoMsgStore msgstore.MessageStore } // NewGossipService creates a gossip instance attached to a gRPC server -func NewGossipService(conf *Config, s *grpc.Server, mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, dialOpts ...grpc.DialOption) Gossip { +func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, dialOpts ...grpc.DialOption) Gossip { var c comm.Comm var err error idMapper := identity.NewIdentityMapper(mcs) @@ -84,6 +91,9 @@ func NewGossipService(conf *Config, s *grpc.Server, mcs api.MessageCryptoService } g := &gossipServiceImpl{ + stateInfoMsgStore: channel.NewStateInfoMessageStore(), + selfOrg: secAdvisor.OrgByPeerIdentity(selfIdentity), + secAdvisor: secAdvisor, selfIdentity: selfIdentity, presumedDead: make(chan common.PKIidType, presumedDeadChanSize), idMapper: idMapper, @@ -96,10 +106,12 @@ func NewGossipService(conf *Config, s *grpc.Server, mcs api.MessageCryptoService toDieChan: make(chan struct{}, 1), stopFlag: int32(0), stopSignal: &sync.WaitGroup{}, - goRoutines: make([]uint64, 0), includeIdentityPeriod: time.Now().Add(conf.PublishCertPeriod), } + g.aliveMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}) + + g.chanState = newChannelState(g) g.emitter = newBatchingEmitter(conf.PropagateIterations, conf.MaxPropagationBurstSize, conf.MaxPropagationBurstLatency, g.sendGossipBatch) @@ -110,12 +122,6 @@ func NewGossipService(conf *Config, s *grpc.Server, mcs api.MessageCryptoService Endpoint: conf.SelfEndpoint, PKIid: g.comm.GetPKIid(), Metadata: []byte{}, }, g.discAdapter, g.disSecAdap) - g.msgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) { - g.blocksPuller.Remove(m.(*proto.GossipMessage)) - }) - - g.blocksPuller = g.createBlockPuller() - g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs) g.logger.SetLevel(logging.WARNING) @@ -125,56 +131,39 @@ func NewGossipService(conf *Config, s *grpc.Server, mcs api.MessageCryptoService return g } -func (g *gossipServiceImpl) createBlockPuller() pull.Mediator { - conf := pull.PullConfig{ - MsgType: proto.PullMsgType_BlockMessage, - Channel: []byte(""), - Id: g.conf.SelfEndpoint, - PeerCountToSelect: g.conf.PullPeerNum, - PullInterval: g.conf.PullInterval, - Tag: proto.GossipMessage_EMPTY, - } - seqNumFromMsg := func(msg *proto.GossipMessage) string { - dataMsg := msg.GetDataMsg() - if dataMsg == nil || dataMsg.Payload == nil { - return "" - } - return fmt.Sprintf("%d", dataMsg.Payload.SeqNum) - } - blockConsumer := func(msg *proto.GossipMessage) { - dataMsg := msg.GetDataMsg() - if dataMsg == nil || dataMsg.Payload == nil { - g.logger.Warning("Invalid DataMessage:", dataMsg) - return - } - added := g.msgStore.Add(msg) - // if we can't add the message to the msgStore, - // no point in disseminating it to others... - if !added { - return - } - g.DeMultiplex(msg) +func newChannelState(g *gossipServiceImpl) *channelState { + return &channelState{ + stopping: int32(0), + channels: make(map[string]channel.GossipChannel), + g: g, } - return pull.NewPullMediator(conf, g.comm, g.disc, seqNumFromMsg, blockConsumer) +} + +func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper, identity api.PeerIdentityType, dialOpts ...grpc.DialOption) (comm.Comm, error) { + return comm.NewCommInstance(s, cert, idStore, identity, dialOpts...) } // NewGossipServiceWithServer creates a new gossip instance with a gRPC server -func NewGossipServiceWithServer(conf *Config, mcs api.MessageCryptoService, identity api.PeerIdentityType) Gossip { - return NewGossipService(conf, nil, mcs, identity) +func NewGossipServiceWithServer(conf *Config, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService, identity api.PeerIdentityType) Gossip { + return NewGossipService(conf, nil, secAdvisor, mcs, identity) } func createCommWithServer(port int, idStore identity.Mapper, identity api.PeerIdentityType) (comm.Comm, error) { return comm.NewCommInstanceWithServer(port, idStore, identity) } -func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper, identity api.PeerIdentityType, dialOpts ...grpc.DialOption) (comm.Comm, error) { - return comm.NewCommInstance(s, cert, idStore, identity, dialOpts...) -} - func (g *gossipServiceImpl) toDie() bool { return atomic.LoadInt32(&g.stopFlag) == int32(1) } +func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID) { + if err := g.secAdvisor.Verify(joinMsg); err != nil { + g.logger.Error("Failed verifying join channel message", joinMsg, "error:", err) + return + } + g.chanState.joinChannel(joinMsg, chainID) +} + func (g *gossipServiceImpl) handlePresumedDead() { defer g.logger.Debug("Exiting") g.stopSignal.Add(1) @@ -244,75 +233,118 @@ func (g *gossipServiceImpl) handleMessage(m comm.ReceivedMessage) { if g.toDie() { return } - g.logger.Info("Entering,", m) - defer g.logger.Info("Exiting") + if m == nil || m.GetGossipMessage() == nil { return } msg := m.GetGossipMessage() - if msg.IsAliveMsg() || msg.IsDataMsg() { - if msg.IsAliveMsg() { - if !g.disSecAdap.ValidateAliveMsg(msg.GetAliveMsg()) { - g.logger.Warning("AliveMessage", m.GetGossipMessage(), "isn't authentic. Discarding it") + g.logger.Debug("Entering,", m.GetPKIID(), "sent us", msg) + defer g.logger.Debug("Exiting") + + if !g.validateMsg(m) { + g.logger.Warning("Message", msg, "isn't valid") + return + } + + if msg.IsAliveMsg() { + am := msg.GetAliveMsg() + storedIdentity, _ := g.idMapper.Get(common.PKIidType(am.Membership.PkiID)) + // If peer's certificate is included inside AliveMessage, and we don't have a mapping between + // its PKI-ID and certificate, create a mapping for it now. + if identity := am.Identity; identity != nil && storedIdentity == nil { + err := g.idMapper.Put(common.PKIidType(am.Membership.PkiID), api.PeerIdentityType(identity)) + if err != nil { + g.logger.Warning("Failed adding identity of", am, "into identity store:", err) return } + g.logger.Info("Learned identity of", am.Membership.PkiID) + } - am := msg.GetAliveMsg() - storedIdentity, _ := g.idMapper.Get(common.PKIidType(am.Membership.PkiID)) - // If peer's certificate is included inside AliveMessage, and we don't have a mapping between - // its PKI-ID and certificate, create a mapping for it now. - if identity := am.Identity; identity != nil && storedIdentity == nil { - err := g.idMapper.Put(common.PKIidType(am.Membership.PkiID), api.PeerIdentityType(identity)) - if err != nil { - g.logger.Warning("Failed adding identity of", am, "into identity store:", err) - return - } - g.logger.Info("Learned identity of", am.Membership.PkiID) - } + added := g.aliveMsgStore.Add(msg) + if !added { + return } + g.emitter.Add(msg) + } - if msg.IsDataMsg() { - blockMsg := msg.GetDataMsg() - if blockMsg.Payload == nil { - g.logger.Warning("Empty block! Discarding it") - return + if msg.IsChannelRestricted() { + if gc := g.chanState.getGossipChannelByChainID(msg.Channel); gc == nil { + // If we're not in the channel but we should forward to peers of our org + if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetPKIID()}) && msg.IsStateInfoMsg() { + if g.stateInfoMsgStore.Add(msg) { + g.emitter.Add(msg) + } } - if err := g.mcs.VerifyBlock(blockMsg); err != nil { - g.logger.Warning("Could not verify block", blockMsg.Payload.SeqNum, "Discarding it") - return + if !g.toDie() { + g.logger.Warning("No such channel", msg.Channel, "discarding message", msg) } + } else { + gc.HandleMessage(m) } + return + } - added := g.msgStore.Add(msg) - if added { - g.emitter.Add(msg) - if dataMsg := m.GetGossipMessage().GetDataMsg(); dataMsg != nil { - g.blocksPuller.Add(msg) - g.DeMultiplex(msg) - } + if selectOnlyDiscoveryMessages(m) { + g.forwardDiscoveryMsg(m) + } + + if msg.IsPullMsg() && msg.GetPullMsgType() == proto.PullMsgType_IdentityMsg { + g.certStore.handleMessage(m) + } +} + +func (g *gossipServiceImpl) forwardDiscoveryMsg(msg comm.ReceivedMessage) { + defer func() { // can be closed while shutting down + recover() + }() + g.discAdapter.incChan <- msg.GetGossipMessage() +} +// validateMsg checks the signature of the message if exists, +// and also checks that the tag matches the message type +func (g *gossipServiceImpl) validateMsg(msg comm.ReceivedMessage) bool { + if err := msg.GetGossipMessage().IsTagLegal(); err != nil { + g.logger.Warning("Tag of", msg.GetGossipMessage(), "isn't legal:", err) + return false + } + + if msg.GetGossipMessage().IsAliveMsg() { + if !g.disSecAdap.ValidateAliveMsg(msg.GetGossipMessage().GetAliveMsg()) { + return false } } - if selectOnlyDiscoveryMessages(m) { - g.forwardDiscoveryMsg(m) + if msg.GetGossipMessage().IsDataMsg() { + blockMsg := msg.GetGossipMessage().GetDataMsg() + if blockMsg.Payload == nil { + g.logger.Warning("Empty block! Discarding it") + return false + } + + // If we're configured to skip block validation, don't verify it + if g.conf.SkipBlockVerification { + return true + } + + if err := g.mcs.VerifyBlock(blockMsg); err != nil { + g.logger.Warning("Could not verify block", blockMsg.Payload.SeqNum, ":", err) + return false + } } - if msg.IsPullMsg() { - switch msgType := msg.GetPullMsgType(); msgType { - case proto.PullMsgType_BlockMessage: - g.blocksPuller.HandleMessage(m) - case proto.PullMsgType_IdentityMsg: - g.certStore.handleMessage(m) - default: - g.logger.Warning("Got invalid pull message type:", msgType) + if msg.GetGossipMessage().IsStateInfoMsg() { + stateInfoMsg := msg.GetGossipMessage().GetStateInfo() + if err := g.validateStateInfoMsg(stateInfoMsg); err != nil { + g.logger.Warning("StateInfo message", msg, "is found invalid:", err) + return false } } + return true } -func (g *gossipServiceImpl) forwardDiscoveryMsg(msg comm.ReceivedMessage) { +func (g *gossipServiceImpl) forwardToDiscoveryLayer(msg comm.ReceivedMessage) { defer func() { // can be closed while shutting down recover() }() @@ -324,26 +356,119 @@ func (g *gossipServiceImpl) sendGossipBatch(a []interface{}) { for i, e := range a { msgs2Gossip[i] = e.(*proto.GossipMessage) } - go g.gossipBatch(msgs2Gossip) -} - + g.gossipBatch(msgs2Gossip) +} + +// gossipBatch - This is the method that actually decides to which peers to gossip the message +// batch we possess. +// For efficiency, we first isolate all the messages that have the same routing policy +// and send them together, and only after that move to the next group of messages. +// i.e: we send all blocks of channel C to the same group of peers, +// and send all StateInfo messages to the same group of peers, etc. etc. +// When we send blocks, we send only to peers that advertised themselves in the channel. +// When we send StateInfo messages, we send to peers in the channel. +// When we send messages that are marked to be sent only within the org, we send all of these messages +// to the same set of peers. +// The rest of the messages that have no restrictions on their destinations can be sent +// to any group of peers. func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) { if g.disc == nil { g.logger.Error("Discovery has not been initialized yet, aborting!") return } - peers2Send := pull.SelectEndpoints(g.conf.PropagatePeerNum, g.disc.GetMembership()) + + var blocks []*proto.GossipMessage + var stateInfoMsgs []*proto.GossipMessage + var orgMsgs []*proto.GossipMessage + + isABlock := func(o interface{}) bool { + return o.(*proto.GossipMessage).IsDataMsg() + } + isAStateInfoMsg := func(o interface{}) bool { + return o.(*proto.GossipMessage).IsStateInfoMsg() + } + isOrgRestricted := func(o interface{}) bool { + return o.(*proto.GossipMessage).IsOrgRestricted() + } + + // Gossip blocks + blocks, msgs = partitionMessages(isABlock, msgs) + g.gossipInChan(blocks, func(gc channel.GossipChannel) filter.RoutingFilter { + return filter.CombineRoutingFilters(gc.IsSubscribed, gc.IsMemberInChan, g.isInMyorg) + }) + + // Gossip StateInfo messages + stateInfoMsgs, msgs = partitionMessages(isAStateInfoMsg, msgs) + g.gossipInChan(stateInfoMsgs, func(gc channel.GossipChannel) filter.RoutingFilter { + return gc.IsMemberInChan + }) + + // Gossip messages restricted to our org + orgMsgs, msgs = partitionMessages(isOrgRestricted, msgs) + peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), g.isInMyorg) + for _, msg := range orgMsgs { + g.comm.Send(msg, peers2Send...) + } + + // Finally, gossip the remaining messages + peers2Send = filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership()) for _, msg := range msgs { g.comm.Send(msg, peers2Send...) } } +// gossipInChan gossips a given GossipMessage slice according to a channel's routing policy. +func (g *gossipServiceImpl) gossipInChan(messages []*proto.GossipMessage, chanRoutingFactory channelRoutingFilterFactory) { + if len(messages) == 0 { + return + } + totalChannels := extractChannels(messages) + var channel common.ChainID + var messagesOfChannel []*proto.GossipMessage + for len(totalChannels) > 0 { + // Take first channel + channel, totalChannels = totalChannels[0], totalChannels[1:] + // Extract all messages of that channel + grabMsgs := func(o interface{}) bool { + return bytes.Equal(o.(*proto.GossipMessage).Channel, channel) + } + messagesOfChannel, messages = partitionMessages(grabMsgs, messages) + // Grab channel object for that channel + gc := g.chanState.getGossipChannelByChainID(channel) + if gc == nil { + g.logger.Warning("Channel", channel, "wasn't found") + continue + } + // Select the peers to send the messages to + peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), chanRoutingFactory(gc)) + // Send the messages to the remote peers + for _, msg := range messagesOfChannel { + g.comm.Send(msg, peers2Send...) + } + } +} + // Gossip sends a message to other peers to the network func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) { - g.logger.Info(msg) - if dataMsg := msg.GetDataMsg(); dataMsg != nil { - g.msgStore.Add(msg) - g.blocksPuller.Add(msg) + // Educate developers to Gossip messages with the right tags. + // See IsTagLegal() for wanted behavior. + if err := msg.IsTagLegal(); err != nil { + panic(err) + } + + if msg.IsChannelRestricted() { + gc := g.chanState.getGossipChannelByChainID(msg.Channel) + if gc == nil { + g.logger.Warning("Failed obtaining gossipChannel of", msg.Channel, "aborting") + return + } + if msg.IsDataMsg() { + gc.AddToMsgStore(msg) + } + } + + if g.conf.PropagateIterations == 0 { + return } g.emitter.Add(msg) } @@ -354,12 +479,25 @@ func (g *gossipServiceImpl) Send(msg *proto.GossipMessage, peers ...*comm.Remote } // GetPeers returns a mapping of endpoint --> []discovery.NetworkMember -func (g *gossipServiceImpl) GetPeers() []discovery.NetworkMember { +func (g *gossipServiceImpl) Peers() []discovery.NetworkMember { s := []discovery.NetworkMember{} for _, member := range g.disc.GetMembership() { s = append(s, member) } return s + +} + +// PeersOfChannel returns the NetworkMembers considered alive +// and also subscribed to the channel given +func (g *gossipServiceImpl) PeersOfChannel(channel common.ChainID) []discovery.NetworkMember { + gc := g.chanState.getGossipChannelByChainID(channel) + if gc == nil { + g.logger.Warning("No such channel", channel) + return nil + } + + return gc.GetPeers() } // Stop stops the gossip component @@ -375,9 +513,9 @@ func (g *gossipServiceImpl) Stop() { defer comWG.Done() g.comm.Stop() }() + g.chanState.stop() g.discAdapter.close() g.disc.Stop() - g.blocksPuller.Stop() g.certStore.stop() g.toDieChan <- struct{}{} g.emitter.Stop() @@ -386,16 +524,30 @@ func (g *gossipServiceImpl) Stop() { comWG.Wait() } -// UpdateMetadata updates the self metadata of the discovery layer func (g *gossipServiceImpl) UpdateMetadata(md []byte) { g.disc.UpdateMetadata(md) } +// UpdateChannelMetadata updates the self metadata the peer +// publishes to other peers about its channel-related state +func (g *gossipServiceImpl) UpdateChannelMetadata(md []byte, chainID common.ChainID) { + gc := g.chanState.getGossipChannelByChainID(chainID) + if gc == nil { + g.logger.Warning("No such channel", chainID) + return + } + stateInfMsg, err := g.createStateInfoMsg(md, chainID) + if err != nil { + g.logger.Error("Failed creating StateInfo message") + return + } + gc.UpdateStateInfo(stateInfMsg) +} + // Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate. // If passThrough is false, the messages are processed by the gossip layer beforehand. // If passThrough is true, the gossip layer doesn't intervene and the messages // can be used to send a reply back to the sender - func (g *gossipServiceImpl) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage) { if passThrough { return nil, g.comm.Accept(acceptor) @@ -471,6 +623,9 @@ func (da *discoveryAdapter) toDie() bool { } func (da *discoveryAdapter) Gossip(msg *proto.GossipMessage) { + if da.toDie() { + return + } if msg.IsAliveMsg() && time.Now().Before(da.includeIdentityPeriod) { msg.GetAliveMsg().Identity = da.identity } @@ -604,6 +759,30 @@ func (g *gossipServiceImpl) peersWithEndpoints(endpoints ...string) []*comm.Remo return peers } +func (g *gossipServiceImpl) validateIdentityMsg(msg *proto.GossipMessage) error { + if msg.GetPeerIdentity() == nil { + return fmt.Errorf("Identity empty") + } + idMsg := msg.GetPeerIdentity() + pkiID := idMsg.PkiID + cert := idMsg.Cert + sig := idMsg.Sig + if bytes.Equal(g.idMapper.GetPKIidOfCert(api.PeerIdentityType(cert)), common.PKIidType(pkiID)) { + return fmt.Errorf("Calculated pkiID doesn't match identity") + } + idMsg.Sig = nil + b, err := prot.Marshal(idMsg) + if err != nil { + return fmt.Errorf("Failed marshalling: %v", err) + } + err = g.mcs.Verify(api.PeerIdentityType(cert), sig, b) + if err != nil { + return fmt.Errorf("Failed verifying message: %v", err) + } + idMsg.Sig = sig + return nil +} + func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator { conf := pull.PullConfig{ MsgType: proto.PullMsgType_IdentityMsg, @@ -630,7 +809,109 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator { if err != nil { g.logger.Warning("Failed associating PKI-ID with certificate:", err) } + g.logger.Info("Learned of a new certificate:", idMsg.Cert) } return pull.NewPullMediator(conf, g.comm, g.disc, pkiIDFromMsg, certConsumer) } + +func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.GossipMessage, error) { + stateInfMsg := &proto.StateInfo{ + Metadata: metadata, + PkiID: g.comm.GetPKIid(), + Signature: nil, + Timestamp: &proto.PeerTime{ + IncNumber: uint64(g.incTime.UnixNano()), + SeqNum: uint64(time.Now().UnixNano()), + }, + } + + b, err := prot.Marshal(stateInfMsg) + if err != nil { + g.logger.Error("Failed marshalling StateInfo message:", err) + return nil, err + } + + sig, err := g.mcs.Sign(b) + if err != nil { + g.logger.Errorf("Failed signing StateInfo message:", err) + return nil, err + } + + stateInfMsg.Signature = sig + + return &proto.GossipMessage{ + Channel: chainID, + Nonce: 0, + Tag: proto.GossipMessage_CHAN_OR_ORG, + Content: &proto.GossipMessage_StateInfo{ + StateInfo: stateInfMsg, + }, + }, nil +} + +func (g *gossipServiceImpl) isInMyorg(member discovery.NetworkMember) bool { + if member.PKIid == nil { + return false + } + if org := g.getOrgOfPeer(member.PKIid); org != nil { + return bytes.Equal(g.selfOrg, org) + } + return false +} + +func (g *gossipServiceImpl) getOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType { + cert, err := g.idMapper.Get(PKIID) + if err != nil { + g.logger.Error("Failed getting certificate by PKIid:", PKIID, ":", err) + return nil + } + + return g.secAdvisor.OrgByPeerIdentity(cert) +} + +func (g *gossipServiceImpl) validateStateInfoMsg(msg *proto.StateInfo) error { + sig := msg.Signature + msg.Signature = nil + b, err := prot.Marshal(msg) + if err != nil { + return err + } + err = g.idMapper.Verify(msg.PkiID, sig, b) + msg.Signature = sig + return err +} + +// partitionMessages receives a predicate and a slice of gossip messages +// and returns a tuple of two slices: the messages that hold for the predicate +// and the rest +func partitionMessages(pred common.MessageAcceptor, a []*proto.GossipMessage) ([]*proto.GossipMessage, []*proto.GossipMessage) { + s1 := []*proto.GossipMessage{} + s2 := []*proto.GossipMessage{} + for _, m := range a { + if pred(m) { + s1 = append(s1, m) + } else { + s2 = append(s2, m) + } + } + return s1, s2 +} + +// extractChannels returns a slice with all channels +// of all given GossipMessages +func extractChannels(a []*proto.GossipMessage) []common.ChainID { + channels := []common.ChainID{} + for _, m := range a { + if len(m.Channel) == 0 { + continue + } + sameChan := func(a interface{}, b interface{}) bool { + return bytes.Equal(a.(common.ChainID), b.(common.ChainID)) + } + if util.IndexInSlice(channels, common.ChainID(m.Channel), sameChan) == -1 { + channels = append(channels, common.ChainID(m.Channel)) + } + } + return channels +} diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index 0703765673c..c7df9217780 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -36,8 +36,9 @@ import ( "github.com/stretchr/testify/assert" ) -var individualTimeout = time.Second * time.Duration(60) -var perTestTimeout = time.Second * time.Duration(180) +var timeout = time.Second * time.Duration(180) + +var testWG = sync.WaitGroup{} func init() { aliveTimeInterval := time.Duration(1000) * time.Millisecond @@ -45,10 +46,14 @@ func init() { discovery.SetAliveExpirationCheckInterval(aliveTimeInterval) discovery.SetExpirationTimeout(aliveTimeInterval * 10) discovery.SetReconnectInterval(aliveTimeInterval * 5) + + testWG.Add(6) + } -var portPrefix = 5610 -var testLock = sync.RWMutex{} +//var testLock = sync.RWMutex{} +var orgInChannelA = api.OrgIdentityType("ORG1") +var anchorPeerIdentity = api.PeerIdentityType("identityInOrg1") func acceptData(m interface{}) bool { if dataMsg := m.(*proto.GossipMessage).GetDataMsg(); dataMsg != nil { @@ -57,9 +62,37 @@ func acceptData(m interface{}) bool { return false } +type joinChanMsg struct { +} + +// GetTimestamp returns the timestamp of the message's creation +func (*joinChanMsg) GetTimestamp() time.Time { + return time.Now() +} + +// AnchorPeers returns all the anchor peers that are in the channel +func (*joinChanMsg) AnchorPeers() []api.AnchorPeer { + return []api.AnchorPeer{{Cert: anchorPeerIdentity}} +} + type naiveCryptoService struct { } +type orgCryptoService struct { +} + +// OrgByPeerIdentity returns the OrgIdentityType +// of a given peer identity +func (*orgCryptoService) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType { + return orgInChannelA +} + +// Verify verifies a JoinChanMessage, returns nil on success, +// and an error on failure +func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error { + return nil +} + func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { return nil } @@ -92,7 +125,7 @@ func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, return nil } -func bootPeers(ids ...int) []string { +func bootPeers(portPrefix int, ids ...int) []string { peers := []string{} for _, id := range ids { peers = append(peers, fmt.Sprintf("localhost:%d", (id+portPrefix))) @@ -100,33 +133,36 @@ func bootPeers(ids ...int) []string { return peers } -func newGossipInstance(id int, maxMsgCount int, boot ...int) Gossip { +func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gossip { port := id + portPrefix conf := &Config{ - BindPort: port, - BootstrapPeers: bootPeers(boot...), - ID: fmt.Sprintf("p%d", id), - MaxMessageCountToStore: maxMsgCount, - MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond, - MaxPropagationBurstSize: 10, + BindPort: port, + BootstrapPeers: bootPeers(portPrefix, boot...), + ID: fmt.Sprintf("p%d", id), + MaxBlockCountToStore: maxMsgCount, + MaxPropagationBurstLatency: time.Duration(500) * time.Millisecond, + MaxPropagationBurstSize: 20, PropagateIterations: 1, PropagatePeerNum: 3, - PullInterval: time.Duration(4) * time.Second, + PullInterval: time.Duration(2) * time.Second, PullPeerNum: 5, SelfEndpoint: fmt.Sprintf("localhost:%d", port), PublishCertPeriod: time.Duration(4) * time.Second, + PublishStateInfoInterval: time.Duration(1) * time.Second, + RequestStateInfoInterval: time.Duration(1) * time.Second, } - return NewGossipServiceWithServer(conf, &naiveCryptoService{}, api.PeerIdentityType(conf.SelfEndpoint)) + g := NewGossipServiceWithServer(conf, &orgCryptoService{}, &naiveCryptoService{}, api.PeerIdentityType(conf.SelfEndpoint)) + return g } -func newGossipInstanceWithOnlyPull(id int, maxMsgCount int, boot ...int) Gossip { +func newGossipInstanceWithOnlyPull(portPrefix int, id int, maxMsgCount int, boot ...int) Gossip { port := id + portPrefix conf := &Config{ - BindPort: port, - BootstrapPeers: bootPeers(boot...), - ID: fmt.Sprintf("p%d", id), - MaxMessageCountToStore: maxMsgCount, - MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond, + BindPort: port, + BootstrapPeers: bootPeers(portPrefix, boot...), + ID: fmt.Sprintf("p%d", id), + MaxBlockCountToStore: maxMsgCount, + MaxPropagationBurstLatency: time.Duration(1000) * time.Millisecond, MaxPropagationBurstSize: 10, PropagateIterations: 0, PropagatePeerNum: 0, @@ -134,21 +170,24 @@ func newGossipInstanceWithOnlyPull(id int, maxMsgCount int, boot ...int) Gossip PullPeerNum: 20, SelfEndpoint: fmt.Sprintf("localhost:%d", port), PublishCertPeriod: time.Duration(0) * time.Second, + PublishStateInfoInterval: time.Duration(1) * time.Second, + RequestStateInfoInterval: time.Duration(1) * time.Second, } - return NewGossipServiceWithServer(conf, &naiveCryptoService{}, api.PeerIdentityType(conf.SelfEndpoint)) + g := NewGossipServiceWithServer(conf, &orgCryptoService{}, &naiveCryptoService{}, api.PeerIdentityType(conf.SelfEndpoint)) + return g } func TestPull(t *testing.T) { + t.Parallel() + + portPrefix := 5610 t1 := time.Now() // Scenario: Turn off forwarding and use only pull-based gossip. // First phase: Ensure full membership view for all nodes // Second phase: Disseminate 10 messages and ensure all nodes got them - testLock.Lock() - defer testLock.Unlock() - - shortenedWaitTime := time.Duration(500) * time.Millisecond - algo.SetDigestWaitTime(shortenedWaitTime / 5) + shortenedWaitTime := time.Duration(200) * time.Millisecond + algo.SetDigestWaitTime(shortenedWaitTime) algo.SetRequestWaitTime(shortenedWaitTime) algo.SetResponseWaitTime(shortenedWaitTime) @@ -163,22 +202,26 @@ func TestPull(t *testing.T) { n := 5 msgsCount2Send := 10 - boot := newGossipInstanceWithOnlyPull(0, 100) + boot := newGossipInstanceWithOnlyPull(portPrefix, 0, 100) + boot.JoinChan(&joinChanMsg{}, common.ChainID("A")) + boot.UpdateChannelMetadata([]byte("bla bla"), common.ChainID("A")) peers := make([]Gossip, n) wg := sync.WaitGroup{} + wg.Add(n) for i := 1; i <= n; i++ { - wg.Add(1) go func(i int) { - pI := newGossipInstanceWithOnlyPull(i, 100, 0) + defer wg.Done() + pI := newGossipInstanceWithOnlyPull(portPrefix, i, 100, 0) + pI.JoinChan(&joinChanMsg{}, common.ChainID("A")) + pI.UpdateChannelMetadata([]byte{}, common.ChainID("A")) peers[i-1] = pI - wg.Done() }(i) } wg.Wait() knowAll := func() bool { for i := 1; i <= n; i++ { - neighborCount := len(peers[i-1].GetPeers()) + neighborCount := len(peers[i-1].Peers()) if n != neighborCount { return false } @@ -186,15 +229,13 @@ func TestPull(t *testing.T) { return true } - waitUntilOrFail(t, knowAll) - receivedMessages := make([]int, n) wg = sync.WaitGroup{} + wg.Add(n) for i := 1; i <= n; i++ { go func(i int) { acceptChan, _ := peers[i-1].Accept(acceptData, false) go func(index int, ch <-chan *proto.GossipMessage) { - wg.Add(1) defer wg.Done() for j := 0; j < msgsCount2Send; j++ { <-ch @@ -205,10 +246,10 @@ func TestPull(t *testing.T) { } for i := 1; i <= msgsCount2Send; i++ { - boot.Gossip(createDataMsg(uint64(i), []byte{}, "")) + boot.Gossip(createDataMsg(uint64(i), []byte{}, "", common.ChainID("A"))) } - time.Sleep(time.Duration(3) * time.Second) + waitUntilOrFail(t, knowAll) waitUntilOrFailBlocking(t, wg.Wait) receivedAll := func() bool { @@ -227,42 +268,48 @@ func TestPull(t *testing.T) { waitUntilOrFailBlocking(t, stop) - fmt.Println("Took", time.Since(t1)) + t.Log("Took", time.Since(t1)) atomic.StoreInt32(&stopped, int32(1)) - ensureGoroutineExit(t) + fmt.Println("<<>>") + testWG.Done() } func TestMembership(t *testing.T) { + t.Parallel() + portPrefix := 4610 t1 := time.Now() // Scenario: spawn 20 nodes and a single bootstrap node and then: // 1) Check full membership views for all nodes but the bootstrap node. // 2) Update metadata of last peer and ensure it propagates to all peers - testLock.Lock() - defer testLock.Unlock() stopped := int32(0) go waitForTestCompletion(&stopped, t) - n := 20 + n := 10 var lastPeer = fmt.Sprintf("localhost:%d", (n + portPrefix)) - boot := newGossipInstance(0, 100) + boot := newGossipInstance(portPrefix, 0, 100) + boot.JoinChan(&joinChanMsg{}, common.ChainID("A")) + boot.UpdateChannelMetadata([]byte{}, common.ChainID("A")) peers := make([]Gossip, n) wg := sync.WaitGroup{} + wg.Add(n) for i := 1; i <= n; i++ { - wg.Add(1) go func(i int) { - pI := newGossipInstance(i, 100, 0) + defer wg.Done() + pI := newGossipInstance(portPrefix, i, 100, 0) peers[i-1] = pI - wg.Done() + pI.JoinChan(&joinChanMsg{}, common.ChainID("A")) + pI.UpdateChannelMetadata([]byte{}, common.ChainID("A")) }(i) } waitUntilOrFailBlocking(t, wg.Wait) + t.Log("Peers started") seeAllNeighbors := func() bool { for i := 1; i <= n; i++ { - neighborCount := len(peers[i-1].GetPeers()) + neighborCount := len(peers[i-1].Peers()) if neighborCount != n { return false } @@ -270,62 +317,69 @@ func TestMembership(t *testing.T) { return true } + membershipEstablishTime := time.Now() waitUntilOrFail(t, seeAllNeighbors) + t.Log("membership established in", time.Since(membershipEstablishTime)) - fmt.Println("Updating metadata...") - + t.Log("Updating metadata...") // Change metadata in last node peers[len(peers)-1].UpdateMetadata([]byte("bla bla")) metaDataUpdated := func() bool { - if "bla bla" != string(metadataOfPeer(boot.GetPeers(), lastPeer)) { + if "bla bla" != string(metadataOfPeer(boot.Peers(), lastPeer)) { return false } for i := 0; i < n-1; i++ { - if "bla bla" != string(metadataOfPeer(peers[i].GetPeers(), lastPeer)) { + if "bla bla" != string(metadataOfPeer(peers[i].Peers(), lastPeer)) { return false } } return true } - + metadataDisseminationTime := time.Now() waitUntilOrFail(t, metaDataUpdated) + t.Log("Metadata dissemination took", time.Since(metadataDisseminationTime)) stop := func() { stopPeers(append(peers, boot)) } + stopTime := time.Now() waitUntilOrFailBlocking(t, stop) + t.Log("Stop took", time.Since(stopTime)) - fmt.Println("Took", time.Since(t1)) + t.Log("Took", time.Since(t1)) atomic.StoreInt32(&stopped, int32(1)) - - ensureGoroutineExit(t) + fmt.Println("<<>>") + testWG.Done() } func TestDissemination(t *testing.T) { + t.Parallel() + portPrefix := 3610 t1 := time.Now() // Scenario: 20 nodes and a bootstrap node. // The bootstrap node sends 10 messages and we count // that each node got 10 messages after a few seconds - testLock.Lock() - defer testLock.Unlock() stopped := int32(0) go waitForTestCompletion(&stopped, t) n := 10 msgsCount2Send := 10 - boot := newGossipInstance(0, 100) + boot := newGossipInstance(portPrefix, 0, 100) + boot.JoinChan(&joinChanMsg{}, common.ChainID("A")) + boot.UpdateChannelMetadata([]byte{}, common.ChainID("A")) peers := make([]Gossip, n) receivedMessages := make([]int, n) wg := sync.WaitGroup{} - + wg.Add(n) for i := 1; i <= n; i++ { - pI := newGossipInstance(i, 100, 0) + pI := newGossipInstance(portPrefix, i, 100, 0) peers[i-1] = pI - wg.Add(1) + pI.JoinChan(&joinChanMsg{}, common.ChainID("A")) + pI.UpdateChannelMetadata([]byte{}, common.ChainID("A")) acceptChan, _ := pI.Accept(acceptData, false) go func(index int, ch <-chan *proto.GossipMessage) { defer wg.Done() @@ -334,33 +388,60 @@ func TestDissemination(t *testing.T) { receivedMessages[index]++ } }(i-1, acceptChan) + // Change metadata in last node + if i == n { + pI.UpdateChannelMetadata([]byte("bla bla"), common.ChainID("A")) + } + } + var lastPeer = fmt.Sprintf("localhost:%d", (n + portPrefix)) + metaDataUpdated := func() bool { + if "bla bla" != string(metadataOfPeer(boot.PeersOfChannel(common.ChainID("A")), lastPeer)) { + return false + } + for i := 0; i < n-1; i++ { + if "bla bla" != string(metadataOfPeer(peers[i].PeersOfChannel(common.ChainID("A")), lastPeer)) { + return false + } + } + return true } + membershipTime := time.Now() waitUntilOrFail(t, checkPeersMembership(peers, n)) + t.Log("Membership establishment took", time.Since(membershipTime)) for i := 1; i <= msgsCount2Send; i++ { - boot.Gossip(createDataMsg(uint64(i), []byte{}, "")) + boot.Gossip(createDataMsg(uint64(i), []byte{}, "", common.ChainID("A"))) } + t2 := time.Now() waitUntilOrFailBlocking(t, wg.Wait) + t.Log("Block dissemination took", time.Since(t2)) + t2 = time.Now() + waitUntilOrFail(t, metaDataUpdated) + t.Log("Metadata dissemination took", time.Since(t2)) for i := 0; i < n; i++ { assert.Equal(t, msgsCount2Send, receivedMessages[i]) } - fmt.Println("Stopping peers") + t.Log("Stopping peers") stop := func() { stopPeers(append(peers, boot)) } + stopTime := time.Now() waitUntilOrFailBlocking(t, stop) - - fmt.Println("Took", time.Since(t1)) + t.Log("Stop took", time.Since(stopTime)) + t.Log("Took", time.Since(t1)) atomic.StoreInt32(&stopped, int32(1)) - ensureGoroutineExit(t) + fmt.Println("<<>>") + testWG.Done() } func TestMembershipConvergence(t *testing.T) { + t.Parallel() + portPrefix := 2610 // Scenario: Spawn 12 nodes and 3 bootstrap peers // but assign each node to its bootstrap peer group modulo 3. // Then: @@ -371,37 +452,35 @@ func TestMembershipConvergence(t *testing.T) { // 4)a) Ensure all nodes consider it as dead // 4)b) Ensure all node still know each other - testLock.Lock() - defer testLock.Unlock() - t1 := time.Now() stopped := int32(0) go waitForTestCompletion(&stopped, t) - boot0 := newGossipInstance(0, 100) - boot1 := newGossipInstance(1, 100) - boot2 := newGossipInstance(2, 100) + boot0 := newGossipInstance(portPrefix, 0, 100) + boot1 := newGossipInstance(portPrefix, 1, 100) + boot2 := newGossipInstance(portPrefix, 2, 100) peers := []Gossip{boot0, boot1, boot2} // 0: {3, 6, 9, 12} // 1: {4, 7, 10, 13} // 2: {5, 8, 11, 14} for i := 3; i < 15; i++ { - pI := newGossipInstance(i, 100, i%3) + pI := newGossipInstance(portPrefix, i, 100, i%3) peers = append(peers, pI) } waitUntilOrFail(t, checkPeersMembership(peers, 4)) + t.Log("Sets of peers connected successfully") - connectorPeer := newGossipInstance(15, 100, 0, 1, 2) + connectorPeer := newGossipInstance(portPrefix, 15, 100, 0, 1, 2) connectorPeer.UpdateMetadata([]byte("Connector")) fullKnowledge := func() bool { for i := 0; i < 15; i++ { - if 15 != len(peers[i].GetPeers()) { + if 15 != len(peers[i].Peers()) { return false } - if "Connector" != string(metadataOfPeer(peers[i].GetPeers(), "localhost:5625")) { + if "Connector" != string(metadataOfPeer(peers[i].Peers(), "localhost:2625")) { return false } } @@ -410,14 +489,14 @@ func TestMembershipConvergence(t *testing.T) { waitUntilOrFail(t, fullKnowledge) - fmt.Println("Stopping connector...") + t.Log("Stopping connector...") waitUntilOrFailBlocking(t, connectorPeer.Stop) - fmt.Println("Stopped") + t.Log("Stopped") time.Sleep(time.Duration(15) * time.Second) ensureForget := func() bool { for i := 0; i < 15; i++ { - if 14 != len(peers[i].GetPeers()) { + if 14 != len(peers[i].Peers()) { return false } } @@ -426,16 +505,16 @@ func TestMembershipConvergence(t *testing.T) { waitUntilOrFail(t, ensureForget) - connectorPeer = newGossipInstance(15, 100) - connectorPeer.UpdateMetadata([]byte("Connector")) - fmt.Println("Started connector") + connectorPeer = newGossipInstance(portPrefix, 15, 100) + connectorPeer.UpdateMetadata([]byte("Connector2")) + t.Log("Started connector") ensureResync := func() bool { for i := 0; i < 15; i++ { - if 15 != len(peers[i].GetPeers()) { + if 15 != len(peers[i].Peers()) { return false } - if "Connector" != string(metadataOfPeer(peers[i].GetPeers(), "localhost:5625")) { + if "Connector2" != string(metadataOfPeer(peers[i].Peers(), "localhost:2625")) { return false } } @@ -446,21 +525,205 @@ func TestMembershipConvergence(t *testing.T) { waitUntilOrFailBlocking(t, connectorPeer.Stop) - fmt.Println("Stopping peers") + t.Log("Stopping peers") stop := func() { stopPeers(peers) } waitUntilOrFailBlocking(t, stop) atomic.StoreInt32(&stopped, int32(1)) - fmt.Println("Took", time.Since(t1)) + t.Log("Took", time.Since(t1)) + fmt.Println("<<>>") + testWG.Done() +} + +func TestDataLeakage(t *testing.T) { + t.Parallel() + portPrefix := 1610 + // Scenario: spawn some nodes and let them all + // establish full membership. + // Then, have half be in channel A and half be in channel B. + // Ensure nodes only get messages of their channels. + + totalPeers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} // THIS MUST BE EVEN AND NOT ODD + + stopped := int32(0) + go waitForTestCompletion(&stopped, t) + + n := len(totalPeers) + peers := make([]Gossip, n) + wg := sync.WaitGroup{} + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + totPeers := append([]int(nil), totalPeers[:i]...) + bootPeers := append(totPeers, totalPeers[i+1:]...) + peers[i] = newGossipInstance(portPrefix, i, 100, bootPeers...) + wg.Done() + }(i) + } + + waitUntilOrFailBlocking(t, wg.Wait) + waitUntilOrFail(t, checkPeersMembership(peers, n-1)) + + channels := []common.ChainID{common.ChainID("A"), common.ChainID("B")} + + for i, channel := range channels { + for j := 0; j < (n / 2); j++ { + instanceIndex := (n/2)*i + j + peers[instanceIndex].JoinChan(&joinChanMsg{}, channel) + t.Log(instanceIndex, "joined", string(channel)) + } + } + + channelAmetadata := []byte("some metadata of channel A") + channelBmetadata := []byte("some metadata on channel B") + + peers[0].UpdateChannelMetadata(channelAmetadata, channels[0]) + peers[n/2].UpdateChannelMetadata(channelBmetadata, channels[1]) + + // Wait until all peers have at least 1 peer in the per-channel view + seeChannelMetadata := func() bool { + for i, channel := range channels { + for j := 1; j < (n / 2); j++ { + instanceIndex := (n/2)*i + j + if len(peers[instanceIndex].PeersOfChannel(channel)) == 0 { + return false + } + } + } + return true + } + t1 := time.Now() + waitUntilOrFail(t, seeChannelMetadata) + t.Log("Metadata sync took", time.Since(t1)) + + for i, channel := range channels { + for j := 1; j < (n / 2); j++ { + instanceIndex := (n/2)*i + j + assert.Len(t, peers[instanceIndex].PeersOfChannel(channel), 1) + if i == 0 { + assert.Equal(t, channelAmetadata, peers[instanceIndex].PeersOfChannel(channel)[0].Metadata) + } else { + assert.Equal(t, channelBmetadata, peers[instanceIndex].PeersOfChannel(channel)[0].Metadata) + } + } + } + + gotMessages := func() { + var wg sync.WaitGroup + wg.Add(n - 2) + for i, channel := range channels { + for j := 1; j < (n / 2); j++ { + instanceIndex := (n/2)*i + j + go func(instanceIndex int, channel common.ChainID) { + incMsgChan, _ := peers[instanceIndex].Accept(acceptData, false) + msg := <-incMsgChan + assert.Equal(t, []byte(channel), []byte(msg.Channel)) + wg.Done() + }(instanceIndex, channel) + } + } + wg.Wait() + } + + t1 = time.Now() + peers[0].Gossip(createDataMsg(1, []byte{}, "", channels[0])) + peers[n/2].Gossip(createDataMsg(2, []byte{}, "", channels[1])) + waitUntilOrFailBlocking(t, gotMessages) + t.Log("Dissemination took", time.Since(t1)) + stop := func() { + stopPeers(peers) + } + stopTime := time.Now() + waitUntilOrFailBlocking(t, stop) + t.Log("Stop took", time.Since(stopTime)) + atomic.StoreInt32(&stopped, int32(1)) + fmt.Println("<<>>") + testWG.Done() +} + +func TestDisseminateAll2All(t *testing.T) { + // Scenario: spawn some nodes, have each node + // disseminate a block to all nodes. + // Ensure all blocks are received + + t.Parallel() + portPrefix := 6610 + stopped := int32(0) + go waitForTestCompletion(&stopped, t) + + totalPeers := []int{0, 1, 2, 3, 4, 5, 6} + n := len(totalPeers) + peers := make([]Gossip, n) + wg := sync.WaitGroup{} + + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + totPeers := append([]int(nil), totalPeers[:i]...) + bootPeers := append(totPeers, totalPeers[i+1:]...) + pI := newGossipInstance(portPrefix, i, 100, bootPeers...) + pI.JoinChan(&joinChanMsg{}, common.ChainID("A")) + pI.UpdateChannelMetadata([]byte{}, common.ChainID("A")) + peers[i] = pI + wg.Done() + }(i) + } + wg.Wait() + waitUntilOrFail(t, checkPeersMembership(peers, n-1)) + + bMutex := sync.WaitGroup{} + bMutex.Add(10 * n * (n - 1)) + + wg = sync.WaitGroup{} + wg.Add(n) + + reader := func(msgChan <-chan *proto.GossipMessage, i int) { + wg.Done() + for range msgChan { + bMutex.Done() + } + } + + for i := 0; i < n; i++ { + msgChan, _ := peers[i].Accept(acceptData, false) + go reader(msgChan, i) + } + + wg.Wait() + + for i := 0; i < n; i++ { + go func(i int) { + blockStartIndex := i * 10 + for j := 0; j < 10; j++ { + blockSeq := uint64(j + blockStartIndex) + peers[i].Gossip(createDataMsg(blockSeq, []byte{}, "", common.ChainID("A"))) + } + }(i) + } + waitUntilOrFailBlocking(t, bMutex.Wait) + + stop := func() { + stopPeers(peers) + } + waitUntilOrFailBlocking(t, stop) + atomic.StoreInt32(&stopped, int32(1)) + fmt.Println("<<>>") + testWG.Done() +} + +func TestEndedGoroutines(t *testing.T) { + t.Parallel() + testWG.Wait() ensureGoroutineExit(t) } -func createDataMsg(seqnum uint64, data []byte, hash string) *proto.GossipMessage { +func createDataMsg(seqnum uint64, data []byte, hash string, channel common.ChainID) *proto.GossipMessage { return &proto.GossipMessage{ - Nonce: 0, - Tag: proto.GossipMessage_EMPTY, + Channel: []byte(channel), + Nonce: 0, + Tag: proto.GossipMessage_CHAN_AND_ORG, Content: &proto.GossipMessage_DataMsg{ DataMsg: &proto.DataMessage{ Payload: &proto.Payload{ @@ -481,6 +744,10 @@ var waitForTestCompl = func(g goroutine) bool { return searchInStackTrace("waitForTestCompletion", g.stack) } +var gossipTest = func(g goroutine) bool { + return searchInStackTrace("gossip_test.go", g.stack) +} + var goExit = func(g goroutine) bool { return searchInStackTrace("runtime.goexit", g.stack) } @@ -494,7 +761,7 @@ var testingg = func(g goroutine) bool { } func shouldNotBeRunningAtEnd(gr goroutine) bool { - return !runTests(gr) && !goExit(gr) && !testingg(gr) && !clientConn(gr) && !waitForTestCompl(gr) + return !runTests(gr) && !goExit(gr) && !testingg(gr) && !waitForTestCompl(gr) && !gossipTest(gr) && !clientConn(gr) } func ensureGoroutineExit(t *testing.T) { @@ -504,17 +771,11 @@ func ensureGoroutineExit(t *testing.T) { for _, gr := range getGoRoutines() { if shouldNotBeRunningAtEnd(gr) { allEnded = false - continue } if shouldNotBeRunningAtEnd(gr) && i == 20 { assert.Fail(t, "Goroutine(s) haven't ended:", fmt.Sprintf("%v", gr.stack)) - for _, gr2 := range getGoRoutines() { - for _, ste := range gr2.stack { - t.Log(ste) - } - t.Log("") - } + util.PrintStackTrace() break } } @@ -535,7 +796,7 @@ func metadataOfPeer(members []discovery.NetworkMember, endpoint string) []byte { } func waitForTestCompletion(stopFlag *int32, t *testing.T) { - time.Sleep(perTestTimeout) + time.Sleep(timeout) if atomic.LoadInt32(stopFlag) == int32(1) { return } @@ -545,8 +806,8 @@ func waitForTestCompletion(stopFlag *int32, t *testing.T) { func stopPeers(peers []Gossip) { stoppingWg := sync.WaitGroup{} + stoppingWg.Add(len(peers)) for i, pI := range peers { - stoppingWg.Add(1) go func(i int, p_i Gossip) { defer stoppingWg.Done() p_i.Stop() @@ -592,12 +853,12 @@ type goroutine struct { func waitUntilOrFail(t *testing.T, pred func() bool) { start := time.Now() - limit := start.UnixNano() + individualTimeout.Nanoseconds() + limit := start.UnixNano() + timeout.Nanoseconds() for time.Now().UnixNano() < limit { if pred() { return } - time.Sleep(individualTimeout / 60) + time.Sleep(timeout / 60) } util.PrintStackTrace() assert.Fail(t, "Timeout expired!") @@ -610,7 +871,7 @@ func waitUntilOrFailBlocking(t *testing.T, f func()) { successChan <- struct{}{} }() select { - case <-time.NewTimer(individualTimeout).C: + case <-time.NewTimer(timeout).C: break case <-successChan: return @@ -631,7 +892,7 @@ func searchInStackTrace(searchTerm string, stack []string) bool { func checkPeersMembership(peers []Gossip, n int) func() bool { return func() bool { for _, peer := range peers { - if len(peer.GetPeers()) != n { + if len(peer.Peers()) != n { return false } } diff --git a/gossip/integration/integration.go b/gossip/integration/integration.go index 2397fcbadc0..0419bb267c2 100644 --- a/gossip/integration/integration.go +++ b/gossip/integration/integration.go @@ -31,6 +31,11 @@ import ( // This file is used to bootstrap a gossip instance for integration/demo purposes ONLY +// TODO: This is a temporary fix to make gossip multi-channel work +// because we don't support cross-organization gossip yet. +// this will be removed once we support gossip across orgs. +var orgId = []byte("ORG1") + func newConfig(selfEndpoint string, bootPeers ...string) *gossip.Config { port, err := strconv.ParseInt(strings.Split(selfEndpoint, ":")[1], 10, 64) if err != nil { @@ -40,7 +45,7 @@ func newConfig(selfEndpoint string, bootPeers ...string) *gossip.Config { BindPort: int(port), BootstrapPeers: bootPeers, ID: selfEndpoint, - MaxMessageCountToStore: 100, + MaxBlockCountToStore: 100, MaxPropagationBurstLatency: time.Millisecond * 50, MaxPropagationBurstSize: 3, PropagateIterations: 1, @@ -48,14 +53,15 @@ func newConfig(selfEndpoint string, bootPeers ...string) *gossip.Config { PullInterval: time.Second * 5, PullPeerNum: 3, SelfEndpoint: selfEndpoint, - PublishCertPeriod: time.Duration(time.Second * 4), + PublishCertPeriod: time.Duration(4) * time.Second, + RequestStateInfoInterval: time.Duration(4) * time.Second, } } // NewGossipComponent creates a gossip component that attaches itself to the given gRPC server func NewGossipComponent(endpoint string, s *grpc.Server, dialOpts []grpc.DialOption, bootPeers ...string) gossip.Gossip { conf := newConfig(endpoint, bootPeers...) - return gossip.NewGossipService(conf, s, &naiveCryptoService{}, []byte(endpoint), dialOpts...) + return gossip.NewGossipService(conf, s, &orgCryptoService{}, &naiveCryptoService{}, []byte(endpoint), dialOpts...) } type naiveCryptoService struct { @@ -91,3 +97,19 @@ func (cs *naiveCryptoService) Verify(vkID api.PeerIdentityType, signature, messa } return nil } + +type orgCryptoService struct { + +} + +// OrgByPeerIdentity returns the OrgIdentityType +// of a given peer identity +func (*orgCryptoService) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType { + return orgId +} + +// Verify verifies a JoinChannelMessage, returns nil on success, +// and an error on failure +func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error { + return nil +} \ No newline at end of file diff --git a/gossip/integration/integration_test.go b/gossip/integration/integration_test.go index 56c08204ef2..93b2b825450 100644 --- a/gossip/integration/integration_test.go +++ b/gossip/integration/integration_test.go @@ -47,8 +47,8 @@ func TestNewGossipCryptoService(t *testing.T) { go s3.Serve(ll3) time.Sleep(time.Second * 5) - fmt.Println(g1.GetPeers()) - fmt.Println(g2.GetPeers()) - fmt.Println(g3.GetPeers()) + fmt.Println(g1.Peers()) + fmt.Println(g2.Peers()) + fmt.Println(g3.Peers()) time.Sleep(time.Second) } diff --git a/gossip/proto/extensions.go b/gossip/proto/extensions.go index 8340e100825..4f3820009e2 100644 --- a/gossip/proto/extensions.go +++ b/gossip/proto/extensions.go @@ -143,7 +143,7 @@ func (m *GossipMessage) IsStateInfoMsg() bool { return m.GetStateInfo() != nil } -// IsPullMsg returns whether this GossipMessage is a message that has belongs +// IsPullMsg returns whether this GossipMessage is a message that belongs // to the pull mechanism func (m *GossipMessage) IsPullMsg() bool { return m.GetDataReq() != nil || m.GetDataUpdate() != nil || diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 77ac79141d3..b23307b95f9 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -21,9 +21,8 @@ import ( peerComm "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/core/committer" - "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/api" gossipCommon "github.com/hyperledger/fabric/gossip/common" - "github.com/hyperledger/fabric/gossip/discovery" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/integration" "github.com/hyperledger/fabric/gossip/proto" @@ -39,6 +38,8 @@ var ( once sync.Once ) +type gossipSvc gossip.Gossip + // GossipService encapsulates gossip and state capabilities into single interface type GossipService interface { gossip.Gossip @@ -52,11 +53,16 @@ type GossipService interface { } type gossipServiceImpl struct { - gossip gossip.Gossip + gossipSvc chains map[string]state.GossipStateProvider lock sync.RWMutex } +// JoinChan makes the Gossip instance join a channel +func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID gossipCommon.ChainID) { + // TODO: eventually we'll have only 1 JoinChannel method +} + var logger = logging.MustGetLogger("gossipService") // InitGossipService initialize gossip service @@ -72,8 +78,8 @@ func InitGossipService(endpoint string, s *grpc.Server, bootPeers ...string) { gossip := integration.NewGossipComponent(endpoint, s, dialOpts, bootPeers...) gossipServiceInstance = &gossipServiceImpl{ - gossip: gossip, - chains: make(map[string]state.GossipStateProvider), + gossipSvc: gossip, + chains: make(map[string]state.GossipStateProvider), } }) } @@ -83,11 +89,6 @@ func GetGossipService() GossipService { return gossipServiceInstance } -// Send sends a message to remote peers -func (g *gossipServiceImpl) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) { - g.gossip.Send(msg, peers...) -} - // JoinChannel joins the channel and initialize gossip state with given committer func (g *gossipServiceImpl) JoinChannel(commiter committer.Committer, block *common.Block) error { g.lock.Lock() @@ -98,32 +99,12 @@ func (g *gossipServiceImpl) JoinChannel(commiter committer.Committer, block *com } else { // Initialize new state provider for given committer logger.Debug("Creating state provider for chainID", chainID) - g.chains[chainID] = state.NewGossipStateProvider(g.gossip, commiter) + g.chains[chainID] = state.NewGossipStateProvider(g, commiter) } return nil } -// GetPeers returns a mapping of endpoint --> []discovery.NetworkMember -func (g *gossipServiceImpl) GetPeers() []discovery.NetworkMember { - return g.gossip.GetPeers() -} - -// UpdateMetadata updates the self metadata of the discovery layer -func (g *gossipServiceImpl) UpdateMetadata(data []byte) { - g.gossip.UpdateMetadata(data) -} - -// Gossip sends a message to other peers to the network -func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) { - g.gossip.Gossip(msg) -} - -// Accept returns a channel that outputs messages from other peers -func (g *gossipServiceImpl) Accept(acceptor gossipCommon.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan comm.ReceivedMessage) { - return g.gossip.Accept(acceptor, false) -} - // GetBlock returns block for given chain func (g *gossipServiceImpl) GetBlock(chainID string, index uint64) *common.Block { g.lock.RLock() @@ -141,7 +122,8 @@ func (g *gossipServiceImpl) AddPayload(chainID string, payload *proto.Payload) e // Stop stops the gossip component func (g *gossipServiceImpl) Stop() { for _, ch := range g.chains { + logger.Info("Stopping chain", ch) ch.Stop() } - g.gossip.Stop() + g.gossipSvc.Stop() } diff --git a/gossip/state/state.go b/gossip/state/state.go index 2a13021c98a..74379b348ba 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -302,7 +302,7 @@ func (s *GossipStateProviderImpl) antiEntropy() { current, _ := s.committer.LedgerHeight() max, _ := s.committer.LedgerHeight() - for _, p := range s.gossip.GetPeers() { + for _, p := range s.gossip.Peers() { if state, err := FromBytes(p.Metadata); err == nil { if max < state.LedgerHeight { max = state.LedgerHeight @@ -328,7 +328,7 @@ func (s *GossipStateProviderImpl) antiEntropy() { func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) { var peers []*comm.RemotePeer // Filtering peers which might have relevant blocks - for _, value := range s.gossip.GetPeers() { + for _, value := range s.gossip.Peers() { nodeMetadata, err := FromBytes(value.Metadata) if err == nil { if nodeMetadata.LedgerHeight >= end { diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index 9f0b61f38a2..f6df150aaa4 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -40,6 +40,24 @@ var ( logger, _ = logging.GetLogger("GossipStateProviderTest") ) +var orgId = []byte("ORG1") + +type orgCryptoService struct { + +} + +// OrgByPeerIdentity returns the OrgIdentityType +// of a given peer identity +func (*orgCryptoService) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType { + return orgId +} + +// Verify verifies a JoinChannelMessage, returns nil on success, +// and an error on failure +func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error { + return nil +} + type naiveCryptoService struct { } @@ -105,7 +123,7 @@ func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { BindPort: port, BootstrapPeers: bootPeers(boot...), ID: fmt.Sprintf("p%d", id), - MaxMessageCountToStore: maxMsgCount, + MaxBlockCountToStore: maxMsgCount, MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond, MaxPropagationBurstSize: 10, PropagateIterations: 1, @@ -114,12 +132,14 @@ func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { PullPeerNum: 5, SelfEndpoint: fmt.Sprintf("localhost:%d", port), PublishCertPeriod: 10 * time.Second, + RequestStateInfoInterval: 4 * time.Second, + PublishStateInfoInterval: 4 * time.Second, } } // Create gossip instance func newGossipInstance(config *gossip.Config) gossip.Gossip { - return gossip.NewGossipServiceWithServer(config, &naiveCryptoService{}, []byte(config.SelfEndpoint)) + return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, &naiveCryptoService{}, []byte(config.SelfEndpoint)) } // Create new instance of KVLedger to be used for testing @@ -270,7 +290,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { waitUntilTrueOrTimeout(t, func() bool { for _, p := range peersSet { - if len(p.g.GetPeers()) != bootstrapSetSize+standartPeersSize-1 { + if len(p.g.Peers()) != bootstrapSetSize+standartPeersSize-1 { logger.Debug("[XXXXXXX]: Peer discovery has not finished yet") return false }