Skip to content

Commit

Permalink
[FAB-2206]Make gossip discovery configurable
Browse files Browse the repository at this point in the history
Change-Id: I98977440727e9ab8108fe94b3a1c0659e539ae85
Signed-off-by: grapebaba <281165273@qq.com>
  • Loading branch information
GrapeBaBa committed Feb 23, 2017
1 parent 351f1b8 commit ff8b3e4
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 31 deletions.
55 changes: 37 additions & 18 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,22 @@ import (
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/spf13/viper"
)

const defaultHelloInterval = time.Duration(5) * time.Second

var aliveTimeInterval = defaultHelloInterval
var aliveExpirationTimeout = 5 * aliveTimeInterval
var aliveExpirationCheckInterval = time.Duration(aliveExpirationTimeout / 10)
var reconnectInterval = aliveExpirationTimeout
var aliveExpirationCheckInterval time.Duration

// SetAliveTimeInternal sets the alive time interval
func SetAliveTimeInternal(interval time.Duration) {
aliveTimeInterval = interval
// SetAliveTimeInterval sets the alive time interval
func SetAliveTimeInterval(interval time.Duration) {
viper.Set("peer.gossip.aliveTimeInterval", interval)
}

// SetExpirationTimeout sets the expiration timeout
func SetExpirationTimeout(timeout time.Duration) {
aliveExpirationTimeout = timeout
// SetAliveExpirationTimeout sets the expiration timeout
func SetAliveExpirationTimeout(timeout time.Duration) {
viper.Set("peer.gossip.aliveExpirationTimeout", timeout)
aliveExpirationCheckInterval = time.Duration(timeout / 10)
}

// SetAliveExpirationCheckInterval sets the expiration check interval
Expand All @@ -53,7 +52,7 @@ func SetAliveExpirationCheckInterval(interval time.Duration) {

// SetReconnectInterval sets the reconnect interval
func SetReconnectInterval(interval time.Duration) {
reconnectInterval = interval
viper.Set("peer.gossip.reconnectInterval", interval)
}

type timestamp struct {
Expand Down Expand Up @@ -177,7 +176,7 @@ func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
}(endpoint)
}
wg.Wait()
time.Sleep(reconnectInterval)
time.Sleep(getReconnectInterval())
}
}

Expand Down Expand Up @@ -482,8 +481,8 @@ func (d *gossipDiscoveryImpl) periodicalReconnectToDead() {
}

wg.Wait()
d.logger.Debug("Sleeping", reconnectInterval)
time.Sleep(reconnectInterval)
d.logger.Debug("Sleeping", getReconnectInterval())
time.Sleep(getReconnectInterval())
}
}

Expand Down Expand Up @@ -531,7 +530,7 @@ func (d *gossipDiscoveryImpl) periodicalCheckAlive() {
defer d.logger.Debug("Stopped")

for !d.toDie() {
time.Sleep(aliveExpirationCheckInterval)
time.Sleep(getAliveExpirationCheckInterval())
dead := d.getDeadMembers()
if len(dead) > 0 {
d.logger.Debugf("Got %v dead members: %v", len(dead), dead)
Expand Down Expand Up @@ -581,7 +580,7 @@ func (d *gossipDiscoveryImpl) getDeadMembers() []common.PKIidType {
dead := []common.PKIidType{}
for id, last := range d.aliveLastTS {
elapsedNonAliveTime := time.Since(last.lastSeen)
if elapsedNonAliveTime.Nanoseconds() > aliveExpirationTimeout.Nanoseconds() {
if elapsedNonAliveTime.Nanoseconds() > getAliveExpirationTimeout().Nanoseconds() {
d.logger.Warning("Haven't heard from", id, "for", elapsedNonAliveTime)
dead = append(dead, common.PKIidType(id))
}
Expand All @@ -593,8 +592,8 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() {
defer d.logger.Debug("Stopped")

for !d.toDie() {
d.logger.Debug("Sleeping", aliveTimeInterval)
time.Sleep(aliveTimeInterval)
d.logger.Debug("Sleeping", getAliveTimeInterval())
time.Sleep(getAliveTimeInterval())
d.comm.Gossip(d.createAliveMessage())
}
}
Expand Down Expand Up @@ -803,3 +802,23 @@ func before(a *timestamp, b *proto.PeerTime) bool {
return (uint64(a.incTime.UnixNano()) == b.IncNumber && a.seqNum < b.SeqNum) ||
uint64(a.incTime.UnixNano()) < b.IncNumber
}

func getAliveTimeInterval() time.Duration {
return util.GetDurationOrDefault("peer.gossip.aliveTimeInterval", defaultHelloInterval)
}

func getAliveExpirationTimeout() time.Duration {
return util.GetDurationOrDefault("peer.gossip.aliveExpirationTimeout", 5*getAliveTimeInterval())
}

func getAliveExpirationCheckInterval() time.Duration {
if aliveExpirationCheckInterval != 0 {
return aliveExpirationCheckInterval
}

return time.Duration(getAliveExpirationTimeout() / 10)
}

func getReconnectInterval() time.Duration {
return util.GetDurationOrDefault("peer.gossip.reconnectInterval", getAliveExpirationTimeout())
}
61 changes: 51 additions & 10 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,30 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/hyperledger/fabric/gossip/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

var timeout = time.Second * time.Duration(15)

func init() {
aliveTimeInterval := time.Duration(time.Millisecond * 100)
SetAliveTimeInterval(aliveTimeInterval)
SetAliveExpirationTimeout(10 * aliveTimeInterval)
SetAliveExpirationCheckInterval(aliveTimeInterval)
SetReconnectInterval(10 * aliveTimeInterval)
}

type dummyCommModule struct {
id string
presumeDead chan common.PKIidType
Expand Down Expand Up @@ -141,13 +151,6 @@ func (comm *dummyCommModule) CloseConn(peer *NetworkMember) {
comm.conns[peer.Endpoint].Close()
}

func init() {
aliveTimeInterval = time.Duration(time.Millisecond * 100)
aliveExpirationTimeout = 10 * aliveTimeInterval
aliveExpirationCheckInterval = aliveTimeInterval
reconnectInterval = aliveExpirationTimeout
}

func (g *gossipInstance) GossipStream(stream proto.Gossip_GossipStreamServer) error {
for {
gMsg, err := stream.Recv()
Expand Down Expand Up @@ -348,12 +351,12 @@ func TestInitiateSync(t *testing.T) {
if atomic.LoadInt32(&toDie) == int32(1) {
return
}
time.Sleep(aliveExpirationTimeout / 3)
time.Sleep(getAliveExpirationTimeout() / 3)
inst.InitiateSync(9)
}
}()
}
time.Sleep(aliveExpirationTimeout * 4)
time.Sleep(getAliveExpirationTimeout() * 4)
assertMembership(t, instances, nodeNum-1)
atomic.StoreInt32(&toDie, int32(1))
stopInstances(t, instances)
Expand Down Expand Up @@ -472,7 +475,7 @@ func TestConvergence(t *testing.T) {
}

assertMembership(t, instances, 3)
connector := createDiscoveryInstance(4623, fmt.Sprintf("d13"), []string{bootPeer(4611), bootPeer(4615), bootPeer(4619)})
connector := createDiscoveryInstance(4623, "d13", []string{bootPeer(4611), bootPeer(4615), bootPeer(4619)})
instances = append(instances, connector)
assertMembership(t, instances, 12)
connector.Stop()
Expand All @@ -481,6 +484,44 @@ func TestConvergence(t *testing.T) {
stopInstances(t, instances)
}

func TestConfigFromFile(t *testing.T) {
preAliveTimeInterval := getAliveTimeInterval()
preAliveExpirationTimeout := getAliveExpirationTimeout()
preAliveExpirationCheckInterval := getAliveExpirationCheckInterval()
preReconnectInterval := getReconnectInterval()

// Recover the config values in order to avoid impacting other tests
defer func() {
SetAliveTimeInterval(preAliveTimeInterval)
SetAliveExpirationTimeout(preAliveExpirationTimeout)
SetAliveExpirationCheckInterval(preAliveExpirationCheckInterval)
SetReconnectInterval(preReconnectInterval)
}()

// Verify if using default values when config is missing
viper.Reset()
aliveExpirationCheckInterval = 0 * time.Second
assert.Equal(t, time.Duration(5)*time.Second, getAliveTimeInterval())
assert.Equal(t, time.Duration(25)*time.Second, getAliveExpirationTimeout())
assert.Equal(t, time.Duration(25)*time.Second/10, getAliveExpirationCheckInterval())
assert.Equal(t, time.Duration(25)*time.Second, getReconnectInterval())

//Verify reading the values from config file
viper.Reset()
aliveExpirationCheckInterval = 0 * time.Second
viper.SetConfigName("core")
viper.SetEnvPrefix("CORE")
viper.AddConfigPath("./../../peer")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()
err := viper.ReadInConfig()
assert.NoError(t, err)
assert.Equal(t, time.Duration(5)*time.Second, getAliveTimeInterval())
assert.Equal(t, time.Duration(25)*time.Second, getAliveExpirationTimeout())
assert.Equal(t, time.Duration(25)*time.Second/10, getAliveExpirationCheckInterval())
assert.Equal(t, time.Duration(25)*time.Second, getReconnectInterval())
}

func waitUntilOrFail(t *testing.T, pred func() bool) {
start := time.Now()
limit := start.UnixNano() + timeout.Nanoseconds()
Expand Down
4 changes: 2 additions & 2 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ var testWG = sync.WaitGroup{}

func init() {
aliveTimeInterval := time.Duration(1000) * time.Millisecond
discovery.SetAliveTimeInternal(aliveTimeInterval)
discovery.SetAliveTimeInterval(aliveTimeInterval)
discovery.SetAliveExpirationCheckInterval(aliveTimeInterval)
discovery.SetExpirationTimeout(aliveTimeInterval * 10)
discovery.SetAliveExpirationTimeout(aliveTimeInterval * 10)
discovery.SetReconnectInterval(aliveTimeInterval * 5)

testWG.Add(7)
Expand Down
7 changes: 6 additions & 1 deletion peer/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ peer:
requestWaitTime: 1s
# Time to wait before pull engine ends pull (unit: second)
responseWaitTime: 2s

# Alive check interval(unit: second)
aliveTimeInterval: 5s
# Alive expiration timeout(unit: second)
aliveExpirationTimeout: 25s
# Reconnect interval(unit: second)
reconnectInterval: 25s
# This is an endpoint that is published to peers outside of the organization.
# If this isn't set, the peer will not be known to other organizations.
externalEndpoint:
Expand Down

0 comments on commit ff8b3e4

Please sign in to comment.