diff --git a/gossip/comm/comm.go b/gossip/comm/comm.go index b06192c4ecc..d69eac19345 100644 --- a/gossip/comm/comm.go +++ b/gossip/comm/comm.go @@ -19,8 +19,8 @@ package comm import ( "fmt" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/proto" - "github.com/hyperledger/fabric/gossip/util" ) // Comm is an object that enables to communicate with other peers @@ -28,20 +28,20 @@ import ( type Comm interface { // GetPKIid returns this instance's PKI id - GetPKIid() PKIidType + GetPKIid() common.PKIidType // Send sends a message to remote peers Send(msg *proto.GossipMessage, peers ...*RemotePeer) // Probe probes a remote node and returns nil if its responsive - Probe(endpoint string, pkiID PKIidType) error + Probe(endpoint string, pkiID common.PKIidType) error // Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate. // Each message from the channel can be used to send a reply back to the sender - Accept(util.MessageAcceptor) <-chan ReceivedMessage + Accept(common.MessageAcceptor) <-chan ReceivedMessage // PresumedDead returns a read-only channel for node endpoints that are suspected to be offline - PresumedDead() <-chan PKIidType + PresumedDead() <-chan common.PKIidType // CloseConn closes a connection to a certain endpoint CloseConn(peer *RemotePeer) @@ -50,17 +50,13 @@ type Comm interface { Stop() // BlackListPKIid prohibits the module communicating with the given PKIid - BlackListPKIid(PKIid PKIidType) + BlackListPKIid(PKIid common.PKIidType) } -// PKIidType defines the type that holds the PKI-id -// which is the security identifier of a peer -type PKIidType []byte - // RemotePeer defines a peer's endpoint and its PKIid type RemotePeer struct { Endpoint string - PKIID PKIidType + PKIID common.PKIidType } // String converts a RemotePeer to a string diff --git a/gossip/comm/comm_impl.go b/gossip/comm/comm_impl.go index e7129ee4971..31fd7276e98 100644 --- a/gossip/comm/comm_impl.go +++ b/gossip/comm/comm_impl.go @@ -28,6 +28,7 @@ import ( "crypto/tls" "os" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/proto" "github.com/hyperledger/fabric/gossip/util" "github.com/op/go-logging" @@ -62,7 +63,7 @@ func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) { } // NewCommInstanceWithServer creates a comm instance that creates an underlying gRPC server -func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID PKIidType, dialOpts ...grpc.DialOption) (Comm, error) { +func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID common.PKIidType, dialOpts ...grpc.DialOption) (Comm, error) { var ll net.Listener var s *grpc.Server var secOpt grpc.DialOption @@ -86,11 +87,11 @@ func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID PKIidType, d gSrv: s, msgPublisher: NewChannelDemultiplexer(), lock: &sync.RWMutex{}, - deadEndpoints: make(chan PKIidType, 100), + deadEndpoints: make(chan common.PKIidType, 100), stopping: int32(0), exitChan: make(chan struct{}, 1), subscriptions: make([]chan ReceivedMessage, 0), - blackListedPKIIDs: make([]PKIidType, 0), + blackListedPKIIDs: make([]common.PKIidType, 0), } commInst.connStore = newConnStore(commInst, pkID, commInst.logger) @@ -112,7 +113,7 @@ func NewCommInstanceWithServer(port int, sec SecurityProvider, pkID PKIidType, d } // NewCommInstance creates a new comm instance that binds itself to the given gRPC server -func NewCommInstance(s *grpc.Server, sec SecurityProvider, PKIID PKIidType, dialOpts ...grpc.DialOption) (Comm, error) { +func NewCommInstance(s *grpc.Server, sec SecurityProvider, PKIID common.PKIidType, dialOpts ...grpc.DialOption) (Comm, error) { commInst, err := NewCommInstanceWithServer(-1, sec, PKIID) if err != nil { return nil, err @@ -128,7 +129,7 @@ type commImpl struct { connStore *connectionStore PKIID []byte port int - deadEndpoints chan PKIidType + deadEndpoints chan common.PKIidType msgPublisher *ChannelDeMultiplexer lock *sync.RWMutex lsnr net.Listener @@ -137,10 +138,10 @@ type commImpl struct { stopping int32 stopWG sync.WaitGroup subscriptions []chan ReceivedMessage - blackListedPKIIDs []PKIidType + blackListedPKIIDs []common.PKIidType } -func (c *commImpl) createConnection(endpoint string, expectedPKIID PKIidType) (*connection, error) { +func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) { c.logger.Debug("Entering", endpoint, expectedPKIID) defer c.logger.Debug("Exiting") if c.isStopping() { @@ -204,7 +205,7 @@ func (c *commImpl) Send(msg *proto.GossipMessage, peers ...*RemotePeer) { } } -func (c *commImpl) BlackListPKIid(PKIID PKIidType) { +func (c *commImpl) BlackListPKIid(PKIID common.PKIidType) { c.logger.Info("Entering", PKIID) defer c.logger.Info("Exiting") c.lock.Lock() @@ -213,7 +214,7 @@ func (c *commImpl) BlackListPKIid(PKIID PKIidType) { c.blackListedPKIIDs = append(c.blackListedPKIIDs, PKIID) } -func (c *commImpl) isPKIblackListed(p PKIidType) bool { +func (c *commImpl) isPKIblackListed(p common.PKIidType) bool { c.lock.RLock() defer c.lock.RUnlock() for _, pki := range c.blackListedPKIIDs { @@ -251,7 +252,7 @@ func (c *commImpl) isStopping() bool { return atomic.LoadInt32(&c.stopping) == int32(1) } -func (c *commImpl) Probe(endpoint string, pkiID PKIidType) error { +func (c *commImpl) Probe(endpoint string, pkiID common.PKIidType) error { if c.isStopping() { return fmt.Errorf("Stopping!") } @@ -274,7 +275,7 @@ func (c *commImpl) Probe(endpoint string, pkiID PKIidType) error { return err } -func (c *commImpl) Accept(acceptor util.MessageAcceptor) <-chan ReceivedMessage { +func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan ReceivedMessage { genericChan := c.msgPublisher.AddChannel(acceptor) specificChan := make(chan ReceivedMessage, 10) @@ -311,7 +312,7 @@ func (c *commImpl) Accept(acceptor util.MessageAcceptor) <-chan ReceivedMessage return specificChan } -func (c *commImpl) PresumedDead() <-chan PKIidType { +func (c *commImpl) PresumedDead() <-chan common.PKIidType { return c.deadEndpoints } @@ -351,7 +352,7 @@ func (c *commImpl) Stop() { c.stopWG.Wait() } -func (c *commImpl) GetPKIid() PKIidType { +func (c *commImpl) GetPKIid() common.PKIidType { return c.PKIID } @@ -366,7 +367,7 @@ func extractRemoteAddress(stream stream) string { return remoteAddress } -func (c *commImpl) authenticateRemotePeer(stream stream) (PKIidType, error) { +func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, error) { ctx := stream.Context() remoteAddress := extractRemoteAddress(stream) tlsUnique := ExtractTLSUnique(ctx) @@ -455,7 +456,7 @@ func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error) { return &proto.Empty{}, nil } -func (c *commImpl) disconnect(pkiID PKIidType) { +func (c *commImpl) disconnect(pkiID common.PKIidType) { if c.isStopping() { return } @@ -485,7 +486,7 @@ func readWithTimeout(stream interface{}, timeout time.Duration) *proto.GossipMes } } -func createConnectionMsg(pkiID PKIidType, sig []byte) *proto.GossipMessage { +func createConnectionMsg(pkiID common.PKIidType, sig []byte) *proto.GossipMessage { return &proto.GossipMessage{ Nonce: 0, Content: &proto.GossipMessage_Conn{ diff --git a/gossip/comm/comm_test.go b/gossip/comm/comm_test.go index cc646f77705..0df31ee27f8 100644 --- a/gossip/comm/comm_test.go +++ b/gossip/comm/comm_test.go @@ -31,6 +31,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "github.com/hyperledger/fabric/gossip/common" ) func init() { @@ -91,7 +92,7 @@ func TestHandshake(t *testing.T) { clientTLSUnique := ExtractTLSUnique(stream.Context()) sig, err := naiveSec.Sign(clientTLSUnique) assert.NoError(t, err, "%v", err) - msg := createConnectionMsg(PKIidType("localhost:9610"), sig) + msg := createConnectionMsg(common.PKIidType("localhost:9610"), sig) stream.Send(msg) msg, err = stream.Recv() assert.NoError(t, err, "%v", err) @@ -150,7 +151,7 @@ func TestHandshake(t *testing.T) { } else { sig[0] = 0 } - msg = createConnectionMsg(PKIidType("localhost:9612"), sig) + msg = createConnectionMsg(common.PKIidType("localhost:9612"), sig) stream.Send(msg) msg, err = stream.Recv() assert.Equal(t, []byte("localhost:9611"), msg.GetConn().PkiID) diff --git a/gossip/comm/conn.go b/gossip/comm/conn.go index 0fe72a48d28..6a42772547e 100644 --- a/gossip/comm/conn.go +++ b/gossip/comm/conn.go @@ -24,17 +24,18 @@ import ( "github.com/hyperledger/fabric/gossip/proto" "github.com/hyperledger/fabric/gossip/util" "google.golang.org/grpc" + "github.com/hyperledger/fabric/gossip/common" ) type handler func(*proto.GossipMessage) type connFactory interface { - createConnection(endpoint string, pkiID PKIidType) (*connection, error) + createConnection(endpoint string, pkiID common.PKIidType) (*connection, error) } type connectionStore struct { logger *util.Logger // logger - selfPKIid PKIidType // pkiID of this peer + selfPKIid common.PKIidType // pkiID of this peer isClosing bool // whether this connection store is shutting down connFactory connFactory // creates a connection to remote peer sync.RWMutex // synchronize access to shared variables @@ -43,7 +44,7 @@ type connectionStore struct { // used to prevent concurrent connection establishment to the same remote endpoint } -func newConnStore(connFactory connFactory, pkiID PKIidType, logger *util.Logger) *connectionStore { +func newConnStore(connFactory connFactory, pkiID common.PKIidType, logger *util.Logger) *connectionStore { return &connectionStore{ connFactory: connFactory, isClosing: false, @@ -154,7 +155,7 @@ func (cs *connectionStore) shutdown() { wg.Wait() } -func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, pkiID PKIidType) *connection { +func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, pkiID common.PKIidType) *connection { cs.Lock() defer cs.Unlock() @@ -165,7 +166,7 @@ func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamSer return cs.registerConn(pkiID, serverStream) } -func (cs *connectionStore) registerConn(pkiID PKIidType, serverStream proto.Gossip_GossipStreamServer) *connection { +func (cs *connectionStore) registerConn(pkiID common.PKIidType, serverStream proto.Gossip_GossipStreamServer) *connection { conn := newConnection(nil, nil, nil, serverStream) conn.pkiID = pkiID conn.logger = cs.logger @@ -173,7 +174,7 @@ func (cs *connectionStore) registerConn(pkiID PKIidType, serverStream proto.Goss return conn } -func (cs *connectionStore) closeByPKIid(pkiID PKIidType) { +func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) { cs.Lock() defer cs.Unlock() if conn, exists := cs.pki2Conn[string(pkiID)]; exists { @@ -199,7 +200,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go type connection struct { outBuff chan *msgSending logger *util.Logger // logger - pkiID PKIidType // pkiID of the remote endpoint + pkiID common.PKIidType // pkiID of the remote endpoint handler handler // function to invoke upon a message reception conn *grpc.ClientConn // gRPC connection to remote endpoint cl proto.GossipClient // gRPC stub of remote endpoint diff --git a/gossip/comm/demux.go b/gossip/comm/demux.go index b6976d7fb20..c8ab5e392dd 100644 --- a/gossip/comm/demux.go +++ b/gossip/comm/demux.go @@ -20,7 +20,7 @@ import ( "sync" "sync/atomic" - "github.com/hyperledger/fabric/gossip/util" + "github.com/hyperledger/fabric/gossip/common" ) // ChannelDeMultiplexer is a struct that can receive channel registrations (AddChannel) @@ -42,7 +42,7 @@ func NewChannelDemultiplexer() *ChannelDeMultiplexer { } type channel struct { - pred util.MessageAcceptor + pred common.MessageAcceptor ch chan interface{} } @@ -66,7 +66,7 @@ func (m *ChannelDeMultiplexer) Close() { } // AddChannel registers a channel with a certain predicate -func (m *ChannelDeMultiplexer) AddChannel(predicate util.MessageAcceptor) chan interface{} { +func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} { m.lock.Lock() defer m.lock.Unlock() ch := &channel{ch: make(chan interface{}, 10), pred: predicate} diff --git a/gossip/common/common.go b/gossip/common/common.go new file mode 100644 index 00000000000..0d90c5e0fab --- /dev/null +++ b/gossip/common/common.go @@ -0,0 +1,23 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +// PKIidType defines the type that holds the PKI-id +// which is the security identifier of a peer +type PKIidType []byte + +type MessageAcceptor func(interface{}) bool diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go index 914db582a85..9fe1080d9a4 100644 --- a/gossip/discovery/discovery.go +++ b/gossip/discovery/discovery.go @@ -16,7 +16,10 @@ limitations under the License. package discovery -import "github.com/hyperledger/fabric/gossip/proto" +import ( + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/common" +) // CryptoService is an interface that the discovery expects to be implemented and passed on creation type CryptoService interface { @@ -43,20 +46,17 @@ type CommService interface { Accept() <-chan *proto.GossipMessage // PresumedDead returns a read-only channel for peers that are presumed to be dead - PresumedDead() <-chan PKIidType + PresumedDead() <-chan common.PKIidType // CloseConn orders to close the connection with a certain peer CloseConn(peer *NetworkMember) } -// PKIidType represents a peer's security identity -type PKIidType []byte - // NetworkMember is a peer's representation type NetworkMember struct { Endpoint string Metadata []byte - PKIid PKIidType + PKIid common.PKIidType } // Discovery is the interface that represents a discovery module diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index c78c67d901a..3f0cbe3f3cd 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -26,6 +26,7 @@ import ( "github.com/hyperledger/fabric/gossip/proto" "github.com/hyperledger/fabric/gossip/util" "github.com/op/go-logging" + "github.com/hyperledger/fabric/gossip/common" ) const defaultHelloInterval = time.Duration(5) * time.Second @@ -70,7 +71,7 @@ func (ts *timestamp) String() string { } type gossipDiscoveryImpl struct { - pkiID PKIidType + pkiID common.PKIidType endpoint string incTime uint64 metadata []byte @@ -189,7 +190,7 @@ func (d *gossipDiscoveryImpl) handlePresumedDeadPeers() { select { case deadPeer := <-d.comm.PresumedDead(): if d.isAlive(deadPeer) { - d.expireDeadMembers([]PKIidType{deadPeer}) + d.expireDeadMembers([]common.PKIidType{deadPeer}) } break case s := <-d.toDieChan: @@ -199,7 +200,7 @@ func (d *gossipDiscoveryImpl) handlePresumedDeadPeers() { } } -func (d *gossipDiscoveryImpl) isAlive(pkiID PKIidType) bool { +func (d *gossipDiscoveryImpl) isAlive(pkiID common.PKIidType) bool { d.lock.RLock() defer d.lock.RUnlock() _, alive := d.aliveLastTS[string(pkiID)] @@ -467,7 +468,7 @@ func (d *gossipDiscoveryImpl) getKnownPeers() [][]byte { peers := [][]byte{} for id := range d.id2Member { - peers = append(peers, PKIidType(id)) + peers = append(peers, common.PKIidType(id)) } return peers } @@ -496,7 +497,7 @@ func (d *gossipDiscoveryImpl) periodicalCheckAlive() { } } -func (d *gossipDiscoveryImpl) expireDeadMembers(dead []PKIidType) { +func (d *gossipDiscoveryImpl) expireDeadMembers(dead []common.PKIidType) { d.logger.Warning("Entering", dead) defer d.logger.Warning("Exiting") @@ -536,16 +537,16 @@ func (d *gossipDiscoveryImpl) expireDeadMembers(dead []PKIidType) { } } -func (d *gossipDiscoveryImpl) getDeadMembers() []PKIidType { +func (d *gossipDiscoveryImpl) getDeadMembers() []common.PKIidType { d.lock.RLock() defer d.lock.RUnlock() - dead := []PKIidType{} + dead := []common.PKIidType{} for id, last := range d.aliveLastTS { elapsedNonAliveTime := time.Since(last.lastSeen) if elapsedNonAliveTime.Nanoseconds() > aliveExpirationTimeout.Nanoseconds() { d.logger.Warning("Haven't heard from", id, "for", elapsedNonAliveTime) - dead = append(dead, PKIidType(id)) + dead = append(dead, common.PKIidType(id)) } } return dead @@ -730,7 +731,7 @@ func (d *gossipDiscoveryImpl) Stop() { d.toDieChan <- struct{}{} } -func equalPKIid(a, b PKIidType) bool { +func equalPKIid(a, b common.PKIidType) bool { return bytes.Equal(a, b) } diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index 6c1cec90c2f..c4314390d9a 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -30,13 +30,14 @@ import ( "github.com/stretchr/testify/assert" "golang.org/x/net/context" "google.golang.org/grpc" + "github.com/hyperledger/fabric/gossip/common" ) var timeout = time.Second * time.Duration(15) type dummyCommModule struct { id string - presumeDead chan PKIidType + presumeDead chan common.PKIidType detectedDead chan string streams map[string]proto.Gossip_GossipStreamClient conns map[string]*grpc.ClientConn @@ -125,7 +126,7 @@ func (comm *dummyCommModule) Accept() <-chan *proto.GossipMessage { return comm.incMsgs } -func (comm *dummyCommModule) PresumedDead() <-chan PKIidType { +func (comm *dummyCommModule) PresumedDead() <-chan common.PKIidType { return comm.presumeDead } @@ -221,7 +222,7 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st conns: make(map[string]*grpc.ClientConn), streams: make(map[string]proto.Gossip_GossipStreamClient), incMsgs: make(chan *proto.GossipMessage, 1000), - presumeDead: make(chan PKIidType, 10000), + presumeDead: make(chan common.PKIidType, 10000), id: id, detectedDead: make(chan string, 10000), lock: &sync.RWMutex{}, diff --git a/gossip/gossip/gossip.go b/gossip/gossip/gossip.go index a115ac1256c..41136168c42 100644 --- a/gossip/gossip/gossip.go +++ b/gossip/gossip/gossip.go @@ -19,9 +19,9 @@ package gossip import ( "time" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" "github.com/hyperledger/fabric/gossip/proto" - "github.com/hyperledger/fabric/gossip/util" ) // Gossip is the interface of the gossip component @@ -37,7 +37,7 @@ type Gossip interface { Gossip(msg *proto.GossipMessage) // Accept returns a channel that outputs messages from other peers - Accept(util.MessageAcceptor) <-chan *proto.GossipMessage + Accept(common.MessageAcceptor) <-chan *proto.GossipMessage // Stop stops the gossip component Stop() @@ -45,18 +45,18 @@ type Gossip interface { // Config is the configuration of the gossip component type Config struct { - BindPort int - ID string - SelfEndpoint string - BootstrapPeers []string - PropagateIterations int - PropagatePeerNum int + BindPort int + ID string + SelfEndpoint string + BootstrapPeers []string + PropagateIterations int + PropagatePeerNum int - MaxMessageCountToStore int + MaxMessageCountToStore int MaxPropagationBurstSize int MaxPropagationBurstLatency time.Duration - PullInterval time.Duration - PullPeerNum int + PullInterval time.Duration + PullPeerNum int } diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index ccb69a42de3..752118765cc 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -23,6 +23,7 @@ import ( "time" "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" "github.com/hyperledger/fabric/gossip/gossip/algo" "github.com/hyperledger/fabric/gossip/proto" @@ -31,7 +32,7 @@ import ( ) type gossipServiceImpl struct { - presumedDead chan discovery.PKIidType + presumedDead chan common.PKIidType disc discovery.Discovery comm comm.Comm *comm.ChannelDeMultiplexer @@ -50,7 +51,7 @@ type gossipServiceImpl struct { // NewGossipService creates a new gossip instance func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) Gossip { g := &gossipServiceImpl{ - presumedDead: make(chan discovery.PKIidType, 100), + presumedDead: make(chan common.PKIidType, 100), disc: nil, comm: c, conf: conf, @@ -69,7 +70,7 @@ func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) g.discAdapter = g.newDiscoveryAdapter() g.disc = discovery.NewDiscoveryService(conf.BootstrapPeers, discovery.NetworkMember{ - Endpoint: conf.SelfEndpoint, PKIid: discovery.PKIidType(g.comm.GetPKIid()), Metadata: []byte{}, + Endpoint: conf.SelfEndpoint, PKIid: g.comm.GetPKIid(), Metadata: []byte{}, }, g.discAdapter, crypto) g.pushPull = algo.NewPullEngine(g, conf.PullInterval) @@ -156,7 +157,7 @@ func (g *gossipServiceImpl) handlePresumedDead() { g.toDieChan <- s return case deadEndpoint := <-g.comm.PresumedDead(): - g.presumedDead <- discovery.PKIidType(deadEndpoint) + g.presumedDead <- deadEndpoint break } } @@ -441,7 +442,7 @@ func (g *gossipServiceImpl) UpdateMetadata(md []byte) { g.disc.UpdateMetadata(md) } -func (g *gossipServiceImpl) Accept(acceptor util.MessageAcceptor) <-chan *proto.GossipMessage { +func (g *gossipServiceImpl) Accept(acceptor common.MessageAcceptor) <-chan *proto.GossipMessage { inCh := g.AddChannel(acceptor) outCh := make(chan *proto.GossipMessage, 100) go func() { @@ -493,7 +494,7 @@ func (g *gossipServiceImpl) newDiscoveryAdapter() *discoveryAdapter { type discoveryAdapter struct { stopping int32 c comm.Comm - presumedDead chan discovery.PKIidType + presumedDead chan common.PKIidType incChan chan *proto.GossipMessage gossipFunc func(*proto.GossipMessage) } @@ -515,26 +516,26 @@ func (da *discoveryAdapter) SendToPeer(peer *discovery.NetworkMember, msg *proto if da.toDie() { return } - da.c.Send(msg, &comm.RemotePeer{PKIID: comm.PKIidType(peer.PKIid), Endpoint: peer.Endpoint}) + da.c.Send(msg, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.Endpoint}) } func (da *discoveryAdapter) Ping(peer *discovery.NetworkMember) bool { - return da.c.Probe(peer.Endpoint, comm.PKIidType(peer.PKIid)) == nil + return da.c.Probe(peer.Endpoint, peer.PKIid) == nil } func (da *discoveryAdapter) Accept() <-chan *proto.GossipMessage { return da.incChan } -func (da *discoveryAdapter) PresumedDead() <-chan discovery.PKIidType { +func (da *discoveryAdapter) PresumedDead() <-chan common.PKIidType { return da.presumedDead } func (da *discoveryAdapter) CloseConn(peer *discovery.NetworkMember) { - da.c.CloseConn(&comm.RemotePeer{Endpoint: peer.Endpoint, PKIID: comm.PKIidType(peer.PKIid)}) + da.c.CloseConn(&comm.RemotePeer{Endpoint: peer.Endpoint, PKIID: peer.PKIid}) } -func equalPKIIds(a, b comm.PKIidType) bool { +func equalPKIIds(a, b common.PKIidType) bool { return bytes.Equal(a, b) } @@ -543,7 +544,7 @@ func (g *gossipServiceImpl) peersWithEndpoints(endpoints ...string) []*comm.Remo for _, member := range g.disc.GetMembership() { for _, endpoint := range endpoints { if member.Endpoint == endpoint { - peers = append(peers, &comm.RemotePeer{Endpoint: member.Endpoint, PKIID: comm.PKIidType(member.PKIid)}) + peers = append(peers, &comm.RemotePeer{Endpoint: member.Endpoint, PKIID: member.PKIid}) } } } diff --git a/gossip/util/misc.go b/gossip/util/misc.go index b715d002aa1..3b5e0413f3c 100644 --- a/gossip/util/misc.go +++ b/gossip/util/misc.go @@ -40,8 +40,6 @@ func IndexInSlice(array interface{}, o interface{}, equals Equals) int { return -1 } -type MessageAcceptor func(interface{}) bool - func numbericEqual(a interface{}, b interface{}) bool { return a.(int) == b.(int) }