From d633d6f3df91d3f4f14da8d70d09582e43d6b923 Mon Sep 17 00:00:00 2001 From: grapebaba <281165273@qq.com> Date: Tue, 14 Feb 2017 11:45:18 +0800 Subject: [PATCH] [FAB-2205]Make gossip comm configuable https://jira.hyperledger.org/browse/FAB-2205 Change-Id: Ifb12f5c9f269bd2bed5ee74d9aa84dee16b4ed4a Signed-off-by: grapebaba <281165273@qq.com> --- gossip/comm/comm_impl.go | 27 +++++++++++----------- gossip/comm/comm_test.go | 36 +++++++++++++++++++++++++----- gossip/comm/conn.go | 7 +++--- gossip/integration/integration.go | 37 +++++++++---------------------- gossip/util/misc.go | 21 ++++++++++++++++++ peer/core.yaml | 8 +++++++ 6 files changed, 89 insertions(+), 47 deletions(-) diff --git a/gossip/comm/comm_impl.go b/gossip/comm/comm_impl.go index b91e492c3df..bddbb92b1af 100644 --- a/gossip/comm/comm_impl.go +++ b/gossip/comm/comm_impl.go @@ -19,6 +19,7 @@ package comm import ( "bytes" "crypto/tls" + "errors" "fmt" "math/rand" "net" @@ -33,6 +34,7 @@ import ( "github.com/hyperledger/fabric/gossip/util" proto "github.com/hyperledger/fabric/protos/gossip" "github.com/op/go-logging" + "github.com/spf13/viper" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -47,8 +49,7 @@ const ( sendOverflowErr = "Send buffer overflow" ) -var errSendOverflow = fmt.Errorf(sendOverflowErr) -var dialTimeout = defDialTimeout +var errSendOverflow = errors.New(sendOverflowErr) func init() { rand.Seed(42) @@ -56,7 +57,7 @@ func init() { // SetDialTimeout sets the dial timeout func SetDialTimeout(timeout time.Duration) { - dialTimeout = timeout + viper.Set("peer.gossip.dialTimeout", timeout) } func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) { @@ -75,7 +76,7 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity var certHash []byte if len(dialOpts) == 0 { - dialOpts = []grpc.DialOption{grpc.WithTimeout(dialTimeout)} + dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))} } if port > 0 { @@ -168,7 +169,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT defer c.logger.Debug("Exiting") if c.isStopping() { - return nil, fmt.Errorf("Stopping") + return nil, errors.New("Stopping") } cc, err = grpc.Dial(endpoint, append(c.opts, grpc.WithBlock())...) if err != nil { @@ -188,7 +189,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) { // PKIID is nil when we don't know the remote PKI id's c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID) - return nil, fmt.Errorf("Authentication failure") + return nil, errors.New("Authentication failure") } conn := newConnection(cl, cc, stream, nil) conn.pkiID = pkiID @@ -275,7 +276,7 @@ func (c *commImpl) Probe(remotePeer *RemotePeer) error { endpoint := remotePeer.Endpoint pkiID := remotePeer.PKIID if c.isStopping() { - return fmt.Errorf("Stopping") + return errors.New("Stopping") } c.logger.Debug("Entering, endpoint:", endpoint, "PKIID:", pkiID) cc, err := grpc.Dial(remotePeer.Endpoint, append(c.opts, grpc.WithBlock())...) @@ -407,15 +408,15 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro c.logger.Debug("Sending", cMsg, "to", remoteAddress) stream.Send(cMsg) - m := readWithTimeout(stream, defConnTimeout) + m := readWithTimeout(stream, util.GetDurationOrDefault("peer.gossip.connTimeout", defConnTimeout)) if m == nil { c.logger.Warning("Timed out waiting for connection message from", remoteAddress) - return nil, fmt.Errorf("Timed out") + return nil, errors.New("Timed out") } receivedMsg := m.GetConn() if receivedMsg == nil { c.logger.Warning("Expected connection message but got", receivedMsg) - return nil, fmt.Errorf("Wrong type") + return nil, errors.New("Wrong type") } if receivedMsg.PkiID == nil { @@ -425,7 +426,7 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro if c.isPKIblackListed(receivedMsg.PkiID) { c.logger.Warning("Connection attempt from", remoteAddress, "but it is black-listed") - return nil, fmt.Errorf("Black-listed") + return nil, errors.New("Black-listed") } c.logger.Debug("Received", receivedMsg, "from", remoteAddress) err = c.idMapper.Put(receivedMsg.PkiID, receivedMsg.Cert) @@ -456,7 +457,7 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error { if c.isStopping() { - return fmt.Errorf("Shutting down") + return errors.New("Shutting down") } PKIID, err := c.authenticateRemotePeer(stream) if err != nil { @@ -573,7 +574,7 @@ func createGRPCLayer(port int) (*grpc.Server, net.Listener, grpc.DialOption, []b } if len(cert.Certificate) == 0 { - panic(fmt.Errorf("Certificate chain is nil")) + panic(errors.New("Certificate chain is nil")) } returnedCertHash = certHashFromRawCert(cert.Certificate[0]) diff --git a/gossip/comm/comm_test.go b/gossip/comm/comm_test.go index 55131f8c5b3..dc32b7859fb 100644 --- a/gossip/comm/comm_test.go +++ b/gossip/comm/comm_test.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand" "os" + "strings" "sync" "testing" "time" @@ -29,7 +30,9 @@ import ( "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/identity" + "github.com/hyperledger/fabric/gossip/util" proto "github.com/hyperledger/fabric/protos/gossip" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "golang.org/x/net/context" "google.golang.org/grpc" @@ -38,7 +41,6 @@ import ( func init() { rand.Seed(42) - SetDialTimeout(time.Duration(300) * time.Millisecond) } func acceptAll(msg interface{}) bool { @@ -149,6 +151,23 @@ func handshaker(endpoint string, comm Comm, t *testing.T, sigMutator func([]byte return acceptChan } +func TestViperConfig(t *testing.T) { + viper.SetConfigName("core") + viper.SetEnvPrefix("CORE") + viper.AddConfigPath("./../../peer") + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + viper.AutomaticEnv() + err := viper.ReadInConfig() + if err != nil { // Handle errors reading the config file + panic(fmt.Errorf("Fatal error config file: %s \n", err)) + } + + assert.Equal(t, time.Duration(2)*time.Second, util.GetDurationOrDefault("peer.gossip.connTimeout", 0)) + assert.Equal(t, time.Duration(300)*time.Millisecond, util.GetDurationOrDefault("peer.gossip.dialTimeout", 0)) + assert.Equal(t, 20, util.GetIntOrDefault("peer.gossip.recvBuffSize", 0)) + assert.Equal(t, 20, util.GetIntOrDefault("peer.gossip.sendBuffSize", 0)) +} + func TestHandshake(t *testing.T) { t.Parallel() comm, _ := newCommInstance(9611, naiveSec) @@ -285,7 +304,7 @@ func TestParallelSend(t *testing.T) { defer comm1.Stop() defer comm2.Stop() - messages2Send := defRecvBuffSize + messages2Send := util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize) wg := sync.WaitGroup{} go func() { @@ -378,7 +397,7 @@ func TestAccept(t *testing.T) { var evenResults []uint64 var oddResults []uint64 - out := make(chan uint64, defRecvBuffSize) + out := make(chan uint64, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize)) sem := make(chan struct{}, 0) readIntoSlice := func(a *[]uint64, ch <-chan proto.ReceivedMessage) { @@ -392,11 +411,11 @@ func TestAccept(t *testing.T) { go readIntoSlice(&evenResults, evenNONCES) go readIntoSlice(&oddResults, oddNONCES) - for i := 0; i < defRecvBuffSize; i++ { + for i := 0; i < util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize); i++ { comm2.Send(createGossipMsg(), remotePeer(7611)) } - waitForMessages(t, out, defRecvBuffSize, "Didn't receive all messages sent") + waitForMessages(t, out, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize), "Didn't receive all messages sent") comm1.Stop() comm2.Stop() @@ -532,3 +551,10 @@ func waitForMessages(t *testing.T, msgChan chan uint64, count int, errMsg string } assert.Equal(t, count, c, errMsg) } + +func TestMain(m *testing.M) { + SetDialTimeout(time.Duration(300) * time.Millisecond) + + ret := m.Run() + os.Exit(ret) +} diff --git a/gossip/comm/conn.go b/gossip/comm/conn.go index 92a817cca7f..d0fb078168f 100644 --- a/gossip/comm/conn.go +++ b/gossip/comm/conn.go @@ -22,6 +22,7 @@ import ( "sync/atomic" "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/util" proto "github.com/hyperledger/fabric/protos/gossip" "github.com/op/go-logging" "google.golang.org/grpc" @@ -183,7 +184,7 @@ func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) { func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection { connection := &connection{ - outBuff: make(chan *msgSending, defSendBuffSize), + outBuff: make(chan *msgSending, util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize)), cl: cl, conn: c, clientStream: cs, @@ -242,7 +243,7 @@ func (conn *connection) send(msg *proto.GossipMessage, onErr func(error)) { conn.Lock() defer conn.Unlock() - if len(conn.outBuff) == defSendBuffSize { + if len(conn.outBuff) == util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize) { go onErr(errSendOverflow) return } @@ -257,7 +258,7 @@ func (conn *connection) send(msg *proto.GossipMessage, onErr func(error)) { func (conn *connection) serviceConnection() error { errChan := make(chan error, 1) - msgChan := make(chan *proto.GossipMessage, defRecvBuffSize) + msgChan := make(chan *proto.GossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize)) defer close(msgChan) // Call stream.Recv() asynchronously in readFromStream(), diff --git a/gossip/integration/integration.go b/gossip/integration/integration.go index 0f3567f2450..0bde2bb41a8 100644 --- a/gossip/integration/integration.go +++ b/gossip/integration/integration.go @@ -25,28 +25,13 @@ import ( "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/identity" + "github.com/hyperledger/fabric/gossip/util" "github.com/spf13/viper" "google.golang.org/grpc" ) // This file is used to bootstrap a gossip instance and/or leader election service instance -func getIntOrDefault(key string, defVal int) int { - if viper.GetInt(key) == 0 { - return defVal - } else { - return viper.GetInt(key) - } -} - -func getDurationOrDefault(key string, defVal time.Duration) time.Duration { - if viper.GetDuration(key) == 0 { - return defVal - } else { - return viper.GetDuration(key) - } -} - func newConfig(selfEndpoint string, externalEndpoint string, bootPeers ...string) *gossip.Config { port, err := strconv.ParseInt(strings.Split(selfEndpoint, ":")[1], 10, 64) if err != nil { @@ -65,18 +50,18 @@ func newConfig(selfEndpoint string, externalEndpoint string, bootPeers ...string BindPort: int(port), BootstrapPeers: bootPeers, ID: selfEndpoint, - MaxBlockCountToStore: getIntOrDefault("peer.gossip.maxBlockCountToStore", 100), - MaxPropagationBurstLatency: getDurationOrDefault("peer.gossip.maxPropagationBurstLatency", 10*time.Millisecond), - MaxPropagationBurstSize: getIntOrDefault("peer.gossip.maxPropagationBurstSize", 10), - PropagateIterations: getIntOrDefault("peer.gossip.propagateIterations", 1), - PropagatePeerNum: getIntOrDefault("peer.gossip.propagatePeerNum", 3), - PullInterval: getDurationOrDefault("peer.gossip.pullInterval", 4*time.Second), - PullPeerNum: getIntOrDefault("peer.gossip.pullPeerNum", 3), + MaxBlockCountToStore: util.GetIntOrDefault("peer.gossip.maxBlockCountToStore", 100), + MaxPropagationBurstLatency: util.GetDurationOrDefault("peer.gossip.maxPropagationBurstLatency", 10*time.Millisecond), + MaxPropagationBurstSize: util.GetIntOrDefault("peer.gossip.maxPropagationBurstSize", 10), + PropagateIterations: util.GetIntOrDefault("peer.gossip.propagateIterations", 1), + PropagatePeerNum: util.GetIntOrDefault("peer.gossip.propagatePeerNum", 3), + PullInterval: util.GetDurationOrDefault("peer.gossip.pullInterval", 4*time.Second), + PullPeerNum: util.GetIntOrDefault("peer.gossip.pullPeerNum", 3), InternalEndpoint: selfEndpoint, ExternalEndpoint: externalEndpoint, - PublishCertPeriod: getDurationOrDefault("peer.gossip.publishCertPeriod", 10*time.Second), - RequestStateInfoInterval: getDurationOrDefault("peer.gossip.requestStateInfoInterval", 4*time.Second), - PublishStateInfoInterval: getDurationOrDefault("peer.gossip.publishStateInfoInterval", 4*time.Second), + PublishCertPeriod: util.GetDurationOrDefault("peer.gossip.publishCertPeriod", 10*time.Second), + RequestStateInfoInterval: util.GetDurationOrDefault("peer.gossip.requestStateInfoInterval", 4*time.Second), + PublishStateInfoInterval: util.GetDurationOrDefault("peer.gossip.publishStateInfoInterval", 4*time.Second), SkipBlockVerification: viper.GetBool("peer.gossip.skipBlockVerification"), TLSServerCert: cert, } diff --git a/gossip/util/misc.go b/gossip/util/misc.go index e3cb6e69708..688ac8663af 100644 --- a/gossip/util/misc.go +++ b/gossip/util/misc.go @@ -22,6 +22,9 @@ import ( "reflect" "runtime" "sync" + "time" + + "github.com/spf13/viper" ) // Equals returns whether a and b are the same @@ -146,3 +149,21 @@ func PrintStackTrace() { runtime.Stack(buf, true) fmt.Printf("%s", buf) } + +// GetIntOrDefault returns the int value from config if present otherwise default value +func GetIntOrDefault(key string, defVal int) int { + if val := viper.GetInt(key); val != 0 { + return val + } + + return defVal +} + +// GetIntOrDefault returns the Duration value from config if present otherwise default value +func GetDurationOrDefault(key string, defVal time.Duration) time.Duration { + if val := viper.GetDuration(key); val != 0 { + return val + } + + return defVal +} diff --git a/peer/core.yaml b/peer/core.yaml index c8ce6d9ecdc..fabea5a05bb 100644 --- a/peer/core.yaml +++ b/peer/core.yaml @@ -100,6 +100,14 @@ peer: skipBlockVerification: false # Should we ignore security or not ignoreSecurity: false + # Dial timeout(unit: second) + dialTimeout: 3s + # Connection timeout(unit: second) + connTimeout: 2s + # Buffer size of received messages + recvBuffSize: 20 + # Buffer size of sending messages + sendBuffSize: 20 # This is an endpoint that is published to peers outside of the organization. # If this isn't set, the peer will not be known to other organizations.