From ff8b3e4b90566349ee4e261bf49794480d52403a Mon Sep 17 00:00:00 2001 From: grapebaba <281165273@qq.com> Date: Fri, 17 Feb 2017 16:51:19 +0800 Subject: [PATCH] [FAB-2206]Make gossip discovery configurable Change-Id: I98977440727e9ab8108fe94b3a1c0659e539ae85 Signed-off-by: grapebaba <281165273@qq.com> --- gossip/discovery/discovery_impl.go | 55 ++++++++++++++++++--------- gossip/discovery/discovery_test.go | 61 +++++++++++++++++++++++++----- gossip/gossip/gossip_test.go | 4 +- peer/core.yaml | 7 +++- 4 files changed, 96 insertions(+), 31 deletions(-) diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index f54ccdfce6b..853d3316f61 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -27,23 +27,22 @@ import ( "github.com/hyperledger/fabric/gossip/util" proto "github.com/hyperledger/fabric/protos/gossip" "github.com/op/go-logging" + "github.com/spf13/viper" ) const defaultHelloInterval = time.Duration(5) * time.Second -var aliveTimeInterval = defaultHelloInterval -var aliveExpirationTimeout = 5 * aliveTimeInterval -var aliveExpirationCheckInterval = time.Duration(aliveExpirationTimeout / 10) -var reconnectInterval = aliveExpirationTimeout +var aliveExpirationCheckInterval time.Duration -// SetAliveTimeInternal sets the alive time interval -func SetAliveTimeInternal(interval time.Duration) { - aliveTimeInterval = interval +// SetAliveTimeInterval sets the alive time interval +func SetAliveTimeInterval(interval time.Duration) { + viper.Set("peer.gossip.aliveTimeInterval", interval) } -// SetExpirationTimeout sets the expiration timeout -func SetExpirationTimeout(timeout time.Duration) { - aliveExpirationTimeout = timeout +// SetAliveExpirationTimeout sets the expiration timeout +func SetAliveExpirationTimeout(timeout time.Duration) { + viper.Set("peer.gossip.aliveExpirationTimeout", timeout) + aliveExpirationCheckInterval = time.Duration(timeout / 10) } // SetAliveExpirationCheckInterval sets the expiration check interval @@ -53,7 +52,7 @@ func SetAliveExpirationCheckInterval(interval time.Duration) { // SetReconnectInterval sets the reconnect interval func SetReconnectInterval(interval time.Duration) { - reconnectInterval = interval + viper.Set("peer.gossip.reconnectInterval", interval) } type timestamp struct { @@ -177,7 +176,7 @@ func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) { }(endpoint) } wg.Wait() - time.Sleep(reconnectInterval) + time.Sleep(getReconnectInterval()) } } @@ -482,8 +481,8 @@ func (d *gossipDiscoveryImpl) periodicalReconnectToDead() { } wg.Wait() - d.logger.Debug("Sleeping", reconnectInterval) - time.Sleep(reconnectInterval) + d.logger.Debug("Sleeping", getReconnectInterval()) + time.Sleep(getReconnectInterval()) } } @@ -531,7 +530,7 @@ func (d *gossipDiscoveryImpl) periodicalCheckAlive() { defer d.logger.Debug("Stopped") for !d.toDie() { - time.Sleep(aliveExpirationCheckInterval) + time.Sleep(getAliveExpirationCheckInterval()) dead := d.getDeadMembers() if len(dead) > 0 { d.logger.Debugf("Got %v dead members: %v", len(dead), dead) @@ -581,7 +580,7 @@ func (d *gossipDiscoveryImpl) getDeadMembers() []common.PKIidType { dead := []common.PKIidType{} for id, last := range d.aliveLastTS { elapsedNonAliveTime := time.Since(last.lastSeen) - if elapsedNonAliveTime.Nanoseconds() > aliveExpirationTimeout.Nanoseconds() { + if elapsedNonAliveTime.Nanoseconds() > getAliveExpirationTimeout().Nanoseconds() { d.logger.Warning("Haven't heard from", id, "for", elapsedNonAliveTime) dead = append(dead, common.PKIidType(id)) } @@ -593,8 +592,8 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() { defer d.logger.Debug("Stopped") for !d.toDie() { - d.logger.Debug("Sleeping", aliveTimeInterval) - time.Sleep(aliveTimeInterval) + d.logger.Debug("Sleeping", getAliveTimeInterval()) + time.Sleep(getAliveTimeInterval()) d.comm.Gossip(d.createAliveMessage()) } } @@ -803,3 +802,23 @@ func before(a *timestamp, b *proto.PeerTime) bool { return (uint64(a.incTime.UnixNano()) == b.IncNumber && a.seqNum < b.SeqNum) || uint64(a.incTime.UnixNano()) < b.IncNumber } + +func getAliveTimeInterval() time.Duration { + return util.GetDurationOrDefault("peer.gossip.aliveTimeInterval", defaultHelloInterval) +} + +func getAliveExpirationTimeout() time.Duration { + return util.GetDurationOrDefault("peer.gossip.aliveExpirationTimeout", 5*getAliveTimeInterval()) +} + +func getAliveExpirationCheckInterval() time.Duration { + if aliveExpirationCheckInterval != 0 { + return aliveExpirationCheckInterval + } + + return time.Duration(getAliveExpirationTimeout() / 10) +} + +func getReconnectInterval() time.Duration { + return util.GetDurationOrDefault("peer.gossip.reconnectInterval", getAliveExpirationTimeout()) +} diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index c312b49bc5b..7b403be8a8b 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "net" + "strings" "sync" "sync/atomic" "testing" @@ -27,6 +28,7 @@ import ( "github.com/hyperledger/fabric/gossip/common" proto "github.com/hyperledger/fabric/protos/gossip" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "golang.org/x/net/context" "google.golang.org/grpc" @@ -34,6 +36,14 @@ import ( var timeout = time.Second * time.Duration(15) +func init() { + aliveTimeInterval := time.Duration(time.Millisecond * 100) + SetAliveTimeInterval(aliveTimeInterval) + SetAliveExpirationTimeout(10 * aliveTimeInterval) + SetAliveExpirationCheckInterval(aliveTimeInterval) + SetReconnectInterval(10 * aliveTimeInterval) +} + type dummyCommModule struct { id string presumeDead chan common.PKIidType @@ -141,13 +151,6 @@ func (comm *dummyCommModule) CloseConn(peer *NetworkMember) { comm.conns[peer.Endpoint].Close() } -func init() { - aliveTimeInterval = time.Duration(time.Millisecond * 100) - aliveExpirationTimeout = 10 * aliveTimeInterval - aliveExpirationCheckInterval = aliveTimeInterval - reconnectInterval = aliveExpirationTimeout -} - func (g *gossipInstance) GossipStream(stream proto.Gossip_GossipStreamServer) error { for { gMsg, err := stream.Recv() @@ -348,12 +351,12 @@ func TestInitiateSync(t *testing.T) { if atomic.LoadInt32(&toDie) == int32(1) { return } - time.Sleep(aliveExpirationTimeout / 3) + time.Sleep(getAliveExpirationTimeout() / 3) inst.InitiateSync(9) } }() } - time.Sleep(aliveExpirationTimeout * 4) + time.Sleep(getAliveExpirationTimeout() * 4) assertMembership(t, instances, nodeNum-1) atomic.StoreInt32(&toDie, int32(1)) stopInstances(t, instances) @@ -472,7 +475,7 @@ func TestConvergence(t *testing.T) { } assertMembership(t, instances, 3) - connector := createDiscoveryInstance(4623, fmt.Sprintf("d13"), []string{bootPeer(4611), bootPeer(4615), bootPeer(4619)}) + connector := createDiscoveryInstance(4623, "d13", []string{bootPeer(4611), bootPeer(4615), bootPeer(4619)}) instances = append(instances, connector) assertMembership(t, instances, 12) connector.Stop() @@ -481,6 +484,44 @@ func TestConvergence(t *testing.T) { stopInstances(t, instances) } +func TestConfigFromFile(t *testing.T) { + preAliveTimeInterval := getAliveTimeInterval() + preAliveExpirationTimeout := getAliveExpirationTimeout() + preAliveExpirationCheckInterval := getAliveExpirationCheckInterval() + preReconnectInterval := getReconnectInterval() + + // Recover the config values in order to avoid impacting other tests + defer func() { + SetAliveTimeInterval(preAliveTimeInterval) + SetAliveExpirationTimeout(preAliveExpirationTimeout) + SetAliveExpirationCheckInterval(preAliveExpirationCheckInterval) + SetReconnectInterval(preReconnectInterval) + }() + + // Verify if using default values when config is missing + viper.Reset() + aliveExpirationCheckInterval = 0 * time.Second + assert.Equal(t, time.Duration(5)*time.Second, getAliveTimeInterval()) + assert.Equal(t, time.Duration(25)*time.Second, getAliveExpirationTimeout()) + assert.Equal(t, time.Duration(25)*time.Second/10, getAliveExpirationCheckInterval()) + assert.Equal(t, time.Duration(25)*time.Second, getReconnectInterval()) + + //Verify reading the values from config file + viper.Reset() + aliveExpirationCheckInterval = 0 * time.Second + viper.SetConfigName("core") + viper.SetEnvPrefix("CORE") + viper.AddConfigPath("./../../peer") + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + viper.AutomaticEnv() + err := viper.ReadInConfig() + assert.NoError(t, err) + assert.Equal(t, time.Duration(5)*time.Second, getAliveTimeInterval()) + assert.Equal(t, time.Duration(25)*time.Second, getAliveExpirationTimeout()) + assert.Equal(t, time.Duration(25)*time.Second/10, getAliveExpirationCheckInterval()) + assert.Equal(t, time.Duration(25)*time.Second, getReconnectInterval()) +} + func waitUntilOrFail(t *testing.T, pred func() bool) { start := time.Now() limit := start.UnixNano() + timeout.Nanoseconds() diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index c03fc923df4..187aafa0187 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -43,9 +43,9 @@ var testWG = sync.WaitGroup{} func init() { aliveTimeInterval := time.Duration(1000) * time.Millisecond - discovery.SetAliveTimeInternal(aliveTimeInterval) + discovery.SetAliveTimeInterval(aliveTimeInterval) discovery.SetAliveExpirationCheckInterval(aliveTimeInterval) - discovery.SetExpirationTimeout(aliveTimeInterval * 10) + discovery.SetAliveExpirationTimeout(aliveTimeInterval * 10) discovery.SetReconnectInterval(aliveTimeInterval * 5) testWG.Add(7) diff --git a/peer/core.yaml b/peer/core.yaml index e1d63053662..1fa289527b9 100644 --- a/peer/core.yaml +++ b/peer/core.yaml @@ -114,7 +114,12 @@ peer: requestWaitTime: 1s # Time to wait before pull engine ends pull (unit: second) responseWaitTime: 2s - + # Alive check interval(unit: second) + aliveTimeInterval: 5s + # Alive expiration timeout(unit: second) + aliveExpirationTimeout: 25s + # Reconnect interval(unit: second) + reconnectInterval: 25s # This is an endpoint that is published to peers outside of the organization. # If this isn't set, the peer will not be known to other organizations. externalEndpoint: